From 10c562cb2bc98b4a1da110e6eeb78c722c998abb Mon Sep 17 00:00:00 2001 From: Nirzari gupta Date: Fri, 9 Jul 2021 18:49:45 +0530 Subject: [PATCH 1/5] Updated to LocalDagRunner --- example/taxi_example_local.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/example/taxi_example_local.py b/example/taxi_example_local.py index 6f20060..65cb489 100644 --- a/example/taxi_example_local.py +++ b/example/taxi_example_local.py @@ -26,12 +26,11 @@ import urllib import os from typing import Text -import tfx import absl from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen from tfx.orchestration import metadata from tfx.orchestration import pipeline -from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner +from tfx.orchestration.local import local_dag_runner sys.path.append(".") from schemacomponent.component import component @@ -45,7 +44,7 @@ urllib.request.urlretrieve(DATA_PATH, _data_filepath) _pipeline_name = 'taxi_pipeline' -_tfx_root = tfx.__path__[0] +_tfx_root = "schemacomponent" _pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name) _metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, 'metadata.db') @@ -73,7 +72,7 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=[example_gen, statistics_gen, schema_gen, schema_curation], - enable_cache=True, + enable_cache=False, metadata_connection_config=metadata.sqlite_metadata_connection_config( metadata_path)) @@ -82,7 +81,7 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, # $python taxi_pipeline_hello.py if __name__ == '__main__': absl.logging.set_verbosity(absl.logging.INFO) - BeamDagRunner().run( + local_dag_runner.LocalDagRunner().run( _create_pipeline( pipeline_name=_pipeline_name, pipeline_root=_pipeline_root, From 2ef400b058523242b9472c81865375b41027da6d Mon Sep 17 00:00:00 2001 From: Nirzari gupta Date: Fri, 9 Jul 2021 18:53:18 +0530 Subject: [PATCH 2/5] updated LocalDagRunner --- example/taxi_example_local.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/example/taxi_example_local.py b/example/taxi_example_local.py index 65cb489..7df29a3 100644 --- a/example/taxi_example_local.py +++ b/example/taxi_example_local.py @@ -43,7 +43,7 @@ _data_filepath = os.path.join(_data_root, "data.csv") urllib.request.urlretrieve(DATA_PATH, _data_filepath) -_pipeline_name = 'taxi_pipeline' +_pipeline_name = tfx.__path__[0] _tfx_root = "schemacomponent" _pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name) _metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, @@ -72,7 +72,7 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=[example_gen, statistics_gen, schema_gen, schema_curation], - enable_cache=False, + enable_cache=True, metadata_connection_config=metadata.sqlite_metadata_connection_config( metadata_path)) @@ -86,4 +86,4 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, pipeline_name=_pipeline_name, pipeline_root=_pipeline_root, data_root=_data_root, - metadata_path=_metadata_path)) \ No newline at end of file + metadata_path=_metadata_path)) From 882ac26e73fb1cd1deb5535a57d14293546d2dfa Mon Sep 17 00:00:00 2001 From: Nirzari gupta Date: Fri, 9 Jul 2021 18:55:56 +0530 Subject: [PATCH 3/5] Update taxi_example_local.py --- example/taxi_example_local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example/taxi_example_local.py b/example/taxi_example_local.py index 7df29a3..67edc61 100644 --- a/example/taxi_example_local.py +++ b/example/taxi_example_local.py @@ -43,8 +43,8 @@ _data_filepath = os.path.join(_data_root, "data.csv") urllib.request.urlretrieve(DATA_PATH, _data_filepath) -_pipeline_name = tfx.__path__[0] -_tfx_root = "schemacomponent" +_pipeline_name = 'taxi_pipeline' +_tfx_root = tfx.__path__[0] _pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name) _metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, 'metadata.db') From ccc57d2f89439e78a3b15920a2bdafd831dcbd4a Mon Sep 17 00:00:00 2001 From: Nirzari gupta Date: Fri, 9 Jul 2021 21:50:47 +0530 Subject: [PATCH 4/5] Update --- example/taxi_example_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/taxi_example_local.py b/example/taxi_example_local.py index 67edc61..ab59a0e 100644 --- a/example/taxi_example_local.py +++ b/example/taxi_example_local.py @@ -65,7 +65,7 @@ def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, # modifies infered schema with use of udf `schema_fn` defined in module file schema_curation = component.SchemaCuration(schema=schema_gen.outputs['schema'], - module_file='schemacomponent\example\module_file.py') + module_file=os.path.join('schemacomponent','example','module_file.py') return pipeline.Pipeline( From 4245af33f3ee2d1cf5fd810f69ef0067ea3d9035 Mon Sep 17 00:00:00 2001 From: Nirzari gupta Date: Fri, 9 Jul 2021 21:54:37 +0530 Subject: [PATCH 5/5] Updates --- example/taxi_example_local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/example/taxi_example_local.py b/example/taxi_example_local.py index ab59a0e..768de7e 100644 --- a/example/taxi_example_local.py +++ b/example/taxi_example_local.py @@ -27,6 +27,7 @@ import os from typing import Text import absl +import tfx from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen from tfx.orchestration import metadata from tfx.orchestration import pipeline