Skip to content

Commit

Permalink
feat(ingest): great-expectations - add more logs (#4832)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored May 5, 2022
1 parent 3775e79 commit b2c82dc
Showing 1 changed file with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _run(
datasets = self.get_dataset_partitions(batch_identifier, data_asset)

if len(datasets) == 0 or datasets[0]["dataset_urn"] is None:
logger.info("Metadata not sent to datahub. No datasets found.")
warn("Metadata not sent to datahub. No datasets found.")
return {"datahub_notification_result": "none required"}

# Returns assertion info and assertion results
Expand All @@ -143,7 +143,15 @@ def _run(
datasets,
)

logger.info("Sending metadata to datahub ...")
logger.info("Dataset URN - {urn}".format(urn=datasets[0]["dataset_urn"]))

for assertion in assertions:

logger.info(
"Assertion URN - {urn}".format(urn=assertion["assertionUrn"])
)

# Construct a MetadataChangeProposalWrapper object.
assertion_info_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
Expand Down Expand Up @@ -175,7 +183,7 @@ def _run(

# Emit Result! (timseries aspect)
emitter.emit_mcp(dataset_assertionResult_mcp)

logger.info("Metadata sent to datahub.")
result = "DataHub notification succeeded"
except Exception as e:
result = "DataHub notification failed"
Expand Down Expand Up @@ -245,6 +253,11 @@ def get_assertions_with_results(
}
)
)
logger.debug(
"GE expectation_suite_name - {name}, expectation_type - {type}, Assertion URN - {urn}".format(
name=expectation_suite_name, type=expectation_type, urn=assertionUrn
)
)
assertionInfo: AssertionInfo = self.get_assertion_info(
expectation_type,
kwargs,
Expand Down Expand Up @@ -541,6 +554,8 @@ def get_min_max(kwargs, type=AssertionStdParameterType.UNKNOWN):
def get_dataset_partitions(self, batch_identifier, data_asset):
dataset_partitions = []

logger.debug("Finding datasets being validated")

# for now, we support only v3-api and sqlalchemy execution engine
if isinstance(data_asset, Validator) and isinstance(
data_asset.execution_engine, SqlAlchemyExecutionEngine
Expand Down Expand Up @@ -645,13 +660,16 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
)
else:
warn(
f"DataHubValidationAction does not recognize this GE batch spec type- {type(ge_batch_spec)}."
"DataHubValidationAction does not recognize this GE batch spec type- {batch_spec_type}.".format(
batch_spec_type=type(ge_batch_spec)
)
)
else:
# TODO - v2-spec - SqlAlchemyDataset support
warn(
f"DataHubValidationAction does not recognize this GE data asset type - {type(data_asset)}. \
This is either using v2-api or execution engine other than sqlalchemy."
"DataHubValidationAction does not recognize this GE data asset type - {asset_type}. This is either using v2-api or execution engine other than sqlalchemy.".format(
asset_type=type(data_asset)
)
)

return dataset_partitions
Expand Down Expand Up @@ -695,8 +713,9 @@ def make_dataset_urn_from_sqlalchemy_uri(
elif data_platform in ["trino", "snowflake"]:
if schema_name is None or url_instance.database is None:
warn(
f"DataHubValidationAction failed to locate schema name and/or database name \
for {data_platform}."
"DataHubValidationAction failed to locate schema name and/or database name for {data_platform}.".format(
data_platform=data_platform
)
)
return None
# If data platform is snowflake, we artificially lowercase the Database name.
Expand All @@ -711,8 +730,9 @@ def make_dataset_urn_from_sqlalchemy_uri(
elif data_platform == "bigquery":
if url_instance.host is None or url_instance.database is None:
warn(
f"DataHubValidationAction failed to locate host and/or database name for \
{data_platform}. "
"DataHubValidationAction failed to locate host and/or database name for {data_platform}. ".format(
data_platform=data_platform
)
)
return None
schema_name = "{}.{}".format(url_instance.host, url_instance.database)
Expand Down

0 comments on commit b2c82dc

Please sign in to comment.