Source code for opendrift.readers.reader_netCDF_CF_generic

# This file is part of OpenDrift.
#
# OpenDrift is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 2
#
# OpenDrift is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenDrift.  If not, see <https://www.gnu.org/licenses/>.
#
# Copyright 2015, Knut-Frode Dagestad, MET Norway

from datetime import datetime
import pyproj
import numpy as np
from netCDF4 import num2date
import logging
logger = logging.getLogger(__name__)

import pandas as pd
import xarray as xr
from opendrift.readers.basereader import BaseReader, StructuredReader
from opendrift.readers import open_dataset_opendrift, datetime_from_variable


[docs] class Reader(StructuredReader, BaseReader): """ A reader for `CF-compliant <https://cfconventions.org/>`_ netCDF files. It can take a single file, a file pattern, a URL or an xarray Dataset. Args: :param filename: A single netCDF file, a pattern of files, or a xr.Dataset. The netCDF file can also be an URL to an OPeNDAP server. :type filename: string, xr.Dataset (required). :param name: Name of reader :type name: string, optional :param proj4: PROJ.4 string describing projection of data. :type proj4: string, optional Example: .. code:: from opendrift.readers.reader_netCDF_CF_generic import Reader r = Reader("arome_subset_16Nov2015.nc") Several files can be specified by using a pattern: .. code:: from opendrift.readers.reader_netCDF_CF_generic import Reader r = Reader("*.nc") An OPeNDAP URL can be used: .. code:: from opendrift.readers.reader_netCDF_CF_generic import Reader r = Reader('https://thredds.met.no/thredds/dodsC/mepslatest/meps_lagged_6_h_latest_2_5km_latest.nc') A xr.Dataset or a zarr dataset in an object store with auth can be used: .. code:: from opendrift.readers.reader_netCDF_CF_generic import Reader r = Reader(ds, zarr_storage_options) """ def __init__(self, filename=None, zarr_storage_options=None, name=None, proj4=None, standard_name_mapping={}, ensemble_member=None): #if isinstance(filename, xr.Dataset): # self.Dataset = filename # if name is not None: # self.name = name # elif hasattr(self.Dataset, 'name'): # self.name = self.Dataset.name # else: # self.name = str(filename) #else: # if zarr_storage_options is not None: # self.Dataset = xr.open_zarr(filename, storage_options=zarr_storage_options) # if name is None: # self.name = filename # else: # self.name = name # else: # if filename is None: # raise ValueError('Need filename as argument to constructor') # filestr = str(filename) # if name is None: # self.name = filestr # else: # self.name = name # try: # # Open file, check that everything is ok # logger.info('Opening dataset: ' + filestr) # if ('*' in filestr) or ('?' in filestr) or ('[' in filestr): # logger.info('Opening files with MFDataset') # self.Dataset = xr.open_mfdataset(filename, data_vars='minimal', coords='minimal', # chunks={'time': 1}, decode_times=False) # elif ensemble_member is not None: # self.Dataset = xr.open_dataset(filename, decode_times=False).isel(ensemble_member=ensemble_member) # else: # self.Dataset = xr.open_dataset(filename, decode_times=False) # except Exception as e: # raise ValueError(e) self.Dataset = open_dataset_opendrift(source=filename, zarr_storage_options=zarr_storage_options) if name is None: self.name = str(filename) else: self.name = name # NB: check below might not be waterproof if 'ocean_time' in self.Dataset.dims and 'eta_u' in self.Dataset.dims and \ 'eta_rho' in self.Dataset.dims: raise ValueError('This seems to be a ROMS native file, should use ROMS native reader instead') logger.debug('Finding coordinate variables.') if proj4 is not None: # If user has provided a projection apriori self.proj4 = proj4 # Find x, y and z coordinates lon_var_name = None lat_var_name = None self.unitfactor = 1 self.realizations = None self.ensemble_dimension = None self.dimensions = {} for var_name in self.Dataset.variables: var = self.Dataset.variables[var_name] if self.proj4 is None: if 'grid_mapping_name' in var.attrs: logger.debug( ('Parsing CF grid mapping dictionary:' ' ' + str(var.attrs))) try: # parse proj4 with pyproj.CRS crs = pyproj.CRS.from_cf(var.attrs) import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") self.proj4 = crs.to_proj4() except: logger.info('Could not parse CF grid_mapping') if self.proj4 is None: if 'proj4' in var.attrs: self.proj4 = str(var.attrs['proj4']) elif 'proj4_string' in var.attrs: self.proj4 = str(var.attrs['proj4_string']) standard_name = var.attrs['standard_name'] if 'standard_name' in var.attrs else '' long_name = var.attrs['long_name'] if 'long_name' in var.attrs else '' axis = var.attrs['axis'] if 'axis' in var.attrs else '' units = var.attrs['units'] if 'units' in var.attrs else '' CoordinateAxisType = var.attrs['CoordinateAxisType'] if 'CoordinateAxisType' in var.attrs else '' if standard_name == 'longitude' or \ CoordinateAxisType.lower() == 'lon' or \ long_name.lower() == 'longitude' or \ var_name.lower() in ['longitude', 'lon']: lon_var_name = var_name if standard_name == 'latitude' or \ CoordinateAxisType.lower() == 'lat' or \ long_name.lower() == 'latitude' or \ var_name.lower() in ['latitude', 'lat']: lat_var_name = var_name if (axis == 'X' or standard_name == 'projection_x_coordinate' or standard_name == 'grid_longitude') \ and var.ndim == 1: if len(var.dims)==1: self.dimensions['x'] = var.dims[0] # Fix for units; should ideally use udunits package if units == 'km': self.unitfactor = 1000 elif units == '100 km': self.unitfactor = 100000 var_data = var.values x = var_data*self.unitfactor if (axis == 'Y' or standard_name == 'projection_y_coordinate' or standard_name == 'grid_latitude') \ and var.ndim == 1: if len(var.dims)==1: self.dimensions['y'] = var.dims[0] # Fix for units; should ideally use udunits package if units == 'km': self.unitfactor = 1000 elif units == '100 km': self.unitfactor = 100000 var_data = var.values y = var_data*self.unitfactor if (standard_name == 'depth' or axis == 'Z') and var.ndim==1: var_data = var.values if len(var.dims)==1: self.dimensions['z'] = var.dims[0] if var_data.ndim == 1: # Earlier this was not a requirement above if 'positive' not in var.attrs or \ var.attrs['positive'] == 'up': self.z = var_data else: self.z = -var_data if standard_name == 'time' or axis == 'T' or var_name in ['time', 'vtime']: # Read and store time coverage (of this particular file) #var_data = var.values #time = var_data #time_units = units if len(var.dims)==1: self.dimensions['time'] = var.dims[0] #if isinstance(time[0], np.bytes_): # # This hack is probably only necessary for CERSAT/GELOBCURRENT # time = [t.decode('ascii') for t in time] # self.times = [datetime.fromisoformat(t.replace('Z', '')) for t in time] #elif time.ndim == 2: # self.times = [datetime.fromisoformat(''.join(t).replace('Z', '')) for t in time.astype(str)] #else: # if 'calendar' in var.attrs: # calendar = var.attrs['calendar'] # else: # calendar = 'standard' # if np.issubdtype(var.dtype, np.datetime64): # import pandas as pd # self.times = [pd.to_datetime(str(d)) for d in time] # else: # self.times = num2date(time, time_units, calendar=calendar) #self.times = pd.to_datetime(var).to_pydatetime() self.times = datetime_from_variable(var) self.start_time = self.times[0] self.end_time = self.times[-1] if len(self.times) > 1: self.time_step = self.times[1] - self.times[0] else: self.time_step = None if standard_name == 'realization': if ensemble_member == None: var_data = np.atleast_1d(var.values) self.realizations = var_data logger.debug('%i ensemble members available' % len(self.realizations)) if len(var.dims)==1: self.ensemble_dimension = var.dims[0] # Temporary workaround for Barents EPS model if self.realizations is None and 'ensemble_member' in self.Dataset.dims: self.realizations = np.arange(self.Dataset.dims['ensemble_member']) ######################### # Summary of geolocation ######################### if 'x' in locals(): if x[1]-x[0] == 1 and y[1]-y[0] == 1: logger.info('deltaX and deltaY are 1, interpreting dataset as unprojected') projected = False else: projected = True if 'x' not in locals() or projected is False: # No x/y-coordinates were detected if lon_var_name is None: raise ValueError('No geospatial coordinates were detected, cannot geolocate dataset') # We load lon and lat arrays into memory lon_var = self.Dataset.variables[lon_var_name] lat_var = self.Dataset.variables[lat_var_name] if lon_var.ndim == 1: logger.debug('Lon and lat are 1D arrays - using as projection coordinates') x = lon_var.data y = lat_var.data self.dimensions['x'] = lon_var.dims[0] self.dimensions['y'] = lat_var.dims[0] if self.proj4 is None: self.proj4 = '+proj=latlong' elif lon_var.ndim == 2: logger.debug('Lon and lat are 2D arrays - dataset is unprojected') self.lon = lon_var.data self.lat = lat_var.data # Check if 2D arrays are repeated 1D arrays if np.array_equal(self.lon[0,:], self.lon[-1,:]) and np.array_equal(self.lat[:,0], self.lat[:,-1]): logger.info('Lon and lat are repeated 1D arrays - i.e. lonlat projection') x = self.lon[0,:] y = self.lat[:,0] self.dimensions['x'] = lon_var.dims[1] self.dimensions['y'] = lat_var.dims[0] if self.proj4 is None: self.proj4 = '+proj=latlong' else: self.dimensions['x'] = lon_var.dims[0] self.dimensions['y'] = lat_var.dims[1] self.projected = False self.proj = None self.proj4 = None elif lon_var.ndim == 3: logger.debug('Lon lat are 3D arrays, reading first time') self.lon = lon_var[0,:,:].data self.lat = lat_var[0,:,:].data self.dimensions['x'] = lon_var.dims[1] self.dimensions['y'] = lat_var.dims[2] self.projected = False self.proj = None self.proj4 = None else: if self.proj4 is None: logger.info('Grid coordinates are detected, but proj4 string not given: assuming latlong') self.proj4 = '+proj=latlong' if 'x' in locals() and 'y' in locals(): self.numx = len(x) self.numy = len(y) self.xmin, self.xmax = x.min(), x.max() self.ymin, self.ymax = y.min(), y.max() self.delta_x = np.abs(x[1] - x[0]) self.delta_y = np.abs(y[1] - y[0]) rel_delta_x = (x[1::] - x[0:-1]) rel_delta_x = np.abs((rel_delta_x.max() - rel_delta_x.min())/self.delta_x) rel_delta_y = (y[1::] - y[0:-1]) rel_delta_y = np.abs((rel_delta_y.max() - rel_delta_y.min())/self.delta_y) if rel_delta_x > 0.05: # Allow 5 % deviation print(rel_delta_x) print(x[1::] - x[0:-1]) raise ValueError('delta_x is not constant!') if rel_delta_y > 0.05: print(rel_delta_y) print(y[1::] - y[0:-1]) raise ValueError('delta_y is not constant!') self.x = x # Store coordinate vectors self.y = y if self.proj4 is not None and 'latlong' in self.proj4 and self.xmax is not None and self.xmax > 360: logger.info('Longitudes > 360 degrees, subtracting 360') self.xmin -= 360 self.xmax -= 360 self.x -= 360 logger.info(f'Detected dimensions: {self.dimensions}') ########################################## # Find all variables having standard_name ########################################## self.variable_mapping = {} skipvars = [] for var_name in self.Dataset.variables: var = self.Dataset.variables[var_name] if var.ndim < 2: continue if var_name in standard_name_mapping: # User may specify mapping if standard_name is missing, or to override existing standard_name = standard_name_mapping[var_name] self.variable_mapping[standard_name] = str(var_name) elif 'standard_name' in var.attrs and 'hybrid' not in var.dims: # Skipping hybrid dim is workaround to prevent parsing upper winds from ECMWF # A permanent solution for selecting correct variable is needed standard_name = str(var.attrs['standard_name']) if standard_name in self.variable_aliases: # Mapping if needed standard_name = self.variable_aliases[standard_name] self.variable_mapping[standard_name] = str(var_name) else: skipvars.append(var_name) # Necessary hack for ECMWF with winds at several levels # TODO: must provide variable mapping along with URLs to create readers if self.variable_mapping.get('x_wind') is not None: if 'x_wind_200m' in self.Dataset.variables and 'x_wind_10m' in self.Dataset.variables: logger.warning('Shifting variable names of ECMWF data, to get winds at 10m') self.variable_mapping['x_wind'] = 'x_wind_10m' self.variable_mapping['y_wind'] = 'y_wind_10m' if self.dimensions.get('y') == 'latitude1' and 'latitude' in self.Dataset.variables: self.dimensions['x'] = 'longitude' self.dimensions['y'] = 'latitude' if len(skipvars) > 0: logger.debug('Skipped variables without standard_name: %s' % skipvars) self.variables = list(self.variable_mapping.keys()) for vn, va in self.variable_mapping.items(): if vn == 'sea_floor_depth_below_sea_level': var = self.Dataset.variables[va] if 'ensemble_member' in var.dims: # Workaround for datasets with unnecessary ensemble dimension for static variables logger.info(f'Removing ensemble dimension from {vn}') var = var.isel(ensemble_member=0).squeeze() self.Dataset[va] = var # Run constructor of parent Reader class super().__init__()
[docs] def get_variables(self, requested_variables, time=None, x=None, y=None, z=None, indrealization=None): requested_variables, time, x, y, z, _outside = self.check_arguments( requested_variables, time, x, y, z) nearestTime, dummy1, dummy2, indxTime, dummy3, dummy4 = \ self.nearest_time(time) if hasattr(self, 'z') and (z is not None): # Find z-index range # NB: may need to flip if self.z is ascending indices = np.searchsorted(-self.z, [-z.min(), -z.max()]) indz = np.arange(np.maximum(0, indices.min() - 1 - self.verticalbuffer), np.minimum(len(self.z), indices.max() + 1 + self.verticalbuffer)) if len(indz) == 1: indz = indz[0] # Extract integer to read only one layer else: indz = 0 if indrealization == None: if self.realizations is not None: indrealization = range(len(self.realizations)) else: indrealization = None # Find indices corresponding to requested x and y if hasattr(self, 'clipped'): clipped = self.clipped else: clipped = 0 buffer = self.buffer # Adding buffer, to cover also future positions of elements indy = np.floor(np.abs(y-self.y[0])/self.delta_y-clipped).astype(int) + clipped indy = np.arange(np.max([0, indy.min()-buffer]), np.min([indy.max()+buffer, self.numy])) if self.global_coverage(): # Treatment of cyclic longitudes (x-coordinate) if self.lon_range() == '0to360': x = np.mod(x, 360) # Shift x/lons to 0-360 elif self.lon_range() == '-180to180': x = np.mod(x + 180, 360) - 180 # Shift x/lons to -180-180 indx = np.floor(np.abs(x-self.x[0])/self.delta_x-clipped).astype(int) + clipped split = False if self.global_coverage(): # Check if need to split in two blocks uniqx = np.unique(indx) diff_xind = np.diff(uniqx) # We split if >800 pixels between left/west and right/east blocks if len(diff_xind)>1 and diff_xind.max() > np.minimum(800, 0.6*self.numx): logger.debug('Requested data block crosses lon-border, reading and concatinating two parts') split = True splitind = np.argmax(diff_xind) indx_left = np.arange(0, uniqx[splitind] + buffer) indx_right = np.arange(uniqx[splitind+1] - buffer, self.numx) indx = np.concatenate((indx_right, indx_left)) if split is False: indx = np.arange(np.maximum(0, indx.min()-buffer), np.minimum(indx.max()+buffer+1, self.numx)) variables = {} for par in requested_variables: if hasattr(self, 'rotate_mapping') and par in self.rotate_mapping: logger.debug('Using %s to retrieve %s' % (self.rotate_mapping[par], par)) if par not in self.variable_mapping: self.variable_mapping[par] = \ self.variable_mapping[ self.rotate_mapping[par]] var = self.Dataset.variables[self.variable_mapping[par]] ensemble_dim = None dimindices = {'x': indx, 'y': indy, 'time': indxTime, 'z': indz} dimorder = list(var.dims) xnum = dimorder.index(self.dimensions['x']) ynum = dimorder.index(self.dimensions['y']) if xnum < ynum: # We must have y before x, since returning numpy arrays and not Xarrays logger.debug(f'Swapping order of x-y dimensions for {par}') dimorder[xnum] = self.dimensions['y'] dimorder[ynum] = self.dimensions['x'] var = var.permute_dims(*dimorder) if split is False: if True: # new dynamic way subset = {vdim:dimindices[dim] for dim,vdim in self.dimensions.items() if vdim in var.dims} variables[par] = var.isel(subset) # Remove any unknown dimensions for dim in variables[par].dims: if dim not in self.dimensions.values() and dim != self.ensemble_dimension: logger.debug(f'Removing unknown dimension: {dim}') variables[par] = variables[par].squeeze(dim=dim) if self.ensemble_dimension is not None and self.ensemble_dimension in variables[par].dims: ensemble_dim = 0 # hardcoded, may not work for MEPS #else: # old hardcoded way, to be removed # if var.ndim == 2: # variables[par] = var[indy, indx] # elif var.ndim == 3: # variables[par] = var[indxTime, indy, indx] # elif var.ndim == 4: # variables[par] = var[indxTime, indz, indy, indx] # elif var.ndim == 5: # Ensemble data # variables[par] = var[indxTime, indz, indrealization, indy, indx] # ensemble_dim = 0 # Hardcoded ensemble dimension for now # else: # raise Exception('Wrong dimension of variable: ' + # self.variable_mapping[par]) # The below should also be updated to dynamic subsetting else: # We need to read left and right parts separately d_left = dimindices.copy() d_right = dimindices.copy() d_left.update({'x': indx_left}) d_right.update({'x': indx_right}) subset_left = {vdim:d_left[dim] for dim,vdim in self.dimensions.items() if vdim in var.dims} subset_right = {vdim:d_right[dim] for dim,vdim in self.dimensions.items() if vdim in var.dims} left = var.isel(subset_left) right = var.isel(subset_right) #if var.ndim == 2: # left = var[indy, indx_left] # right = var[indy, indx_right] # variables[par] = xr.Variable.concat([left, right], dim=self.dimensions['x']) #elif var.ndim == 3: # left = var[indxTime, indy, indx_left] # right = var[indxTime, indy, indx_right] # variables[par] = xr.Variable.concat([left, right], dim=self.dimensions['x']) #elif var.ndim == 4: # left = var[indxTime, indz, indy, indx_left] # right = var[indxTime, indz, indy, indx_right] # variables[par] = xr.Variable.concat([left, right], dim=self.dimensions['x']) #elif var.ndim == 5: # Ensemble data # left = var[indxTime, indz, indrealization, # indy, indx_left] # right = var[indxTime, indz, indrealization, # indy, indx_right] # variables[par] = xr.Variable.concat([left, right], dim=self.dimensions['x']) #variables[par] = xr.Variable.concat([left, right], dim=self.dimensions['x']) variables[par] = xr.Variable.concat([right, left], dim=self.dimensions['x']) variables[par] = np.ma.masked_invalid(variables[par]) # Mask values outside domain variables[par] = np.ma.array(variables[par], ndmin=2, mask=False) # Mask extreme values which might have slipped through with np.errstate(invalid='ignore'): variables[par] = np.ma.masked_outside( variables[par], -30000, 30000) # Ensemble blocks are split into lists if ensemble_dim is not None: num_ensembles = variables[par].shape[ensemble_dim] logger.debug(f'Num ensembles for {par}: {num_ensembles}') newvar = [0]*num_ensembles for ensemble_num in range(num_ensembles): newvar[ensemble_num] = \ np.take(variables[par], ensemble_num, ensemble_dim) variables[par] = newvar # Store coordinates of returned points try: variables['z'] = self.z[indz] except: variables['z'] = None if self.projected is True: variables['x'] = self.x[indx] variables['y'] = self.y[indy] if split is True and variables['x'][0] > variables['x'][-1]: # We need to shift so that x-coordinate (longitude) is continous if self.lon_range() == '-180to180': variables['x'][variables['x']>0] -= 360 elif self.lon_range() == '0to360': variables['x'][variables['x']<0] += 360 else: variables['x'] = indx variables['y'] = indy variables['x'] = np.asarray(variables['x'], dtype=np.float32) variables['y'] = np.asarray(variables['y'], dtype=np.float32) variables['time'] = nearestTime # Rotate any east/north vectors if necessary if hasattr(self, 'rotate_mapping'): if self.y_is_north() is True: logger.debug('North is up, no rotation necessary') else: rx, ry = np.meshgrid(variables['x'], variables['y']) lon, lat = self.xy2lonlat(rx, ry) from opendrift.readers.basereader import vector_pairs_xy for vectorpair in vector_pairs_xy: if vectorpair[0] in self.rotate_mapping and vectorpair[0] in variables.keys(): if self.proj.__class__.__name__ == 'fakeproj': logger.warning('Rotation from fakeproj is not yet implemented, skipping.') continue logger.debug(f'Rotating vector from east/north to xy orientation: {vectorpair[0:2]}') variables[vectorpair[0]], variables[vectorpair[1]] = self.rotate_vectors( lon, lat, variables[vectorpair[0]], variables[vectorpair[1]], pyproj.Proj('+proj=latlong'), self.proj) if hasattr(self, 'shift_x'): # "hidden feature": if reader.shift_x and reader.shift_y are defined, # the returned fields are shifted this many meters in the x- and y directions # E.g. reader.shift_x=10000 gives a shift 10 km eastwards (if x is east direction) if self.proj.crs.is_geographic: # meters to degrees shift_y = (self.shift_y/111000) shift_x = (self.shift_x/111000)*np.cos(np.radians(variables['y'])) logger.info('Shifting x between %s and %s' % (shift_x.min(), shift_x.max())) logger.info('Shifting y with %s m' % shift_y) else: shift_x = self.shift_x shift_y = self.shift_y logger.info('Shifting x with %s m' % shift_x) logger.info('Shifting y with %s m' % shift_y) variables['x'] += shift_x variables['y'] += shift_y return variables