Source code for opendrift.readers.interpolation.interpolators

import logging
import numpy as np
from scipy.ndimage import map_coordinates, grey_dilation
import logging; logging.captureWarnings(True); logger = logging.getLogger(__name__)
from scipy.interpolate import interp1d, LinearNDInterpolator

logger = logging.getLogger('opendrift')  # using common logger

[docs] def expand_numpy_array(data): if isinstance(data, np.ma.MaskedArray): logger.warning('Converting masked array to numpy array before interpolating') data = np.ma.filled(data, fill_value=np.nan) if not np.isfinite(data).any(): logger.warning('Only NaNs, returning') return mask = ~np.isfinite(data) data[mask] = np.finfo(np.float64).min data[mask] = grey_dilation(data, size=3)[mask] data[data==np.finfo(np.float64).min] = np.nan
########################### # 2D interpolator classes ###########################
[docs] class Nearest2DInterpolator(): def __init__(self, xgrid, ygrid, x, y): self.x = x self.y = y self.xi = (x - xgrid.min())/(xgrid.max()-xgrid.min())*len(xgrid) self.yi = (y - ygrid.min())/(ygrid.max()-ygrid.min())*len(ygrid) self.xi = np.round(self.xi).astype(np.uint32) self.yi = np.round(self.yi).astype(np.uint32) self.xi[self.xi >= len(xgrid)] = len(xgrid)-1 self.yi[self.yi >= len(ygrid)] = len(ygrid)-1
[docs] def __call__(self, array2d): return array2d[self.yi, self.xi]
[docs] class NDImage2DInterpolator(): def __init__(self, xgrid, ygrid, x, y): self.x = x self.y = y self.xi = (x - xgrid.min())/(xgrid.max()-xgrid.min())*len(xgrid) self.yi = (y - ygrid.min())/(ygrid.max()-ygrid.min())*len(ygrid)
[docs] def __call__(self, array2d): try: array2d = np.ma.array(array2d, mask=array2d.mask) array2d[array2d.mask] = np.nan # Gives holes except: pass return np.ma.masked_invalid( map_coordinates(array2d, [self.yi, self.xi], cval=np.nan, order=0))
[docs] class LinearND2DInterpolator(): logger = logging.getLogger('opendrift') def __init__(self, xgrid, ygrid, x, y): self.block_x, self.block_y = np.meshgrid(xgrid, ygrid) self.block_x = self.block_x.ravel() self.block_y = self.block_y.ravel() self.x = x self.y = y
[docs] def __call__(self, array2d): array_ravel = array2d.ravel() valid = np.isfinite(array_ravel) #if isinstance(array2d.mask, np.ndarray): # valid = ~array2d.ravel().mask #elif array2d.mask == False: # valid = np.ones(array_ravel.shape, dtype=bool) #elif array2d.mask == True: # valid = np.zeros(array_ravel.shape, dtype=bool) if hasattr(self, 'interpolator'): if not np.array_equal(valid, self.interpolator.valid): logger.debug('Cannot reuse interpolator - validity of ' 'array is different from original.') if hasattr(self, 'interpolator') and (np.array_equal( valid, self.interpolator.valid)): # Reuse stored interpolator with new data self.interpolator.values[:, 0] = \ (array_ravel[valid]) else: # Make new interpolator for given x,y self.interpolator = LinearNDInterpolator( (self.block_y[valid], self.block_x[valid]), array_ravel[valid]) # Store valid array, to determine if can be used again self.interpolator.valid = valid # Call interpolator to avoid threading-problem: # https://github.com/scipy/scipy/issues/8856 self.interpolator((0,0)) return self.interpolator(self.y, self.x)
[docs] class Linear2DInterpolator(): logger = logging.getLogger('opendrift') def __init__(self, xgrid, ygrid, x, y): self.x = x self.y = y self.xi = (x - xgrid[0])/(xgrid[-1]-xgrid[0])*(len(xgrid)-1) self.yi = (y - ygrid[0])/(ygrid[-1]-ygrid[0])*(len(ygrid)-1)
[docs] def __call__(self, array2d): if isinstance(array2d,np.ma.MaskedArray): logger.debug('Converting masked array to numpy array for interpolation') array2d = np.ma.filled(array2d, fill_value=np.nan) if not np.isfinite(array2d).any(): logger.warning('Only NaNs input to linearNDFast - returning') return np.nan*np.ones(len(self.xi)) # Fill NaN-values with nearby real values interp = map_coordinates(array2d, [self.yi, self.xi], cval=np.nan, order=1) missing = np.where(~np.isfinite(interp))[0] i=0 while len(missing) > 0: i += 1 if i > 10: logger.warning('Still NaN-values after 10 iterations, exiting!') return interp logger.debug('Linear2DInterpolator informational: NaN values for %i elements, expanding data %i' % (len(missing), i)) expand_numpy_array(array2d) interp[missing] = map_coordinates( array2d, [self.yi[missing], self.xi[missing]], cval=np.nan, order=1, mode='nearest') missing = np.where(~np.isfinite(interp))[0] return interp
horizontal_interpolation_methods = { 'nearest': Nearest2DInterpolator, 'ndimage': NDImage2DInterpolator, 'linearND': LinearND2DInterpolator, 'linearNDFast': Linear2DInterpolator} ########################### # 1D interpolator classes ###########################
[docs] class Nearest1DInterpolator(): def __init__(self, zgrid, z): # Truncating above and below z[z < zgrid.min()] = zgrid.min() z[z > zgrid.max()] = zgrid.max() # Interpolator zgrid -> index if zgrid[1] > zgrid[0]: # increasing z_interpolator = interp1d(zgrid, range(len(zgrid))) else: # decreasing values, must flip for interpolator z_interpolator = interp1d(zgrid[::-1], range(len(zgrid))[::-1]) # Indices corresponding to nearest value in zgrid self.zi = np.round(z_interpolator(z)).astype(np.uint8) self.zi[self.zi < 0] = 0 self.zi[self.zi >= len(zgrid)] = len(zgrid) - 1
[docs] def __call__(self, array2d): return array2d[self.zi, range(len(self.zi))]
[docs] class Linear1DInterpolator(): def __init__(self, zgrid, z): # Truncating above and below z[z < zgrid.min()] = zgrid.min() z[z > zgrid.max()] = zgrid.max() # Interpolator zgrid -> index if zgrid[1] > zgrid[0]: # increasing z_interpolator = interp1d(zgrid, range(len(zgrid))) else: # decreasing values, must flip for interpolator z_interpolator = interp1d(zgrid[::-1], range(len(zgrid))[::-1]) z_interpolator(z[0]) # to prevent threading issues # Indices corresponding to layers above and below interp_zi = z_interpolator(z) self.index_above = np.floor(interp_zi).astype(np.int8) self.index_above[self.index_above < 0] = 0 self.index_below = np.minimum(self.index_above + 1, len(zgrid) - 1) self.weight_above = 1 - (interp_zi - self.index_above) self.xi = range(len(z))
[docs] def __call__(self, array2d): return array2d[self.index_above, self.xi]*self.weight_above + \ array2d[self.index_below, self.xi]*(1 - self.weight_above)
vertical_interpolation_methods = { 'nearest': Nearest1DInterpolator, 'linear': Linear1DInterpolator}
[docs] def fill_NaN_towards_seafloor(array): """Extrapolate NaN-values (missing) towards seafloor""" filled = False for i in range(1, array.shape[0]): mask = np.isnan(array[i,:,:]) if np.sum(mask) > 0: array[i, mask] = array[i-1, mask] filled = True return filled