#!/usr/bin/env python import os import re import csv import signal import sys import json import decimal import datetime import argparse try: from urlparse import urlparse except ImportError: from urllib.parse import urlparse try: from StringIO import StringIO except ImportError: from io import StringIO import subprocess TEXT = "text" INTEGER = "integer" NUMERIC = "numeric" DATE = "date" DATETIME = "timestamp" DATETIMETZ = "timestamptz" BIGINTEGER = "biginteger" JSON = "json" JSONB = "jsonb" EMPTY = None # special case, used as placeholder for null upgrades # ignore pipe errors using head, etc signal.signal(signal.SIGPIPE, signal.SIG_DFL) # without this, we may encounter fields larger than can be read csv.field_size_limit(sys.maxsize) parser = argparse.ArgumentParser() parser.add_argument("--file", "-f", help="csv file", required=True) parser.add_argument("--copy", "-y", help="issue a copy statement, for import", action="store_true", default=False) parser.add_argument("--backslash", "-s", help="when issuing copy, use \copy", action="store_true", default=False) parser.add_argument("--delimiter", "-d", help="csv delimiter", default=",") parser.add_argument("--quote", "-q", help="csv quote", default='"') parser.add_argument("--table", "-t", help="table name", default=None) parser.add_argument("--schema", help="schema name", default=None) parser.add_argument("--create-schema", help="create the named schema", action="store_true", default=False) parser.add_argument("--integer", "-i", help="integer columns", action="store_true", default=False) parser.add_argument("--numeric", "-n", help="numeric parsing", action="store_true", default=False) parser.add_argument("--timestamp", "-p", help="timestamp parsing", action="store_true", default=False) parser.add_argument("--date", help="date parsing", action="store_true", default=False) parser.add_argument("--tz", help="use timestamptz instead of timestamp", action="store_true", default=False) parser.add_argument("--mogrify", "-m", help="clean names", action="store_true", default=False) parser.add_argument("--lower", "-l", help="lowercase names", action="store_true", default=False) parser.add_argument("--drop", "-x", help="drop table first", action="store_true", default=False) parser.add_argument("--truncate", help="truncate table first", action="store_true", default=False) parser.add_argument("--skip-parsing", help="skip type parsing for cols") parser.add_argument("--parse-only", help="attempt type parsing only for cols") parser.add_argument("--big-integer", "-b", help="use bigint instead of int", default=False, action="store_true") parser.add_argument("--no-create", "-z", help="don't issue create table", default=False, action="store_true") parser.add_argument("--fix-duplicates", help=("handle duplicate column names by appending an " "increasing suffix"), default=False, action="store_true") parser.add_argument("--transaction", "-1", help="run in a transaction", default=False, action="store_true") parser.add_argument("--temporary", help="create a temporary table", default=False, action="store_true") parser.add_argument("--file-column", help=("add a column to the created table which will contain " "the name of the file the data was loaded from"), default=None) parser.add_argument("--redshift", help=("generate a copy command for redshift. takes an " "optional argument, a file to look for s3 " "credentials/bucket in. To try and read from the " "environment, specify "), default=None) parser.add_argument("--redshift-bucket", help=("when generating a copy command for redshift, using " "this option specifies the bucket to store the file " "in, if requested. This will override any bucket found " "by the --redshift option."), default=None) parser.add_argument("--redshift-upload", help="upload the specified file to redshift", action="store_true", default=False) parser.add_argument("--gzip", help="the file is in gzip format", action="store_true", default=False) parser.add_argument("--missing-headers", help="file is missing headers, make up column names", action="store_true", default=False) parser.add_argument("--cine", help="create if not exists when creating a table", action="store_true", default=False) parser.add_argument("--keep-going", help=("read the N rows instead of the " "first two rows, trading performance " "for better type detection"), type=int, metavar="N", default=None) parser.add_argument("--primary-key", help=("column(s) to make part of the " "primary key")) parser.add_argument("--serial-column", help=("add serial column of the given " "name")) parser.add_argument("--array", help="attempt to detect and create array columns", action="store_true", default=False) group = parser.add_mutually_exclusive_group() group.add_argument("--json", help="attempt to detect and create json columns", action="store_true", default=False) group.add_argument("--jsonb", help="attempt to detect and create jsonb columns", action="store_true", default=False) args = parser.parse_args() # check that our given file exists if not args.file or not os.path.exists(args.file): sys.stderr.write("file '{}' does not exist.\n".format(args.file)) sys.exit(1) # check that a schema was given if create schema was requested if args.create_schema and args.schema is None: sys.stderr.write("--create-schema given but --schema not provided.\n") sys.exit(1) # if --redshift-upload is specified, and --redshift is not set, force it to # "" if args.redshift_upload and not args.redshift: args.redshift = "" # --backslash is incompatible with --redshift if args.backslash and args.redshift: sys.stderr.write("--backslash is incompatible with --redshift*") sys.exit(1) # redshift doesn't like plain NUMERIC, need to give it some precision otherwise # it loses decimals places... if args.redshift: NUMERIC = "numeric(24, 8)" # we assume that for redshift, the default date format is 'YYYY-MM-DD'. # however, we may automatically detect a different one, so try to inform # redshift of our choices. redshift_date_format = "YYYY-MM-DD" redshift_time_format = "HH:MI:SS" redshift_datetime_format = "YYYY-MM-DD HH:MI:SS" # identifier quoting function def quote_ident(name): name = name.replace('"', '""') return '"' + name + '"' # escape helper def escape(val): return val.replace("'", "''") # name cleaner function empties = "" def clean_name(name): global empties name = name.strip() name = re.sub(r'[^a-zA-Z0-9_]', "_", name) name = re.sub(r'_+', "_", name) name = re.sub(r'_$', "", name) if args.lower: name = name.lower() if not name: empties += "_" name = empties + "" return name # construct a table name from the file name if args.table is None: path_parts = os.path.split(args.file) file_name = path_parts[1] pieces = file_name.split(".") if len(pieces) > 1: if pieces[-1] == "gz": pieces.pop() pieces = pieces[:-1] table_name = ".".join(pieces) if args.mogrify: table_name = clean_name(table_name) else: table_name = args.table # determine any columns we should not type parse skip_type_parsing = [] if args.skip_parsing: cols = args.skip_parsing.split(",") skip_type_parsing = [c.strip() for c in cols] # determine any columns which we should only type parse parse_only_cols = None if args.parse_only: cols = args.parse_only.split(",") parse_only_cols = [c.strip() for c in cols] # get a handle on things if args.gzip: process = subprocess.Popen( ["zcat", args.file], stdout=subprocess.PIPE, universal_newlines=True, ) fp = process.stdout else: fp = open(args.file, "rU") # attempt to deserialize an array def parse_array(value): if not (value.startswith("{") and value.endswith("}")): raise ValueError("not an array") value = re.sub(r'^[{](.*)[}]$', r'\1', value) array_fp = StringIO(value) array_reader = csv.reader(array_fp) try: array = next(array_reader) except StopIteration: raise ValueError("not an array") return array # try and figure out the type of the given value def get_type(value, args): global redshift_date_format global redshift_time_format global redshift_datetime_format # date formats will be crossed with time formats date_formats = [ "%Y-%m-%d", "%Y%m%d", "%m/%d/%Y", "%b %d, %Y", "%m/%d/%y", ] time_formats = [ "%H:%M:%S", "%H:%M:%S %z", "%H:%M:%S %Z", "%I:%M:%S %p", "%I:%M:%S %P", "%H:%M:%S.%f", "%H:%M:%S.%f %z", "%H:%M:%S.%f %Z", "%I:%M:%S.%f %p", "%I:%M:%S.%f %P", "%H:%M", ] redshift_date_formats = { "%Y-%m-%d": "YYYY-MM-DD", "%Y%m%d": "YYYYMMDD", "%m/%d/%Y": "MM/DD/YYYY", "%b %d, %Y": "MON DD, YYYY", "%m/%d/%y": "MM/DD/YY", } redshift_time_formats = { "%H:%M:%S": "HH:MM:SS", "%H:%M:%S %z": "HH:MM:SS", # XXX "%H:%M:%S %Z": "HH:MM:SS", # XXX "%I:%M:%S %p": "HH12:MM:SS AM", "%I:%M:%S %P": "HH12:MM:SS AM", "%H:%M:%S.%f": "HH:MM:SS", "%H:%M:%S.%f %z": "HH:MM:SS", # XXX "%H:%M:%S.%f %Z": "HH:MM:SS", # XXX "%I:%M:%S.%f %p": "HH12:MM:SS AM", "%I:%M:%S.%f %P": "HH12:MM:SS AM", "%H:%M": "HH:MM", } redshift_datetime_formats = {} datetime_formats = [] for date_format in date_formats: datetime_formats.append(date_format) for time_format in time_formats: datetime_formats.append(date_format + " " + time_format) datetime_formats.append(date_format + "T" + time_format) redshift_datetime_formats[date_format] = redshift_date_formats[date_format] redshift_datetime_formats[date_format + " " + time_format] = ( redshift_date_formats[date_format] + " " + redshift_time_formats[time_format] ) if args.array: try: values = parse_array(value) types = [] for v in values: t, _ = get_type(v, args) types.append(t) if len(list(set(types))) == 1: return types[0], True except ValueError: pass else: pass # first the date if args.date: for date_format in date_formats: try: datetime.datetime.strptime(value, date_format) if not (len(value) != 8 and date_format == "%Y%m%d"): # hacks, but we'll sort of assume everything is in the same # format redshift_date_format = redshift_date_formats[date_format] return DATE, False except ValueError: pass # then timestamp if args.timestamp: for timestamp_format in datetime_formats: try: datetime.datetime.strptime(value, timestamp_format) if not (len(value) != 8 and timestamp_format == "%Y%m%d"): # hacks, but we'll sort of assume everything is in the same # format redshift_time_format = redshift_datetime_formats[timestamp_format] if args.tz: return DATETIMETZ, False else: return DATETIME, False except ValueError: pass # then integers if args.integer: try: int(value) if args.big_integer: return BIGINTEGER, False else: return INTEGER, False except ValueError: pass # then numeric if args.numeric: try: decimal.Decimal(value) return NUMERIC, False except decimal.InvalidOperation: pass # json/jsonb if args.json or args.jsonb: # yes, techinically more values are accepted. but then we'll try to # convert any number to json, and that's probably not what we want possibly_json = ( (value.startswith("{") and value.endswith("}")) or (value.startswith("[") and value.endswith("]")) ) if possibly_json: try: json.loads(value) if args.json: return JSON, False elif args.jsonb: return JSONB, False else: raise Exception("Implementation Error") except ValueError: pass # we got nothing return TEXT, False # some help to pass tabs delimiter = args.delimiter if delimiter in ("\\t", "\\\\t", "tab", "TAB"): delimiter = "\t" # start processing the file first = True header_mapping = {} reader = csv.DictReader(fp, delimiter=delimiter, quotechar=args.quote) # figure out the column name and types seen_column_names = {} # try to strip out the unicode bom, thanks new excel def strip_bom(field): if field[:1] == "\ufeff": field = field[1:] try: field_bytes = bytes(field) except TypeError as tex: if "string argument without an encoding" not in str(tex): raise else: return field if field_bytes[:3] == b"\xef\xbb\xbf": return field[3:] return field # if a file name column was requested, pre-seed that name here if args.file_column is not None: seen_column_names[args.file_column] = 0 columns = [] for record_idx, record in enumerate(reader): fields = reader.fieldnames if not header_mapping: if args.mogrify: for field in fields: header_mapping[field] = clean_name(strip_bom(field)) else: # strip quotes to work with less chance of failure. header_mapping = {f: strip_bom(f).replace('"', "") for f in fields} # by default, every column will be unset if not columns: columns = [None] * len(fields) # build a table with some hopefully reasonable types for i, field in enumerate(fields): value = record[field] # typed columns (with maybe nice names) if field in skip_type_parsing \ or header_mapping[field] in skip_type_parsing: if not value: column_type, is_array = EMPTY, False else: column_type, is_array = TEXT, False else: if not value: column_type, is_array = EMPTY, False else: column_type, is_array = get_type(value, args) # but un-type the given columns if parse only was requested if parse_only_cols is not None: if field not in parse_only_cols: if not value: column_type, is_array = EMPTY, False else: column_type, is_array = TEXT, False if args.missing_headers: column_name = "col_{}".format(i) else: column_name = header_mapping[field] if args.fix_duplicates: if column_name in seen_column_names: original_name = column_name suffix = seen_column_names[original_name] + 1 column_name = "{}_{}".format(original_name, suffix) seen_column_names[original_name] = suffix else: seen_column_names[column_name] = 0 # not seen, use what we've got if columns[i] is None: columns[i] = (column_name, column_type, is_array) # column seen previously, figure out if the type needs to be adjusted else: # certain types will be automatically "broadened" upgrades = { INTEGER: [NUMERIC], BIGINTEGER: [NUMERIC], DATE: [DATETIME, DATETIMETZ], } prior_type = columns[i][1] prior_is_array = columns[i][2] # mis-matching array determination means we're just gonna use text if prior_is_array != is_array: columns[i] = (column_name, TEXT, False) elif prior_type != column_type: # empty is null for any type, if the previous type was empty, # upgrade to anything if prior_type == EMPTY: columns[i] = (column_name, column_type, is_array) # if current type is empty, don't change whichever type we've # already guessed elif column_type == EMPTY: pass # see if the type can be broadened elif prior_type in upgrades and column_type in upgrades[prior_type]: columns[i] = (column_name, column_type, is_array) # see if the type is broad, to avoid narrowing elif column_type in upgrades and prior_type in upgrades[column_type]: pass # otherwise, force to text else: columns[i] = (column_name, TEXT, is_array) # enough of this boring crap if not args.keep_going or args.keep_going < record_idx: break # if we're using gzip, kill the process we started if args.gzip: process.kill() # if redshift was requested, attempt to figure out our credentials and bucket. # note that we'll potentially need access to the credentials below, so define # some variables up here aws_access_key_id = None aws_secret_access_key = None s3_path = None if args.redshift: def read_redshift_file(path): aws_access_key_id = None aws_secret_access_key = None s3_bucket = None s3_transfer_bucket = None if os.path.exists(path): # open and "parse" file. because this file can actually contain # no headers, we can't really use ConfigParser with open(path, "r") as rfp: for line in rfp.readlines(): if line.startswith("#"): continue id_match = re.search(r'(AWSAccessKeyId|aws_access_key_id|access_key|s3.account_id)\s*=\s*(.+)$', line) if id_match: aws_access_key_id = id_match.groups()[1].strip() key_match = re.search(r'(AWSSecretKey|aws_secret_access_key|secret_key|s3.private_key)\s*=\s*(.+)$', line) if key_match: aws_secret_access_key = key_match.groups()[1].strip() bucket_match = re.search(r's3.bucket\s*=\s*(.+)$', line) if bucket_match: s3_bucket = bucket_match.groups()[0].strip() transfer_match = re.search(r's3.transfer_bucket\s*=\s*(.+)$', line) if transfer_match: s3_transfer_bucket = transfer_match.groups()[0].strip() return aws_access_key_id, aws_secret_access_key, (s3_transfer_bucket or s3_bucket) if args.redshift == "": aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") s3_bucket = os.getenv("S3_BUCKET") # maybe we didn't find a key, look for a credentials specified in the # env if aws_access_key_id is None: credentials_path = os.getenv("AWS_CREDENTIAL_FILE") maybe_key_id, maybe_key, maybe_bucket = read_redshift_file(credentials_path) if aws_access_key_id is None: aws_access_key_id = maybe_key_id if aws_secret_access_key is None: aws_secret_access_key = maybe_key if s3_bucket is None: s3_bucket = maybe_bucket else: aws_access_key_id, aws_secret_access_key, s3_bucket = read_redshift_file(args.redshift) if args.redshift_bucket: s3_bucket = args.redshift_bucket # we need to have access credentials and a bucket, abort if not found if aws_access_key_id is None: sys.stderr.write("unable to determine aws_access_key_id") sys.exit(1) if aws_secret_access_key is None: sys.stderr.write("unable to determine aws_secret_access_key") sys.exit(1) if s3_bucket is None: sys.stderr.write("unable to determine s3_bucket") sys.exit(1) # now, build a url from our components. we will then parse this url to get # our bucket and key name components, because the bucket may have been # specified with a prefix s3_path = "s3://" + os.path.join(s3_bucket, os.path.basename(args.file)) parsed = urlparse(s3_path) real_bucket = parsed.netloc real_key = parsed.path.lstrip("/") # if redshift upload was requested, attempt to upload the file if args.redshift_upload: # we import botocore here so it's not always required import botocore.session bs = botocore.session.get_session() s3_client = bs.create_client( service_name="s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) with open(args.file, "rb") as upload_fp: s3_client.put_object( Bucket=real_bucket, Key=real_key, Body=upload_fp, ) # generate our name clause name_clause = "" if args.schema: name_clause += quote_ident(args.schema) + "." name_clause += quote_ident(table_name) # generate our columns clause for table create columns_fragments = [] for column_name, column_type, is_array in columns: if column_type == EMPTY: column_type = TEXT column_fragment = " {} {}".format(quote_ident(column_name), column_type) if is_array: column_fragment += "[]" columns_fragments.append(column_fragment) # if we requested a column containing the filename, add that in if args.file_column is not None: file_column_frag = (" {} text default '{}'" .format(quote_ident(args.file_column), escape(args.file))) columns_fragments = [file_column_frag] + columns_fragments columns_clause = ",\n".join(columns_fragments) # generate the target clause to emit with the copy statement targets_clause = ", ".join([quote_ident(c[0]) for c in columns]) # maybe print a begin if args.transaction: print("begin;") # maybe print our a schema create if args.create_schema: print("create schema {};".format(quote_ident(args.schema))) # maybe print out a drop if args.drop and not args.no_create: # redshift doesn't know about if exists, sigh if args.redshift: print("drop table {};".format(name_clause)) else: print("drop table if exists {};".format(name_clause)) # print out our table create statement if not args.no_create: table_clause = "create table" if args.temporary: table_clause = "create temporary table" if args.cine: table_clause += " if not exists" print("{} {} (".format(table_clause, name_clause)) if args.serial_column: print(' {} serial,'.format(quote_ident(args.serial_column))) print(columns_clause + ("," if args.primary_key else "")) if args.primary_key: pkey = [c.strip() for c in args.primary_key.split(",") if c.strip()] pkey_clause = [] if args.serial_column in pkey: pkey_clause.append(quote_ident(args.serial_column)) for field in fields: if field in pkey or header_mapping[field] in pkey: col = header_mapping[field] pkey_clause.append(quote_ident(col)) pkey_clause = " primary key (" + ", ".join(pkey_clause) + ")" print(pkey_clause) print(");") # print out our truncate statement if args.truncate: print("truncate table {};".format(name_clause)) # print out our copy statement if args.copy: # generate our options clause if args.redshift: options = ("'{}' with credentials 'aws_access_key_id={};aws_secret_access_key={}' " "csv ignoreheader 1 delimiter '{}' quote '{}' dateformat " "'{}' timeformat '{}'" .format(escape(s3_path), escape(aws_access_key_id), escape(aws_secret_access_key), escape(delimiter), escape(args.quote), redshift_date_format, redshift_datetime_format)) if args.gzip: options += " gzip" else: if args.gzip: start = "program 'gunzip < {}'".format(escape(args.file)) else: start = "'{}'".format(escape(args.file)) options = start + (" with csv header delimiter '{}' quote '{}'" .format(escape(delimiter), escape(args.quote))) copy = "copy" if args.backslash: copy = r'\copy' print("{} {}({}) from {};".format(copy, name_clause, targets_clause, options)) # maybe print out a commit if args.transaction: print("commit;")