Skip to content

Commit

Permalink
Uses ImportSchemaGen in taxi and penguin templates.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 402739430
  • Loading branch information
jiyongjung authored and tfx-copybara committed Oct 13, 2021
1 parent f314cc7 commit dcf03fa
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 133 deletions.
74 changes: 18 additions & 56 deletions tfx/experimental/templates/penguin/e2e_tests/local_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,13 @@

@unittest.skipIf(tf.__version__ < '2',
'Uses keras Model only compatible with TF 2.x')
class PenguinTemplateLocalEndToEndTest(test_utils.BaseEndToEndTest):
class PenguinTemplateLocalEndToEndTest(test_utils.BaseLocalEndToEndTest):
"""This test runs all components in the template."""

def setUp(self):
super().setUp()
self._pipeline_name = 'PENGUIN_TEMPLATE_E2E_TEST'

def _getAllUnitTests(self):
for root, _, files in os.walk(self._project_dir):
base_dir = os.path.relpath(root, self._project_dir)
if base_dir == '.': # project_dir == root
base_module = ''
else:
base_module = base_dir.replace(os.path.sep, '.') + '.'

for filename in files:
if filename.endswith('_test.py'):
yield base_module + filename[:-3]

def testGeneratedUnitTests(self):
self._copyTemplate('penguin')
for m in self._getAllUnitTests():
Expand All @@ -56,59 +44,33 @@ def testLocalPipeline(self):
self._copyTemplate('penguin')
os.environ['LOCAL_HOME'] = os.path.join(self._temp_dir, 'local')

# Create a pipeline with only initial components.
result = self._runCli([
'pipeline',
'create',
'--engine',
'local',
'--pipeline_path',
'local_runner.py',
])
self.assertIn(
'Pipeline "{}" created successfully.'.format(self._pipeline_name),
result)

# Run the pipeline.
self._runCli([
'run',
'create',
'--engine',
'local',
'--pipeline_name',
self._pipeline_name,
])
# Create a pipeline with only one component.
self._create_pipeline()
self._run_pipeline()

self._copy_schema()

# Update the pipeline to include all components.
updated_pipeline_file = self._addAllComponents()
logging.info('Updated %s to add all components to the pipeline.',
updated_pipeline_file)

# Update the pipeline to use ImportSchemaGen
self._uncomment('local_runner.py', ['schema_path=generated_schema_path'])
self._replaceFileContent(
'local_runner.py',
[('schema_path=generated_schema_path', 'schema_path=\'schema.pbtxt\'')])

# Lowers required threshold to make tests stable.
self._replaceFileContent(
os.path.join('pipeline', 'configs.py'), [
('EVAL_ACCURACY_THRESHOLD = 0.6', 'EVAL_ACCURACY_THRESHOLD = 0.1'),
])
result = self._runCli([
'pipeline',
'update',
'--engine',
'local',
'--pipeline_path',
'local_runner.py',
])
self.assertIn(
'Pipeline "{}" updated successfully.'.format(self._pipeline_name),
result)

# Run the updated pipeline.
self._runCli([
'run',
'create',
'--engine',
'local',
'--pipeline_name',
self._pipeline_name,
])

logging.info(
'Updated pipeline to add all components and use user provided schema.')
self._update_pipeline()
self._run_pipeline()


