import sys
import os
from datetime import datetime, timedelta
import logging; logging.captureWarnings(True); logger = logging.getLogger(__name__)
import string
from shutil import move
import pandas as pd
import numpy as np
from netCDF4 import Dataset, num2date, date2num
from opendrift.models.basemodel import Mode
# Module with functions to export/import trajectory data to/from netCDF file
# Strives to be compliant with netCDF CF-convention on trajectories
# https://cfconventions.org/Data/cf-conventions/cf-conventions-1.6/build/cf-conventions.html#idp8377728
# https://geo-ide.noaa.gov/wiki/index.php?title=NODC_NetCDF_Trajectory_Template
skip_parameters = ['ID'] # Do not write to file
[docs]
def init(self, filename):
self.outfile_name = filename
self.outfile = Dataset(filename, 'w')
self.outfile.createDimension('trajectory', self.num_elements_total())
self.outfile.createVariable('trajectory', 'i4', ('trajectory',))
self.outfile.createDimension('time', None) # Unlimited time dimension
self.outfile.createVariable('time', 'f8', ('time',))
# NB: trajectory_id must be changed for future ragged array representation
self.outfile.variables['trajectory'][:] = \
np.arange(self.num_elements_total())+1
self.outfile.variables['trajectory'].cf_role = 'trajectory_id'
self.outfile.variables['trajectory'].units = '1'
self.outfile.Conventions = 'CF-1.11, ACDD-1.3'
self.outfile.standard_name_vocabulary = 'CF Standard Name Table v85'
self.outfile.featureType = 'trajectory'
self.outfile.title = 'OpenDrift trajectory simulation'
self.outfile.summary = 'Output from simulation with OpenDrift framework'
self.outfile.keywords = 'trajectory, drift, lagrangian, simulation'
self.outfile.history = 'Created ' + str(datetime.now())
self.outfile.date_created = datetime.now().isoformat()
self.outfile.source = 'Output from simulation with OpenDrift'
self.outfile.model_url = 'https://github.com/OpenDrift/opendrift'
self.outfile.opendrift_class = self.__class__.__name__
self.outfile.opendrift_module = self.__class__.__module__
self.outfile.readers = str(self.env.readers.keys())
self.outfile.time_coverage_start = str(self.start_time)
self.outfile.time_step_calculation = str(self.time_step)
self.outfile.time_step_output = str(self.time_step_output)
# Time
self.timeStr = 'seconds since 1970-01-01 00:00:00'
self.outfile.variables['time'].units = self.timeStr
self.outfile.variables['time'].standard_name = 'time'
self.outfile.variables['time'].long_name = 'time'
# Apparently axis attribute shall not be given for time, lon and lat
#self.outfile.variables['time'].axis = 'T'
# Write config settings
for key in self._config:
value = self.get_config(key)
if isinstance(value, (bool, type(None))):
value = str(value)
self.outfile.setncattr('config_' + key, value)
# Write additionaly metadata attributes, if given
if hasattr(self, 'metadata_dict'):
for key, value in self.metadata_dict.items():
self.outfile.setncattr(key, str(value))
# Add all element properties as variables
for prop in self.history.dtype.fields:
if prop in skip_parameters:
continue
# Note: Should use 'f8' if 'f4' is not accurate enough,
# at expense of larger files
try:
dtype = self.history.dtype[prop]
except:
dtype = 'f4'
if np.issubdtype(dtype, np.integer):
fill=-999
else:
fill = np.nan
var = self.outfile.createVariable(prop, dtype, ('trajectory', 'time'), fill_value=fill)
for subprop in self.history_metadata[prop].items():
if subprop[0] not in ['dtype', 'constant', 'default', 'seed']:
# Apparently axis attribute shall not be given for lon and lat:
if prop in ['lon', 'lat'] and subprop[0] == 'axis':
continue
var.setncattr(subprop[0], subprop[1])
[docs]
def write_buffer(self):
if self.outfile._isopen == 0:
self.outfile = Dataset(self.outfile_name, 'a')
num_steps_to_export = self.steps_output - self.steps_exported
for prop in self.history_metadata:
if prop in skip_parameters:
continue
var = self.outfile.variables[prop]
var[:, self.steps_exported:self.steps_exported+num_steps_to_export] = \
self.history[prop][:, 0:num_steps_to_export]
times = [self.start_time + n*self.time_step_output for n in
range(self.steps_exported, self.steps_output)]
self.outfile.variables['time'][
self.steps_exported:self.steps_exported+len(times)] = \
date2num(times, self.timeStr)
# Write status categories metadata
# TODO: need not be written each output timestep, thus this could be deleted?
#status_dtype = self.ElementType.variables['status']['dtype']
#self.outfile.variables['status'].valid_range = np.array(
# (0, len(self.status_categories) - 1)).astype(status_dtype)
#self.outfile.variables['status'].flag_values = \
# np.array(np.arange(len(self.status_categories)), dtype=status_dtype)
#self.outfile.variables['status'].flag_meanings = \
# " ".join(self.status_categories)
logger.info('Wrote %s steps to file %s' % (num_steps_to_export,
self.outfile_name))
self.history.mask = True # Reset history array, for new data
self.steps_exported = self.steps_exported + num_steps_to_export
self.outfile.steps_exported = self.steps_exported
self.outfile.sync() # Flush from memory to disk
self.outfile.close() # close file temporarily
[docs]
def close(self):
self.outfile = Dataset(self.outfile_name, 'a')
# Write status categories metadata
status_dtype = self.ElementType.variables['status']['dtype']
self.outfile.variables['status'].valid_range = np.array(
(0, len(self.status_categories) - 1)).astype(status_dtype)
self.outfile.variables['status'].flag_values = \
np.array(np.arange(len(self.status_categories)), dtype=status_dtype)
self.outfile.variables['status'].flag_meanings = \
" ".join(self.status_categories)
# Write origin_marker definitions
if 'origin_marker' in self.outfile.variables:
self.outfile.variables['origin_marker'].flag_values = \
np.array(np.arange(len(self.origin_marker)))
self.outfile.variables['origin_marker'].flag_meanings = \
" ".join(self.origin_marker.values())
# Write final timesteps to file
self.outfile.time_coverage_end = str(self.time)
self.outfile.time_coverage_duration = pd.Timedelta(self.time-self.start_time).isoformat()
self.outfile.time_coverage_resolution = pd.Timedelta(self.time_step).isoformat()
# Write performance data
self.outfile.performance = self.performance()
# Write metadata items anew, if any are added during simulation
if hasattr(self, 'metadata_dict'):
for key, value in self.metadata_dict.items():
self.outfile.setncattr(key, str(value))
# Write min and max values as variable attributes
for var in self.history_metadata:
if var in self.outfile.variables:
self.outfile.variables[var].setncattr('minval', self.minvals[var])
self.outfile.variables[var].setncattr('maxval', self.maxvals[var])
# Write bounds metadata
self.outfile.geospatial_bounds_crs = 'EPSG:4326'
self.outfile.geospatial_bounds_vertical_crs = 'EPSG:5831'
self.outfile.geospatial_lat_min = self.minvals['lat']
self.outfile.geospatial_lat_max = self.maxvals['lat']
self.outfile.geospatial_lat_units = 'degrees_north'
self.outfile.geospatial_lat_resolution = 'point'
self.outfile.geospatial_lon_min = self.minvals['lon']
self.outfile.geospatial_lon_max = self.maxvals['lon']
self.outfile.geospatial_lon_units = 'degrees_east'
self.outfile.geospatial_lon_resolution = 'point'
if 'z' in self.minvals:
self.outfile.geospatial_vertical_min = self.minvals['z']
self.outfile.geospatial_vertical_max = self.maxvals['z']
self.outfile.geospatial_vertical_positive = 'up'
self.outfile.runtime = str(self.timing['total time'])
self.outfile.close() # Finally close file
# Finally changing UNLIMITED time dimension to fixed, for CDM compliance.
# Fortunately this is quite fast.
# https://www.unidata.ucar.edu/software/thredds/current/netcdf-java/reference/FeatureDatasets/CFpointImplement.html
try:
logger.debug('Making netCDF file CDM compliant with fixed dimensions')
if self.num_elements_scheduled() > 0:
logger.info('Removing %i unseeded elements already written to file' % self.num_elements_scheduled())
mask = np.ones(self.history.shape[0], dtype=bool)
mask[self.elements_scheduled.ID-1] = False
with Dataset(self.outfile_name) as src, \
Dataset(self.outfile_name + '_tmp', 'w') as dst:
for name, dimension in src.dimensions.items():
if name=='trajectory':
# Truncate dimension length to number actually seeded
dst.createDimension(name, self.num_elements_activated())
else:
dst.createDimension(name, len(dimension))
for name, variable in src.variables.items():
if '_FillValue' in variable.ncattrs():
fill = variable.getncattr('_FillValue')
else:
fill = None
dstVar = dst.createVariable(name, variable.datatype,
variable.dimensions, fill_value=fill)
srcVar = src.variables[name]
# Truncate data to number actually seeded
if 'trajectory' in variable.dimensions:
if self.num_elements_scheduled() > 0:
if len(variable.dimensions) == 2:
dstVar[:] = srcVar[mask, :]
else:
dstVar[:] = srcVar[mask] # Copy data
else:
dstVar[:] = srcVar[:]
else:
dstVar[:] = srcVar[:]
for att in src.variables[name].ncattrs():
# Copy variable attributes
if att in ['_FillValue']:
continue
dstVar.setncattr(att, srcVar.getncattr(att))
for att in src.ncattrs(): # Copy global attributes
dst.setncattr(att, src.getncattr(att))
move(self.outfile_name + '_tmp', self.outfile_name) # Replace original
except Exception as me:
print(me)
print('Could not convert netCDF file from unlimited to fixed dimension. Could be due to netCDF library incompatibility(?)')
[docs]
def import_file_xarray(self, filename, chunks, elements=None):
import xarray as xr
logger.debug('Importing with Xarray from ' + filename)
self.ds = xr.open_dataset(filename, chunks=chunks)
if elements is not None:
self.ds = self.ds.isel(trajectory=elements)
self.steps_output = len(self.ds.time)
ts0 = (self.ds.time[0] - np.datetime64('1970-01-01T00:00:00')) / np.timedelta64(1, 's')
self.start_time = datetime.utcfromtimestamp(float(ts0))
tse = (self.ds.time[-1] - np.datetime64('1970-01-01T00:00:00')) / np.timedelta64(1, 's')
self.end_time = datetime.utcfromtimestamp(float(tse))
if len(self.ds.time) > 1:
ts1 = (self.ds.time[1] - np.datetime64('1970-01-01T00:00:00')) / np.timedelta64(1, 's')
self.time_step_output = timedelta(seconds=float(ts1 - ts0))
self.time = self.end_time # Using end time as default
self.status_categories = self.ds.status.flag_meanings.split()
if 'origin_marker' in self.ds.variables :
if 'flag_meanings' in self.ds.origin_marker.attrs:
self.origin_marker = [s.replace('_', ' ') for s in self.ds.origin_marker.flag_meanings.split()]
num_elements = len(self.ds.trajectory)
elements=np.arange(num_elements)
# Data types for variables
dtype = np.dtype([(var[0], var[1]['dtype'])
for var in self.ElementType.variables.items()])
history_dtype_fields = [
(name, self.ElementType.variables[name]['dtype'])
for name in self.ElementType.variables]
# Add environment variables
self.history_metadata = self.ElementType.variables.copy()
for env_var in self.required_variables:
if env_var in self.ds.variables:
history_dtype_fields.append((env_var, np.dtype('float32')))
self.history_metadata[env_var] = {}
history_dtype = np.dtype(history_dtype_fields)
# Masking where elements are not been seeded
# TODO: mask from deactivation towards end
for da in ['lon', 'lat']:
self.ds[da] = self.ds[da].where(self.ds.status>=0)
if 'minval' in self.ds.lon.attrs:
self.lonmin = np.float32(self.ds.lon.minval)
self.latmin = np.float32(self.ds.lat.minval)
self.lonmax = np.float32(self.ds.lon.maxval)
self.latmax = np.float32(self.ds.lat.maxval)
self.mode = Mode.Result
[docs]
def import_file(self, filename, times=None, elements=None, load_history=True):
"""Create OpenDrift object from imported file.
times: indices of time steps to be imported, must be contineous range.
elements: indices of elements to be imported
"""
logger.debug('Importing from ' + filename)
infile = Dataset(filename, 'r')
# 'times' can be used to import subset. Not yet implemented.
if times is None and hasattr(infile, 'steps_exported'):
self.steps_output = infile.steps_exported
times = np.arange(infile.steps_exported)
else:
#self.steps_output = len(infile.dimensions['time'])
if times is not None:
self.steps_output = len(times)
else:
times = np.arange(len(infile.dimensions['time']))
self.steps_output = len(times)
filetime = infile.variables['time'][times]
units = infile.variables['time'].units
self.start_time = num2date(filetime[0], units)
if len(filetime) > 1:
self.end_time = num2date(filetime[self.steps_output-1], units) # Why -1?
self.time_step_output = num2date(filetime[1], units) - self.start_time
else:
self.time_step_output = timedelta(hours=1)
self.end_time = self.start_time
self.time = self.end_time # Using end time as default
self.status_categories = infile.variables['status'].flag_meanings.split()
if elements is None:
num_elements = len(infile.dimensions['trajectory'])
elements=np.arange(num_elements)
else:
num_elements=len(elements)
dtype = np.dtype([(var[0], var[1]['dtype'])
for var in self.ElementType.variables.items()])
history_dtype_fields = []
self.history_metadata = self.ElementType.variables.copy()
for env_var in infile.variables:
history_dtype_fields.append((env_var, np.dtype('float32')))
self.history_metadata[env_var] = {}
history_dtype = np.dtype(history_dtype_fields)
# Import dataset (history)
status = infile.variables['status'][elements, times]
firstlast = np.ma.notmasked_edges(status, axis=1)
index_of_last = firstlast[1][1]
actual_num_elements = len(index_of_last)
if actual_num_elements < num_elements:
num_elements = actual_num_elements
elements = firstlast[0][0]
logger.warning('A subset is requested, and number of active elements is %d'
% num_elements)
if load_history is True:
self.history = np.ma.array(
np.zeros([num_elements, self.steps_output]),
dtype=history_dtype)
self.history[:] = np.ma.masked
for var in infile.variables:
if var in ['time', 'trajectory']:
continue
try:
self.history[var] = infile.variables[var][elements, times]
except Exception as e:
logger.info(e)
pass
# Initialise elements from given (or last) state/time
firstlast = np.ma.notmasked_edges(self.history['status'], axis=1)
index_of_last = firstlast[1][1]
kwargs = {}
for var in infile.variables:
if var in self.ElementType.variables:
kwargs[var] = self.history[var][
np.arange(num_elements), index_of_last]
#kwargs['ID'] = np.arange(num_elements) + 1
kwargs['ID'] = np.array(list(elements)) + 1
self.elements = self.ElementType(**kwargs)
self.elements_deactivated = self.ElementType()
else:
self.history = None
logger.warning('Not importing history')
# Remove elements which are scheduled for deactivation
self.remove_deactivated_elements()
# Import and apply config settings
attributes = infile.ncattrs()
self.mode = Mode.Config # To allow setting config
for attr in attributes:
if attr.startswith('config_'):
value = infile.getncattr(attr)
conf_key = attr[7:]
if value == 'True':
value = True
if value == 'False':
value = False
if value == 'None':
value = None
try:
self.set_config(conf_key, value)
logger.debug('Setting imported config: %s -> %s' %
(conf_key, value))
except Exception as e:
logger.warning(e)
logger.warning('Could not set config: %s -> %s' %
(conf_key, value))
self.mode = Mode.Result
# Import time steps from metadata
def timedelta_from_string(timestring):
if 'day' in timestring:
days = int(timestring.split('day')[0])
hs = timestring.split(' ')[-1]
th = datetime.strptime(hs, '%H:%M:%S')
return timedelta(days=days, hours=th.hour, minutes=th.minute, seconds=th.second)
else:
t = datetime.strptime(timestring, '%H:%M:%S')
return timedelta(
hours=t.hour, minutes=t.minute, seconds=t.second)
try:
self.time_step = timedelta_from_string(infile.time_step_calculation)
self.time_step_output = timedelta_from_string(infile.time_step_output)
except Exception as e:
logger.warning(e)
logger.warning('Could not parse time_steps from netCDF file')
infile.close()