# Decode AIS raw data and save to HDF5

In [None]:
from pyais.stream import FileReaderStream
from pathlib import Path
import os

import h5py
import vaex

import numpy as np

import yaml

import pooch

In [None]:
vaex.multithreading.thread_count_default = 4

## Define constants

In [None]:
# Convert to meters from radians
EARTH_RADIUS_IN_M: int = 6371000

In [None]:
MAX_NUMBER_ROWS = 10000000

## Define paths for input and output files

In [None]:
data_folder = "./data"
input_folder = os.path.join(data_folder, "input")
output_folder = os.path.join(data_folder, "decoded")

### Create folders

In [None]:
for folder in [data_folder, input_folder, output_folder]:
 if not os.path.isdir(folder):
 os.mkdir(folder)

## Get input data

In [None]:
input_url = "https://raw.githubusercontent.com/aduvenhage/ais-decoder/master/data/nmea-sample.txt"

In [None]:
pooch.retrieve(
 url=input_url,
 known_hash=None,
 path=input_folder,
 fname=input_url.split("/")[-1]
)

## Define column names, types and attributes

- Read from XML template file containing information about data group, variables and their associated attributes such as types, missing values, bounds, etc.

In [None]:
# Load parameters
h5template_yaml: Path = "./ais_metadata.yaml"

## Analyse, decode and save to HDF5

In [None]:
class H5AIS:
 def __init__(self, ofilename, h5template=None, MAX_NUMBER_ROWS = 2000000):
 self.output_filename = ofilename
 self.columns = []
 self.coltypes = []
 self.colmissings = []
 self.colsizes = []
 self.colattrs = []
 self.buffers = []
 self.h5file = h5py.File(ofilename, "w")
 if h5template is None:
 self.h5group = "data"
 else:
 self.get_from_template(h5template)
 self.MAX_NUMBER_ROWS = MAX_NUMBER_ROWS
 self.h5columns = self.h5file.create_group(self.h5group) # vaex reads all datasets in the columns group
 
 
 def get_from_template(self, h5template):
 yaml_file = open(f"{h5template}")
 data_info = yaml.load(yaml_file, Loader=yaml.FullLoader)["group"]
 self.h5group = data_info["group_name"]
 self.h5template = data_info
 i = 0
 for var in self.h5template["variables"].keys():
 v = self.h5template["variables"][var]
 self.columns.append(var)
 attrs = { } 
 for vk in v.keys():
 if "type" == vk:
 self.coltypes.append(v[vk])
 elif "missing_value" == vk:
 self.colmissings.append(v[vk])
 else:
 attrs[vk] = v[vk]
 self.colattrs.append(attrs)
 self.buffers.append([])
 i+=1
 
 def _key2value(self, msg, key):
 idx = self.columns.index(key)
 attrs = self.colattrs[idx]
 if key == "status" and "status" in msg.asdict():
 colval = msg.asdict()[key].value
 elif key in msg.asdict():
 if "flag_meanings" in attrs:
 if type(msg.asdict()[key]) == bool:
 colval = int(msg.asdict()[key])
 else:
 idx_flag = attrs["flag_meanings"].index(msg.asdict()[key])
 colval = attrs["flag_values"][idx_flag]
 elif self.coltypes[idx] == "str":
 colval = str(msg.asdict()[key])
 else:
 colval = msg.asdict()[key]
 else:
 colval = self.colmissings[idx]
 
 if "scaling_factor" in self.colattrs[idx]:
 scaling = attrs["scaling_factor"]
 colval = scaling * colval
 return colval
 
 def create_column(self, colname, coltype, colmissing, colsize, colattr):
 idx = len(self.columns)
 self.columns.append(colname)
 self.coltypes[colname] = coltype
 self.colmissings[colname] = colmissing
 self.colsizes.append(colsize)
 self.colattrs.append(colattr)
 self.buffers.append([])
 
 def create_columns(self, colnames, coltypes, colmissings, colsizes, colattrs):
 print(coltypes)
 for cname, ctype, cmiss, csize, cattr in zip(colnames, list(coltypes.values()), list(colmissings.values()), colsizes, colattrs):
 self.create_column(cname, ctype, cmiss, csize, cattr)
 
 def decode_message(self, msg):
 try:
 decoded_message = msg.decode()
 if msg.tag_block is not None:
 msg.tag_block.init()
 tags = msg.tag_block
 except:
 print(" Message ", msg, " ignored")
 else:
 for cname in self.columns:
 idx = self.columns.index(cname)
 if "tag_block" in self.colattrs[idx]:
 if msg.tag_block is not None:
 val = self._key2value(tags, cname)
 else:
 val = self.colmissings[idx]
 else:
 val = self._key2value(decoded_message, cname)
 self.buffers[idx].append(val) 
 
 
 def write(self, colsize=None):
 for idx, colname in zip(range(len(self.columns)), self.columns):
 if colsize is None:
 colsize = self.MAX_NUMBER_ROWS
 coltype = self.coltypes[idx]
 print("Writing Column ", colname, coltype, self.colmissings[idx])
 if coltype == "str": 
 coltype = h5py.string_dtype(encoding='utf-8')
 ds = self.h5columns.create_dataset(colname, data=np.array(self.buffers[idx], dtype='S'))
 else:
 ds = self.h5columns.create_dataset(colname, data=np.array(self.buffers[idx], dtype=coltype), dtype=coltype)
 attrs = self.colattrs[idx]
 for attr in self.colattrs[idx].keys():
 ds.attrs[attr] = attrs[attr]

 def close(self):
 self.h5file.close()

