######################## # logstash Configuration Files - Bro IDS Logs # Created by 505Forensics (http://www.505forensics.com) # MIT License, so do what you want with it! # # For use with logstash, elasticsearch, and kibana to analyze logs # # Usage: Reference this config file for your instance of logstash to parse Bro conn logs # # Limitations: Standard Bro log delimiter is tab. # # Dependencies: Utilizing the logstash 'translate' filter requires having the logstash contrib plugins added, which are community supported and not part of the official release. Visit logstash.net to find out how to install these # ####################### input { file { type => "bro-conn_log" start_position => "end" sincedb_path => "/var/tmp/.bro_conn_sincedb" #Edit the following path to reflect the location of your log files. You can also change the extension if you use something else path => "/nsm/bro/logs/current/conn.log" } } filter { #Let's get rid of those header lines; they begin with a hash if [message] =~ /^#/ { drop { } } #Now, using the csv filter, we can define the Bro log fields if [type] == "bro-conn_log" { csv { columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"] #If you use a custom delimiter, change the following value in between the quotes to your delimiter. Otherwise, insert a literal in between the two quotes on your logstash system, use a text editor like nano that doesn't convert tabs to spaces. separator => " " } #Let's convert our timestamp into the 'ts' field, so we can use Kibana features natively date { match => [ "ts", "UNIX" ] } # add geoip attributes geoip { source => "id.orig_h" target => "orig_geoip" } geoip { source => "id.resp_h" target => "resp_geoip" } #The following makes use of the translate filter (logstash contrib) to convert conn_state into human text. Saves having to look up values for packet introspection translate { field => "conn_state" destination => "conn_state_full" dictionary => [ "S0", "Connection attempt seen, no reply", "S1", "Connection established, not terminated", "S2", "Connection established and close attempt by originator seen (but no reply from responder)", "S3", "Connection established and close attempt by responder seen (but no reply from originator)", "SF", "Normal SYN/FIN completion", "REJ", "Connection attempt rejected", "RSTO", "Connection established, originator aborted (sent a RST)", "RSTR", "Established, responder aborted", "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder", "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator", "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)", "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator", "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)" ] } mutate { convert => [ "id.orig_p", "integer" ] convert => [ "id.resp_p", "integer" ] convert => [ "orig_bytes", "integer" ] convert => [ "duration", "float" ] convert => [ "resp_bytes", "integer" ] convert => [ "missed_bytes", "integer" ] convert => [ "orig_pkts", "integer" ] convert => [ "orig_ip_bytes", "integer" ] convert => [ "resp_pkts", "integer" ] convert => [ "resp_ip_bytes", "integer" ] rename => [ "id.orig_h", "id_orig_host" ] rename => [ "id.orig_p", "id_orig_port" ] rename => [ "id.resp_h", "id_resp_host" ] rename => [ "id.resp_p", "id_resp_port" ] } } } output { # stdout { codec => rubydebug } elasticsearch { hosts => localhost } }