from types import LambdaType
from ..basereader import BaseReader
import numpy as np
import matplotlib.pyplot as plt
import pyproj
[docs]
def none_or_cmp(a, b, cmp):
if a is None:
return b
if b is None:
return a
return cmp(a, b)
[docs]
class Combined(BaseReader):
"""
A combination of two readers.
"""
a: BaseReader
b: BaseReader
op: LambdaType
def __init__(self, a, b, op, op_type = "easy", external_params = None):
'''Combine two readers a and b followinf the operator op. If needed,
you can ad an op_type that will enable you to use the external parameters
you want in your op between lines 63 and 73.'''
self.a = a
self.b = b
self.op = op
self.op_type = op_type
self.external_params = external_params
if isinstance(b, list):
self.variables = set(self.a.variables)
self.start_time = self.a.start_time
self.end_time = self.a.end_time
self.name = f'Combined({a.name}'
for reader in self.b:
self.variables = self.variables.intersection(reader.variables)
self.start_time = none_or_cmp(self.start_time, reader.start_time, max)
self.end_time = none_or_cmp(self.end_time, reader.end_time, min)
self.name = self.name + f' | {reader.name})'
self.variables = list(self.variables)
else:
self.variables = list(set(self.a.variables).intersection(self.b.variables))
self.start_time = none_or_cmp(self.a.start_time, self.b.start_time, max)
self.end_time = none_or_cmp(self.a.end_time, self.b.end_time, min)
self.name = f'Combined({a.name} | {b.name})'
self.xmin = -180
self.xmax = 180
self.ymin = -90
self.ymax = 90
self.proj4 = '+proj=latlong'
self.projected = None
super().__init__()
[docs]
def covers_positions(self, lon, lat):
if isinstance(self.b, list):
res = self.a.covers_positions(lon, lat)
for reader in self.b:
res = np.intersect1d(res, reader.covers_positions(lon, lat))
else :
res = np.intersect1d(self.a.covers_positions(lon, lat), self.b.covers_positions(lon, lat))
return res
[docs]
def covers_time(self, time):
if isinstance(self.b, list):
res = self.a.covers_time(time)
for reader in self.b:
res = res and reader.covers_time(time)
else :
res = self.a.covers_time(time) and self.b.covers_time(time)
return res
[docs]
def get_variables_interpolated(self, variables, profiles=None, profiles_depth=None,time=None,lon=None, lat=None, z=None,rotate_to_proj=None):
assert set(variables).issubset(self.variables), f"{variables} is not subset of {self.variables}"
if isinstance(self.b, list):
N = len(self.b)
env_a, env_profiles_a = self.a.get_variables_interpolated(variables, time=time,lon=lon, lat=lat, z=z)
#TODO Maybe change structure of profiles_b if there is a list
env_b = {}
env_profiles_b = {}
temp = []
for reader in self.b:
temp.append(reader.get_variables_interpolated(variables,time=time,lon=lon, lat=lat, z=z))
for var in variables:
env_b[var] = np.array([env_i[var] for env_i, _ in temp])
profiles_temp = []
for _, env_profile_i in temp:
if env_profile_i == None:
profiles_temp.append(None)
else:
profiles_temp.append(env_profile_i[var])
env_profiles_b[var] = np.array(profiles_temp)
else:
N = -1
env_a, env_profiles_a = self.a.get_variables_interpolated(variables, time=time,lon=lon, lat=lat, z=z)
env_b, env_profiles_b = self.b.get_variables_interpolated(variables,time=time,lon=lon, lat=lat, z=z)
variables = [ var for var in env_a.keys() if var not in ['x', 'y', 'z', 'time'] ]
#Making disctinction between easy functions or more complex ones that need some external parameters
env_c = {}
for var in variables:
# Easy
if self.op_type == "easy":
if N == -1:
env_c[var] = self.op(env_a[var], env_b[var])
else:
for i in range(N):
env_c[var] = self.op(env_c[var], env_b[i][var])
#Gaussian
elif self.op_type == "combine_gaussian":
if N == -1:
lon_center, lat_center = self.b.lon, self.b.lat
std = self.external_params
exponential_factor= self.op(lon, lat, lon_center, lat_center, std)
env_c[var] = env_a[var] * (1 - exponential_factor) + env_b[var] * exponential_factor
else:
std = self.external_params
lon_centers, lat_centers = np.ones(N), np.ones(N)
for i in range(N):
lon_centers[i], lat_centers[i] = self.b[i].lon, self.b[i].lat
exponential_factors = self.op(lon, lat, lon_centers, lat_centers, std)
env_c[var] = env_a[var] * (1-np.sum(exponential_factors, axis = 0)) + np.sum(env_b[var]*exponential_factors, axis = 0)
else:
raise ValueError('Op_type not recognised. You should verify the definition of the Reader operator you are using.')
#Profile -> still need to implement for lists of readers, should just copy above structure
env_profiles_c = None
if env_profiles_a is not None:
env_profiles_c = {}
variables = [var for var in env_profiles_a.keys() if var not in ['x', 'y', 'z', 'time'] ]
for var in variables:
if N == -1:
env_profiles_c[var] = self.op(env_profiles_a[var], env_profiles_b[var])
else:
pass #TODO HERE
return env_c, env_profiles_c
[docs]
def get_variables(self, requested_variables, time=None,
x=None, y=None, z=None):
pm1 = self.a.pixel_size()
pm2 = self.b.pixel_size()
if pm1 == None and pm2 == None:
raise Exception("Neither {} or {} in {} have pixel_size well defined. Plot is not possible.")
elif pm1 == None:
pm=pm2
elif pm2 == None:
pm=pm1
else:
pm = np.lcm(int(pm1), int(pm2))
delta_x = pm/111000
delta_y = pm/ 111000 * np.abs(np.cos(np.radians(np.mean(y))))
x = np.arange(np.min(x), np.max(x), delta_x)
y = np.arange(np.min(y), np.max(y), delta_y)
X, Y = np.meshgrid(x, y)
shape = X.shape
X = X.flatten()
Y = Y.flatten()
variables, _ = self.get_variables_interpolated(requested_variables, lon = X, lat = Y, time = time, z = z)
for key in variables.keys():
variables[key] = np.reshape(variables[key], shape)
variables['x'] = x
variables['y'] = y
return variables