From 801a77c985f0f7dac074af8f732819a9af7550a0 Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 16 Oct 2020 10:24:57 +0800 Subject: [PATCH 1/2] Cleanup cli and python dependencies Signed-off-by: Terence --- sdk/python/feast/cli.py | 108 +++++++++++++++++++++++++++++----------- sdk/python/setup.py | 2 - 2 files changed, 79 insertions(+), 31 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index cf1417a354..bd67774e83 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -23,6 +23,7 @@ from feast.client import Client from feast.config import Config +from feast.constants import CONFIG_SPARK_LAUNCHER from feast.entity import Entity from feast.feature_table import FeatureTable from feast.loaders.yaml import yaml_loader @@ -351,78 +352,127 @@ def project_list(): print(tabulate(table, headers=["NAME"], tablefmt="plain")) -@cli.command() +@cli.group(name="jobs") +def job(): + """ + Create and manage jobs + """ + pass + + +@job.command(name="sync-offline-to-online") @click.option( "--feature-table", "-t", - help="Feature table name to ingest data into", + help="Feature table name of data to be synced", + type=click.STRING, required=True, ) @click.option("--start-time", "-s", help="Interval start", required=True) @click.option("--end-time", "-e", help="Interval end", required=True) def sync_offline_to_online(feature_table: str, start_time: str, end_time: str): """ - Sync offline store to online. + Sync offline store data to online store """ - from datetime import datetime - client = Client() - table = client.get_feature_table(feature_table) - client.start_offline_to_online_ingestion( - table, datetime.fromisoformat(start_time), datetime.fromisoformat(end_time) - ) + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs -@cli.command() + client = Client() + table = client.get_feature_table(feature_table) + feast.pyspark.aws.jobs.sync_offline_to_online( + client, table, start_time, end_time + ) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) + + +@job.command(name="start-stream-to-online") @click.option( "--feature-table", "-t", - help="Feature table name to ingest data into", + help="Feature table name of job to be started", + type=click.STRING, required=True, ) @click.option( - "--jar", "-j", help="Feature table name to ingest data into", default="", + "--jar", + "-j", + help="The file path to the uber jar for offline to online ingestion spark job", + default="", ) def start_stream_to_online(feature_table: str, jar: str): """ - Start stream to online sync job. + Start stream to online sync job """ - client = Client() - table = client.get_feature_table(feature_table) - client.start_stream_to_online_ingestion(table, [jar] if jar else []) + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs -@cli.command() + client = Client() + table = client.get_feature_table(feature_table) + feast.pyspark.aws.jobs.start_stream_to_online( + client, table, [jar] if jar else [] + ) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) + + +@job.command(name="stop-stream-to-online") @click.option( "--feature-table", "-t", - help="Feature table name to ingest data into", + help="Feature table name of job to be stopped", + type=click.STRING, required=True, ) def stop_stream_to_online(feature_table: str): """ - Start stream to online sync job. + Stop stream to online sync job """ - import feast.pyspark.aws.jobs - feast.pyspark.aws.jobs.stop_stream_to_online(feature_table) + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs -@cli.command() -def list_emr_jobs(): + feast.pyspark.aws.jobs.stop_stream_to_online(feature_table) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) + + +@job.command() +def list_jobs(): """ - List jobs. + List jobs """ from tabulate import tabulate - import feast.pyspark.aws.jobs + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) - jobs = feast.pyspark.aws.jobs.list_jobs(None, None) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs - print( - tabulate(jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain") - ) + jobs = feast.pyspark.aws.jobs.list_jobs(None, None) + print( + tabulate( + jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain" + ) + ) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) @cli.command() diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 5ad33e4eaa..19e567ecd3 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -39,14 +39,12 @@ "protobuf>=3.10", "PyYAML==5.1.*", "fastavro>=0.22.11,<0.23", - "kafka-python==1.*", "tabulate==0.8.*", "toml==0.10.*", "tqdm==4.*", "pyarrow<0.16.0,>=0.15.1", "numpy", "google", - "confluent_kafka", ] # README file from Feast repo root directory From 06528a02a1233f24e521e5ca8fd72bc87c5cc5d0 Mon Sep 17 00:00:00 2001 From: Terence Date: Tue, 20 Oct 2020 17:44:04 +0800 Subject: [PATCH 2/2] Cleanup cli Signed-off-by: Terence --- sdk/python/feast/cli.py | 39 ++++++++++----------------------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index bd67774e83..46d0675b56 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -360,7 +360,7 @@ def job(): pass -@job.command(name="sync-offline-to-online") +@job.command(name="start-offline-to-online") @click.option( "--feature-table", "-t", @@ -374,21 +374,13 @@ def sync_offline_to_online(feature_table: str, start_time: str, end_time: str): """ Sync offline store data to online store """ + from datetime import datetime - spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) - - if spark_launcher == "emr": - import feast.pyspark.aws.jobs - - client = Client() - table = client.get_feature_table(feature_table) - feast.pyspark.aws.jobs.sync_offline_to_online( - client, table, start_time, end_time - ) - else: - raise NotImplementedError( - f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" - ) + client = Client() + table = client.get_feature_table(feature_table) + client.start_offline_to_online_ingestion( + table, datetime.fromisoformat(start_time), datetime.fromisoformat(end_time) + ) @job.command(name="start-stream-to-online") @@ -410,20 +402,9 @@ def start_stream_to_online(feature_table: str, jar: str): Start stream to online sync job """ - spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) - - if spark_launcher == "emr": - import feast.pyspark.aws.jobs - - client = Client() - table = client.get_feature_table(feature_table) - feast.pyspark.aws.jobs.start_stream_to_online( - client, table, [jar] if jar else [] - ) - else: - raise NotImplementedError( - f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" - ) + client = Client() + table = client.get_feature_table(feature_table) + client.start_stream_to_online_ingestion(table, [jar] if jar else []) @job.command(name="stop-stream-to-online")