# Copyright (C) 2012 VT SuperDARN Lab # Full license can be found in LICENSE.txt """ ********************* **Module**: rbsp ********************* This module handles RBSP foorpoint calculations and plotting **Class**: * :class:`rbspFp`: FPs reading (or calculating) and plotting **Functions**: * :func:`isrFov`: calculate field-of-view for a given ISR **Prerequesite**: * `Numpy ` * `Matplotlib ` * `Basemap ` * `Pymongo ` * `Tsyganenko ` (optional, used only if the requested date is not found in the database) """ ############################################################################ # Foot Points (FPs) calculation and plotting ############################################################################ class rbspFp(object): """This class reads FPs from the SuperDARN FPs database, or generate them if necessary **Args**: * **sTime**: start date/time to get FPs * **[eTime]**: end date/time to get FPs (defaulst to 24 hours after `sTime`) * **[hemisphere]**: limit FPs loading to a specific hemisphere * **[spacecraft]**: limit FPs loading to a specific spacecraft * **[L_shell_min]**: limit FPs loading to L-shell values greater than this * **[L_shell_max]**: limit FPs loading to L-shell values lesser than this * **[apogees_only]**: record foot-points (usefull if all you want are apogees) **Example**: :: # Get all the FPs for 1/Sept/2012 from 0 to 6 UT from datetime import datetime import rbsp sTime = datetime(2012,9,1,0) eTime = datetime(2012,9,1,6) fps = rbsp.rbspFp(sTime, eTime) # Pretty print the apogees in that period print fps # Plot them on a map fps.map() .. warning:: try not to request more than 24 hours at once, unless all you want are apogees. written by Sebastien de Larquier, 2013-03 """ def __init__(self, sTime, eTime=None, spacecraft=None, L_shell_min=None, L_shell_max=None, apogees_only=False): from datetime import datetime, timedelta # MongoDB server self._db_user = 'sd_dbread' self._db_pswd = '5d' self._db_host = 'sd-work9.ece.vt.edu' self._db_name = 'rbsp' # Input self.sTime = sTime self.eTime = eTime if eTime else sTime + timedelta(hours=24) self._spacecraft = spacecraft.lower() if spacecraft else ['a','b'] self.L_shell_min = L_shell_min self.L_shell_max = L_shell_max self._apogees_only = apogees_only # Connect to DB isDb = self.__getFpsFromDb() if not isDb: orbit = self.__getOrbit() trace = self.__getTrace(orbit) self.scraft = orbit['scraft'] def map(self, hemisphere='north', boundinglat=35, spacecraft=None, legend=True, date=True, apogees=False): """Plot FPs on a map **Belongs to**: :class:`rbspFp` **Args**: * **[hemisphere]**: plot FPs in this hemisphere ('north' or 'south') * **[boundinglat]**: bounding latitude of map (absolute value) * **[spacecraft]**: limit the plotting to one spacecraft ('a' or 'b') * **[legend]**: to show or not to show the legend * **[date]**: to show or not to show the date * **[apogees]**: to show or not to show the apogees * **[isr]**: a list of ISRs to be plotted (codes include mho, sdt, eiscat, pfisr, risr) **Returns**: * **myMap**: a mpl_toolkits.basemap.Basemap object **Example**: :: # To plot 2 panels (1 per hemisphere) fig = figure(figsize=(11,8)) subplot(121) fps.map() subplot(122) fps.map(hemisphere='South') fig.tight_layout() """ from mpl_toolkits.basemap import Basemap from pylab import gca ax = gca() if hemisphere.lower() == 'north': projection = 'npstere' sgn = 1 lat = self.latNH lon = self.lonNH elif hemisphere.lower() == 'south': projection = 'spstere' sgn = -1 lat = self.latSH lon = self.lonSH # Generate map and background myMap = Basemap(projection=projection, lon_0=270, boundinglat=sgn*boundinglat, ax=ax) myMap.fillcontinents(color='.8') myMap.drawmeridians(range(0,360,20), alpha=0.5) myMap.drawparallels(range(-80,81,20), alpha=0.5) # Calculate FP coordinates x, y = myMap(lon, lat) # Scatter FPs if spacecraft is None or spacecraft == 'a': myMap.scatter(x[self.scraft == 'a'], y[self.scraft == 'a'], zorder=5, edgecolors='none', s=10, facecolors='r', alpha=.8) if legend: ax.text(0, -0.01, 'RBSP-A', transform=ax.transAxes, color='r', ha='left', va='top') if spacecraft is None or spacecraft == 'b': myMap.scatter(x[self.scraft == 'b'], y[self.scraft == 'b'], zorder=5, edgecolors='none', s=10, facecolors='b', alpha=.8) if legend: ax.text(1, -0.01, 'RBSP-B', transform=ax.transAxes, color='b', ha='right', va='top') # Show date/time interval if date: if self.sTime.date() == self.eTime.date(): dateTitle = '{:%d/%b/%Y %H:%M UT} - {:%H:%M UT}' elif self.sTime.time() == self.eTime.time(): dateTitle = '{:%d/%b/%Y} - {:%d/%b/%Y (%H:%M UT)}' else: dateTitle = '{:%d/%b/%Y %H:%M UT} - {:%d/%b/%Y %H:%M UT}' ax.text(0, 1.01, dateTitle.format(self.sTime, self.eTime), transform=ax.transAxes) if apogees: myMap.scatter(x[self.apogees], y[self.apogees], zorder=5, edgecolors='w', s=10, facecolors='k') for ap in self.apogees: self.__textHighlighted((x[ap],y[ap]), '{:%H:%M}'.format(self.times[ap])) return myMap def showISR(self, myMap, isr): """overlay ISR fovs on map **Belongs to**: :class:`rbspFp` **Args**: * **myMap**: Basemap object * **isr**: a list of ISRs to be plotted (codes include mho, sdt, eiscat, pfisr, risr) """ if isinstance(isr, str): isr = [isr] for rad in isr: dbConn = self.__dbConnect('isr') if not dbConn: print 'Could not access DB' return qIn = {'code': rad} qRes = dbConn.info.find(qIn) if qRes.count() == 0: print 'Radar {} not found in db'.format(rad) print 'Use one or more in {}'.format(dbConn.codes.find_one()['codes']) continue for el in qRes: x, y = myMap(el['pos']['lon'], el['pos']['lat']) myMap.scatter(x, y, zorder=6, s=20, facecolors='k') if isinstance(x, list): x, y = x[0], y[0] myMap.ax.text(x*1.04, y*0.96, el['code'].upper()) x, y = myMap(el['fov']['lon'], el['fov']['lat']) myMap.plot(x, y, 'g') def __getFpsFromDb(self): """Get FPs from DB **Belongs to**: :class:`rbspFp` **Args**: * **None** **Returns**: * **isSuccess**: True it worked, False otherwise """ import numpy as np dbConn = self.__dbConnect(self._db_name) if not dbConn: return False isAp = True if self._apogees_only else None # Build querry qIn = {'time': {'$gte': self.sTime, '$lte': self.eTime}} if not self._spacecraft == ['a', 'b']: qIn['spacecraft'] = self._spacecraft.upper() if self.L_shell_min: if 'L' not in qIn: qIn['L'] = {} qIn['L']['$gte'] = self.L_shell_min if self.L_shell_max: if 'L' not in qIn: qIn['L'] = {} qIn['L']['$lte'] = self.L_shell_max if self._apogees_only: qIn['isApogee'] = True # Launch query qRes = dbConn.ephemeris.find(qIn).sort('time') if qRes.count() == 0: return False # Store query results self.lonNH = [] self.latNH = [] self.lonSH = [] self.latSH = [] self.times = [] self.scraft = [] self.apogees = [] self.L = [] for i, el in enumerate(qRes): self.lonNH.append( el['lonNH'] ) self.latNH.append( el['latNH'] ) self.lonSH.append( el['lonSH'] ) self.latSH.append( el['latSH'] ) self.times.append( el['time'] ) self.scraft.append( el['scraft'] ) self.L.append( el['L'] ) if el['isApogee']: self.apogees.append( i ) self.lonNH = np.array(self.lonNH) self.latNH = np.array(self.latNH) self.lonSH = np.array(self.lonSH) self.latSH = np.array(self.latSH) self.times = np.array(self.times) self.scraft = np.array(self.scraft) self.apogees = np.array(self.apogees) self.L = np.array(self.L) return True def __dbConnect(self, dbName): """Try to establish a connection to remote db database **Belongs to**: :class:`rbspFp` **Args**: * **None** **Returns**: * **dbConn**: pymongo database object """ from pymongo import MongoClient import sys try: conn = MongoClient( 'mongodb://{}:{}@{}/{}'.format(self._db_user, self._db_pswd, self._db_host, dbName) ) dba = conn[dbName] except: print 'Could not connect to remote DB: ', sys.exc_info()[0] return False return dba def __getOrbit(self): """Get orbit data from APL **Belongs to**: :class:`rbspFp` **Args**: * **None** **Returns """ import urllib2, urllib from datetime import datetime print 'Get orbit from JHU/APL' header = 'on' cmode = 'geo' Cadence = 5 try: [i in self._spacecraft] scraft = set(self._spacecraft) except: scraft = (self._spacecraft) orbit = {'time': [], 'alt': [], 'lat': [], 'lon': [], 'scraft': []} for sc in scraft: params = urllib.urlencode({'sDay': str( self.sTime.day ), 'sMonth': str( self.sTime.month ), 'sYear': str( self.sTime.year ), 'sHour': str( self.sTime.hour ), 'sMinute': str( self.sTime.minute ), 'eDay': str( self.eTime.day ), 'eMonth': str( self.eTime.month ), 'eYear': str( self.eTime.year ), 'eHour': str( self.eTime.hour ), 'eMinute': str( self.eTime.minute ), 'Cadence': str( Cadence ), 'mode': str( cmode ), 'scraft': sc, 'header': header, 'getASCII': 'Get ASCII Output'}) f = urllib2.urlopen("http://athena.jhuapl.edu/LT_Position_Calc", params) # f = urllib2.urlopen("http://athena.jhuapl.edu/orbit_pos", params) out = f.read().splitlines() f.close() st = out.index('
')+1
			ed = out.index('
') header = out[st].split() lines = out[st+1:ed] for i,l in enumerate(lines): row = l.split() cTime = datetime( int(row[0]), int(row[1]), int(row[2]), int(row[3]), int(row[4]), int(row[5]) ) orbit['time'].append( cTime ) orbit['alt'].append( float(row[6]) ) orbit['lat'].append( float(row[7]) ) orbit['lon'].append( float(row[8]) ) orbit['scraft'].append( sc ) return orbit def __getTrace(self, data): """Trace orbit to the ionosphere **Belongs to**: :class:`rbspFp` **Args**: * **data**: a dictionnary containing ephemeris (with keys 'lat', 'lon', 'alt', 'time') """ import tsyganenko as ts import numpy as np fname = 'trace.{:%Y%m%d}.{:%Y%m%d}.dat'.format(self.sTime, self.eTime) try: trace = ts.tsygTrace(filename=fname) print 'Read tracing results...' except: print 'Tracing...' trace = ts.tsygTrace(data['lat'], data['lon'], data['alt'], datetime=data['time'], rmin=1.047) trace.save( fname ) self.lonNH = trace.lonNH self.latNH = trace.latNH self.lonSH = trace.lonSH self.latSH = trace.latSH self.times = trace.datetime # Mark apogees mins = np.r_[True, trace.rho[1:] >= trace.rho[:-1]] & np.r_[trace.rho[:-1] > trace.rho[1:], True] mins[0] = mins[-1] = False self.apogees = np.where(mins)[0] def __repr__(self): sOut = 'Van Allen Probes (a.k.a. RBSP) ionospheric footpoints\n' sOut += '{:%Y-%b-%d at %H:%M UT} to {:%Y-%b-%d at %H:%M UT}\n'.format(self.sTime, self.eTime) sOut += '\t{} points\n'.format(len(self.times)) sOut += '\t{} apogee(s):\n'.format(len(self.apogees)) if len(self.apogees) > 0: for i in self.apogees: sOut += '\t\t{:%H:%M} UT, {}: ({:6.2f} N, {:6.2f} E)\t({:6.2f} N, {:6.2f} E)\n'.format(self.times[i], self.scraft[i].upper(), self.latNH[i], self.lonNH[i], self.latSH[i], self.lonSH[i]) return sOut def __textHighlighted(self, xy, text, zorder=None, color='k', fontsize=None): """Plot highlighted annotation (with a white lining) **Belongs to**: :class:`rbspFp` **Args**: * **xy**: position of point to annotate * **text**: text to show * **[zorder]**: text zorder * **[color]**: text color * **[fontsize]**: text font size """ import matplotlib as mp from pylab import gca ax = gca() text_path = mp.text.TextPath( (0,0), text, size=fontsize) p1 = mp.patches.PathPatch(text_path, ec="w", lw=2, fc="w", alpha=0.7, zorder=zorder, transform=mp.transforms.IdentityTransform()) p2 = mp.patches.PathPatch(text_path, ec="none", fc=color, zorder=zorder, transform=mp.transforms.IdentityTransform()) offsetbox2 = mp.offsetbox.AuxTransformBox(mp.transforms.IdentityTransform()) offsetbox2.add_artist(p1) offsetbox2.add_artist(p2) ax2disp = ax.transAxes.transform disp2ax = ax.transAxes.inverted().transform data2disp = ax.transData.transform disp2data = ax.transData.inverted().transform xyA = disp2ax( data2disp( xy ) ) frac = 0.5 scatC = (-frac*(xyA[0]-0.5)+xyA[0], -frac*(xyA[1]-0.5)+xyA[1]) scatC = disp2data( ax2disp( scatC ) ) ab = mp.offsetbox.AnnotationBbox( offsetbox2, xy, xybox=(scatC[0], scatC[1]), boxcoords="data", box_alignment=(.5,.5), arrowprops=dict(arrowstyle="-|>", facecolor='none'), frameon=False ) ax.add_artist(ab) ############################################################################