name: BigQueryExampleGen description: |- Official TFX BigQueryExampleGen component. The BigQuery examplegen component takes a query, and generates train and eval examples for downsteam components. Args: query: BigQuery sql string, query result will be treated as a single split, can be overwritten by input_config. input_config: An example_gen_pb2.Input instance with Split.pattern as BigQuery sql string. If set, it overwrites the 'query' arg, and allows different queries per split. output_config: An example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. Returns: examples: Optional channel of 'ExamplesPath' for output train and eval examples. Raises: RuntimeError: Only one of query and input_config should be set. inputs: - {name: query, type: String, optional: true} - name: input_config type: JsonObject: {data_type: 'proto:tfx.components.example_gen.Input'} optional: true - name: output_config type: JsonObject: {data_type: 'proto:tfx.components.example_gen.Output'} optional: true - name: custom_config type: JsonObject: {data_type: 'proto:tfx.components.example_gen.CustomConfig'} optional: true outputs: - {name: examples, type: Examples} implementation: container: image: tensorflow/tfx:0.21.4 command: - python3 - -u - -c - | def _make_parent_dirs_and_return_path(file_path: str): import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return file_path def BigQueryExampleGen( examples_path , query = None, input_config = None, output_config = None, custom_config = None, ): """ Official TFX BigQueryExampleGen component. The BigQuery examplegen component takes a query, and generates train and eval examples for downsteam components. Args: query: BigQuery sql string, query result will be treated as a single split, can be overwritten by input_config. input_config: An example_gen_pb2.Input instance with Split.pattern as BigQuery sql string. If set, it overwrites the 'query' arg, and allows different queries per split. output_config: An example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. Returns: examples: Optional channel of 'ExamplesPath' for output train and eval examples. Raises: RuntimeError: Only one of query and input_config should be set. """ from tfx.components.example_gen.csv_example_gen.component import BigQueryExampleGen as component_class #Generated code import json import os import tensorflow from google.protobuf import json_format, message from tfx.types import Artifact, channel_utils, artifact_utils arguments = locals().copy() component_class_args = {} for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): argument_value_obj = argument_value = arguments.get(name, None) if argument_value is None: continue parameter_type = execution_parameter.type if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple argument_value_obj = parameter_type() json_format.Parse(argument_value, argument_value_obj) component_class_args[name] = argument_value_obj for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): artifact_path = arguments[name + '_path'] if artifact_path: artifact = channel_parameter.type() artifact.uri = artifact_path + '/' # ? if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES: # Recovering splits subdirs = tensorflow.io.gfile.listdir(artifact_path) artifact.split_names = artifact_utils.encode_split_names(sorted(subdirs)) component_class_args[name] = channel_utils.as_channel([artifact]) component_class_instance = component_class(**component_class_args) input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} exec_properties = component_class_instance.exec_properties # Generating paths for output artifacts for name, artifacts in output_dict.items(): base_artifact_path = arguments[name + '_path'] # Are there still cases where output channel has multiple artifacts? for idx, artifact in enumerate(artifacts): subdir = str(idx + 1) if idx > 0 else '' artifact.uri = os.path.join(base_artifact_path, subdir) # Ends with '/' print('component instance: ' + str(component_class_instance)) #executor = component_class.EXECUTOR_SPEC.executor_class() # Same executor = component_class_instance.executor_spec.executor_class() executor.Do( input_dict=input_dict, output_dict=output_dict, exec_properties=exec_properties, ) import argparse _parser = argparse.ArgumentParser(prog='BigQueryExampleGen', description="Official TFX BigQueryExampleGen component.\n\n The BigQuery examplegen component takes a query, and generates train\n and eval examples for downsteam components.\n\n\n Args:\n query: BigQuery sql string, query result will be treated as a single\n split, can be overwritten by input_config.\n input_config: An example_gen_pb2.Input instance with Split.pattern as\n BigQuery sql string. If set, it overwrites the 'query' arg, and allows\n different queries per split.\n output_config: An example_gen_pb2.Output instance, providing output\n configuration. If unset, default splits will be 'train' and 'eval' with\n size 2:1.\n Returns:\n examples: Optional channel of 'ExamplesPath' for output train and\n eval examples.\n\n Raises:\n RuntimeError: Only one of query and input_config should be set.") _parser.add_argument("--query", dest="query", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--input-config", dest="input_config", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--output-config", dest="output_config", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--custom-config", dest="custom_config", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--examples", dest="examples_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) _outputs = BigQueryExampleGen(**_parsed_args) _output_serializers = [ ] import os for idx, output_file in enumerate(_output_files): try: os.makedirs(os.path.dirname(output_file)) except OSError: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) args: - if: cond: {isPresent: query} then: - --query - {inputValue: query} - if: cond: {isPresent: input_config} then: - --input-config - {inputValue: input_config} - if: cond: {isPresent: output_config} then: - --output-config - {inputValue: output_config} - if: cond: {isPresent: custom_config} then: - --custom-config - {inputValue: custom_config} - --examples - {outputPath: examples}