input { file { path => "/Users/olli/Development/ml/raw_data/expo2009_airline/2001.csv" type => "flight" start_position => "beginning" codec => plain { charset => "ISO-8859-1" } } } # http://www.transtats.bts.gov/Fields.asp?Table_ID=236 # https://www.elastic.co/guide/en/logstash/current/plugins-filters-csv.html filter { csv { columns => ["Year","Month","DayofMonth","DayOfWeek","DepTime","CRSDepTime","ArrTime","CRSArrTime","UniqueCarrier","FlightNum","TailNum","ActualElapsedTime","CRSElapsedTime","AirTime","ArrDelay","DepDelay","Origin","Dest","Distance","TaxiIn","TaxiOut","Cancelled","CancellationCode","Diverted","CarrierDelay","WeatherDelay","NASDelay","SecurityDelay","LateAircraftDelay"] separator => "," } # skip header line with this trick # https://discuss.elastic.co/t/skip-header-line-in-csv-input-v-1-5-0/1386 if ([Year] == "Year") { drop { } } # when there are not enough digits, we just assume leading 0 ruby { code => "event['CRSDepTime'] = '000' + event['CRSDepTime'] if event['CRSDepTime'].length == 1" } ruby { code => "event['CRSDepTime'] = '00' + event['CRSDepTime'] if event['CRSDepTime'].length == 2" } # pad with leading 0 then hour is a single digit only ruby { code => "event['CRSDepTime'] = '0' + event['CRSDepTime'] if event['CRSDepTime'].length == 3" } ruby { code => "event['CRSArrTime'] = '0' + event['CRSArrTime'] if event['CRSArrTime'].length == 3" } ruby { code => "event['ArrTime'] = '0' + event['ArrTime'] if event['ArrTime'].length == 3" } ruby { code => "event['DepTime'] = '0' + event['DepTime'] if event['DepTime'].length == 3" } mutate { add_field => ["timestamp", "%{Year}-%{Month}-%{DayofMonth};%{CRSDepTime}"] } mutate { add_field => ["actual_timestamp", "%{Year}-%{Month}-%{DayofMonth};%{DepTime}"] } date { locale => "en" match => ["timestamp", "YYYY-MM-dd;HHmm"] # should depend on timezone of origin, but this is too complicated and makes things hard to compare timezone => "America/Chicago" target => "@timestamp" } mutate { convert => { "ActualElapsedTime" => "integer" } } mutate { convert => { "CRSElapsedTime" => "integer" } } mutate { convert => { "ArrDelay" => "integer" } } mutate { convert => { "DepDelay" => "integer" } } mutate { convert => { "AirTime" => "integer" } } mutate { convert => { "Distance" => "integer" } } mutate { convert => { "TaxiIn" => "integer" } } mutate { convert => { "TaxiOut" => "integer" } } mutate { convert => { "Cancelled" => "boolean" } } mutate { convert => { "Diverted" => "boolean" } } } output { elasticsearch { action => "index" hosts => "localhost:9200" index => "expo2009_airline" workers => 1 } }