input { udp { port => 1514 type => barracuda } } filter { if [type] == "barracuda" { # Extract Log Fields ruby { init => " HEADER_FIELDS = ['cef_version','Vendor','Product','DeviceVersion','SignatureId','EventName','Severity'] #event_new = LogStash::Event.new def store_header_field(event,field_name,field_data) #Special Condition for CONNECTION LOGS in ADC as in syslog EventName, Severity headers are missing if field_data =~ /LogType\=CONN/ if field_name == 'EventName' field_data = '' end if field_name == 'Severity' field_data = '' end end #Unescape pipes and backslash in header fields event.set(field_name,field_data.gsub(/\\\|/, '|').gsub(/\\\\/, '\\')) unless field_data.nil? end " code => " if event.get('[message][0]') == '\"' event.set('[message]' , event.get('[message]')[1..-2]) end split_data = event.get('[message]').to_s.split /(?<=[^\\]\\\\)[\|]|(? 1 split_data.reject(&:blank?) #split_data[HEADER_FIELDS.size..-1].join('|') split_data.join('|') end if split_data.length > 1 if event.get('cef_version').include? ' ' split_cef_version= event.get('cef_version').rpartition(' ') event.set('syslog', split_cef_version[0]) event.set('cef_version',split_cef_version[2]) end end if event.get('cef_version') =~ /^CEF:/ event.set('cef_version', event.get('cef_version').sub(/^CEF:/, '')) unless event.get('cef_version').nil? end if not msg.nil? and msg.include? '=' msg = msg.strip # If the last KVP has no value, add an empty string, this prevents hash errors below if msg.end_with?('=') msg=msg + ' ' unless msg.end_with?('\=') end # Now parse the key value pairs into it msg = msg.split(/[ ]*([\w\.]+)=/) #msg = msg.split(/ ([\w\.]+)=/) msg.shift() Hash[*msg].each{ |k, v| event.set(k,v.gsub(/\\=/, '=').gsub(/\\\\/, '\\')) unless v.nil? } hash2 = event.to_hash hash2.each { |key2,value2| logger.info('Key 2:', 'value' => key2) logger.info('Value 2 :', 'value' => value2) } end " remove_field => ['message'] } # Filtering LogFields which are common to all Log Types mutate { convert => {"Severity" => "integer" } } grok { match => {"cef_version" => ".+\:%{INT:cef_version:int}" } overwrite => ["cef_version"] } grok { match => {"DeviceReceiptTime" => "^\s*%{DATA:DeviceReceiptTime}\s*$" } overwrite => ["DeviceReceiptTime"] } mutate { gsub => ["StartTime", '\"', ""] } grok { match => {"StartTime" => "^\s*%{DATA:StartTime}\s*$" } overwrite => ["StartTime"] } date { match => ["StartTime","MMM dd YYYY HH:mm:ss"] target => "StartTime" } date { match => ["DeviceReceiptTime","UNIX_MS"] target => "DeviceReceiptTime" } # LogType Specific Filtering if [LogType] == "SYS" { # No filtering needed } else if [LogType] == "AUDIT" { mutate { convert => {"TransactionID" => "integer"} } grok { match => {"LoginPort" => "%{INT:LoginPort:int}" } overwrite => ["LoginPort"] } } else if [LogType] == "NF" { grok { match => {"DestinationPort" => "%{INT:DestinationPort:int}" } overwrite => ["DestinationPort"] } grok { match => {"SourcePort" => "%{INT:SourcePort:int}" } overwrite => ["SourcePort"] } geoip { source => "SourceIP" target => "geoip" #database => "/etc/logstash/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } else if [LogType] == "WF" { grok { match => {"ProxyPort" => "%{INT:ProxyPort:int}" } overwrite => ["ProxyPort"] } grok { match => {"ServicePort" => "%{INT:ServicePort:int}" } overwrite => ["ServicePort"] } grok { match => {"ClientPort" => "%{INT:ClientPort:int}" } overwrite => ["ClientPort"] } geoip { source => "ClientIP" target => "geoip" #database => "/etc/logstash/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } else if [LogType] == "TR" { grok { match => {"ServicePort" => "%{INT:ServicePort:int}" } overwrite => ["ServicePort"] } grok { match => {"CacheHit" => "%{INT:CacheHit:int}" } overwrite => ["CacheHit"] } grok { match => {"ProxyPort" => "%{INT:ProxyPort:int}" } overwrite => ["ProxyPort"] } grok { match => {"ServerTime" => "%{INT:ServerTime:int}" } overwrite => ["ServerTime"] } grok { match => {"TimeTaken" => "%{INT:TimeTaken:int}" } overwrite => ["TimeTaken"] } grok { match => {"ServerPort" => "%{INT:ServerPort:int}" } overwrite => ["ServerPort"] } grok { match => {"BytesReceived" => "%{INT:BytesReceived:int}" } overwrite => ["BytesReceived"] } grok { match => {"BytesSent" => "%{INT:BytesSent:int}" } overwrite => ["BytesSent"] } grok { match => {"ClientPort" => "%{INT:ClientPort:int}" } overwrite => ["ClientPort"] } geoip { source => "ClientIP" target => "geoip" #database => "/etc/logstash/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } # if "_grokparsefailure" in [tags] { # drop {} # } if "_rubyexception" in [tags] { drop {} } if ![LogType] { drop {} } } } output { file { path => "/home/logstash/output.txt" } if [type] == "barracuda" { if [LogType] == "SYS" { elasticsearch { hosts => [ "localhost:9200" ] index => "system_logs" } } else if [LogType] == "TR" { elasticsearch { hosts => [ "localhost:9200" ] index => "access_logs" } } else if [LogType] == "WF" { elasticsearch { hosts => [ "localhost:9200" ] index => "web_firewall_logs" } } else if [LogType] == "NF" { elasticsearch { hosts => [ "localhost:9200" ] index => "network_firewall_logs" } } else if [LogType] == "AUDIT" { elasticsearch { hosts => [ "localhost:9200" ] index => "audit_logs" } } else if [LogType] == "CONN" { elasticsearch { hosts => [ "localhost:9200" ] index => "connection_logs" } } } }