name: Pandas Transform DataFrame in ApacheParquet format description: |- Transform DataFrame loaded from an ApacheParquet file. Inputs: table: DataFrame to transform. transform_code: Transformation code. Code is written in Python and can consist of multiple lines. The DataFrame variable is called "df". Examples: - `df['prod'] = df['X'] * df['Y']` - `df = df[['X', 'prod']]` - `df.insert(0, "is_positive", df["X"] > 0)` Outputs: transformed_table: Transformed DataFrame. Annotations: author: Alexey Volkov inputs: - {name: table, type: ApacheParquet} - {name: transform_code, type: PythonCode} outputs: - {name: transformed_table, type: ApacheParquet} implementation: container: image: python:3.7 command: - sh - -c - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'pandas==1.0.4' 'pyarrow==0.14.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'pandas==1.0.4' 'pyarrow==0.14.1' --user) && "$0" "$@" - python3 - -u - -c - | def _make_parent_dirs_and_return_path(file_path: str): import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return file_path def Pandas_Transform_DataFrame_in_ApacheParquet_format( table_path, transformed_table_path, transform_code, ): '''Transform DataFrame loaded from an ApacheParquet file. Inputs: table: DataFrame to transform. transform_code: Transformation code. Code is written in Python and can consist of multiple lines. The DataFrame variable is called "df". Examples: - `df['prod'] = df['X'] * df['Y']` - `df = df[['X', 'prod']]` - `df.insert(0, "is_positive", df["X"] > 0)` Outputs: transformed_table: Transformed DataFrame. Annotations: author: Alexey Volkov ''' import pandas df = pandas.read_parquet(table_path) # The namespace is needed so that the code can replace `df`. For example df = df[['X']] namespace = locals() exec(transform_code, namespace) namespace['df'].to_parquet(transformed_table_path) import argparse _parser = argparse.ArgumentParser(prog='Pandas Transform DataFrame in ApacheParquet format', description='Transform DataFrame loaded from an ApacheParquet file.\n\n Inputs:\n table: DataFrame to transform.\n transform_code: Transformation code. Code is written in Python and can consist of multiple lines.\n The DataFrame variable is called "df".\n Examples:\n - `df[\'prod\'] = df[\'X\'] * df[\'Y\']`\n - `df = df[[\'X\', \'prod\']]`\n - `df.insert(0, "is_positive", df["X"] > 0)`\n\n Outputs:\n transformed_table: Transformed DataFrame.\n\n Annotations:\n author: Alexey Volkov ') _parser.add_argument("--table", dest="table_path", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--transform-code", dest="transform_code", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--transformed-table", dest="transformed_table_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _outputs = Pandas_Transform_DataFrame_in_ApacheParquet_format(**_parsed_args) args: - --table - {inputPath: table} - --transform-code - {inputValue: transform_code} - --transformed-table - {outputPath: transformed_table}