From 4cb2f010de767e8d1aef827c0446a0b968baded9 Mon Sep 17 00:00:00 2001 From: Vivian Tao Date: Mon, 14 Jun 2021 16:21:44 -0400 Subject: [PATCH 1/7] Add to_bigquery() function for bq retrieval job Signed-off-by: Vivian Tao --- .../feast/infra/offline_stores/bigquery.py | 32 +++++++++++++++++-- sdk/python/tests/test_historical_retrieval.py | 16 ++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 2c0b115e0c..59f419d6a6 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -1,6 +1,7 @@ import time +import uuid from dataclasses import asdict, dataclass -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from typing import List, Optional, Set, Union import pandas @@ -118,7 +119,7 @@ def get_historical_features( entity_df_event_timestamp_col=entity_df_event_timestamp_col, ) - job = BigQueryRetrievalJob(query=query, client=client) + job = BigQueryRetrievalJob(query=query, client=client, config=config) return job @@ -206,15 +207,40 @@ def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str: class BigQueryRetrievalJob(RetrievalJob): - def __init__(self, query, client): + def __init__(self, query, client, config): self.query = query self.client = client + self.config = config def to_df(self): # TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df() df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True) return df + def to_bigquery(self, dry_run=False) -> Optional[str]: + today = date.today().strftime("%Y%m%d") + rand_id = str(uuid.uuid4())[:7] + path = f"{self.client.project}.{self.config.offline_store.dataset}.training_{today}_{rand_id} " + + job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run) + bq_job = self.client.query(self.query, job_config=job_config) + + if dry_run: + print( + "This query will process {} bytes.".format(bq_job.total_bytes_processed) + ) + return None + + while True: + query_job = self.client.get_job(bq_job.job_id) + if query_job.state in ["PENDING", "RUNNING"]: + print(f"The job is still '{bq_job.state}'. Will wait for 30 seconds") + time.sleep(30) + else: + break + + return path + @dataclass(frozen=True) class FeatureViewQueryContext: diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 83f48ccd96..5cced84bb6 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -441,6 +441,22 @@ def test_historical_features_from_bigquery_sources( ], ) + # Just a dry run, should not create table + bq_dry_run = job_from_sql.to_bigquery(dry_run=True) + assert bq_dry_run is None + + bq_temp_table_path = job_from_sql.to_bigquery() + assert bq_temp_table_path.split(".")[0] == gcp_project + + if provider_type == "gcp_custom_offline_config": + assert bq_temp_table_path.split(".")[1] == "foo" + else: + assert bq_temp_table_path.split(".")[1] == bigquery_dataset + + # Check that this table actually exists + actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path) + assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1] + start_time = datetime.utcnow() actual_df_from_sql_entities = job_from_sql.to_df() end_time = datetime.utcnow() From e49ce8f33bfda686c6d55277a5baa9b95197543c Mon Sep 17 00:00:00 2001 From: Vivian Tao Date: Wed, 16 Jun 2021 09:13:41 -0400 Subject: [PATCH 2/7] Using tenacity for retries Signed-off-by: Vivian Tao --- sdk/python/feast/infra/offline_stores/bigquery.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 59f419d6a6..13d2fa3afa 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -7,6 +7,7 @@ import pandas import pyarrow from jinja2 import BaseLoader, Environment +from tenacity import retry, stop_after_delay, wait_fixed from feast import errors from feast.data_source import BigQuerySource, DataSource @@ -231,13 +232,17 @@ def to_bigquery(self, dry_run=False) -> Optional[str]: ) return None - while True: + @retry(wait=wait_fixed(30), stop=stop_after_delay(1800)) + def _block_until_done(): query_job = self.client.get_job(bq_job.job_id) if query_job.state in ["PENDING", "RUNNING"]: print(f"The job is still '{bq_job.state}'. Will wait for 30 seconds") - time.sleep(30) + raise bq_job.exception() else: - break + return + + _block_until_done() + print(f"Done writing to '{path}'.") return path From c45765c99398027e773b5ce59ee6b616ce69948e Mon Sep 17 00:00:00 2001 From: Vivian Tao Date: Wed, 16 Jun 2021 13:33:25 -0400 Subject: [PATCH 3/7] Refactoring to_biquery function Signed-off-by: Vivian Tao --- .../feast/infra/offline_stores/bigquery.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 13d2fa3afa..7a326b2ce3 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -219,10 +219,13 @@ def to_df(self): return df def to_bigquery(self, dry_run=False) -> Optional[str]: + @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) + def _block_until_done(): + return bigquery.Client().get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] + today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] - path = f"{self.client.project}.{self.config.offline_store.dataset}.training_{today}_{rand_id} " - + path = f"{self.client.project}.{self.config.offline_store.dataset}.training_{today}_{rand_id}" job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run) bq_job = self.client.query(self.query, job_config=job_config) @@ -232,18 +235,12 @@ def to_bigquery(self, dry_run=False) -> Optional[str]: ) return None - @retry(wait=wait_fixed(30), stop=stop_after_delay(1800)) - def _block_until_done(): - query_job = self.client.get_job(bq_job.job_id) - if query_job.state in ["PENDING", "RUNNING"]: - print(f"The job is still '{bq_job.state}'. Will wait for 30 seconds") - raise bq_job.exception() - else: - return - _block_until_done() - print(f"Done writing to '{path}'.") + if bq_job.exception(): + raise bq_job.exception() + + print(f"Done writing to '{path}'.") return path From 5e5d470d2eca3acd6bd45cadbb96a16dac7c1dc4 Mon Sep 17 00:00:00 2001 From: Vivian Tao Date: Wed, 16 Jun 2021 14:00:33 -0400 Subject: [PATCH 4/7] Adding tenacity dependency and changing temp table prefix to historical Signed-off-by: Vivian Tao --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- sdk/python/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 7a326b2ce3..f96116d003 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -225,7 +225,7 @@ def _block_until_done(): today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] - path = f"{self.client.project}.{self.config.offline_store.dataset}.training_{today}_{rand_id}" + path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run) bq_job = self.client.query(self.query, job_config=job_config) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index c2b8d75248..b32f577f7c 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -54,6 +54,7 @@ "pydantic>=1.0.0", "PyYAML==5.3.*", "tabulate==0.8.*", + "tenacity", "toml==0.10.*", "tqdm==4.*", ] @@ -92,7 +93,6 @@ "pytest-mock==1.10.4", "Sphinx!=4.0.0", "sphinx-rtd-theme", - "tenacity", "adlfs==0.5.9", "firebase-admin==4.5.2", "pre-commit", From d7717effb517a721b9e483352eeeabd447561961 Mon Sep 17 00:00:00 2001 From: Vivian Tao Date: Wed, 16 Jun 2021 14:02:08 -0400 Subject: [PATCH 5/7] Use self.client instead of creating a new client Signed-off-by: Vivian Tao --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index f96116d003..44b83caff1 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -221,7 +221,7 @@ def to_df(self): def to_bigquery(self, dry_run=False) -> Optional[str]: @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) def _block_until_done(): - return bigquery.Client().get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] + return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] From 0ef414d16aa8f4d9a1eb5cded5e8d3ec2e726a3d Mon Sep 17 00:00:00 2001 From: Vivian Tao Date: Wed, 16 Jun 2021 14:22:08 -0400 Subject: [PATCH 6/7] pin tenacity to major version Signed-off-by: Vivian Tao --- sdk/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index b32f577f7c..65a27e4835 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -54,7 +54,7 @@ "pydantic>=1.0.0", "PyYAML==5.3.*", "tabulate==0.8.*", - "tenacity", + "tenacity==7.*", "toml==0.10.*", "tqdm==4.*", ] From d2090f236aba8b4f202a6c65d94e92ac6f414d33 Mon Sep 17 00:00:00 2001 From: Vivian Tao Date: Wed, 16 Jun 2021 14:48:42 -0400 Subject: [PATCH 7/7] Tenacity dependency range Signed-off-by: Vivian Tao --- sdk/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 65a27e4835..c198e4ff75 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -54,7 +54,7 @@ "pydantic>=1.0.0", "PyYAML==5.3.*", "tabulate==0.8.*", - "tenacity==7.*", + "tenacity>=7.*", "toml==0.10.*", "tqdm==4.*", ]