if __name__ == '__main__':
Expand Down
2 changes: 2 additions & 0 deletions tfx/experimental/templates/penguin/kubeflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def run():
data_path=DATA_PATH,
# NOTE: Use `query` instead of `data_path` to use BigQueryExampleGen.
# query=configs.BIG_QUERY_QUERY,
# NOTE: Set the path of the customized schema if any.
# schema_path=generated_schema_path,
preprocessing_fn=configs.PREPROCESSING_FN,
run_fn=configs.RUN_FN,
train_args=tfx.proto.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
Expand Down
2 changes: 2 additions & 0 deletions tfx/experimental/templates/penguin/local_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def run():
data_path=DATA_PATH,
# NOTE: Use `query` instead of `data_path` to use BigQueryExampleGen.
# query=configs.BIG_QUERY_QUERY,
# NOTE: Set the path of the customized schema if any.
# schema_path=generated_schema_path,
preprocessing_fn=configs.PREPROCESSING_FN,
run_fn=configs.RUN_FN,
train_args=tfx.proto.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
Expand Down
26 changes: 16 additions & 10 deletions tfx/experimental/templates/penguin/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def create_pipeline(
eval_args: tfx.proto.EvalArgs,
eval_accuracy_threshold: float,
serving_model_dir: str,
schema_path: Optional[str] = None,
metadata_connection_config: Optional[
metadata_store_pb2.ConnectionConfig] = None,
beam_pipeline_args: Optional[List[str]] = None,
Expand All @@ -53,16 +54,21 @@ def create_pipeline(
examples=example_gen.outputs['examples'])
components.append(statistics_gen)

# Generates schema based on statistics files.
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
components.append(schema_gen)

# Performs anomaly detection based on statistics and data schema.
example_validator = tfx.components.ExampleValidator( # pylint: disable=unused-variable
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
components.append(example_validator)
if schema_path is None:
# Generates schema based on statistics files.
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'])
components.append(schema_gen)
else:
# Import user provided schema into the pipeline.
schema_gen = tfx.components.ImportSchemaGen(schema_file=schema_path)
components.append(schema_gen)

# Performs anomaly detection based on statistics and data schema.
example_validator = tfx.components.ExampleValidator( # pylint: disable=unused-variable
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
components.append(example_validator)

# Performs transformations and feature engineering in training and serving.
transform = tfx.components.Transform( # pylint: disable=unused-variable
Expand Down
69 changes: 15 additions & 54 deletions tfx/experimental/templates/taxi/e2e_tests/local_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,12 @@

@unittest.skipIf(tf.__version__ < '2',
'Uses keras Model only compatible with TF 2.x')
class TaxiTemplateLocalEndToEndTest(test_utils.BaseEndToEndTest):
class TaxiTemplateLocalEndToEndTest(test_utils.BaseLocalEndToEndTest):
"""This test covers step 1~6 of the accompanying document[1] for taxi template.
[1]https://github.com/tensorflow/tfx/blob/master/docs/tutorials/tfx/template.ipynb
"""

def _getAllUnitTests(self):
for root, _, files in os.walk(self._project_dir):
base_dir = os.path.relpath(root, self._project_dir)
if base_dir == '.': # project_dir == root
base_module = ''
else:
base_module = base_dir.replace(os.path.sep, '.') + '.'

for filename in files:
if filename.endswith('_test.py'):
yield base_module + filename[:-3]

def testGeneratedUnitTests(self):
self._copyTemplate('taxi')
for m in self._getAllUnitTests():
Expand All @@ -56,53 +44,26 @@ def testLocalPipeline(self):
os.environ['LOCAL_HOME'] = os.path.join(self._temp_dir, 'local')

# Create a pipeline with only one component.
result = self._runCli([
'pipeline',
'create',
'--engine',
'local',
'--pipeline_path',
'local_runner.py',
])
self.assertIn(
'Pipeline "{}" created successfully.'.format(self._pipeline_name),
result)

# Run the pipeline.
self._runCli([
'run',
'create',
'--engine',
'local',
'--pipeline_name',
self._pipeline_name,
])
self._create_pipeline()
self._run_pipeline()

# Update the pipeline to include all components.
updated_pipeline_file = self._addAllComponents()
logging.info('Updated %s to add all components to the pipeline.',
updated_pipeline_file)
result = self._runCli([
'pipeline',
'update',
'--engine',
'local',
'--pipeline_path',
'local_runner.py',
])
self.assertIn(
'Pipeline "{}" updated successfully.'.format(self._pipeline_name),
result)
self._update_pipeline()
self._run_pipeline()

self._copy_schema()

# Run the updated pipeline.
self._runCli([
'run',
'create',
'--engine',
'local',
'--pipeline_name',
self._pipeline_name,
])
# Update the pipeline to use ImportSchemaGen
self._uncomment('local_runner.py', ['schema_path=generated_schema_path'])
self._replaceFileContent(
'local_runner.py',
[('schema_path=generated_schema_path', 'schema_path=\'schema.pbtxt\'')])
logging.info('Updated pipeline to use user provided schema.')
self._update_pipeline()
self._run_pipeline()


if __name__ == '__main__':
Expand Down
2 changes: 2 additions & 0 deletions tfx/experimental/templates/taxi/kubeflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def run():
data_path=DATA_PATH,
# TODO(step 7): (Optional) Uncomment below to use BigQueryExampleGen.
# query=configs.BIG_QUERY_QUERY,
# TODO(step 5): (Optional) Set the path of the customized schema.
# schema_path=generated_schema_path,
preprocessing_fn=configs.PREPROCESSING_FN,
run_fn=configs.RUN_FN,
train_args=tfx.proto.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
Expand Down
2 changes: 2 additions & 0 deletions tfx/experimental/templates/taxi/local_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def run():
data_path=DATA_PATH,
# TODO(step 7): (Optional) Uncomment here to use BigQueryExampleGen.
# query=configs.BIG_QUERY_QUERY,
# TODO(step 5): (Optional) Set the path of the customized schema.
# schema_path=generated_schema_path,
preprocessing_fn=configs.PREPROCESSING_FN,
run_fn=configs.RUN_FN,
train_args=tfx.proto.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
Expand Down
35 changes: 22 additions & 13 deletions tfx/experimental/templates/taxi/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def create_pipeline(
eval_args: tfx.proto.EvalArgs,
eval_accuracy_threshold: float,
serving_model_dir: str,
schema_path: Optional[str] = None,
metadata_connection_config: Optional[
metadata_store_pb2.ConnectionConfig] = None,
beam_pipeline_args: Optional[List[str]] = None,
Expand All @@ -59,18 +60,26 @@ def create_pipeline(
# TODO(step 5): Uncomment here to add StatisticsGen to the pipeline.
# components.append(statistics_gen)

# Generates schema based on statistics files.
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
# TODO(step 5): Uncomment here to add SchemaGen to the pipeline.
# components.append(schema_gen)

# Performs anomaly detection based on statistics and data schema.
example_validator = tfx.components.ExampleValidator( # pylint: disable=unused-variable
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
# TODO(step 5): Uncomment here to add ExampleValidator to the pipeline.
# components.append(example_validator)
if schema_path is None:
# Generates schema based on statistics files.
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'])
# TODO(step 5): Uncomment here to add SchemaGen to the pipeline.
# components.append(schema_gen)
else:
# Import user provided schema into the pipeline.
schema_gen = tfx.components.ImportSchemaGen(schema_file=schema_path)
# TODO(step 5): (Optional) Uncomment here to add ImportSchemaGen to the
# pipeline.
# components.append(schema_gen)

# Performs anomaly detection based on statistics and data schema.
example_validator = tfx.components.ExampleValidator( # pylint: disable=unused-variable
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
# TODO(step 5): (Optional) Uncomment here to add ExampleValidator to the
# pipeline.
# components.append(example_validator)

# Performs transformations and feature engineering in training and serving.
transform = tfx.components.Transform(
Expand All @@ -83,7 +92,7 @@ def create_pipeline(
# Uses user-provided Python function that implements a model.
trainer_args = {
'run_fn': run_fn,
'transformed_examples': transform.outputs['transformed_examples'],
'examples': transform.outputs['transformed_examples'],
'schema': schema_gen.outputs['schema'],
'transform_graph': transform.outputs['transform_graph'],
'train_args': train_args,
Expand Down
62 changes: 62 additions & 0 deletions tfx/experimental/templates/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,65 @@ def _copyTemplate(self, model):
model,
])
self.assertIn('Copying {} pipeline template'.format(model), result)


class BaseLocalEndToEndTest(BaseEndToEndTest):
"""Common tests for local engine."""

def _getAllUnitTests(self):
for root, _, files in os.walk(self._project_dir):
base_dir = os.path.relpath(root, self._project_dir)
if base_dir == '.': # project_dir == root
base_module = ''
else:
base_module = base_dir.replace(os.path.sep, '.') + '.'

for filename in files:
if filename.endswith('_test.py'):
yield base_module + filename[:-3]

def _create_pipeline(self):
result = self._runCli([
'pipeline',
'create',
'--engine',
'local',
'--pipeline_path',
'local_runner.py',
])
self.assertIn(
'Pipeline "{}" created successfully.'.format(self._pipeline_name),
result)

def _update_pipeline(self):
result = self._runCli([
'pipeline',
'update',
'--engine',
'local',
'--pipeline_path',
'local_runner.py',
])
self.assertIn(
'Pipeline "{}" updated successfully.'.format(self._pipeline_name),
result)

def _run_pipeline(self):
self._runCli([
'run',
'create',
'--engine',
'local',
'--pipeline_name',
self._pipeline_name,
])

def _copy_schema(self):
self._runCli([
'pipeline',
'schema',
'--engine',
'local',
'--pipeline_name',
self._pipeline_name,
])

0 comments on commit dcf03fa

Please sign in to comment.