diff --git a/docs/changelog/changelog.rst b/docs/changelog/changelog.rst index 6fd5a8a67468..ac6740bce261 100644 --- a/docs/changelog/changelog.rst +++ b/docs/changelog/changelog.rst @@ -8,6 +8,7 @@ 0.8.3 ----------------- +* Add support for GCS store * Fix a bug in data-docs' rendering of mostly parameter * Correct wording for expect_column_proportion_of_unique_values_to_be_between * Set charset and meta tags to avoid unicode decode error in some browser/backend configurations @@ -21,6 +22,7 @@ * Correct a packaging issue resulting in missing css files in tarball release + 0.8.2 ----------------- * Add easier support for customizing data-docs css @@ -231,7 +233,7 @@ v.0.7.0 Version 0.7 of Great Expectations is HUGE. It introduces several major new features and a large number of improvements, including breaking API changes. -The core vocabulary of expectations remains consistent. Upgrading to +The core vocabulary of expectations remains consistent. Upgrading to the new version of GE will primarily require changes to code that uses data contexts; existing expectation suites will require only changes to top-level names. @@ -310,7 +312,7 @@ v.0.6.0 ------------ * Add support for SparkDFDataset and caching (HUGE work from @cselig) * Migrate distributional expectations to new testing framework -* Add support for two new expectations: expect_column_distinct_values_to_contain_set +* Add support for two new expectations: expect_column_distinct_values_to_contain_set and expect_column_distinct_values_to_equal_set (thanks @RoyalTS) * FUTURE BREAKING CHANGE: The new cache mechanism for Datasets, \ when enabled, causes GE to assume that dataset does not change between evaluation of individual expectations. \ diff --git a/docs/features/data_docs.rst b/docs/features/data_docs.rst index 0043ea2d358a..5cff3097d53c 100644 --- a/docs/features/data_docs.rst +++ b/docs/features/data_docs.rst @@ -50,8 +50,8 @@ Users can specify * which datasources to document (by default, all) * whether to include expectations, validations and profiling results sections -* where the expectations and validations should be read from (filesystem or S3) -* where the HTML files should be written (filesystem or S3) +* where the expectations and validations should be read from (filesystem, S3, or GCS) +* where the HTML files should be written (filesystem, S3, or GCS) * which renderer and view class should be used to render each section ******************************** diff --git a/docs/getting_started/pipeline_integration.rst b/docs/getting_started/pipeline_integration.rst index 346c3f55d058..da726438db29 100644 --- a/docs/getting_started/pipeline_integration.rst +++ b/docs/getting_started/pipeline_integration.rst @@ -155,7 +155,7 @@ Validation Operators -------------------- Validation Operators and Actions make it possible to define collections of tasks together that should be done after a -validation. For example, we might store results (either on a local filesystem or to S3), send a slack notification, +validation. For example, we might store results (either on a local filesystem, to S3 or GCS), send a slack notification, and update data documentation. The default configuration performs each of those actions. See the :ref:`validation_operators_and_actions` for more information. diff --git a/docs/reference/integrations/bigquery.rst b/docs/reference/integrations/bigquery.rst new file mode 100644 index 000000000000..b4987fcb9151 --- /dev/null +++ b/docs/reference/integrations/bigquery.rst @@ -0,0 +1,60 @@ +.. _BigQuery: + +############## +BigQuery +############## + +To add a BigQuery datasource do this: + +1. Run ``great_expectations add-datasource`` +2. Choose the *SQL* option from the menu. +3. When asked which sqlalchemy driver to use enter ``bigquery``. +4. Consult the `PyBigQuery `_ +for authentication. + +Install the pybigquery package for the BigQuery sqlalchemy dialect (``pip install pybigquery``) diff --git a/great_expectations/data_context/store/__init__.py b/great_expectations/data_context/store/__init__.py index f72d736ad512..71ae488276c9 100644 --- a/great_expectations/data_context/store/__init__.py +++ b/great_expectations/data_context/store/__init__.py @@ -4,6 +4,7 @@ # FilesystemStoreBackend, FixedLengthTupleFilesystemStoreBackend, FixedLengthTupleS3StoreBackend, + FixedLengthTupleGCSStoreBackend ) from .store import ( diff --git a/great_expectations/data_context/store/store_backend.py b/great_expectations/data_context/store/store_backend.py index 6ea075fae6ca..a8324134500d 100644 --- a/great_expectations/data_context/store/store_backend.py +++ b/great_expectations/data_context/store/store_backend.py @@ -414,3 +414,86 @@ def has_key(self, key): all_keys = self.list_keys() return key in all_keys + + +class FixedLengthTupleGCSStoreBackend(FixedLengthTupleStoreBackend): + """ + Uses a GCS bucket as a store. + + The key to this StoreBackend must be a tuple with fixed length equal to key_length. + The filepath_template is a string template used to convert the key to a filepath. + There's a bit of regex magic in _convert_filepath_to_key that reverses this process, + so that we can write AND read using filenames as keys. + """ + def __init__( + self, + root_directory, + filepath_template, + key_length, + bucket, + prefix, + project, + forbidden_substrings=None, + platform_specific_separator=False + ): + super(FixedLengthTupleGCSStoreBackend, self).__init__( + root_directory=root_directory, + filepath_template=filepath_template, + key_length=key_length, + forbidden_substrings=forbidden_substrings, + platform_specific_separator=platform_specific_separator + ) + self.bucket = bucket + self.prefix = prefix + self.project = project + + + def _get(self, key): + gcs_object_key = os.path.join( + self.prefix, + self._convert_key_to_filepath(key) + ) + + from google.cloud import storage + gcs = storage.Client(project=self.project) + bucket = gcs.get_bucket(self.bucket) + gcs_response_object = bucket.get_blob(gcs_object_key) + return gcs_response_object.download_as_string().decode("utf-8") + + def _set(self, key, value, content_encoding='utf-8', content_type='application/json'): + gcs_object_key = os.path.join( + self.prefix, + self._convert_key_to_filepath(key) + ) + + from google.cloud import storage + gcs = storage.Client(project=self.project) + bucket = gcs.get_bucket(self.bucket) + blob = bucket.blob(gcs_object_key) + blob.upload_from_string(value.encode(content_encoding), content_type=content_type) + return gcs_object_key + + def list_keys(self): + key_list = [] + + from google.cloud import storage + gcs = storage.Client(self.project) + + for blob in gcs.list_blobs(self.bucket, prefix=self.prefix): + gcs_object_name = blob.name + gcs_object_key = os.path.relpath( + gcs_object_name, + self.prefix, + ) + + key = self._convert_filepath_to_key(gcs_object_key) + if key: + key_list.append(key) + + return key_list + + def has_key(self, key): + assert isinstance(key, string_types) + + all_keys = self.list_keys() + return key in all_keys diff --git a/great_expectations/dataset/sqlalchemy_dataset.py b/great_expectations/dataset/sqlalchemy_dataset.py index 1f1d5a42686d..4aac909a0a10 100644 --- a/great_expectations/dataset/sqlalchemy_dataset.py +++ b/great_expectations/dataset/sqlalchemy_dataset.py @@ -39,6 +39,11 @@ except ImportError: snowflake = None +try: + import pybigquery.sqlalchemy_bigquery +except ImportError: + pybigquery = None + class MetaSqlAlchemyDataset(Dataset): @@ -200,6 +205,9 @@ def __init__(self, table_name=None, engine=None, connection_string=None, if custom_sql and not table_name: # dashes are special characters in most databases so use underscores table_name = "ge_tmp_" + str(uuid.uuid4()).replace("-", "_") + generated_table_name = table_name + else: + generated_table_name = None if table_name is None: raise ValueError("No table_name provided.") @@ -226,6 +234,8 @@ def __init__(self, table_name=None, engine=None, connection_string=None, self.dialect = import_module("snowflake.sqlalchemy.snowdialect") elif self.engine.dialect.name.lower() == "redshift": self.dialect = import_module("sqlalchemy_redshift.dialect") + elif self.engine.dialect.name.lower() == "bigquery": + self.dialect = import_module("pybigquery.sqlalchemy_bigquery") else: self.dialect = None @@ -234,9 +244,17 @@ def __init__(self, table_name=None, engine=None, connection_string=None, # a user-defined schema raise ValueError("Cannot specify both schema and custom_sql.") + if custom_sql is not None and self.engine.dialect.name.lower() == "bigquery": + if generated_table_name is not None and self.engine.dialect.dataset_id is None: + raise ValueError("No BigQuery dataset specified. Use biquery_temp_table batch_kwarg or a specify a default dataset in engine url") + if custom_sql: self.create_temporary_table(table_name, custom_sql) + if generated_table_name is not None and self.engine.dialect.name.lower() == "bigquery": + logger.warning("Created permanent table {table_name}".format( + table_name=table_name)) + try: insp = reflection.Inspector.from_engine(self.engine) self.columns = insp.get_columns(table_name, schema=schema) @@ -258,14 +276,17 @@ def head(self, n=5): con=self.engine, chunksize=n )) - except ValueError: + except (ValueError, NotImplementedError): # it looks like MetaData that is used by pd.read_sql_table # cannot work on a temp table. # If it fails, we are trying to get the data using read_sql head_sql_str = "select * from " - if self._table.schema: + if self._table.schema and self.engine.dialect.name.lower() != "bigquery": head_sql_str += self._table.schema + "." - head_sql_str += self._table.name + elif self.engine.dialect.name.lower() == "bigquery": + head_sql_str += "`" + self._table.name + "`" + else: + head_sql_str += self._table.name head_sql_str += " limit {0:d}".format(n) df = pd.read_sql(head_sql_str, con=self.engine) @@ -329,7 +350,7 @@ def get_column_min(self, column, parse_strings_as_datetimes=False): sa.select([sa.func.min(sa.column(column))]).select_from( self._table) ).scalar() - + def get_column_value_counts(self, column, sort="value", collate=None): if sort not in ["value", "count", "none"]: raise ValueError( @@ -461,7 +482,7 @@ def get_column_hist(self, column, bins): ) ).label("bin_" + str(len(bins)-1)) ) - else: + else: case_conditions.append( sa.func.sum( sa.case( @@ -504,7 +525,7 @@ def get_column_count_in_range(self, column, min_val=None, max_val=None, strict_m max_condition = sa.column(column) < max_val else: max_condition = sa.column(column) <= max_val - + if min_condition is not None and max_condition is not None: condition = sa.and_(min_condition, max_condition) elif min_condition is not None: @@ -522,7 +543,7 @@ def get_column_count_in_range(self, column, min_val=None, max_val=None, strict_m ) ) \ .select_from(self._table) - + return self.engine.execute(query).scalar() def create_temporary_table(self, table_name, custom_sql): @@ -532,13 +553,17 @@ def create_temporary_table(self, table_name, custom_sql): It hasn't been tested in all SQL dialects, and may change based on community feedback. :param custom_sql: """ - if self.engine.dialect.name == "mysql": + + if self.engine.dialect.name.lower() == "bigquery": + stmt = "CREATE OR REPLACE TABLE `{table_name}` AS {custom_sql}".format( + table_name=table_name, custom_sql=custom_sql) + + elif self.engine.dialect.name == "mysql": stmt = "CREATE TEMPORARY TABLE {table_name} AS {custom_sql}".format( table_name=table_name, custom_sql=custom_sql) else: stmt = "CREATE TEMPORARY TABLE \"{table_name}\" AS {custom_sql}".format( table_name=table_name, custom_sql=custom_sql) - self.engine.execute(stmt) def column_reflection_fallback(self): @@ -871,6 +896,13 @@ def _get_dialect_regex_fn(self, positive=True): except (AttributeError, TypeError): # TypeError can occur if the driver was not installed and so is None pass + try: + # Bigquery + if isinstance(self.engine.dialect, pybigquery.sqlalchemy_bigquery.BigQueryDialect): + return "REGEXP_CONTAINS" if positive else "NOT REGEXP_CONTAINS" + except (AttributeError, TypeError): # TypeError can occur if the driver was not installed and so is None + pass + @MetaSqlAlchemyDataset.column_map_expectation def expect_column_values_to_match_regex( self, diff --git a/great_expectations/datasource/generator/table_generator.py b/great_expectations/datasource/generator/table_generator.py index d1750bdac98f..6df3a5a59993 100644 --- a/great_expectations/datasource/generator/table_generator.py +++ b/great_expectations/datasource/generator/table_generator.py @@ -128,7 +128,12 @@ def _get_iterator(self, generator_asset, **kwargs): else: raise ValueError("Table name must be of shape '[SCHEMA.]TABLE'. Passed: " + split_generator_asset) tables = self.inspector.get_table_names(schema=schema_name) - tables.extend(self.inspector.get_view_names(schema=schema_name)) + try: + tables.extend(self.inspector.get_view_names(schema=schema_name)) + except NotImplementedError: + # Not implemented by bigquery dialect + pass + if table_name in tables: return iter([ SqlAlchemyDatasourceTableBatchKwargs( @@ -162,13 +167,17 @@ def get_available_data_asset_names(self): if table_name not in known_system_tables ] ) - tables.extend( - [table_name if self.inspector.default_schema_name == schema_name else - schema_name + "." + table_name - for table_name in self.inspector.get_view_names(schema=schema_name) - if table_name not in known_system_tables - ] - ) + try: + tables.extend( + [table_name if self.inspector.default_schema_name == schema_name else + schema_name + "." + table_name + for table_name in self.inspector.get_view_names(schema=schema_name) + if table_name not in known_system_tables + ] + ) + except NotImplementedError: + # Not implemented by bigquery dialect + pass return defined_assets + tables diff --git a/great_expectations/datasource/sqlalchemy_datasource.py b/great_expectations/datasource/sqlalchemy_datasource.py index 242401031cb5..fa8e0923ad64 100644 --- a/great_expectations/datasource/sqlalchemy_datasource.py +++ b/great_expectations/datasource/sqlalchemy_datasource.py @@ -187,10 +187,16 @@ def _get_data_asset(self, batch_kwargs, expectation_suite, **kwargs): ) elif "query" in batch_kwargs: + if "bigquery_temp_table" in batch_kwargs: + table_name = batch_kwargs.get("bigquery_temp_table") + else: + table_name = None + query = Template(batch_kwargs["query"]).safe_substitute(**kwargs) return data_asset_type( custom_sql=query, engine=self.engine, + table_name=table_name, data_context=self._data_context, expectation_suite=expectation_suite, batch_kwargs=batch_kwargs, diff --git a/requirements-dev.txt b/requirements-dev.txt index 43c0bf8c2d3f..d6cec487ba17 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -31,3 +31,5 @@ pytest-cov>=2.6.1 coveralls>=1.3 freezegun>=0.3.12 moto>=1.3.7 +google-cloud-storage>=1.20.0 +pybigquery>=0.4.11 diff --git a/tests/store/test_store_backends.py b/tests/store/test_store_backends.py index b86a7f4a88bf..1e5b3d89f1e8 100644 --- a/tests/store/test_store_backends.py +++ b/tests/store/test_store_backends.py @@ -5,6 +5,7 @@ import re import boto3 from moto import mock_s3 +from mock import patch import six if six.PY2: FileNotFoundError = IOError @@ -17,6 +18,7 @@ InMemoryStoreBackend, FixedLengthTupleFilesystemStoreBackend, FixedLengthTupleS3StoreBackend, + FixedLengthTupleGCSStoreBackend, ) from great_expectations.util import ( gen_directory_tree_str, @@ -210,7 +212,66 @@ def test_FixedLengthTupleS3StoreBackend(): print(my_store.list_keys()) assert set(my_store.list_keys()) == {("AAA",), ("BBB",)} + assert set([s3_object_info['Key'] for s3_object_info in boto3.client('s3').list_objects(Bucket=bucket, Prefix=prefix)['Contents']])\ + == set(['this_is_a_test_prefix/my_file_AAA', 'this_is_a_test_prefix/my_file_BBB']) - assert set([s3_object_info['Key'] for s3_object_info in boto3.client('s3').list_objects( - Bucket=bucket, Prefix=prefix)['Contents']]) == {'this_is_a_test_prefix/my_file_AAA', - 'this_is_a_test_prefix/my_file_BBB'} + +def test_FixedLengthTupleGCSStoreBackend(): + + """ + What does this test test and why? + + Since no package like moto exists for GCP services, we mock the GCS client + and assert that the store backend makes the right calls for set, get, and list. + + """ + + path = "dummy_str" + bucket = "leakybucket" + prefix = "this_is_a_test_prefix" + project = "dummy-project" + + my_store = FixedLengthTupleGCSStoreBackend( + root_directory=os.path.abspath(path), # NOTE: Eugene: 2019-09-06: root_directory should be removed from the base class + key_length=1, + filepath_template="my_file_{0}", + bucket=bucket, + prefix=prefix, + project=project + ) + + with patch("google.cloud.storage.Client", autospec=True) as mock_gcs_client: + + mock_client = mock_gcs_client.return_value + mock_bucket = mock_client.get_bucket.return_value + mock_blob = mock_bucket.blob.return_value + + my_store.set(("AAA",), "aaa", content_type='text/html') + + mock_gcs_client.assert_called_once_with('dummy-project') + mock_client.get_bucket.assert_called_once_with("leakybucket") + mock_bucket.blob.assert_called_once_with("this_is_a_test_prefix/my_file_AAA") + mock_blob.upload_from_string.assert_called_once_with(b"aaa", content_type="text/html") + + with patch("google.cloud.storage.Client", autospec=True) as mock_gcs_client: + + mock_client = mock_gcs_client.return_value + mock_bucket = mock_client.get_bucket.return_value + mock_blob = mock_bucket.get_blob.return_value + mock_str = mock_blob.download_as_string.return_value + + my_store.get(("BBB",)) + + mock_gcs_client.assert_called_once_with('dummy-project') + mock_client.get_bucket.assert_called_once_with("leakybucket") + mock_bucket.get_blob.assert_called_once_with("this_is_a_test_prefix/my_file_BBB") + mock_blob.download_as_string.assert_called_once() + mock_str.decode.assert_called_once_with("utf-8") + + with patch("google.cloud.storage.Client", autospec=True) as mock_gcs_client: + + mock_client = mock_gcs_client.return_value + + my_store.list_keys() + + mock_client.list_blobs.assert_called_once_with("leakybucket", prefix="this_is_a_test_prefix")