In [None]:
%%time

nb_file = 1

input_filename = os.path.join(input_folder, input_url.split("/")[-1])
output_filename = os.path.join(output_folder, "ais_" + f"{nb_file:02}" + ".h5")

list_filenames = []
if os.path.isfile(output_filename):
 print(input_filename, " has been already decoded and ", output_filename, " have been generated")
else:
 
 print("Start...")
 ofile = H5AIS(output_filename, h5template=h5template_yaml, MAX_NUMBER_ROWS=MAX_NUMBER_ROWS)
 nb_msg = 0
 for msg in FileReaderStream(input_filename):
 ofile.decode_message(msg)
 nb_msg += 1
 if len(ofile.buffers[0]) >= ofile.MAX_NUMBER_ROWS:
 print("Writing buffer to ", output_filename)
 nb_msg = 0
 ofile.write()
 ofile.close()
 list_filenames.append(output_filename)
 nb_file += 1
 output_filename = output_prefix + "/ais_" + f"{nb_file:02}" + ".h5"
 ofile = H5AIS(output_filename, h5template=h5template_yaml, MAX_NUMBER_ROWS=MAX_NUMBER_ROWS)
 
 # Need to write last part
 print("Writing last buffer to ", output_filename)
 ofile.write(nb_msg) 
 ofile.close()

## Open decoded AIS data 

In [None]:
dset = vaex.open(os.path.join(output_folder, 'ais_*.h5'))
dset

## Visualization
- Simple interactive visualization with plotly to visualize vessels' locations;

In [None]:
df = dset.to_pandas_df()

In [None]:
# visualization
import plotly.express as px

In [None]:
fig = px.scatter_mapbox(df, lat="lat", lon="lon", color="mmsi", zoom=1, height=500)
fig.update_layout(
 mapbox_style="white-bg",
 mapbox_layers=[
 {
 "below": 'traces',
 "sourcetype": "raster",
 "sourceattribution": "United States Geological Survey",
 "source": [
 "https://basemap.nationalmap.gov/arcgis/rest/services/USGSImageryOnly/MapServer/tile/{z}/{y}/{x}"
 ]
 }
 ])
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()