#!/usr/bin/env python3 # # fetchIndexInfo.py: Fetch time series index information. # Reference: https://github.com/iris-edu/mseedindex/wiki # # Read an EarthScope time series index table in an SQLite3 database and print # readable details of the index entries, alternatively a SYNC-style # listing can be printed. # # Data may be selected by network, station, location, channel and time # range. A selection may be specified using command line options or by # specifying a file containing one or more independent selections. # # If present, a '_summary' table that summarizes the time # extents of each channel in the index is used to optimize the query # to the index. This table can be created with the following # statement (assuming an index table of 'tsindex'): # # CREATE TABLE tsindex_summary AS # SELECT network,station,location,channel, # min(starttime) AS earliest, max(endtime) AS latest, datetime('now') as updt # FROM tsindex # GROUP BY 1,2,3,4; # # The summary table is used to: # a) resolve wildcards, allowing the use of '=' operator and thus table index # b) reduce index table search to channels that are known to be present # # Modified: 2024.032 # Written by Chad Trabant, EarthScope Data Services from collections import namedtuple import threading import time import sys import signal import os import getopt import re import datetime import sqlite3 version = '1.6' verbose = 0 table = 'tsindex' dbconn = None # Assume no time series index section (row) spans more than this many days maxsectiondays = 10 def main(): global verbose global table requestfile = None request = [] index_rows = [] network = None station = None location = None channel = None starttime = None endtime = None filename = None sync = False try: opts, args = getopt.gnu_getopt(sys.argv[1:], "vhl:t:N:S:L:C:s:e:f:", ["verbose","help", "listfile=", "table=", "network=","station=","location=","channel=", "starttime=", "endtime=", "filename=", "SYNC"]) except Exception as err: print (str(err)) usage() sys.exit(2) for o, a in opts: if o == "-v": verbose = verbose + 1 elif o in ("-h", "--help"): usage() sys.exit() elif o in ("-l", "--listfile"): requestfile = a elif o in ("-t", "--table"): table = a elif o in ("-N", "--network"): network = a elif o in ("-S", "--station"): station = a elif o in ("-L", "--location"): location = a elif o in ("-C", "--channel"): channel = a elif o in ("-s", "--starttime"): starttime = a elif o in ("-e", "--endtime"): endtime = a elif o in ("-f", "--filename"): filename = a elif o == "--SYNC": sync = True else: assert False, "Unhandled option" if len(args) == 0: print ("No database specified, try -h for usage\n") sys.exit() sqlitefile = args[0] # Read request from specified request file if requestfile: try: request = read_request_file(requestfile) except Exception as err: print ("Error reading request file:\n {0}".format(err)) sys.exit(2) # Add request specified as command line options if network or station or location or channel or starttime or endtime: if network is None: network = '*' if station is None: station = '*' if location is None: location = '*' if channel is None: channel = '*' if starttime is None: starttime = '*' else: starttime = normalize_datetime (starttime) if endtime is None: endtime = '*' else: endtime = normalize_datetime (endtime) request.append([network,station,location,channel,starttime,endtime]) if len(request) <= 0 and filename is None: print ("No selection specified, try -h for usage") sys.exit() if verbose >= 2: print ("Request:") for req in request: print (" {0}: {1}".format(len(req), req)) try: index_rows = fetch_index_rows(sqlitefile, table, request, filename) except Exception as err: if str(err) != "interrupted": print ("Error fetching index rows from '{0}':\n {1}".format(sqlitefile, err)) sys.exit(2) if sync: print_sync(index_rows) else: print_info(index_rows) if verbose >= 1: print ("Fetched {0:d} index rows".format(len(index_rows))) return def read_request_file(requestfile): '''Read a specified request file and return it as a list of tuples. Expected selection format is: Network Station Location Channel StartTime EndTime where the fields are space delimited and Network, Station, Location and Channel may contain '*' and '?' wildcards and StartTime and EndTime are in YYYY-MM-DDThh:mm:ss.ssssss format or are '*' Returned tuples have the same fields and ordering as the selection lines. ''' global verbose if verbose >= 1: print ("Reading select file: {0}".format(requestfile)) request = [] linenumber = 1 linematch = re.compile ('^[\w\?\*]{1,2}\s+[\w\?\*]{1,5}\s+[-\w\?\*]{1,2}\s+[\w\?\*]{1,3}\s+[-:T.*\d]+\s+[-:T.*\d]+$'); with open(requestfile, mode="r") as fp: for line in fp: line = line.strip() # Add line to request list if it matches validation regex if linematch.match(line): fields = line.split() # Normalize start and end times to "YYYY-MM-DDThh:mm:ss.ffffff" if not wildcards if fields[4] != '*': try: fields[4] = normalize_datetime (fields[4]) except: raise ValueError("Cannot normalize start time (line {0:d}): {1:s}".format(linenumber, fields[4])) if fields[5] != '*': try: fields[5] = normalize_datetime (fields[5]) except: raise ValueError("Cannot normalize start time (line {0:d}): {1:s}".format(linenumber, fields[4])) request.append(fields) # Raise error if line is not empty and does not start with a # elif line and not line.startswith("#"): raise ValueError("Unrecognized selection line ({0:d}): '{1:s}'".format(linenumber, line)) linenumber = linenumber + 1 return request def fetch_index_rows(sqlitefile, table, request, filename): '''Fetch index rows matching specified request[] `sqlitefile`: SQLite3 database file `table`: Target database table name `request`: List of tuples containing (net,sta,loc,chan,start,end) `filename`: Limit results to filename (or comma-separated list of) Request elements may contain '?' and '*' wildcards. The start and end elements can be a single '*' if not a date-time string. Return rows as list of tuples containing: (network,station,location,channel,quality,starttime,endtime,samplerate, filename,byteoffset,bytes,hash,timeindex,timespans,timerates, format,filemodtime,updated,scanned) ''' global verbose global dbconn index_rows = [] if not os.path.exists(sqlitefile): raise ValueError("Database not found: {0}".format(sqlitefile)) try: dbconn = sqlite3.connect(sqlitefile, 10.0) except Exception as err: raise ValueError(str(err)) cursor = dbconn.cursor() # Store temporary table(s) in memory try: cursor.execute("PRAGMA temp_store=MEMORY") except Exception as err: raise ValueError(str(err)) if len(request) > 0: index_rows = fetch_index_by_request(cursor, table, request, filename) elif filename: index_rows = fetch_index_by_filename(cursor, table, filename) else: print ("No criteria (request selections or filename), nothing to do.") # Print time series index rows if verbose >= 3: print ("TSINDEX:") for row in index_rows: print (" ", row) dbconn.close() dbconn = None return index_rows def fetch_index_by_request(cursor, table, request, filename): '''Fetch index rows matching specified request[] `cursor`: Database cursor `table`: Target database table `request`: List of tuples containing (net,sta,loc,chan,start,end) `filename`: Limit results to filename (or comma-separated list of) Request elements may contain '?' and '*' wildcards. The start and end elements can be a single '*' if not a date-time string. Return rows as list of tuples containing: (network,station,location,channel,quality,starttime,endtime,samplerate, filename,byteoffset,bytes,hash,timeindex,timespans,timerates, format,filemodtime,updated,scanned) ''' global verbose global maxsectiondays # Create temporary table and load request try: if verbose >= 2: print ("Creating temporary request table") cursor.execute("CREATE TEMPORARY TABLE request " "(network TEXT, station TEXT, location TEXT, channel TEXT, " "starttime TEXT, endtime TEXT) ") if verbose >= 2: print ("Populating temporary request table") for req in request: # Replace "--" location ID request alias with true empty value if req[2] == "--": req[2] = "" cursor.execute("INSERT INTO request (network,station,location,channel,starttime,endtime) " "VALUES (?,?,?,?,?,?) ", req) if verbose >= 2: print ("Request table populated") except Exception as err: raise ValueError(str(err)) # Print request table if verbose >= 3: cursor.execute ("SELECT * FROM request") rows = cursor.fetchall() print ("REQUEST:") for row in rows: print (" ", row) # Determine if any wildcards are used wildcards = False for req in request: for field in req: if '*' in field or '?' in field: wildcards = True break # Determine if summary table exists summary_table = "{0}_summary".format(table) cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' and name='%s'" % summary_table); summary_present = cursor.fetchone()[0] if wildcards: # Resolve wildcards using summary table if present to: # a) resolve wildcards, allows use of '=' operator and table index # b) reduce index table search to channels that are known included if summary_present: resolve_request(cursor, summary_table, "request") wildcards = False # Replace wildcarded starttime and endtime with extreme date-times else: cursor.execute("UPDATE request SET starttime='0000-00-00T00:00:00' WHERE starttime='*'") cursor.execute("UPDATE request SET endtime='5000-00-00T00:00:00' WHERE endtime='*'") # Fetch final results by joining request and index table try: if verbose >= 2: print ("Fetching index entries") # The index rows are selected by joining with the request for matching # network, station, location and channel entries with intersecting time ranges. # # The 'maxsectiondays' value is used to further constrain the search. This is # needed because the endtime cannot be used by the index scan as only a single # field may be compared with inequalities (i.e. greater-than, less-than). # This optimization limits discovery of index section entries (rows) to those # not spanning more than 'maxsectiondays'. statement = ("SELECT ts.network,ts.station,ts.location,ts.channel,ts.quality, " " ts.starttime,ts.endtime,ts.samplerate, " " ts.filename,ts.byteoffset,ts.bytes,ts.hash, " " ts.timeindex,ts.timespans,ts.timerates, " " ts.format,ts.filemodtime,ts.updated,ts.scanned " "FROM {0} ts, request r " "WHERE " " ts.network {1} r.network " " AND ts.station {1} r.station " " AND ts.location {1} r.location " " AND ts.channel {1} r.channel " " AND ts.starttime <= r.endtime " " AND (r.starttime < 1 OR ts.starttime >= datetime(r.starttime,'-{2} days')) " " AND ts.endtime >= r.starttime " .format(table, "GLOB" if wildcards else "=", maxsectiondays)) if filename: statement += (" AND ts.filename IN ({0}) ". format(','.join("'" + item + "'" for item in filename.split(',')))) if verbose >= 4: print ("STATEMENT:\n{0}".format(statement)) cursor.execute(statement) except Exception as err: raise ValueError(str(err)) # Map tuple to a named tuple for clear referencing NamedRow = namedtuple ('NamedRow', ['network','station','location','channel','quality', 'starttime','endtime','samplerate','filename', 'byteoffset','bytes','hash','timeindex','timespans', 'timerates','format','filemodtime','updated','scanned']) index_rows = [] while True: row = cursor.fetchone() if row is None: break #index_rows.append(row) index_rows.append(NamedRow(*row)) # Sort results in application (ORDER BY in SQL triggers bad index usage) index_rows.sort() cursor.execute("DROP TABLE request") return index_rows def resolve_request(cursor, summary_table, request_table): '''Resolve request table using summary `cursor`: Database cursor `summary_table`: summary table to resolve `request_table`: request table to resolve Resolve any '?' and '*' wildcards in the specified request table. The original table is renamed, rebuilt with a join to summary table and then original table is then removed. ''' global verbose request_table_orig = request_table + "_orig" if verbose >= 1: print ("Resolving request using summary") # Rename request table try: cursor.execute("ALTER TABLE {0} RENAME TO {1}".format(request_table, request_table_orig)) except Exception as err: raise ValueError(str(err)) # Create resolved request table by joining with summary try: cursor.execute("CREATE TEMPORARY TABLE {0} " "(network TEXT, station TEXT, location TEXT, channel TEXT, " "starttime TEXT, endtime TEXT) ".format(request_table)) if verbose >= 2: print ("Populating resolved request table") cursor.execute("INSERT INTO {0} (network,station,location,channel,starttime,endtime) " "SELECT s.network,s.station,s.location,s.channel," "CASE WHEN r.starttime='*' THEN s.earliest ELSE r.starttime END," "CASE WHEN r.endtime='*' THEN s.latest ELSE r.endtime END " "FROM {1} s, {2} r " "WHERE " " (r.starttime='*' OR r.starttime <= s.latest) " " AND (r.endtime='*' OR r.endtime >= s.earliest) " " AND (r.network='*' OR s.network GLOB r.network) " " AND (r.station='*' OR s.station GLOB r.station) " " AND (r.location='*' OR s.location GLOB r.location) " " AND (r.channel='*' OR s.channel GLOB r.channel) ". format(request_table, summary_table, request_table_orig)) except Exception as err: raise ValueError(str(err)) resolvedrows = cursor.execute("SELECT COUNT(*) FROM request").fetchone()[0] # Print resolved request table if verbose >= 3: cursor.execute ("SELECT * FROM request") rows = cursor.fetchall() print ("RESOLVED ({0} rows):".format(resolvedrows)) for row in rows: print (" ", row) cursor.execute("DROP TABLE {0}".format(request_table_orig)) return def fetch_index_by_filename(cursor, table, filename): '''Fetch index rows matching specified filename(s) `cursor`: Database cursor `filename`: Limit results to filename (or comma-separated list of) Request elements may contain '?' and '*' wildcards. The start and end elements can be a single '*' if not a date-time string. Return rows as list of tuples containing: (network,station,location,channel,quality,starttime,endtime,samplerate, filename,byteoffset,bytes,hash,timeindex,timespans,timerates, format,filemodtime,updated,scanned) ''' global verbose index_rows = [] # Fetch final results limiting to specified filename(s) try: if verbose >= 2: print ("Fetching index entries based on filename") statement = ("SELECT ts.network,ts.station,ts.location,ts.channel,ts.quality, " " ts.starttime,ts.endtime,ts.samplerate, " " ts.filename,ts.byteoffset,ts.bytes,ts.hash, " " ts.timeindex,ts.timespans,ts.timerates, " " ts.format,ts.filemodtime,ts.updated,ts.scanned " "FROM {0} ts " "WHERE " " ts.filename IN ({1}) ". format(table, ','.join("'" + item + "'" for item in filename.split(',')))) if verbose >= 4: print ("STATEMENT:\n{0}".format(statement)) cursor.execute(statement) except Exception as err: raise ValueError(str(err)) # Map tuple to a named tuple for clear referencing NamedRow = namedtuple ('NamedRow', ['network','station','location','channel','quality', 'starttime','endtime','samplerate','filename', 'byteoffset','bytes','hash','timeindex','timespans', 'timerates','format','filemodtime','updated','scanned']) index_rows = [] while True: row = cursor.fetchone() if row is None: break index_rows.append(NamedRow(*row)) # Sort results in application (ORDER BY in SQL triggers bad index usage) index_rows.sort() return index_rows def print_info(index_rows): '''Print index information given a list of named tuples containing: (0:network,1:station,2:location,3:channel,4:quality,5:starttime,6:endtime,7:samplerate, 8:filename,9:byteoffset,10:bytes,11:hash,12:timeindex,13:timespans,14:timerates, 15:format,16:filemodtime,17:updated,18:scanned) ''' for NRow in index_rows: print ("{0}:".format(NRow.filename)) print (" {0}.{1}.{2}.{3}.{4}, samplerate: {5}, timerange: {6} - {7}". format(NRow.network,NRow.station,NRow.location,NRow.channel,NRow.quality, NRow.samplerate,NRow.starttime,NRow.endtime)) print (" byteoffset: {0}, bytes: {1}, endoffset: {2}, hash: {3}". format(NRow.byteoffset,NRow.bytes,NRow.byteoffset + NRow.bytes, NRow.hash)) print (" filemodtime: {0}, updated: {1}, scanned: {2}". format(NRow.filemodtime, NRow.updated, NRow.scanned)) print ("Time index: (time => byteoffset)") for index in NRow.timeindex.split(','): (time,offset) = index.split('=>') # Convert epoch times to nicer format if re.match ("^[+-]?\d+(>?\.\d+)?$", time.strip()): time = timestamp_to_isoZ(time) print (" {0} => {1}".format(time,offset)) if NRow.timespans: # If per-span sample rates are present create a list of them rates = None if NRow.timerates: rates = NRow.timerates.split(',') # Print time spans either with or without per-span rates print ("Time spans:") for idx, span in enumerate(NRow.timespans.split(',')): (start,end) = span.lstrip('[').rstrip(']').split(':') if rates: print (" {0} - {1} ({2})".format(timestamp_to_isoZ(start), timestamp_to_isoZ(end), rates[idx])) else: print (" {0} - {1}".format(timestamp_to_isoZ(start), timestamp_to_isoZ(end))) return def print_sync(index_rows): '''Print a SYNC listing given a list of named tuples containing: (0:network,1:station,2:location,3:channel,4:quality,5:starttime,6:endtime,7:samplerate, 8:filename,9:byteoffset,10:bytes,11:hash,12:timeindex,13:timespans,14:timerates, 15:format,16:filemodtime,17:updated,18:scanned) ''' for NRow in index_rows: if NRow.timespans: # If per-span sample rates are present create a list of them rates = None if NRow.timerates: rates = NRow.timerates.split(',') # Print time spans either with or without per-span rates for idx, span in enumerate(NRow.timespans.split(',')): (start,end) = span.lstrip('[').rstrip(']').split(':') starttime = datetime.datetime.utcfromtimestamp(float(start)).strftime("%Y,%j,%H:%M:%S.%f") endtime = datetime.datetime.utcfromtimestamp(float(end)).strftime("%Y,%j,%H:%M:%S.%f") updated = datetime.datetime.strptime(NRow.updated, "%Y-%m-%dT%H:%M:%S").strftime("%Y,%j") if rates: print ("{0}|{1}|{2}|{3}|{4}|{5}||{6}||||{7}||NC|{8}||". format(NRow.network,NRow.station,NRow.location,NRow.channel, starttime,endtime, rates[idx], NRow.quality, updated)) else: print ("{0}|{1}|{2}|{3}|{4}|{5}||{6}||||{7}||NC|{8}||". format(NRow.network,NRow.station,NRow.location,NRow.channel, starttime,endtime, NRow.samplerate, NRow.quality, updated)) return def normalize_datetime(timestring): '''Normalize time string to strict YYYY-MM-DDThh:mm:ss.ffffff format ''' # Split Year,Month,Day Hour,Min,Second,Fractional seconds timepieces = re.split("[-.:T]+", timestring) timepieces = [int(i) for i in timepieces] # Rebuild into target format return datetime.datetime(*timepieces).strftime("%Y-%m-%dT%H:%M:%S.%f") def timestamp_to_isoZ(timestamp): '''Convert epoch timestamp to ISO8601 format with Z ''' return datetime.datetime.utcfromtimestamp(float(timestamp)).isoformat() + "Z" def usage(): '''Print usage message ''' global version print ("Fetch time series index information from SQLite database ({0})".format(version)) print () print ("Usage: {0} [options] sqlitefile".format(os.path.basename(sys.argv[0]))) print () print (" -h Print help message") print (" -l file Specify file containing list of selections") print (" -t table Specify time series index table, default: {0}".format(table)) print () print (" -N net Specify network code") print (" -S sta Specify station code") print (" -L loc Specify location IDs") print (" -C chan Specify channel codes") print (" -s start Specify start time in YYYY-MM-DDThh:mm:ss.ffffff format") print (" -s end Specify end time in YYYY-MM-DDThh:mm:ss.ffffff format") print (" -f file Limit results to specified filename") print () print (" --SYNC Print results as SYNC listing instead of default details") print () print ("The file containing a list of selections is expected to contain one more") print ("lines with a series of space-separated fields following this pattern:") print () print ("Net Sta Loc Chan StartTime EndTime") print () print ("where the Net, Sta, Loc and Chan may contain '?' and '*' wildcards and") print ("StartTime and EndTime are either 'YYYY-MM-DDThh:mm:ss.ffffff' or '*'") print () return def interrupt_handler(signum, frame): '''Interruption signal handler. When triggered, interrupt database connection ''' global dbconn if dbconn: print ("Termination requested (signal {0})".format(signum), file=sys.stderr) dbconn.interrupt() if __name__ == "__main__": signal.signal(signal.SIGINT, interrupt_handler) signal.signal(signal.SIGTERM, interrupt_handler) try: mainthread = threading.Thread(target=main) mainthread.start() except Exception as err: print (err) while mainthread.is_alive(): time.sleep(0.2)