From b615b5c3ef04b8b9a1a027c8a75a7b18808bc638 Mon Sep 17 00:00:00 2001 From: codyjlin <31944154+codyjlin@users.noreply.github.com> Date: Wed, 14 Jul 2021 17:30:30 -0400 Subject: [PATCH] Cancel BigQuery job if block_until_done call times out or is interrupted (#1699) * Cancel job if to_bigquery is cancelled by user Signed-off-by: Cody Lin * cancel job in _upload_entity_df_into_bq as well Signed-off-by: Cody Lin * Fix _is_done logic? Signed-off-by: Cody Lin * make cancel job code more readable Signed-off-by: Cody Lin * move KeyboardInterrupt catch outside retry logic; fix retry logic Signed-off-by: Cody Lin * make block_until_done public; add custom exception for BQJobStillRunning Signed-off-by: Cody Lin * fix retry logic to catch specific exception Signed-off-by: Cody Lin * Make retry params configurable; use finally clause to catch more cancellation cases Signed-off-by: Cody Lin * Modify docstring Signed-off-by: Cody Lin * Typo in docstring Signed-off-by: Cody Lin * Fix lint Signed-off-by: Cody Lin Signed-off-by: CS <2498638+charliec443@users.noreply.github.com> --- sdk/python/feast/errors.py | 5 ++ .../feast/infra/offline_stores/bigquery.py | 73 ++++++++++++++----- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index b855dd57ed..61fee8f918 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -157,6 +157,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str): ) +class BigQueryJobStillRunning(Exception): + def __init__(self, job_id): + super().__init__(f"The BigQuery job with ID '{job_id}' is still running.") + + class BigQueryJobCancelled(Exception): def __init__(self, job_id): super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled") diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index ffff021c09..7ad2eefd97 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -10,11 +10,15 @@ from pandas import Timestamp from pydantic import StrictStr from pydantic.typing import Literal -from tenacity import retry, stop_after_delay, wait_fixed +from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed from feast import errors from feast.data_source import BigQuerySource, DataSource -from feast.errors import BigQueryJobCancelled, FeastProviderLoginError +from feast.errors import ( + BigQueryJobCancelled, + BigQueryJobStillRunning, + FeastProviderLoginError, +) from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.provider import ( @@ -263,12 +267,20 @@ def to_sql(self) -> str: """ return self.query - def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]: + def to_bigquery( + self, + job_config: bigquery.QueryJobConfig = None, + timeout: int = 1800, + retry_cadence: int = 10, + ) -> Optional[str]: """ Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table. + Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes). Args: job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc. + timeout: An optional number of seconds for setting the time limit of the QueryJob. + retry_cadence: An optional number of seconds for setting how long the job should checked for completion. Returns: Returns the destination table name or returns None if job_config.dry_run is True. @@ -288,10 +300,7 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st ) return None - block_until_done(client=self.client, bq_job=bq_job) - - if bq_job.exception(): - raise bq_job.exception() + block_until_done(client=self.client, bq_job=bq_job, timeout=timeout) print(f"Done writing to '{job_config.destination}'.") return str(job_config.destination) @@ -300,23 +309,47 @@ def to_arrow(self) -> pyarrow.Table: return self.client.query(self.query).to_arrow() -def block_until_done(client, bq_job): - def _is_done(job_id): - return client.get_job(job_id).state in ["PENDING", "RUNNING"] +def block_until_done( + client: Client, + bq_job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob], + timeout: int = 1800, + retry_cadence: int = 10, +): + """ + Waits for bq_job to finish running, up to a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes). + + Args: + client: A bigquery.client.Client to monitor the bq_job. + bq_job: The bigquery.job.QueryJob that blocks until done runnning. + timeout: An optional number of seconds for setting the time limit of the job. + retry_cadence: An optional number of seconds for setting how long the job should checked for completion. + + Raises: + BigQueryJobStillRunning exception if the function has blocked longer than 30 minutes. + BigQueryJobCancelled exception to signify when that the job has been cancelled (i.e. from timeout or KeyboardInterrupt). + """ - @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) def _wait_until_done(job_id): - return _is_done(job_id) + if client.get_job(job_id).state in ["PENDING", "RUNNING"]: + raise BigQueryJobStillRunning(job_id=job_id) job_id = bq_job.job_id - _wait_until_done(job_id=job_id) + try: + retryer = Retrying( + wait=wait_fixed(retry_cadence), + stop=stop_after_delay(timeout), + retry=retry_if_exception_type(BigQueryJobStillRunning), + reraise=True, + ) + retryer(_wait_until_done, job_id) - if bq_job.exception(): - raise bq_job.exception() + finally: + if client.get_job(job_id).state in ["PENDING", "RUNNING"]: + client.cancel_job(job_id) + raise BigQueryJobCancelled(job_id=job_id) - if not _is_done(job_id): - client.cancel_job(job_id) - raise BigQueryJobCancelled(job_id=job_id) + if bq_job.exception(): + raise bq_job.exception() @dataclass(frozen=True) @@ -368,7 +401,7 @@ def _upload_entity_df_into_bigquery( if type(entity_df) is str: job = client.query(f"CREATE TABLE {table_id} AS ({entity_df})") - job.result() + block_until_done(client, job) elif isinstance(entity_df, pandas.DataFrame): # Drop the index so that we dont have unnecessary columns entity_df.reset_index(drop=True, inplace=True) @@ -378,7 +411,7 @@ def _upload_entity_df_into_bigquery( job = client.load_table_from_dataframe( entity_df, table_id, job_config=job_config ) - job.result() + block_until_done(client, job) else: raise ValueError( f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "