Skip to content

Commit

Permalink
Merge branch 'master' into kubernetes
Browse files Browse the repository at this point in the history
* master:
  [AIRFLOW-520] Fix Version Info in Flask UI (apache#4072)
  [AIRFLOW-XXX] Add Neoway to companies list (apache#4081)
  [AIRFLOW-XXX] Add Surfline to companies list (apache#4079)
  Revert "[AIRFLOW-461] Restore parameter position for BQ run_load method (apache#4077)"
  [AIRFLOW-461] Restore parameter position for BQ run_load method (apache#4077)
  [AIRFLOW-461]  Support autodetected schemas in BigQuery run_load (apache#3880)
  [AIRFLOW-3238] Fix models.DAG to deactivate unknown DAGs on initdb (apache#4073)
  [AIRFLOW-3239] Fix test recovery further (apache#4074)
  [AIRFLOW-3203] Fix DockerOperator & some operator test (apache#4049)
  [AIRFLOW-1867] Add sandbox mode and py3k bug  (apache#2824)
  [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators (apache#3828)
  [AIRFLOW-XXX] BigQuery Hook - Minor Refactoring (apache#4066)
  [AIRFLOW-3232] More readable GCF operator documentation (apache#4067)
  • Loading branch information
Anders Åslund committed Oct 26, 2018
2 parents d2445c4 + 7df4405 commit 2b0f007
Show file tree
Hide file tree
Showing 28 changed files with 713 additions and 142 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ Currently **officially** using Airflow:
1. [New Relic](https://www.newrelic.com) [[@marcweil](https://github.com/marcweil)]
1. [Newzoo](https://www.newzoo.com) [[@newzoo-nexus](https://github.com/newzoo-nexus)]
1. [Nextdoor](https://nextdoor.com) [[@SivaPandeti](https://github.com/SivaPandeti), [@zshapiro](https://github.com/zshapiro) & [@jthomas123](https://github.com/jthomas123)]
1. [Neoway](https://www.neoway.com.br/) [[@neowaylabs](https://github.com/orgs/NeowayLabs/people)]
1. [OdysseyPrime](https://www.goprime.io/) [[@davideberdin](https://github.com/davideberdin)]
1. [OfferUp](https://offerupnow.com)
1. [OneFineStay](https://www.onefinestay.com) [[@slangwald](https://github.com/slangwald)]
Expand Down Expand Up @@ -262,6 +263,7 @@ Currently **officially** using Airflow:
1. [Stripe](https://stripe.com) [[@jbalogh](https://github.com/jbalogh)]
1. [Strongmind](https://www.strongmind.com) [[@tomchapin](https://github.com/tomchapin) & [@wongstein](https://github.com/wongstein)]
1. [Square](https://squareup.com/)
1. [Surfline](https://www.surfline.com/) [[@jawang35](https://github.com/jawang35)]
1. [Tails.com](https://tails.com/) [[@alanmcruickshank](https://github.com/alanmcruickshank)]
1. [Tesla](https://www.tesla.com/) [[@thoralf-gutierrez](https://github.com/thoralf-gutierrez)]
1. [The Home Depot](https://www.homedepot.com/)[[@apekshithr](https://github.com/apekshithr)]
Expand Down
25 changes: 18 additions & 7 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ def create_empty_table(self,
:param table_id: The Name of the table to be created.
:type table_id: str
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
:type schema_fields: list
:param labels: a dictionary containing labels for the table, passed to BigQuery
:type labels: dict
Expand All @@ -238,7 +239,6 @@ def create_empty_table(self,
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
:type schema_fields: list
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
Expand Down Expand Up @@ -480,7 +480,7 @@ def create_external_table(self,
)

def run_query(self,
sql=None,
sql,
destination_dataset_table=None,
write_disposition='WRITE_EMPTY',
allow_large_results=False,
Expand Down Expand Up @@ -522,6 +522,7 @@ def run_query(self,
:type flatten_results: bool
:param udf_config: The User Defined Function configuration for the query.
See https://cloud.google.com/bigquery/user-defined-functions for details.
:type udf_config: list
:param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
If `None`, defaults to `self.use_legacy_sql`.
:type use_legacy_sql: bool
Expand All @@ -532,7 +533,6 @@ def run_query(self,
if you need to provide some params that are not supported by the
BigQueryHook like args.
:type api_resource_configs: dict
:type udf_config: list
:param maximum_billing_tier: Positive integer that serves as a
multiplier of the basic price.
:type maximum_billing_tier: int
Expand Down Expand Up @@ -832,8 +832,8 @@ def run_copy(self,

def run_load(self,
destination_project_dataset_table,
schema_fields,
source_uris,
schema_fields=None,
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
Expand All @@ -847,7 +847,8 @@ def run_load(self,
schema_update_options=(),
src_fmt_configs=None,
time_partitioning=None,
cluster_fields=None):
cluster_fields=None,
autodetect=False):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
to BigQuery. See here:
Expand All @@ -865,7 +866,11 @@ def run_load(self,
:type destination_project_dataset_table: str
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
Required if autodetect=False; optional if autodetect=True.
:type schema_fields: list
:param autodetect: Attempt to autodetect the schema for CSV and JSON
source files.
:type autodetect: bool
:param source_uris: The source Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
per-object name can be used.
Expand Down Expand Up @@ -920,6 +925,11 @@ def run_load(self,
# if it's not, we raise a ValueError
# Refer to this link for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat

if schema_fields is None and not autodetect:
raise ValueError(
'You must either pass a schema or autodetect=True.')

if src_fmt_configs is None:
src_fmt_configs = {}

Expand Down Expand Up @@ -954,6 +964,7 @@ def run_load(self,

configuration = {
'load': {
'autodetect': autodetect,
'createDisposition': create_disposition,
'destinationTable': {
'projectId': destination_project,
Expand Down Expand Up @@ -1717,7 +1728,7 @@ def _split_tablename(table_input, default_project_id, var_name=None):

if '.' not in table_input:
raise ValueError(
'Expected deletion_dataset_table name in the format of '
'Expected target table name in the format of '
'<dataset>.<table>. Got: {}'.format(table_input))

if not default_project_id:
Expand Down
26 changes: 13 additions & 13 deletions airflow/contrib/hooks/gcp_function_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# noinspection PyAbstractClass
class GcfHook(GoogleCloudBaseHook):
"""
Hook for Google Cloud Functions APIs.
Hook for the Google Cloud Functions APIs.
"""
_conn = None

Expand All @@ -48,7 +48,7 @@ def __init__(self,

def get_conn(self):
"""
Retrieves connection to cloud functions.
Retrieves the connection to Cloud Functions.
:return: Google Cloud Function services object
:rtype: dict
Expand All @@ -61,7 +61,7 @@ def get_conn(self):

def get_function(self, name):
"""
Returns the function with a given name.
Returns the Cloud Function with the given name.
:param name: name of the function
:type name: str
Expand All @@ -73,9 +73,9 @@ def get_function(self, name):

def list_functions(self, full_location):
"""
Lists all functions created in the location.
Lists all Cloud Functions created in the location.
:param full_location: full location including project. On the form
:param full_location: full location including the project in the form of
of /projects/<PROJECT>/location/<LOCATION>
:type full_location: str
:return: array of CloudFunction objects - representing functions in the location
Expand All @@ -87,12 +87,12 @@ def list_functions(self, full_location):

def create_new_function(self, full_location, body):
"""
Creates new cloud function in location given with body specified.
Creates a new function in Cloud Function in the location specified in the body.
:param full_location: full location including project. On the form
:param full_location: full location including the project in the form of
of /projects/<PROJECT>/location/<LOCATION>
:type full_location: str
:param body: body required by the cloud function insert API
:param body: body required by the Cloud Functions insert API
:type body: dict
:return: response returned by the operation
:rtype: dict
Expand All @@ -106,7 +106,7 @@ def create_new_function(self, full_location, body):

def update_function(self, name, body, update_mask):
"""
Updates cloud function according to the update mask specified.
Updates Cloud Functions according to the specified update mask.
:param name: name of the function
:type name: str
Expand All @@ -129,10 +129,10 @@ def upload_function_zip(self, parent, zip_path):
"""
Uploads zip file with sources.
:param parent: project and location in which signed upload URL should be generated
in the form of /projects/<PROJECT>/location/<LOCATION>
:param parent: Google Cloud Platform project id and region where zip file should
be uploaded in the form of /projects/<PROJECT>/location/<LOCATION>
:type parent: str
:param zip_path: path of the file to upload (should point to valid .zip file)
:param zip_path: path of the valid .zip file to upload
:type zip_path: str
:return: Upload URL that was returned by generateUploadUrl method
"""
Expand All @@ -156,7 +156,7 @@ def upload_function_zip(self, parent, zip_path):

def delete_function(self, name):
"""
Deletes cloud function specified by name.
Deletes the specified Cloud Function.
:param name: name of the function
:type name: str
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def __init__(self,
project_id=None,
schema_fields=None,
gcs_schema_object=None,
time_partitioning={},
time_partitioning=None,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
Expand All @@ -306,7 +306,7 @@ def __init__(self,
self.bigquery_conn_id = bigquery_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to
self.time_partitioning = time_partitioning
self.time_partitioning = {} if time_partitioning is None else time_partitioning
self.labels = labels

def execute(self, context):
Expand Down
35 changes: 18 additions & 17 deletions airflow/contrib/operators/gcp_function_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,29 @@ def _validate_max_instances(value):

class GcfFunctionDeployOperator(BaseOperator):
"""
Create a function in Google Cloud Functions.
Creates a function in Google Cloud Functions.
:param project_id: Project ID that the operator works on
:param project_id: Google Cloud Platform Project ID where the function should
be created.
:type project_id: str
:param location: Region where the operator operates on
:param location: Google Cloud Platform region where the function should be created.
:type location: str
:param body: Body of the cloud function definition. The body must be a CloudFunction
dictionary as described in:
:param body: Body of the Cloud Functions definition. The body must be a
Cloud Functions dictionary as described in:
https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions
(note that different API versions require different
variants of the CloudFunction dictionary)
. Different API versions require different variants of the Cloud Functions
dictionary.
:type body: dict or google.cloud.functions.v1.CloudFunction
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: Version of the API used (for example v1).
:param api_version: API version used (for example v1 or v1beta1).
:type api_version: str
:param zip_path: Path to zip file containing source code of the function. If it is
set, then sourceUploadUrl should not be specified in the body (or it should
be empty), then the zip file will be uploaded using upload URL generated
via generateUploadUrl from cloud functions API
:param zip_path: Path to zip file containing source code of the function. If the path
is set, the sourceUploadUrl should not be specified in the body or it should
be empty. Then the zip file will be uploaded using the upload URL generated
via generateUploadUrl from the Cloud Functions API.
:type zip_path: str
:param validate_body: If set to False, no body validation is performed.
:param validate_body: If set to False, body validation is not performed.
:type validate_body: bool
"""

Expand Down Expand Up @@ -265,14 +266,14 @@ def preprocess_body(self):

class GcfFunctionDeleteOperator(BaseOperator):
"""
Delete a function with specified name from Google Cloud Functions.
Deletes the specified function from Google Cloud Functions.
:param name: A fully-qualified function name, matching
the pattern: `^projects/[^/]+/locations/[^/]+/functions/[^/]+$`
:type name: str
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: Version of the API used (for example v1).
:param api_version: API version used (for example v1 or v1beta1).
:type api_version: str
"""

Expand Down
24 changes: 15 additions & 9 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(self,
external_table=False,
time_partitioning=None,
cluster_fields=None,
autodetect=False,
*args, **kwargs):

super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -190,20 +191,24 @@ def __init__(self,
self.src_fmt_configs = src_fmt_configs
self.time_partitioning = time_partitioning
self.cluster_fields = cluster_fields
self.autodetect = autodetect

def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

if not self.schema_fields and \
self.schema_object and \
self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
self.bucket,
self.schema_object).decode("utf-8"))
if not self.schema_fields:
if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
self.bucket,
self.schema_object).decode("utf-8"))
elif self.schema_object is None and self.autodetect is False:
raise ValueError('At least one of `schema_fields`, `schema_object`, '
'or `autodetect` must be passed.')

else:
schema_fields = self.schema_fields

Expand Down Expand Up @@ -234,6 +239,7 @@ def execute(self, context):
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
autodetect=self.autodetect,
create_disposition=self.create_disposition,
skip_leading_rows=self.skip_leading_rows,
write_disposition=self.write_disposition,
Expand Down
Loading

0 comments on commit 2b0f007

Please sign in to comment.