Skip to content

Commit

Permalink
refactor(ingest): clean up exception types (datahub-project#6818)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and szalai1 committed Dec 22, 2022
1 parent bdf5c7b commit 7bab889
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 71 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/glue/glue.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
## Compatibility
### Compatibility

To capture lineage across Glue jobs and databases, a requirements must be met – otherwise the AWS API is unable to report any lineage. The job must be created in Glue Studio with the "Generate classic script" option turned on (this option can be accessed in the "Script" tab). Any custom scripts that do not have the proper annotations will not have reported lineage.
16 changes: 10 additions & 6 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,19 @@ class DynamicTypedConfig(ConfigModel):


class MetaError(Exception):
"""A base class for all meta exceptions"""
"""A base class for all meta exceptions."""


class PipelineExecutionError(MetaError):
"""An error occurred when executing the pipeline"""
"""An error occurred when executing the pipeline."""


class OperationalError(PipelineExecutionError):
"""An error occurred because of client-provided metadata"""
class GraphError(MetaError):
"""An error in communicating with the DataHub Graph."""


class OperationalError(GraphError):
"""A GraphError with extra debug annotations."""

message: str
info: dict
Expand All @@ -147,11 +151,11 @@ def __init__(self, message: str, info: Optional[dict] = None):


class ConfigurationError(MetaError):
"""A configuration error has happened"""
"""A configuration error."""


class IgnorableError(MetaError):
"""An error that can be ignored"""
"""An error that can be ignored."""


class ConfigurationMechanism(ABC):
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from requests.models import HTTPError

from datahub.cli.cli_utils import get_boolean_env_variable
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform
Expand Down Expand Up @@ -157,7 +157,7 @@ def get_aspect(
post_json_obj = post_json_transform(aspect_json)
return aspect_type.from_obj(post_json_obj)
else:
raise OperationalError(
raise GraphError(
f"Failed to find {aspect_type_name} in response {response_json}"
)

Expand Down Expand Up @@ -297,7 +297,7 @@ def get_latest_timeseries_value(
if aspect_json:
return aspect_type.from_obj(json.loads(aspect_json), tuples=False)
else:
raise OperationalError(
raise GraphError(
f"Failed to find {aspect_type} in response {aspect_json}"
)
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class GlueSource(StatefulIngestionSourceBase):
- Table metadata, such as owner, description and parameters
- Jobs and their component transformations, data sources, and data sinks
## IAM permissions
### IAM permissions
For ingesting datasets, the following IAM permissions are required:
```json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,9 @@ def _extract_metadata_from_sql_query(
sql_query = derived_table["sql"]
reporter.query_parse_attempts += 1

# Skip queries that contain liquid variables. We currently don't parse them correctly
# Skip queries that contain liquid variables. We currently don't parse them correctly.
# Docs: https://cloud.google.com/looker/docs/liquid-variable-reference.
# TODO: also support ${EXTENDS} and ${TABLE}
if "{%" in sql_query:
try:
# test if parsing works
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import json
import re
import time
import warnings
from typing import Any, Dict, Generator, List, Optional, Tuple

import requests
import yaml
from requests.auth import HTTPBasicAuth

from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
OtherSchemaClass,
SchemaField,
Expand Down Expand Up @@ -385,16 +383,12 @@ def set_metadata(
)
canonical_schema.append(field)

actor = "urn:li:corpuser:etl"
sys_time = int(time.time() * 1000)
schema_metadata = SchemaMetadata(
schemaName=dataset_name,
platform=f"urn:li:dataPlatform:{platform}",
version=0,
hash="",
platformSchema=OtherSchemaClass(rawSchema=""),
created=AuditStamp(time=sys_time, actor=actor),
lastModified=AuditStamp(time=sys_time, actor=actor),
fields=canonical_schema,
)
return schema_metadata
5 changes: 1 addition & 4 deletions metadata-ingestion/tests/integration/lookml/test_lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,8 @@ def test_lookml_bad_sql_parser(pytestconfig, tmp_path, mock_time):
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
try:
with pytest.raises(PipelineExecutionError): # we expect the source to have warnings
pipeline.raise_from_status(raise_warnings=True)
assert False, "Pipeline should have generated warnings"
except PipelineExecutionError:
pass

mce_helpers.check_golden_file(
pytestconfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {},
"externalUrl": null,
"description": "List API versions",
"uri": null,
"tags": []
}
},
Expand All @@ -27,8 +24,7 @@
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
"actor": "urn:li:corpuser:etl",
"impersonator": null
"actor": "urn:li:corpuser:etl"
}
}
]
Expand All @@ -40,18 +36,13 @@
"platform": "urn:li:dataPlatform:api",
"version": 0,
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:etl",
"impersonator": null
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:etl",
"impersonator": null
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
Expand All @@ -61,7 +52,6 @@
"fields": [
{
"fieldPath": "foo",
"jsonPath": null,
"nullable": false,
"description": "",
"type": {
Expand All @@ -71,38 +61,28 @@
},
"nativeDataType": "str",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": null
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00",
"properties": null
"runId": "openapi-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {},
"externalUrl": null,
"description": "Show API version details",
"uri": null,
"tags": []
}
},
Expand All @@ -119,8 +99,7 @@
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
"actor": "urn:li:corpuser:etl",
"impersonator": null
"actor": "urn:li:corpuser:etl"
}
}
]
Expand All @@ -132,18 +111,13 @@
"platform": "urn:li:dataPlatform:api",
"version": 0,
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:etl",
"impersonator": null
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:etl",
"impersonator": null
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
Expand All @@ -153,7 +127,6 @@
"fields": [
{
"fieldPath": "foo",
"jsonPath": null,
"nullable": false,
"description": "",
"type": {
Expand All @@ -163,24 +136,17 @@
},
"nativeDataType": "str",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": null
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00",
"properties": null
"runId": "openapi-2020_04_14-07_00_00"
}
}
]
]

0 comments on commit 7bab889

Please sign in to comment.