-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
122 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,32 @@ | ||
# Copyright 2021 Google LLC. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Supplement for chicago taxi pipeline example with specifics schema modification. | ||
This module file will be used in the custom schema curation component. | ||
""" | ||
|
||
|
||
|
||
import tensorflow_data_validation as tfdv | ||
|
||
# TFX schema curation component will call this function. | ||
|
||
def schema_fn(infered_schema): | ||
"""Curate the infered schema. | ||
Args: | ||
infered_schema:schema generated by SchemaGen component of tfx | ||
""" | ||
|
||
print(infered_schema) | ||
feature = tfdv.get_feature(infered_schema, fare) # feature_name = fare | ||
feature.presence.min_fraction = 0.9 | ||
tfdv.display_schema(infered_schema) | ||
|
||
modified_schema = infered_schema | ||
|
||
return modified_schema | ||
# Copyright 2021 Google LLC. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Supplement for chicago taxi pipeline example with specifics schema modification. | ||
This module file will be used in the custom schema curation component. | ||
""" | ||
|
||
import tensorflow_data_validation as tfdv | ||
|
||
# TFX schema curation component will call this function. | ||
|
||
def schema_fn(schema): | ||
"""modifies the infered schema. | ||
Args: | ||
schema:schema generated by SchemaGen component of tfx | ||
""" | ||
#changing "tips" into optional feature | ||
feature = tfdv.get_feature(schema, 'tips') | ||
feature.presence.min_fraction = 0.9 | ||
|
||
return schema |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
# Lint as: python3 | ||
# Copyright 2021 Google LLC. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Chicago taxi example using TFX schema curation custom component. | ||
base code taken from: https://github.com/tensorflow/tfx/blob/master/tfx/examples/custom_components/hello_world/example/taxi_pipeline_hello.py | ||
This example demonstrate the use of schema curation custom component. | ||
user defined function `schema_fn` defined in `module_file.py` is used | ||
to change feature `tips` from required to optional. | ||
""" | ||
|
||
import sys | ||
import tempfile | ||
import urllib | ||
import os | ||
from typing import Text | ||
import tfx | ||
import absl | ||
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen | ||
from tfx.orchestration import metadata | ||
from tfx.orchestration import pipeline | ||
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner | ||
|
||
sys.path.append(".") | ||
from schemacomponent.component import component | ||
|
||
|
||
|
||
# downloading data and setting up required paths | ||
_data_root = tempfile.mkdtemp(prefix='tfx-data') | ||
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv' | ||
_data_filepath = os.path.join(_data_root, "data.csv") | ||
urllib.request.urlretrieve(DATA_PATH, _data_filepath) | ||
|
||
_pipeline_name = 'taxi_pipeline' | ||
_tfx_root = tfx.__path__[0] | ||
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name) | ||
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, | ||
'metadata.db') | ||
|
||
|
||
def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text, | ||
metadata_path: Text) -> pipeline.Pipeline: | ||
"""Implements the chicago taxi pipeline with TFX.""" | ||
|
||
# Brings data into the pipeline or otherwise joins/converts training data. | ||
example_gen = CsvExampleGen(input_base=data_root) | ||
|
||
# Computes statistics over data for visualization and example validation. | ||
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples']) | ||
|
||
# inferes a schema | ||
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True) | ||
|
||
# modifies infered schema with use of udf `schema_fn` defined in module file | ||
schema_curation = component.SchemaCuration(schema=schema_gen.outputs['schema'], | ||
module_file='schemacomponent\example\module_file.py') | ||
|
||
|
||
return pipeline.Pipeline( | ||
pipeline_name=pipeline_name, | ||
pipeline_root=pipeline_root, | ||
components=[example_gen, statistics_gen, schema_gen, schema_curation], | ||
enable_cache=True, | ||
metadata_connection_config=metadata.sqlite_metadata_connection_config( | ||
metadata_path)) | ||
|
||
|
||
# To run this pipeline from the python CLI: | ||
# $python taxi_pipeline_hello.py | ||
if __name__ == '__main__': | ||
absl.logging.set_verbosity(absl.logging.INFO) | ||
BeamDagRunner().run( | ||
_create_pipeline( | ||
pipeline_name=_pipeline_name, | ||
pipeline_root=_pipeline_root, | ||
data_root=_data_root, | ||
metadata_path=_metadata_path)) |