Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
ajoymajumdar authored Apr 4, 2024
2 parents fbb9f82 + b5615fa commit 3002950
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 43 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion-modules/airflow-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ task cleanPythonCache(type: Exec) {
commandLine 'bash', '-c',
"find src -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete -o -type d -empty -delete"
}
task buildWheel(type: Exec, dependsOn: [environmentSetup, cleanPythonCache]) {
task buildWheel(type: Exec, dependsOn: [environmentSetup]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
}

build.dependsOn install
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -euxo pipefail

if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then
if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew build # also runs tests
elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew install
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/dagster-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) {
}
task buildWheel(type: Exec, dependsOn: [environmentSetup]) {
commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " +
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
}

task cleanPythonCache(type: Exec) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -euxo pipefail

if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then
if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew build # also runs tests
elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew install
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ task cleanPythonCache(type: Exec) {
task buildWheel(type: Exec, dependsOn: [install, codegen, cleanPythonCache]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && " +
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
}

build.dependsOn install
Expand Down
20 changes: 11 additions & 9 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. |
| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. |
| `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. |
| `tag_character_mapping` | | dict[str, str] | | A mapping of tag character to datahub owner character. If provided, `tag_pattern` config should be matched against converted tag as per mapping|
| `email_domain` | | str | | If set then this is appended to create owner URN. |
| `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config. For example: if provided tag pattern is `(.*)_owner_email:` and actual tag is `developer_owner_email`, then extracted owner type will be `developer`.|
| `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. |
Expand All @@ -40,14 +40,14 @@ transformers:
```
So if we have input dataset tag like
- `urn:li:tag:dataset_owner_email:[email protected]`
- `urn:li:tag:dataset_owner_email:[email protected]`
- `urn:li:tag:owner_email:[email protected]`
- `urn:li:tag:owner_email:[email protected]`

The portion of the tag after the matched tag pattern will be converted into an owner. Hence users `[email protected]` and `[email protected]` will be added as owners.

### Examples

- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:dataset_owner:abc` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:owner:abc` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
Expand All @@ -56,7 +56,7 @@ The portion of the tag after the matched tag pattern will be converted into an o
is_user: false
email_domain: "email.com"
```
- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:dataset_owner_email:[email protected]` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this:
- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:owner_email:[email protected]` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
Expand All @@ -65,15 +65,17 @@ The portion of the tag after the matched tag pattern will be converted into an o
owner_type: "CUSTOM"
owner_type_urn: "urn:li:ownershipType:data_product"
```
- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: from tag urn `urn:li:tag:dataset_owner_email:abc_xyz-email_com` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
- Add owners, however some tag characters needs to replace with some other characters before extracting owner. For example: from tag urn `urn:li:tag:owner__email:abc--xyz-email_com` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "owner_email:"
owner_character_mapping:
"_": ".",
"-": "@",
tag_character_mapping:
"_": "."
"-": "@"
"--": "-"
"__": "_"
```
- Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:[email protected]` extracted owner type should be `data_producer` then config would look like this:
```yaml
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/scripts/release.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -euxo pipefail

if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then
if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../gradlew build # also runs tests
elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../gradlew install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
class ExtractOwnersFromTagsConfig(ConfigModel):
tag_pattern: str = ""
is_user: bool = True
owner_character_mapping: Optional[Dict[str, str]] = None
tag_character_mapping: Optional[Dict[str, str]] = None
email_domain: Optional[str] = None
extract_owner_type_from_tag_pattern: bool = False
owner_type: str = "TECHNICAL_OWNER"
Expand Down Expand Up @@ -70,18 +70,35 @@ def get_owner_urn(self, owner_str: str) -> str:
return owner_str + "@" + self.config.email_domain
return owner_str

def convert_owner_as_per_mapping(self, owner: str) -> str:
if self.config.owner_character_mapping:
# Sort the provided mapping by its length.
# Eg: Suppose we have {"_":".", "__":"#"} character mapping.
# In this case "__" character should get replace first compare to "_" character.
for key in sorted(
self.config.owner_character_mapping.keys(),
def convert_tag_as_per_mapping(self, tag: str) -> str:
"""
Function to modify tag as per provided tag character mapping. It also handles the overlappings in the mapping.
Eg: '--':'-' & '-':'@' should not cause incorrect mapping.
"""
if self.config.tag_character_mapping:
# indices list to keep track of the indices where replacements have been made
indices: List[int] = list()
for old_char in sorted(
self.config.tag_character_mapping.keys(),
key=len,
reverse=True,
):
owner = owner.replace(key, self.config.owner_character_mapping[key])
return owner
new_char = self.config.tag_character_mapping[old_char]
index = tag.find(old_char)
while index != -1:
if index not in indices:
tag = tag[:index] + new_char + tag[index + len(old_char) :]
# Adjust indices for overlapping replacements
indices = [
each + (len(new_char) - len(old_char))
if each > index
else each
for each in indices
]
indices.append(index)
# Find the next occurrence of old_char, starting from the next index
index = tag.find(old_char, index + len(new_char))
return tag

def handle_end_of_stream(
self,
Expand All @@ -100,10 +117,10 @@ def transform_aspect(

for tag_class in tags:
tag_str = TagUrn.from_string(tag_class.tag).name
tag_str = self.convert_tag_as_per_mapping(tag_str)
re_match = re.search(self.config.tag_pattern, tag_str)
if re_match:
owner_str = tag_str[re_match.end() :].strip()
owner_str = self.convert_owner_as_per_mapping(owner_str)
owner_urn_str = self.get_owner_urn(owner_str)
owner_urn = (
str(CorpuserUrn(owner_urn_str))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import (
RuntimeDataBatchSpec,
RuntimeQueryBatchSpec,
SqlAlchemyDatasourceBatchSpec,
)
Expand All @@ -24,6 +25,7 @@
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.execution_engine.sqlalchemy_execution_engine import (
SqlAlchemyExecutionEngine,
)
Expand Down Expand Up @@ -566,10 +568,12 @@ def get_dataset_partitions(self, batch_identifier, data_asset):

logger.debug("Finding datasets being validated")

# for now, we support only v3-api and sqlalchemy execution engine
if isinstance(data_asset, Validator) and isinstance(
data_asset.execution_engine, SqlAlchemyExecutionEngine
):
# for now, we support only v3-api and sqlalchemy execution engine and Pandas engine
is_sql_alchemy = isinstance(data_asset, Validator) and (
isinstance(data_asset.execution_engine, SqlAlchemyExecutionEngine)
)
is_pandas = isinstance(data_asset.execution_engine, PandasExecutionEngine)
if is_sql_alchemy or is_pandas:
ge_batch_spec = data_asset.active_batch_spec
partitionSpec = None
batchSpecProperties = {
Expand All @@ -581,10 +585,14 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
),
}
sqlalchemy_uri = None
if isinstance(data_asset.execution_engine.engine, Engine):
if is_sql_alchemy and isinstance(
data_asset.execution_engine.engine, Engine
):
sqlalchemy_uri = data_asset.execution_engine.engine.url
# For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection
elif isinstance(data_asset.execution_engine.engine, Connection):
elif is_sql_alchemy and isinstance(
data_asset.execution_engine.engine, Connection
):
sqlalchemy_uri = data_asset.execution_engine.engine.engine.url

if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec):
Expand Down Expand Up @@ -680,6 +688,30 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
"batchSpec": batchSpec,
}
)
elif isinstance(ge_batch_spec, RuntimeDataBatchSpec):
data_platform = self.get_platform_instance(
data_asset.active_batch_definition.datasource_name
)
dataset_urn = builder.make_dataset_urn_with_platform_instance(
platform=data_platform
if self.platform_alias is None
else self.platform_alias,
name=data_asset.active_batch_definition.datasource_name,
platform_instance="",
env=self.env,
)
batchSpec = BatchSpec(
nativeBatchId=batch_identifier,
query="",
customProperties=batchSpecProperties,
)
dataset_partitions.append(
{
"dataset_urn": dataset_urn,
"partitionSpec": partitionSpec,
"batchSpec": batchSpec,
}
)
else:
warn(
"DataHubValidationAction does not recognize this GE batch spec type- {batch_spec_type}.".format(
Expand Down
Loading

0 comments on commit 3002950

Please sign in to comment.