diff --git a/example/taxi_example_local.py b/example/taxi_example_local.py index 6f20060..768de7e 100644 --- a/example/taxi_example_local.py +++ b/example/taxi_example_local.py @@ -26,12 +26,12 @@ import urllib import os from typing import Text -import tfx import absl +import tfx from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen from tfx.orchestration import metadata from tfx.orchestration import pipeline -from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner +from tfx.orchestration.local import local_dag_runner sys.path.append(".") from schemacomponent.component import component @@ -66,7 +66,7 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, # modifies infered schema with use of udf `schema_fn` defined in module file schema_curation = component.SchemaCuration(schema=schema_gen.outputs['schema'], - module_file='schemacomponent\example\module_file.py') + module_file=os.path.join('schemacomponent','example','module_file.py') return pipeline.Pipeline( @@ -82,9 +82,9 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, # $python taxi_pipeline_hello.py if __name__ == '__main__': absl.logging.set_verbosity(absl.logging.INFO) - BeamDagRunner().run( + local_dag_runner.LocalDagRunner().run( _create_pipeline( pipeline_name=_pipeline_name, pipeline_root=_pipeline_root, data_root=_data_root, - metadata_path=_metadata_path)) \ No newline at end of file + metadata_path=_metadata_path))