From ba93e5d5cd1dc1e01f8ce2f62cc5ed217369266e Mon Sep 17 00:00:00 2001 From: Tomas Aschan Date: Mon, 8 Jun 2020 10:01:17 +0200 Subject: [PATCH 1/6] Expose the BQ job ID when running a job This enables client code to do further things with the job id, e.g. look up logs, status or other information. --- luigi/contrib/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index ce2953a0d6..d9f85d2b23 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -349,7 +349,7 @@ def run_job(self, project_id, body, dataset=None): if status['status']['state'] == 'DONE': if status['status'].get('errorResult'): raise Exception('BigQuery job failed: {}'.format(status['status']['errorResult'])) - return + return job_id logger.info('Waiting for job %s:%s to complete...', project_id, job_id) time.sleep(5) From 1bcd4e0deaed2913c1e06b4c3640bdeb6ed986bf Mon Sep 17 00:00:00 2001 From: Tomas Aschan Date: Mon, 8 Jun 2020 10:33:00 +0200 Subject: [PATCH 2/6] Expose job_id also on error --- luigi/contrib/bigquery.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index d9f85d2b23..09ec5c72ef 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -348,7 +348,7 @@ def run_job(self, project_id, body, dataset=None): status = self.client.jobs().get(projectId=project_id, jobId=job_id).execute(num_retries=10) if status['status']['state'] == 'DONE': if status['status'].get('errorResult'): - raise Exception('BigQuery job failed: {}'.format(status['status']['errorResult'])) + raise BigQueryExecutionError(job_id, status['status']['errorResult']) return job_id logger.info('Waiting for job %s:%s to complete...', project_id, job_id) @@ -786,3 +786,9 @@ def run(self): BigqueryRunQueryTask = BigQueryRunQueryTask BigqueryCreateViewTask = BigQueryCreateViewTask ExternalBigqueryTask = ExternalBigQueryTask + +class BigQueryExecutionError(Exception): + def __init__(self, job_id, error_message) -> None: + super(BigQueryExecutionError, self).__init__('BigQuery job {} failed: {}'.format(job_id, error_message)) + self.error_message = error_message + self.job_id = job_id From 63d469f620e8bb273b7af060c3290484f08a99bb Mon Sep 17 00:00:00 2001 From: Tomas Aschan Date: Mon, 8 Jun 2020 10:53:14 +0200 Subject: [PATCH 3/6] Fix formatting --- luigi/contrib/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index 09ec5c72ef..0a4e8a9a05 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -787,6 +787,7 @@ def run(self): BigqueryCreateViewTask = BigQueryCreateViewTask ExternalBigqueryTask = ExternalBigQueryTask + class BigQueryExecutionError(Exception): def __init__(self, job_id, error_message) -> None: super(BigQueryExecutionError, self).__init__('BigQuery job {} failed: {}'.format(job_id, error_message)) From a7698aad5a649db2382bb1273ff2d7b349716b48 Mon Sep 17 00:00:00 2001 From: Tomas Aschan Date: Mon, 8 Jun 2020 13:22:12 +0200 Subject: [PATCH 4/6] Improve docstrings --- luigi/contrib/bigquery.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index 0a4e8a9a05..f449cb99fd 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -336,6 +336,9 @@ def run_job(self, project_id, body, dataset=None): :param dataset: :type dataset: BQDataset + :return: the job id of the job. + :rtype: str + :raises luigi.contrib.BigQueryExecutionError: if the job fails. """ if dataset and not self.dataset_exists(dataset): @@ -790,6 +793,12 @@ def run(self): class BigQueryExecutionError(Exception): def __init__(self, job_id, error_message) -> None: + """ + :param job_id: BigQuery Job ID + :type job_id: str + :param error_message: status['status']['errorResult'] for the failed job + :type error_message: str + """ super(BigQueryExecutionError, self).__init__('BigQuery job {} failed: {}'.format(job_id, error_message)) self.error_message = error_message self.job_id = job_id From 715334a7842a7bfeaa726b7e76ce1306b529931e Mon Sep 17 00:00:00 2001 From: Tomas Aschan Date: Mon, 8 Jun 2020 13:22:26 +0200 Subject: [PATCH 5/6] Use Python3 syntax for super constructor --- luigi/contrib/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index f449cb99fd..55fc3b72c8 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -799,6 +799,6 @@ def __init__(self, job_id, error_message) -> None: :param error_message: status['status']['errorResult'] for the failed job :type error_message: str """ - super(BigQueryExecutionError, self).__init__('BigQuery job {} failed: {}'.format(job_id, error_message)) + super().__init__('BigQuery job {} failed: {}'.format(job_id, error_message)) self.error_message = error_message self.job_id = job_id From 9d59f4623075dcfcc94abe1cf66a75cd8300e0ff Mon Sep 17 00:00:00 2001 From: Tomas Aschan Date: Mon, 8 Jun 2020 13:53:31 +0200 Subject: [PATCH 6/6] Add bq client tests --- test/contrib/bigquery_gcloud_test.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/test/contrib/bigquery_gcloud_test.py b/test/contrib/bigquery_gcloud_test.py index 68f71b3a08..c8e19f2bbc 100644 --- a/test/contrib/bigquery_gcloud_test.py +++ b/test/contrib/bigquery_gcloud_test.py @@ -38,6 +38,7 @@ from avro.datafile import DataFileWriter from avro.io import DatumWriter from luigi.contrib.gcs import GCSTarget +from luigi.contrib.bigquery import BigQueryExecutionError from nose.plugins.attrib import attr from helpers import unittest @@ -322,6 +323,19 @@ def test_run_query(self): self.assertTrue(self.bq_client.table_exists(self.table)) + def test_run_successful_job(self): + body = {'configuration': {'query': {'query': 'select count(*) from unnest([1,2,3])'}}} + + job_id = self.bq_client.run_job(PROJECT_ID, body) + + self.assertIsNotNone(job_id) + self.assertNotEqual('', job_id) + + def test_run_failing_job(self): + body = {'configuration': {'query': {'query': 'this is not a valid query'}}} + + self.assertRaises(BigQueryExecutionError, lambda: self.bq_client.run_job(PROJECT_ID, body)) + @attr('gcloud') class BigQueryLoadAvroTest(unittest.TestCase):