name: ImportExampleGen description: |- TFX ImportExampleGen component. The ImportExampleGen component takes TFRecord files with TF Example data format, and generates train and eval examples for downsteam components. This component provides consistent and configurable partition, and it also shuffle the dataset for ML best practice. Args: input: A Channel of 'ExternalPath' type, which includes one artifact whose uri is an external directory with TFRecord files inside (required). input_config: An example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split. output_config: An example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. Returns: examples: Optional channel of 'ExamplesPath' for output train and eval examples. Raises: RuntimeError: Only one of query and input_config should be set. inputs: - {name: input_base, type: ExternalPath} - {name: input_config, type: 'JsonObject: example_gen_pb2.Input', optional: true} - {name: output_config, type: 'JsonObject: example_gen_pb2.Output', optional: true} outputs: - {name: examples, type: Examples} implementation: container: image: tensorflow/tfx:0.21.4 command: - python3 - -u - -c - | def _make_parent_dirs_and_return_path(file_path: str): import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return file_path def ImportExampleGen( input_base_path , #input_path: InputPath('ExternalPath'), examples_path , input_config = None, output_config = None, ): """ TFX ImportExampleGen component. The ImportExampleGen component takes TFRecord files with TF Example data format, and generates train and eval examples for downsteam components. This component provides consistent and configurable partition, and it also shuffle the dataset for ML best practice. Args: input: A Channel of 'ExternalPath' type, which includes one artifact whose uri is an external directory with TFRecord files inside (required). input_config: An example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split. output_config: An example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. Returns: examples: Optional channel of 'ExamplesPath' for output train and eval examples. Raises: RuntimeError: Only one of query and input_config should be set. """ from tfx.components.example_gen.import_example_gen.component import ImportExampleGen as component_class #Generated code import json import os import tensorflow from google.protobuf import json_format, message from tfx.types import Artifact, channel_utils, artifact_utils arguments = locals().copy() component_class_args = {} for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): argument_value_obj = argument_value = arguments.get(name, None) if argument_value is None: continue parameter_type = execution_parameter.type if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple argument_value_obj = parameter_type() json_format.Parse(argument_value, argument_value_obj) component_class_args[name] = argument_value_obj for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): artifact_path = arguments[name + '_path'] if artifact_path: artifact = channel_parameter.type() artifact.uri = artifact_path + '/' # ? if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES: # Recovering splits subdirs = tensorflow.io.gfile.listdir(artifact_path) artifact.split_names = artifact_utils.encode_split_names(sorted(subdirs)) component_class_args[name] = channel_utils.as_channel([artifact]) component_class_instance = component_class(**component_class_args) input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} exec_properties = component_class_instance.exec_properties # Generating paths for output artifacts for name, artifacts in output_dict.items(): base_artifact_path = arguments[name + '_path'] # Are there still cases where output channel has multiple artifacts? for idx, artifact in enumerate(artifacts): subdir = str(idx + 1) if idx > 0 else '' artifact.uri = os.path.join(base_artifact_path, subdir) # Ends with '/' print('component instance: ' + str(component_class_instance)) #executor = component_class.EXECUTOR_SPEC.executor_class() # Same executor = component_class_instance.executor_spec.executor_class() executor.Do( input_dict=input_dict, output_dict=output_dict, exec_properties=exec_properties, ) import argparse _parser = argparse.ArgumentParser(prog='ImportExampleGen', description="TFX ImportExampleGen component.\n\n The ImportExampleGen component takes TFRecord files with TF Example data\n format, and generates train and eval examples for downsteam components.\n This component provides consistent and configurable partition, and it also\n shuffle the dataset for ML best practice.\n\n Args:\n input: A Channel of 'ExternalPath' type, which includes one artifact\n whose uri is an external directory with TFRecord files inside\n (required).\n input_config: An example_gen_pb2.Input instance, providing input\n configuration. If unset, the files under input_base will be treated as a\n single split.\n output_config: An example_gen_pb2.Output instance, providing output\n configuration. If unset, default splits will be 'train' and 'eval' with\n size 2:1.\n Returns:\n examples: Optional channel of 'ExamplesPath' for output train and\n eval examples.\n\n Raises:\n RuntimeError: Only one of query and input_config should be set.") _parser.add_argument("--input-base", dest="input_base_path", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--input-config", dest="input_config", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--output-config", dest="output_config", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--examples", dest="examples_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) _outputs = ImportExampleGen(**_parsed_args) _output_serializers = [ ] import os for idx, output_file in enumerate(_output_files): try: os.makedirs(os.path.dirname(output_file)) except OSError: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) args: - --input-base - {inputPath: input_base} - if: cond: {isPresent: input_config} then: - --input-config - {inputValue: input_config} - if: cond: {isPresent: output_config} then: - --output-config - {inputValue: output_config} - --examples - {outputPath: examples}