import apache_beam as beam import datetime, os PROJECT='pme-cx' BUCKET='cloud-training-demos-ml_02' REGION='us-central1' def to_csv(rowdict): # Pull columns from BQ and create a line import hashlib import copy CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',') # Create synthetic data where we assume that no ultrasound has been performed # and so we don't know sex of the baby. Let's assume that we can tell the difference # between single and multiple, but that the errors rates in determining exact number # is difficult in the absence of an ultrasound. no_ultrasound = copy.deepcopy(rowdict) w_ultrasound = copy.deepcopy(rowdict) no_ultrasound['is_male'] = 'Unknown' if rowdict['plurality'] > 1: no_ultrasound['plurality'] = 'Multiple(2+)' else: no_ultrasound['plurality'] = 'Single(1)' # Change the plurality column to strings w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1] # Write out two rows for each input row, one with ultrasound and one without for result in [no_ultrasound, w_ultrasound]: data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS]) key = hashlib.sha224(data).hexdigest() # hash the columns to form a key yield str('{},{}'.format(data, key)) def preprocess(in_test_mode): import shutil, os, subprocess job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S') if in_test_mode: print('Launching local job ... hang on') OUTPUT_DIR = './preproc' shutil.rmtree(OUTPUT_DIR, ignore_errors=True) os.makedirs(OUTPUT_DIR) else: print('Launching Dataflow job {} ... hang on'.format(job_name)) OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET) try: subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split()) except: pass options = { 'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'), 'temp_location': os.path.join(OUTPUT_DIR, 'tmp'), 'job_name': job_name, 'project': PROJECT, 'teardown_policy': 'TEARDOWN_ALWAYS', 'no_save_main_session': True } opts = beam.pipeline.PipelineOptions(flags = [], **options) if in_test_mode: RUNNER = 'DirectRunner' else: RUNNER = 'DataflowRunner' p = beam.Pipeline(RUNNER, options = opts) query = """ SELECT weight_pounds, is_male, mother_age, plurality, gestation_weeks, ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth FROM publicdata.samples.natality WHERE year > 2000 AND weight_pounds > 0 AND mother_age > 0 AND plurality > 0 AND gestation_weeks > 0 AND month > 0 """ if in_test_mode: query = query + ' LIMIT 100' for step in ['train', 'eval']: if step == 'train': selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) < 3'.format(query) else: selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) = 3'.format(query) (p | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True)) | '{}_csv'.format(step) >> beam.FlatMap(to_csv) | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step)))) ) job = p.run() if in_test_mode: job.wait_until_finish() print("Done!") preprocess(in_test_mode = False)