# Ground Observatory Data - FTP

> Authors: Luca Mariani, Clemens Kloss
>
> Abstract: Demonstrates ground observatory data by direct access to the BGS FTP server (AUX_OBS dataset). Note that in the future there will be a VirES-based access method (work in progress).



## Contents

- [Settings and functions](#settings)
- [Hourly mean values](#obs)
 - [Read data from ASCII files](#obs-read-ascii)
 - [Read data from multiple files](#obs-multifiles)
 - [Examples](#obs-examples)
- [Minute and second mean values](#obsms)
 - [Read data from CDF files](#obsms-read-cdf)
 - [Read data from multiple files](#obsms-multifiles)

In [None]:
%load_ext watermark
%watermark -i -v -p viresclient,pandas,xarray,matplotlib

In [None]:
# Python standard library
import os
import re
from contextlib import closing
from datetime import datetime
from ftplib import FTP
from pathlib import Path
from tempfile import TemporaryFile
from zipfile import ZipFile

# Extra libraries
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import cdflib
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
from viresclient import SwarmRequest



## Settings and functions

[[TOP]](#top)

First we define a number of functions to enable convenient searching, downloading and reading from the FTP server.

In [None]:
# FTP server
HOST = 'ftp.nerc-murchison.ac.uk'

# Local directories (update paths according to your environment)
OBS_HOUR_LOCAL_DIR = Path('~/data/AUX_OBS/hour').expanduser()
OBS_MINUTE_LOCAL_DIR = Path('~/data/AUX_OBS/minute').expanduser()
OBS_SECOND_LOCAL_DIR = Path('~/data/AUX_OBS/second').expanduser()

# Create directories to use
os.makedirs(OBS_HOUR_LOCAL_DIR, exist_ok=True)
os.makedirs(OBS_MINUTE_LOCAL_DIR, exist_ok=True)
os.makedirs(OBS_SECOND_LOCAL_DIR, exist_ok=True)


def search(obstype, start_date=None, end_date=None):
 """Search OBS data file on the FTP server.

 Parameters
 ----------
 obstype : str
 OBS file type: `hour`, `minute`, `second`.
 start_date : str or numpy.datetime64
 lower bound of the time interval (default: no time interval).
 stop_date : str or numpy.datetime64
 upper bound of the time interval (default: no time interval).
 
 Returns
 -------
 list(str)
 OBS data files.
 
 Raises
 ------
 ValueError
 if `obstype` is not valid.
 ftplib.all_errors
 in case of FTP errors.
 
 """

 OBS_HOUR_DIR = '/geomag/Swarm/AUX_OBS/hour'
 OBS_MINUTE_DIR = '/geomag/Swarm/AUX_OBS/minute'
 OBS_SECOND_DIR = '/geomag/Swarm/AUX_OBS/second'
 PATTERN = re.compile(
 r'SW_OPER_AUX_OBS[_MS]2__(?P\d{8}T\d{6})_'
 r'(?P\d{8}T\d{6})_\d{4}\.ZIP$'
 )
 MINDATE = np.datetime64('0000', 's')
 MAXDATE = np.datetime64('9999', 's')
 
 def _callback(line, result, start_date, end_date):
 if line[0] == '-':
 match = PATTERN.match(line[56:])
 if match:
 start, stop = match.groupdict().values()
 start = np.datetime64(datetime.strptime(start, '%Y%m%dT%H%M%S'))
 stop = np.datetime64(datetime.strptime(stop, '%Y%m%dT%H%M%S'))
 if end_date >= start and start_date <= stop:
 result.append(line[56:])

 start_date = MINDATE if start_date is None else np.datetime64(start_date)
 end_date = MAXDATE if end_date is None else np.datetime64(end_date)
 paths = {
 'hour': OBS_HOUR_DIR,
 'minute': OBS_MINUTE_DIR,
 'second': OBS_SECOND_DIR
 }
 if obstype not in paths:
 raise ValueError(
 f'obstype must be hour, minute or second, not {obstype}'
 )

 result = []
 with FTP(HOST) as ftp:
 ftp.login()
 ftp.dir(paths[obstype], lambda line: _callback(line, result, start_date, end_date))
 return [f'{paths[obstype]}/{name}' for name in sorted(result)]


def loacal_search(obstype, start_date=None, end_date=None):
 """Search OBS data file on local filesystem.

 Parameters
 ----------
 obstype : str
 OBS file type: `hour`, `minute`, `second`.
 start_date : str or numpy.datetime64
 lower bound of the time interval (default: no time interval).
 stop_date : str or numpy.datetime64
 upper bound of the time interval (default: no time interval).
 
 Returns
 -------
 list(pathlib.Path)
 OBS data files.
 
 Raises
 ------
 ValueError
 if `obstype` is not valid.
 
 """

 PATTERN = re.compile(
 r'SW_OPER_AUX_OBS[_MS]2__(?P\d{8}T\d{6})_'
 r'(?P\d{8}T\d{6})_\d{4}\.\w{3}$'
 )
 MINDATE = np.datetime64('0000', 's')
 MAXDATE = np.datetime64('9999', 's')
 
 start_date = MINDATE if start_date is None else np.datetime64(start_date)
 end_date = MAXDATE if end_date is None else np.datetime64(end_date)
 paths = {
 'hour': OBS_HOUR_LOCAL_DIR,
 'minute': OBS_MINUTE_LOCAL_DIR,
 'second': OBS_SECOND_LOCAL_DIR
 }
 if obstype not in paths:
 raise ValueError(
 f'obstype must be hour, minute or second, not {obstype}'
 )

 result = []
 for file in (elm for elm in paths[obstype].iterdir() if elm.is_file()):
 match = PATTERN.match(file.name)
 if match:
 start, stop = match.groupdict().values()
 start = np.datetime64(datetime.strptime(start, '%Y%m%dT%H%M%S'))
 stop = np.datetime64(datetime.strptime(stop, '%Y%m%dT%H%M%S'))
 if end_date >= start and start_date <= stop:
 result.append(file)
 return sorted(result)


def download(files, outdir='', show_progress=True):
 """Download files from the FTP server.

 Parameters
 ----------
 outdir : str or os.PathLike
 output directory (default: current directory).
 files : collections.abc.Iterable(str)
 path(s) of the file(s) to be downloaded
 
 Returns
 -------
 list(pathlib.Path)
 list of downloaded files.
 
 Raises
 ------
 ftplib.all_errors
 in case of FTP errors.
 
 """
 def _callback(data, fh, pbar):
 pbar.update(len(data))
 fh.write(data)

 outdir = Path(outdir)
 downloaded = []
 with FTP(HOST) as ftp:
 ftp.login()
 for file in files:
 file = str(file)
 basename = file.split('/')[-1]
 with TemporaryFile(dir=outdir) as tmp:
 with tqdm(total=ftp.size(file), unit='B',
 unit_scale=True, desc=basename,
 disable=not show_progress) as pbar:
 ftp.retrbinary(f'RETR {file}', callback=lambda x: _callback(x, tmp, pbar))
 with ZipFile(tmp) as zf:
 hdr = Path(basename).with_suffix('.HDR').name
 datafile = [elm for elm in zf.namelist()if elm != hdr][0]
 outfile = zf.extract(datafile, outdir)
 downloaded.append(Path(outfile))
 return downloaded


def ascii_to_pandas(file):
 """Convert an OBS ASCII file to a pandas DataFrame.
 
 Parameters
 ----------
 file : str or os.PathLike
 OBS ASCII file.
 
 Returns
 -------
 pandas.DataFrame
 data contained in the OBS ASCII file.

 """
 df = pd.read_csv(
 file,
 comment='#',
 delim_whitespace=True,
 names = ['IAGA_code', 'Latitude', 'Longitude', 'Radius',
 'yyyy', 'mm', 'dd', 'UT', 'B_N', 'B_E', 'B_C'],
 parse_dates={'Timestamp': [4, 5, 6]},
 infer_datetime_format=True
 )
 df['Timestamp'] = df['Timestamp'] + pd.to_timedelta(df['UT'], 'h')
 df.drop(columns='UT', inplace=True)
 df.set_index('Timestamp', inplace=True)
 return df


def cdf_to_pandas(file):
 """Convert an OBS CDF file to a pandas DataFrame.
 
 Parameters
 ----------
 file : str or os.PathLike
 OBS CDF file.
 
 Returns
 -------
 pandas.DataFrame
 data contained in the OBS CDF file.

 """
 with closing(cdflib.cdfread.CDF(file)) as data:
 ts = pd.DatetimeIndex(
 cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True),
 name='Timestamp'
 )
 df = pd.DataFrame(
 {
 'IAGA_code': data.varget('IAGA_code')[:,0,0],
 'Latitude': data.varget('Latitude'),
 'Longitude': data.varget('Longitude'),
 'Radius': data.varget('Radius'),
 'B_N': data.varget('B_NEC')[:,0],
 'B_E': data.varget('B_NEC')[:,1],
 'B_C': data.varget('B_NEC')[:,2]
 },
 index=ts
 )
 return df


def download_obslist(outdir=''):
 """Search observatory list file on the FTP server.

 Parameters
 ----------
 outdir : str or os.PathLike
 output directory (default: current directory).

 Returns
 -------
 str
 Observatory list file.
 
 Raises
 ------
 ftplib.all_errors
 in case of FTP errors.
 
 """

 OBS_HOUR_DIR = '/geomag/Swarm/AUX_OBS/hour'
 
 def _callback(line, result):
 if line[0] == '-':
 match = re.match('obslist.+_gd\.all$', line[56:])
 if match:
 result.append(line[56:])

 outdir = Path(outdir)
 files = []
 with FTP(HOST) as ftp:
 ftp.login()
 ftp.dir(OBS_HOUR_DIR, lambda line: _callback(line, files))
 remote_obslist_file = f'{OBS_HOUR_DIR}/{files[0]}'
 local_obslist_file = outdir / files[0]
 with local_obslist_file.open('w') as fh:
 ftp.retrlines(f'RETR {remote_obslist_file}', lambda line: print(line, file=fh))
 return local_obslist_file
 

def read_obslist(file):
 """Convert observatory list ASCII file to a pandas DataFrame.
 
 Parameters
 ----------
 file : str or os.PathLike
 observatory list ASCII file.
 
 Returns
 -------
 pandas.DataFrame
 data contained in the observatory list ASCII file.

 """
 df = pd.read_csv(
 file,
 delim_whitespace=True,
 names = ['IAGA_code', 'Latitude', 'Longitude', 'Altitude'],
 )
 return df



## Hourly mean values

[[TOP]](#top)

Hourly means hosted at:
- ftp://ftp.nerc-murchison.ac.uk/geomag/Swarm/AUX_OBS/hour/

Processing methodology:
- Macmillan, S., Olsen, N. Observatory data and the Swarm mission. Earth Planet Sp 65, 15 (2013). https://doi.org/10.5047/eps.2013.07.011



### Read data from ASCII files

[[TOP]](#top)

Use the `search()` function (see [Settings and functions](#settings)) to search OBS hourly data from 2018-01-01T00:00:00 to 2019-12-31T23:59:59 on the FTP server:

In [None]:
result = search('hour', '2018-01-01', '2019-12-31T23:59:59')
result

Use the `download()` function (see [Settings and functions](#settings)) to download data:

In [None]:
downloaded = download(result, outdir=OBS_HOUR_LOCAL_DIR)
downloaded

Select one of the AUX_OBS_2_ files (e.g. the first one):

In [None]:
file1 = downloaded[0]
file1

Read ASCII file and convert data to a `pandas.DataFrame`:

In [None]:
df1 = pd.read_csv(
 file1,
 comment='#',
 delim_whitespace=True,
 names = ['IAGA_code', 'Latitude', 'Longitude',
 'Radius', 'yyyy', 'mm', 'dd', 'UT', 'B_N', 'B_E', 'B_C'],
 parse_dates={'Timestamp': [4, 5, 6]},
 infer_datetime_format=True
)
df1['Timestamp'] = df1['Timestamp'] + pd.to_timedelta(df1['UT'], 'h')
df1.drop(columns='UT', inplace=True)
df1.set_index('Timestamp', inplace=True)

df1

For more information on `pandas.Dataframe` see: https://pandas.pydata.org/docs/reference/frame.

The same result can be obtained with the `ascii_to_pandas()` function (see [Settings and functions](#settings)).

In [None]:
new = ascii_to_pandas(file1)
new

Compare the two data frames:

In [None]:
pd.testing.assert_frame_equal(df1, new)

Example: get minimum and maximum dates:

In [None]:
df1.index.min(), df1.index.max()

Example: get list of observatories (IAGA codes) stored in the files:

In [None]:
df1['IAGA_code'].unique()



### Read data from multiple files

[[TOP]](#top)

Pandas dataframes can be concatenated to represent data obtained from more than one file. E.g. read data from the next AUX_OBS_2_ file:

In [None]:
file2 = downloaded[1]
df2 = ascii_to_pandas(file2)
df2

The two dataframes can be concatenated using the `pandas.concat()` function (for more information see: https://pandas.pydata.org/docs/reference/api/pandas.concat.html#pandas.concat):

In [None]:
concatenated = pd.concat([df1, df2])
concatenated.sort_values(by=['IAGA_code', 'Timestamp'], inplace=True)

concatenated.index.min(), concatenated.index.max()

In [None]:
concatenated



### Examples

[[TOP]](#top)

Plot hourly mean values on a map:

In [None]:
df = ascii_to_pandas(file1)

# Add F column
df['F'] = np.linalg.norm(df[['B_N', 'B_E', 'B_C']], axis=1)

# Select date
date = '2018-01-01T01:30:00'

fig = plt.figure(figsize=(16, 10))

# Draw map
ax = plt.subplot2grid((1, 1), (0, 0), projection=ccrs.PlateCarree())
ax.coastlines()
ax.add_feature(cfeature.OCEAN, facecolor='lightgrey')
ax.gridlines()

# Plot observatory measurements at date
cm = ax.scatter(
 df[date]['Longitude'], df[date]['Latitude'], c=df[date]['F'],
 marker='D', transform=ccrs.PlateCarree(),
 label=f'OBS F hourly mean value at {date}'
)

# Add IAGA codes
for row in df[date].itertuples():
 ax.annotate(
 row.IAGA_code, (row.Longitude, row.Latitude),
 xycoords=ccrs.PlateCarree()._as_mpl_transform(ax)
 )

# Set title and legendbb
plt.title('Magnetic field intensities')
plt.legend()

# Add colorbar
cax = fig.add_axes([0.92, 0.2, 0.02, 0.6])
plt.colorbar(cm, cax=cax, label='F [nT]')

plt.show()

Read list of all observatories (use the `download_obslist()` and `read_obslist()` functions defined in [Settings and functions](#settings)):

In [None]:
obslist = download_obslist(outdir=OBS_HOUR_LOCAL_DIR)
obslist

In [None]:
obs = read_obslist(obslist)
obs

Add the missing observatories, i.e. those not included in the observatory hourly mean values, to the plot:

In [None]:
df = ascii_to_pandas(file1)

# Add F column
df['F'] = np.linalg.norm(df[['B_N', 'B_E', 'B_C']], axis=1)

# Select date
date = '2018-01-01T01:30:00'

fig = plt.figure(figsize=(16, 10))

# Draw map
ax = plt.subplot2grid((1, 1), (0, 0), projection=ccrs.PlateCarree())
ax.coastlines()
ax.add_feature(cfeature.OCEAN, facecolor='lightgrey')
ax.gridlines()

# Plot observatory measurements at date
cm = ax.scatter(
 df[date]['Longitude'], df[date]['Latitude'], c=df[date]['F'],
 marker='D', transform=ccrs.PlateCarree(),
 label=f'OBS F hourly mean value at {date}'
)

# Add IAGA codes
for row in df[date].itertuples():
 ax.annotate(
 row.IAGA_code, (row.Longitude, row.Latitude),
 xycoords=ccrs.PlateCarree()._as_mpl_transform(ax)
 )

# Add missing observatories from obslist (position only)
missing = obs[~obs['IAGA_code'].isin(df[date]['IAGA_code'].unique())]
cm2 = ax.scatter(missing['Longitude'], missing['Latitude'], c='black', marker='D', alpha=0.1)

# Set title and legendbb
plt.title('Magnetic field intensities')
plt.legend()

# Add colorbar
cax = fig.add_axes([0.92, 0.2, 0.02, 0.6])
plt.colorbar(cm, cax=cax, label='F [nT]')

plt.show()

Add Swarm F measurements between 01:00:00 and 02:00:00 of the same day:

In [None]:
# using viresclient
request = SwarmRequest()
request.set_collection('SW_OPER_MAGA_LR_1B')
request.set_products(measurements='F')

start_date = '2018-01-01T01:00:00'
end_date = '2018-01-01T02:00:00'

data = request.get_between(start_date, end_date)

In [None]:
df = ascii_to_pandas(file1)

# Add F column
df['F'] = np.linalg.norm(df[['B_N', 'B_E', 'B_C']], axis=1)

# Select date
date = '2018-01-01T01:30:00'

fig = plt.figure(figsize=(16, 10))

# Draw map
ax = plt.subplot2grid((1, 1), (0, 0), projection=ccrs.PlateCarree())
ax.coastlines()
ax.add_feature(cfeature.OCEAN, facecolor='lightgrey')
ax.gridlines()

# Plot observatory measurements at date
cm = ax.scatter(
 df[date]['Longitude'], df[date]['Latitude'], c=df[date]['F'],
 marker='D', transform=ccrs.PlateCarree(),
 label=f'OBS F hourly mean value at {date}'
)

# Add IAGA codes
for row in df[date].itertuples():
 ax.annotate(
 row.IAGA_code, (row.Longitude, row.Latitude),
 xycoords=ccrs.PlateCarree()._as_mpl_transform(ax)
 )

# Add missing observatories from obslist (position only)
missing = obs[~obs['IAGA_code'].isin(df[date]['IAGA_code'].unique())]
ax.scatter(missing['Longitude'], missing['Latitude'], c='black', marker='D', alpha=0.1)

# Add Swarm A data
swarm = data.as_dataframe()
ax.scatter(
 swarm['Longitude'], swarm['Latitude'], c=swarm['F'],
 transform=ccrs.PlateCarree(),
 label=f'Swarm A - F measurements between {start_date} and {end_date}'
)

# Set title and legendbb
plt.title('Magnetic field intensities')
plt.legend()

# Add colorbar
cax = fig.add_axes([0.92, 0.2, 0.02, 0.6])
plt.colorbar(cm, cax=cax, label='F [nT]')

plt.show()



## Minute and second mean values

[[TOP]](#top)

Files containing observatory minute and second mean values have CDF format. They can be downloade from:

- ftp://ftp.nerc-murchison.ac.uk/geomag/Swarm/AUX_OBS/minute/
- ftp://ftp.nerc-murchison.ac.uk/geomag/Swarm/AUX_OBS/second/



### Read data from CDF files

[[TOP]](#top)

Use the `search()` function (see [Settings and functions](#settings)) to search OBS minute/second data from 2019-12-01T00:00:00 to 2019-12-31T23:59:59 on the FTP server:

In [None]:
minute = search('minute', '2019-12-01', '2019-12-31T23:59:59')
minute

In [None]:
second = search('second', '2019-12-01', '2019-12-31T23:59:59')
second

Use the `download()` function (see [Settings and functions](#settings)) to download data:

In [None]:
dl_minute = download(minute, outdir=OBS_MINUTE_LOCAL_DIR)

In [None]:
dl_second = download(second, outdir=OBS_SECOND_LOCAL_DIR)

Select one of the AUX_OBSM2_ files (e.g. the first one):

In [None]:
file1 = dl_minute[0]
file1

Read CDF file using `cdflib` (for more information on `cdflib`, see: https://github.com/MAVENSDC/cdflib)

In [None]:
data = cdflib.CDF(file1)

Get info about the file as a Python dictionary:

In [None]:
data.cdf_info()

You can see that measurements are stored as *zVariables*:

In [None]:
data.cdf_info()['zVariables']

Data can be retrieved via the `.varget()` method, e.g:

In [None]:
data.varget('B_NEC')

Data is returned as a `numpy.ndarray` object (for more information on `numpy.ndarray`, see: https://docs.scipy.org/doc/numpy/reference/arrays.ndarray.html).

Variable attributes can be retrieved using the `.varattsget()` method, e.g.:

In [None]:
data.varattsget('B_NEC')

Attributes are returned as a Python dictionary.

Let's retrieve the timestamps:

In [None]:
data.varget('Timestamp')

`Timestamp` type is:

In [None]:
data.varget('Timestamp').dtype

Timestamps are represented as NumPy `float64` values. Why? Get info about `Timestamp` variable using the `.varinq()` method:

In [None]:
data.varinq('Timestamp')

The returned dictionary shows that the data type is *CDF_EPOCH* consising in a floating point value representing the number of milliseconds since 01-Jan-0000 00:00:00.000. It can be converted to a more readable format (list of strings) using the `cdflib.cdfepoch.encode()` function:

In [None]:
ts = cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True)
ts[:5]

Or to a numpy array of `numpy.datetime64` values:

In [None]:
ts = np.array(cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True), dtype='datetime64')
ts[:5]

You may be interested also in the CDF global attributes:

In [None]:
data.globalattsget()

Close the file when you have finished:

In [None]:
data.close()

AUX_OBSS2_ data contains the same variables:

In [None]:
with closing(cdflib.cdfread.CDF(dl_second[0])) as data:
 zvariables = data.cdf_info()['zVariables']

zvariables

Data can be represented as a `pandas.DataFrame` object:

In [None]:
with closing(cdflib.cdfread.CDF(file1)) as data:
 ts = pd.DatetimeIndex(
 cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True),
 name='Timestamp'
 )
 df1 = pd.DataFrame(
 {
 'IAGA_code': data.varget('IAGA_code')[:,0,0],
 'Latitude': data.varget('Latitude'),
 'Longitude': data.varget('Longitude'),
 'Radius': data.varget('Radius'),
 'B_N': data.varget('B_NEC')[:,0],
 'B_E': data.varget('B_NEC')[:,1],
 'B_C': data.varget('B_NEC')[:,2]
 },
 index=ts
 )

df1

For more information on `pandas.Dataframe` see: https://pandas.pydata.org/docs/reference/frame.

The same result can be obtained with the `cdf_to_pandas()` function (see [Settings and functions](#settings)).

In [None]:
new = cdf_to_pandas(file1)

new

Compare the two data frames:

In [None]:
pd.testing.assert_frame_equal(df1, new)

Example: get minimum and maximum dates:

In [None]:
df1.index.min(), df1.index.max()

Example: get list of observatories (IAGA codes) stored in the files:

In [None]:
df1['IAGA_code'].unique()

Example: get list of observatories (IAGA codes) included in the following ranges of coordinates:
- $30 \leq Latitude \leq 70$
- $-10 \leq Longitude \leq 40$

In [None]:
df1[(df1['Latitude'] >= 30) & (df1['Latitude'] <= 70) & (df1['Longitude'] >= -10) & (df1['Longitude'] <= 40)]['IAGA_code'].unique()

You can do the same using the `.query()` method (see: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html#pandas.DataFrame.query):

In [None]:
df1.query('(30 <= Latitude <= 70) and (-10 <= Longitude <= 40)')['IAGA_code'].unique()



### Read data from multiple files

[[TOP]](#top)

Pandas dataframes can be concatenated to represent data obtained from more than one file. E.g. read data from the next AUX_OBSM2_ file:

In [None]:
file2 = dl_minute[1]

df2 = cdf_to_pandas(file2)

df2

The two dataframes can be concatenated using the `pandas.concat()` function (for more information see: https://pandas.pydata.org/docs/reference/api/pandas.concat.html#pandas.concat):

In [None]:
concatenated = pd.concat([df1, df2])
concatenated.sort_values(by=['IAGA_code', 'Timestamp'], inplace=True)

concatenated.index.min(), concatenated.index.max()

In [None]:
concatenated

With AUX_OBSS2_ data:

In [None]:
files = dl_second[:2]

files

In [None]:
concatenated = pd.concat([cdf_to_pandas(file) for file in files])
concatenated.sort_values(by=['IAGA_code', 'Timestamp'], inplace=True)

concatenated.index.min(), concatenated.index.max()

In [None]:
concatenated