diff --git a/datahub-web-react/src/utils/sort/topologicalSort.ts b/datahub-web-react/src/utils/sort/topologicalSort.ts index 792c7207d152b..ceca4b72b963d 100644 --- a/datahub-web-react/src/utils/sort/topologicalSort.ts +++ b/datahub-web-react/src/utils/sort/topologicalSort.ts @@ -6,6 +6,7 @@ function topologicalSortHelper( explored: Set, result: Array, urnsArray: Array, + nodes: Array, ) { if (!node.entity?.urn) { return; @@ -16,11 +17,14 @@ function topologicalSortHelper( .filter((entity) => entity?.entity?.urn && urnsArray.includes(entity?.entity?.urn)) .forEach((n) => { if (n?.entity?.urn && !explored.has(n?.entity?.urn)) { - topologicalSortHelper(n, explored, result, urnsArray); + topologicalSortHelper(n, explored, result, urnsArray, nodes); } }); if (urnsArray.includes(node?.entity?.urn)) { - result.push(node); + const fullyFetchedEntity = nodes.find((n) => n?.entity?.urn === node?.entity?.urn); + if (fullyFetchedEntity) { + result.push(fullyFetchedEntity); + } } } @@ -34,7 +38,7 @@ export function topologicalSort(input: Array) { .map((node) => node.entity?.urn) as Array; nodes.forEach((node) => { if (node.entity?.urn && !explored.has(node.entity?.urn)) { - topologicalSortHelper(node, explored, result, urnsArray); + topologicalSortHelper(node, explored, result, urnsArray, nodes); } }); diff --git a/gms/api/src/main/pegasus/com/linkedin/datajob/DataJob.pdl b/gms/api/src/main/pegasus/com/linkedin/datajob/DataJob.pdl index b6d58df214eb3..2102ea5b6e7ce 100644 --- a/gms/api/src/main/pegasus/com/linkedin/datajob/DataJob.pdl +++ b/gms/api/src/main/pegasus/com/linkedin/datajob/DataJob.pdl @@ -7,7 +7,7 @@ import com.linkedin.common.Status import com.linkedin.common.GlobalTags /** - * Metadata bout DataJob + * Metadata about DataJob */ record DataJob includes DataJobKey, ChangeAuditStamps { /** @@ -28,7 +28,7 @@ record DataJob includes DataJobKey, ChangeAuditStamps { /** * Input and output datasets of job */ - inputOutput: optional DataJobInputOutput + inputOutput: optional DataJobInputOutput /** * Status information for the chart such as removed or not diff --git a/gms/api/src/main/snapshot/com.linkedin.datajob.dataJobs.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.datajob.dataJobs.snapshot.json index 9c16a60fab6c1..1dc322312120a 100644 --- a/gms/api/src/main/snapshot/com.linkedin.datajob.dataJobs.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.datajob.dataJobs.snapshot.json @@ -378,7 +378,7 @@ "type" : "record", "name" : "DataJob", "namespace" : "com.linkedin.datajob", - "doc" : "Metadata bout DataJob", + "doc" : "Metadata about DataJob", "include" : [ { "type" : "record", "name" : "DataJobKey", @@ -438,9 +438,10 @@ "name" : "AzkabanJobType", "namespace" : "com.linkedin.datajob.azkaban", "doc" : "The various types of support azkaban jobs", - "symbols" : [ "COMMAND", "HADOOP_JAVA", "HADOOP_SHELL", "HIVE", "PIG", "SQL" ], + "symbols" : [ "COMMAND", "HADOOP_JAVA", "HADOOP_SHELL", "HIVE", "PIG", "SQL", "GLUE" ], "symbolDocs" : { "COMMAND" : "The command job type is one of the basic built-in types. It runs multiple UNIX commands using java processbuilder.\nUpon execution, Azkaban spawns off a process to run the command.", + "GLUE" : "Glue type is for running AWS Glue job transforms.", "HADOOP_JAVA" : "Runs a java program with ability to access Hadoop cluster.\nhttps://azkaban.readthedocs.io/en/latest/jobTypes.html#java-job-type", "HADOOP_SHELL" : "In large part, this is the same Command type. The difference is its ability to talk to a Hadoop cluster\nsecurely, via Hadoop tokens.", "HIVE" : "Hive type is for running Hive jobs.", diff --git a/gms/api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 7f184170ac5c2..bd6a09a7b4f04 100644 --- a/gms/api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -956,9 +956,10 @@ "name" : "AzkabanJobType", "namespace" : "com.linkedin.datajob.azkaban", "doc" : "The various types of support azkaban jobs", - "symbols" : [ "COMMAND", "HADOOP_JAVA", "HADOOP_SHELL", "HIVE", "PIG", "SQL" ], + "symbols" : [ "COMMAND", "HADOOP_JAVA", "HADOOP_SHELL", "HIVE", "PIG", "SQL", "GLUE" ], "symbolDocs" : { "COMMAND" : "The command job type is one of the basic built-in types. It runs multiple UNIX commands using java processbuilder.\nUpon execution, Azkaban spawns off a process to run the command.", + "GLUE" : "Glue type is for running AWS Glue job transforms.", "HADOOP_JAVA" : "Runs a java program with ability to access Hadoop cluster.\nhttps://azkaban.readthedocs.io/en/latest/jobTypes.html#java-job-type", "HADOOP_SHELL" : "In large part, this is the same Command type. The difference is its ability to talk to a Hadoop cluster\nsecurely, via Hadoop tokens.", "HIVE" : "Hive type is for running Hive jobs.", diff --git a/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index b9a2bec0902f9..6ce21bf375603 100644 --- a/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -1231,9 +1231,10 @@ "name" : "AzkabanJobType", "namespace" : "com.linkedin.datajob.azkaban", "doc" : "The various types of support azkaban jobs", - "symbols" : [ "COMMAND", "HADOOP_JAVA", "HADOOP_SHELL", "HIVE", "PIG", "SQL" ], + "symbols" : [ "COMMAND", "HADOOP_JAVA", "HADOOP_SHELL", "HIVE", "PIG", "SQL", "GLUE" ], "symbolDocs" : { "COMMAND" : "The command job type is one of the basic built-in types. It runs multiple UNIX commands using java processbuilder.\nUpon execution, Azkaban spawns off a process to run the command.", + "GLUE" : "Glue type is for running AWS Glue job transforms.", "HADOOP_JAVA" : "Runs a java program with ability to access Hadoop cluster.\nhttps://azkaban.readthedocs.io/en/latest/jobTypes.html#java-job-type", "HADOOP_SHELL" : "In large part, this is the same Command type. The difference is its ability to talk to a Hadoop cluster\nsecurely, via Hadoop tokens.", "HIVE" : "Hive type is for running Hive jobs.", diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 3d6e7a452a6dc..918440ff2c859 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -485,12 +485,14 @@ Extracts: - List of tables - Column types associated with each table - Table metadata, such as owner, description and parameters +- Jobs and their component transformations, data sources, and data sinks ```yml source: type: glue config: aws_region: # aws_region_name, i.e. "eu-west-1" + extract_transforms: True # whether to ingest Glue jobs, defaults to True env: # environment for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". Defaults to "PROD". # Filtering patterns for databases and tables to scan @@ -683,8 +685,8 @@ Pull metadata from dbt artifacts files: - [data platforms](https://github.com/linkedin/datahub/blob/master/gms/impl/src/main/resources/DataPlatformInfo.json) - load_schemas: - Load schemas from dbt catalog file, not necessary when the underlying data platform already has this data. -- node_type_pattern: - - Use this filter to exclude and include node types using allow or deny method +- node_type_pattern: + - Use this filter to exclude and include node types using allow or deny method ```yml source: @@ -698,7 +700,7 @@ source: deny: - ^test.* allow: - - ^.* + - ^.* ``` Note: when `load_schemas` is False, models that use [identifiers](https://docs.getdbt.com/reference/resource-properties/identifier) to reference their source tables are ingested using the model identifier as the model name to preserve the lineage. diff --git a/metadata-ingestion/examples/recipes/glue_to_datahub.yml b/metadata-ingestion/examples/recipes/glue_to_datahub.yml index 56e6292abfaab..21c87353c2ef8 100644 --- a/metadata-ingestion/examples/recipes/glue_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/glue_to_datahub.yml @@ -1,9 +1,10 @@ source: type: glue config: - aws_region: "us-east-1" + aws_region: "us-west-2" + extract_transforms: true sink: type: "datahub-rest" config: - server: 'http://localhost:8080' \ No newline at end of file + server: "http://localhost:8080" diff --git a/metadata-ingestion/scripts/update_golden_files.sh b/metadata-ingestion/scripts/update_golden_files.sh index 3861943c181b1..68a737b5e7a55 100755 --- a/metadata-ingestion/scripts/update_golden_files.sh +++ b/metadata-ingestion/scripts/update_golden_files.sh @@ -14,6 +14,7 @@ cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mce cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mces_golden.json cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json +cp tmp/test_glue_ingest0/glue_mces.json tests/unit/glue/glue_mces_golden.json cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue.py b/metadata-ingestion/src/datahub/ingestion/source/glue.py index 989cfe68d2f5a..1692760ac327f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/glue.py @@ -1,13 +1,18 @@ +import json import time +import typing +from collections import defaultdict from dataclasses import dataclass from dataclasses import field as dataclass_field from functools import reduce -from typing import Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union +from urllib.parse import urlparse import boto3 from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern +from datahub.emitter import mce_builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.metadata_common import MetadataWorkUnit @@ -31,8 +36,14 @@ ) from datahub.metadata.schema_classes import ( AuditStampClass, + DataFlowInfoClass, + DataFlowSnapshotClass, + DataJobInfoClass, + DataJobInputOutputClass, + DataJobSnapshotClass, DatasetPropertiesClass, MapTypeClass, + MetadataChangeEventClass, OwnerClass, OwnershipClass, OwnershipTypeClass, @@ -59,23 +70,26 @@ def assume_role( class GlueSourceConfig(ConfigModel): env: str = "PROD" + database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + + extract_transforms: Optional[bool] = True + aws_access_key_id: Optional[str] = None aws_secret_access_key: Optional[str] = None aws_session_token: Optional[str] = None aws_role: Optional[Union[str, List[str]]] = None aws_region: str - @property - def glue_client(self): + def get_client(self, service: str) -> boto3.client: if ( self.aws_access_key_id and self.aws_secret_access_key and self.aws_session_token ): return boto3.client( - "glue", + service, aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, aws_session_token=self.aws_session_token, @@ -83,7 +97,7 @@ def glue_client(self): ) elif self.aws_access_key_id and self.aws_secret_access_key: return boto3.client( - "glue", + service, aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, region_name=self.aws_region, @@ -100,14 +114,22 @@ def glue_client(self): {}, ) return boto3.client( - "glue", + service, aws_access_key_id=credentials["AccessKeyId"], aws_secret_access_key=credentials["SecretAccessKey"], aws_session_token=credentials["SessionToken"], region_name=self.aws_region, ) else: - return boto3.client("glue", region_name=self.aws_region) + return boto3.client(service, region_name=self.aws_region) + + @property + def glue_client(self): + return self.get_client("glue") + + @property + def s3_client(self): + return self.get_client("s3") @dataclass @@ -131,6 +153,8 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext): self.source_config = config self.report = GlueSourceReport() self.glue_client = config.glue_client + self.s3_client = config.s3_client + self.extract_transforms = config.extract_transforms self.env = config.env @classmethod @@ -138,37 +162,313 @@ def create(cls, config_dict, ctx): config = GlueSourceConfig.parse_obj(config_dict) return cls(config, ctx) - def get_workunits(self) -> Iterable[MetadataWorkUnit]: - def get_all_tables() -> List[dict]: - def get_tables_from_database(database_name: str) -> List[dict]: - new_tables = [] - paginator = self.glue_client.get_paginator("get_tables") - for page in paginator.paginate(DatabaseName=database_name): - new_tables += page["TableList"] - - return new_tables - - def get_database_names() -> List[str]: - database_names = [] - paginator = self.glue_client.get_paginator("get_databases") - for page in paginator.paginate(): - for db in page["DatabaseList"]: - if self.source_config.database_pattern.allowed(db["Name"]): - database_names.append(db["Name"]) - - return database_names - - if self.source_config.database_pattern.is_fully_specified_allow_list(): - database_names = self.source_config.database_pattern.get_allowed_list() + def get_all_jobs(self): + """ + List all jobs in Glue. + """ + + jobs = [] + + # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_jobs + paginator = self.glue_client.get_paginator("get_jobs") + for page in paginator.paginate(): + jobs += page["Jobs"] + + return jobs + + def get_dataflow_graph(self, script_path: str) -> Dict[str, Any]: + """ + Get the DAG of transforms and data sources/sinks for a job. + + Parameters + ---------- + script_path: + S3 path to the job's Python script. + """ + + # extract the script's bucket and key + url = urlparse(script_path, allow_fragments=False) + bucket = url.netloc + key = url.path[1:] + + # download the script contents + # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object + obj = self.s3_client.get_object(Bucket=bucket, Key=key) + script = obj["Body"].read().decode("utf-8") + + # extract the job DAG from the script + # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_dataflow_graph + return self.glue_client.get_dataflow_graph(PythonScript=script) + + def get_dataflow_s3_names( + self, dataflow_graph: Dict[str, Any] + ) -> Iterator[Tuple[str, Optional[str]]]: + + # iterate through each node to populate processed nodes + for node in dataflow_graph["DagNodes"]: + + node_type = node["NodeType"] + + # for nodes representing datasets, we construct a dataset URN accordingly + if node_type in ["DataSource", "DataSink"]: + + node_args = {x["Name"]: json.loads(x["Value"]) for x in node["Args"]} + + # if data object is S3 bucket + if node_args.get("connection_type") == "s3": + + # remove S3 prefix (s3://) + s3_name = node_args["connection_options"]["path"][5:] + + if s3_name.endswith("/"): + s3_name = s3_name[:-1] + + extension = node_args.get("format") + + yield s3_name, extension + + def process_dataflow_node( + self, + node: Dict[str, Any], + flow_urn: str, + new_dataset_ids: List[str], + new_dataset_mces: List[MetadataChangeEvent], + s3_formats: typing.DefaultDict[str, Set[Union[str, None]]], + ) -> Dict[str, Any]: + + node_type = node["NodeType"] + + # for nodes representing datasets, we construct a dataset URN accordingly + if node_type in ["DataSource", "DataSink"]: + + node_args = {x["Name"]: json.loads(x["Value"]) for x in node["Args"]} + + # if data object is Glue table + if "database" in node_args and "table_name" in node_args: + + full_table_name = f"{node_args['database']}.{node_args['table_name']}" + + # we know that the table will already be covered when ingesting Glue tables + node_urn = f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})" + + # if data object is S3 bucket + elif node_args.get("connection_type") == "s3": + + # remove S3 prefix (s3://) + s3_name = node_args["connection_options"]["path"][5:] + + if s3_name.endswith("/"): + s3_name = s3_name[:-1] + + # append S3 format if different ones exist + if len(s3_formats[s3_name]) > 1: + node_urn = f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{node_args.get('format')},{self.env})" + + else: + node_urn = ( + f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{self.env})" + ) + + dataset_snapshot = DatasetSnapshot( + urn=node_urn, + aspects=[], + ) + + dataset_snapshot.aspects.append(Status(removed=False)) + dataset_snapshot.aspects.append( + OwnershipClass( + owners=[], + lastModified=AuditStampClass( + time=int(time.time() * 1000), + actor="urn:li:corpuser:datahub", + ), + ) + ) + dataset_snapshot.aspects.append( + DatasetPropertiesClass( + customProperties={k: str(v) for k, v in node_args.items()}, + tags=[], + ) + ) + + new_dataset_mces.append( + MetadataChangeEvent(proposedSnapshot=dataset_snapshot) + ) + new_dataset_ids.append(f"{node['NodeType']}-{node['Id']}") + + else: + + raise ValueError(f"Unrecognized Glue data object type: {node_args}") + + # otherwise, a node represents a transformation + else: + node_urn = mce_builder.make_data_job_urn_with_flow( + flow_urn, job_id=f'{node["NodeType"]}-{node["Id"]}' + ) + + return { + **node, + "urn": node_urn, + # to be filled in after traversing edges + "inputDatajobs": [], + "inputDatasets": [], + "outputDatasets": [], + } + + def process_dataflow_graph( + self, + dataflow_graph: Dict[str, Any], + flow_urn: str, + s3_names: typing.DefaultDict[str, Set[Union[str, None]]], + ) -> Tuple[Dict[str, Dict[str, Any]], List[str], List[MetadataChangeEvent]]: + """ + Prepare a job's DAG for ingestion. + Parameters + ---------- + dataflow_graph: + Job DAG returned from get_dataflow_graph() + flow_urn: + URN of the flow (i.e. the AWS Glue job itself). + """ + + new_dataset_ids: List[str] = [] + new_dataset_mces: List[MetadataChangeEvent] = [] + + nodes: dict = {} + + # iterate through each node to populate processed nodes + for node in dataflow_graph["DagNodes"]: + + nodes[node["Id"]] = self.process_dataflow_node( + node, flow_urn, new_dataset_ids, new_dataset_mces, s3_names + ) + + # traverse edges to fill in node properties + for edge in dataflow_graph["DagEdges"]: + + source_node = nodes[edge["Source"]] + target_node = nodes[edge["Target"]] + + source_node_type = source_node["NodeType"] + target_node_type = target_node["NodeType"] + + # note that source nodes can't be data sinks + if source_node_type == "DataSource": + target_node["inputDatasets"].append(source_node["urn"]) + # keep track of input data jobs (as defined in schemas) else: - database_names = get_database_names() + target_node["inputDatajobs"].append(source_node["urn"]) + # track output datasets (these can't be input datasets) + if target_node_type == "DataSink": + source_node["outputDatasets"].append(target_node["urn"]) + + return nodes, new_dataset_ids, new_dataset_mces + + def get_dataflow_wu(self, flow_urn: str, job: Dict[str, Any]) -> MetadataWorkUnit: + """ + Generate a DataFlow workunit for a Glue job. + + Parameters + ---------- + flow_urn: + URN for the flow + job: + Job object from get_all_jobs() + """ + mce = MetadataChangeEventClass( + proposedSnapshot=DataFlowSnapshotClass( + urn=flow_urn, + aspects=[ + DataFlowInfoClass( + name=job["Name"], + description=job["Description"], + # specify a few Glue-specific properties + customProperties={ + "role": job["Role"], + "created": str(job["CreatedOn"]), + "modified": str(job["LastModifiedOn"]), + "command": job["Command"]["ScriptLocation"], + }, + ), + ], + ) + ) + + return MetadataWorkUnit(id=job["Name"], mce=mce) + + def get_datajob_wu( + self, node: Dict[str, Any], job: Dict[str, Any] + ) -> MetadataWorkUnit: + """ + Generate a DataJob workunit for a component (node) in a Glue job. + + Parameters + ---------- + node: + Node from process_dataflow_graph() + job: + Job object from get_all_jobs() + """ + mce = MetadataChangeEventClass( + proposedSnapshot=DataJobSnapshotClass( + urn=node["urn"], + aspects=[ + DataJobInfoClass( + name=f"{job['Name']}:{node['NodeType']}-{node['Id']}", + type="GLUE", + customProperties={ + **{x["Name"]: x["Value"] for x in node["Args"]}, + "transformType": node["NodeType"], + "nodeId": node["Id"], + }, + ), + DataJobInputOutputClass( + inputDatasets=node["inputDatasets"], + outputDatasets=node["outputDatasets"], + inputDatajobs=node["inputDatajobs"], + ), + ], + ) + ) + + return MetadataWorkUnit(id=f'{job["Name"]}-{node["Id"]}', mce=mce) + + def get_all_tables(self) -> List[dict]: + def get_tables_from_database(database_name: str) -> List[dict]: + new_tables = [] + + # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_tables + paginator = self.glue_client.get_paginator("get_tables") + for page in paginator.paginate(DatabaseName=database_name): + new_tables += page["TableList"] + + return new_tables - all_tables: List[dict] = [] - for database in database_names: - all_tables += get_tables_from_database(database) - return all_tables + def get_database_names() -> List[str]: + database_names = [] - tables = get_all_tables() + # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_databases + paginator = self.glue_client.get_paginator("get_databases") + for page in paginator.paginate(): + for db in page["DatabaseList"]: + if self.source_config.database_pattern.allowed(db["Name"]): + database_names.append(db["Name"]) + + return database_names + + if self.source_config.database_pattern.is_fully_specified_allow_list(): + database_names = self.source_config.database_pattern.get_allowed_list() + else: + database_names = get_database_names() + + all_tables: List[dict] = [] + for database in database_names: + all_tables += get_tables_from_database(database) + return all_tables + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + + tables = self.get_all_tables() for table in tables: database_name = table["DatabaseName"] @@ -186,6 +486,55 @@ def get_database_names() -> List[str]: self.report.report_workunit(workunit) yield workunit + if self.extract_transforms: + + dags = {} + + for job in self.get_all_jobs(): + + flow_urn = mce_builder.make_data_flow_urn("glue", job["Name"], self.env) + + flow_wu = self.get_dataflow_wu(flow_urn, job) + self.report.report_workunit(flow_wu) + yield flow_wu + + dag = self.get_dataflow_graph(job["Command"]["ScriptLocation"]) + + dags[flow_urn] = dag + + # run a first pass to pick up s3 bucket names and formats + # in Glue, it's possible for two buckets to have files of different extensions + # if this happens, we append the extension in the URN so the sources can be distinguished + # see process_dataflow_node() for details + + s3_formats: typing.DefaultDict[str, Set[Union[str, None]]] = defaultdict( + lambda: set() + ) + + for dag in dags.values(): + for s3_name, extension in self.get_dataflow_s3_names(dag): + s3_formats[s3_name].add(extension) + + # run second pass to generate node workunits + for flow_urn, dag in dags.items(): + + nodes, new_dataset_ids, new_dataset_mces = self.process_dataflow_graph( + dag, flow_urn, s3_formats + ) + + for node in nodes.values(): + + if node["NodeType"] not in ["DataSource", "DataSink"]: + job_wu = self.get_datajob_wu(node, job) + self.report.report_workunit(job_wu) + yield job_wu + + for dataset_id, dataset_mce in zip(new_dataset_ids, new_dataset_mces): + + dataset_wu = MetadataWorkUnit(id=dataset_id, mce=dataset_mce) + self.report.report_workunit(dataset_wu) + yield dataset_wu + def _extract_record(self, table: Dict, table_name: str) -> MetadataChangeEvent: def get_owner(time: int) -> OwnershipClass: owner = table.get("Owner") diff --git a/metadata-ingestion/src/datahub/metadata/schema.avsc b/metadata-ingestion/src/datahub/metadata/schema.avsc index 85674cb3cb440..4b4e88c416b69 100644 --- a/metadata-ingestion/src/datahub/metadata/schema.avsc +++ b/metadata-ingestion/src/datahub/metadata/schema.avsc @@ -1543,6 +1543,7 @@ "type": "enum", "symbolDocs": { "COMMAND": "The command job type is one of the basic built-in types. It runs multiple UNIX commands using java processbuilder.\nUpon execution, Azkaban spawns off a process to run the command.", + "GLUE": "Glue type is for running AWS Glue job transforms.", "HADOOP_JAVA": "Runs a java program with ability to access Hadoop cluster.\nhttps://azkaban.readthedocs.io/en/latest/jobTypes.html#java-job-type", "HADOOP_SHELL": "In large part, this is the same Command type. The difference is its ability to talk to a Hadoop cluster\nsecurely, via Hadoop tokens.", "HIVE": "Hive type is for running Hive jobs.", @@ -1557,7 +1558,8 @@ "HADOOP_SHELL", "HIVE", "PIG", - "SQL" + "SQL", + "GLUE" ], "doc": "The various types of support azkaban jobs" } diff --git a/metadata-ingestion/src/datahub/metadata/schema_classes.py b/metadata-ingestion/src/datahub/metadata/schema_classes.py index 29e786f2d2b0a..23b7b6a4ec30c 100644 --- a/metadata-ingestion/src/datahub/metadata/schema_classes.py +++ b/metadata-ingestion/src/datahub/metadata/schema_classes.py @@ -2262,6 +2262,9 @@ class AzkabanJobTypeClass(object): """SQL is for running Presto, mysql queries etc""" SQL = "SQL" + """Glue type is for running AWS Glue job transforms.""" + GLUE = "GLUE" + class DataPlatformInfoClass(DictWrapper): """Information about a data platform""" diff --git a/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc b/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc index 6d36de82a3b1e..b547ea471f459 100644 --- a/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc +++ b/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc @@ -1539,10 +1539,12 @@ "HADOOP_SHELL", "HIVE", "PIG", - "SQL" + "SQL", + "GLUE" ], "symbolDocs": { "COMMAND": "The command job type is one of the basic built-in types. It runs multiple UNIX commands using java processbuilder.\nUpon execution, Azkaban spawns off a process to run the command.", + "GLUE": "Glue type is for running AWS Glue job transforms.", "HADOOP_JAVA": "Runs a java program with ability to access Hadoop cluster.\nhttps://azkaban.readthedocs.io/en/latest/jobTypes.html#java-job-type", "HADOOP_SHELL": "In large part, this is the same Command type. The difference is its ability to talk to a Hadoop cluster\nsecurely, via Hadoop tokens.", "HIVE": "Hive type is for running Hive jobs.", diff --git a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc index 95f194d363219..49b407ded70d2 100644 --- a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc +++ b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc @@ -1538,10 +1538,12 @@ "HADOOP_SHELL", "HIVE", "PIG", - "SQL" + "SQL", + "GLUE" ], "symbolDocs": { "COMMAND": "The command job type is one of the basic built-in types. It runs multiple UNIX commands using java processbuilder.\nUpon execution, Azkaban spawns off a process to run the command.", + "GLUE": "Glue type is for running AWS Glue job transforms.", "HADOOP_JAVA": "Runs a java program with ability to access Hadoop cluster.\nhttps://azkaban.readthedocs.io/en/latest/jobTypes.html#java-job-type", "HADOOP_SHELL": "In large part, this is the same Command type. The difference is its ability to talk to a Hadoop cluster\nsecurely, via Hadoop tokens.", "HIVE": "Hive type is for running Hive jobs.", diff --git a/metadata-ingestion/tests/integration/lookml/expected_output.json b/metadata-ingestion/tests/integration/lookml/expected_output.json index e4464445071e1..a18bddef60866 100644 --- a/metadata-ingestion/tests/integration/lookml/expected_output.json +++ b/metadata-ingestion/tests/integration/lookml/expected_output.json @@ -3,7 +3,7 @@ "auditHeader": null, "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:looker, my_view, PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,my_view,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.common.Status": { diff --git a/metadata-ingestion/tests/unit/__init__.py b/metadata-ingestion/tests/unit/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/unit/glue/glue_mces_golden.json b/metadata-ingestion/tests/unit/glue/glue_mces_golden.json new file mode 100644 index 0000000000000..c4c2a1e66f09c --- /dev/null +++ b/metadata-ingestion/tests/unit/glue/glue_mces_golden.json @@ -0,0 +1,959 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:owner", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "flights-crawler", + "averageRecordSize": "55", + "avro.schema.literal": "{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}", + "classification": "avro", + "compressionType": "none", + "objectCount": "30", + "recordCount": "169222196", + "sizeKey": "9503351413", + "typeOfData": "file", + "Location": "s3://crawler-public-us-west-2/flight/avro/", + "InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat", + "Compressed": "False", + "NumberOfBuckets": "-1", + "SerdeInfo": "{'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe', 'Parameters': {'avro.schema.literal': '{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}', 'serialization.format': '1'}}", + "BucketColumns": "[]", + "SortColumns": "[]", + "StoredAsSubDirectories": "False" + }, + "externalUrl": null, + "description": null, + "uri": null, + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "flights-database.avro", + "platform": "urn:li:dataPlatform:glue", + "version": 0, + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "yr", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "flightdate", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "uniquecarrier", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "airlineid", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "carrier", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "flightnum", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "origin", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + } + ], + "primaryKeys": null, + "foreignKeysSpecs": null + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:owner", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test-jsons", + "averageRecordSize": "273", + "classification": "json", + "compressionType": "none", + "objectCount": "1", + "recordCount": "1", + "sizeKey": "273", + "typeOfData": "file", + "Location": "s3://test-glue-jsons/markers/", + "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "Compressed": "False", + "NumberOfBuckets": "-1", + "SerdeInfo": "{'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe', 'Parameters': {'paths': 'markers'}}", + "BucketColumns": "[]", + "SortColumns": "[]", + "StoredAsSubDirectories": "False" + }, + "externalUrl": null, + "description": null, + "uri": null, + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "test-database.test_jsons_markers", + "platform": "urn:li:dataPlatform:glue", + "version": 0, + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "markers", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.ArrayType": { + "nestedType": null + } + } + }, + "nativeDataType": "array,location:array>>", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + } + ], + "primaryKeys": null, + "foreignKeysSpecs": null + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:owner", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test", + "averageRecordSize": "19", + "classification": "parquet", + "compressionType": "none", + "objectCount": "60", + "recordCount": "167497743", + "sizeKey": "4463574900", + "typeOfData": "file", + "Location": "s3://crawler-public-us-west-2/flight/parquet/", + "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", + "Compressed": "False", + "NumberOfBuckets": "-1", + "SerdeInfo": "{'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe', 'Parameters': {'serialization.format': '1'}}", + "BucketColumns": "[]", + "SortColumns": "[]", + "StoredAsSubDirectories": "False" + }, + "externalUrl": null, + "description": null, + "uri": null, + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "test-database.test_parquet", + "platform": "urn:li:dataPlatform:glue", + "version": 0, + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "yr", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "quarter", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "month", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "dayofmonth", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + } + ], + "primaryKeys": null, + "foreignKeysSpecs": null + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": { + "urn": "urn:li:dataFlow:(glue,test-job-1,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataFlowInfo": { + "customProperties": { + "role": "arn:aws:iam::123412341234:role/service-role/AWSGlueServiceRole-glue-crawler", + "created": "2021-06-10 16:51:25.690000", + "modified": "2021-06-10 16:55:35.307000", + "command": "s3://aws-glue-assets-123412341234-us-west-2/scripts/job-1.py" + }, + "externalUrl": null, + "name": "test-job-1", + "description": "The first test job", + "project": null + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": { + "urn": "urn:li:dataFlow:(glue,test-job-2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataFlowInfo": { + "customProperties": { + "role": "arn:aws:iam::123412341234:role/service-role/AWSGlueServiceRole-glue-crawler", + "created": "2021-06-10 16:58:32.469000", + "modified": "2021-06-10 16:58:32.469000", + "command": "s3://aws-glue-assets-123412341234-us-west-2/scripts/job-2.py" + }, + "externalUrl": null, + "name": "test-job-2", + "description": "The second test job", + "project": null + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Filter-Transform0)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "f": "lambda row : ()", + "transformation_ctx": "\"Transform0\"", + "transformType": "Filter", + "nodeId": "Transform0" + }, + "externalUrl": null, + "name": "test-job-2:Filter-Transform0", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform2)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\"), (\"flightnum\", \"string\", \"flightnum\", \"string\"), (\"origin\", \"string\", \"origin\", \"string\"), (\"dest\", \"string\", \"dest\", \"string\"), (\"depdelay\", \"int\", \"depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"weatherdelay\", \"int\"), (\"year\", \"string\", \"year\", \"string\")]", + "transformation_ctx": "\"Transform1\"", + "transformType": "ApplyMapping", + "nodeId": "Transform1" + }, + "externalUrl": null, + "name": "test-job-2:ApplyMapping-Transform1", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Filter-Transform0)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\"), (\"flightnum\", \"string\", \"flightnum\", \"string\"), (\"origin\", \"string\", \"origin\", \"string\"), (\"dest\", \"string\", \"dest\", \"string\"), (\"depdelay\", \"int\", \"depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"weatherdelay\", \"int\"), (\"year\", \"string\", \"year\", \"string\")]", + "transformation_ctx": "\"Transform2\"", + "transformType": "ApplyMapping", + "nodeId": "Transform2" + }, + "externalUrl": null, + "name": "test-job-2:ApplyMapping-Transform2", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Join-Transform3)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "keys2": "[\"(right) flightdate\"]", + "transformation_ctx": "\"Transform3\"", + "keys1": "[\"yr\"]", + "transformType": "Join", + "nodeId": "Transform3" + }, + "externalUrl": null, + "name": "test-job-2:Join-Transform3", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform4)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform4)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\"), (\"flightnum\", \"string\", \"flightnum\", \"string\"), (\"origin\", \"string\", \"origin\", \"string\"), (\"dest\", \"string\", \"dest\", \"string\"), (\"depdelay\", \"int\", \"depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"weatherdelay\", \"int\"), (\"year\", \"string\", \"year\", \"string\")]", + "transformation_ctx": "\"Transform4\"", + "transformType": "ApplyMapping", + "nodeId": "Transform4" + }, + "externalUrl": null, + "name": "test-job-2:ApplyMapping-Transform4", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform5)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"(right) yr\", \"int\"), (\"flightdate\", \"string\", \"(right) flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"(right) uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"(right) airlineid\", \"int\"), (\"carrier\", \"string\", \"(right) carrier\", \"string\"), (\"flightnum\", \"string\", \"(right) flightnum\", \"string\"), (\"origin\", \"string\", \"(right) origin\", \"string\"), (\"dest\", \"string\", \"(right) dest\", \"string\"), (\"depdelay\", \"int\", \"(right) depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"(right) carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"(right) weatherdelay\", \"int\"), (\"year\", \"string\", \"(right) year\", \"string\")]", + "transformation_ctx": "\"Transform5\"", + "transformType": "ApplyMapping", + "nodeId": "Transform5" + }, + "externalUrl": null, + "name": "test-job-2:ApplyMapping-Transform5", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-glue-jsons,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [], + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "connection_type": "s3", + "format": "json", + "connection_options": "{'path': 's3://test-glue-jsons/', 'partitionKeys': []}", + "transformation_ctx": "DataSink1" + }, + "externalUrl": null, + "description": null, + "uri": null, + "tags": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),SplitFields-Transform0)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "paths": "[\"yr\", \"quarter\", \"month\", \"dayofmonth\", \"dayofweek\", \"flightdate\", \"uniquecarrier\"]", + "name2": "\"Transform0Output1\"", + "name1": "\"Transform0Output0\"", + "transformation_ctx": "\"Transform0\"", + "transformType": "SplitFields", + "nodeId": "Transform0" + }, + "externalUrl": null, + "name": "test-job-2:SplitFields-Transform0", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),ApplyMapping-Transform1)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),ApplyMapping-Transform1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"quarter\", \"int\", \"quarter\", \"int\"), (\"month\", \"int\", \"month\", \"int\"), (\"dayofmonth\", \"int\", \"dayofmonth\", \"int\"), (\"dayofweek\", \"int\", \"dayofweek\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\")]", + "transformation_ctx": "\"Transform1\"", + "transformType": "ApplyMapping", + "nodeId": "Transform1" + }, + "externalUrl": null, + "name": "test-job-2:ApplyMapping-Transform1", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),FillMissingValues-Transform2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "missing_values_column": "\"dayofmonth\"", + "transformation_ctx": "\"Transform2\"", + "transformType": "FillMissingValues", + "nodeId": "Transform2" + }, + "externalUrl": null, + "name": "test-job-2:FillMissingValues-Transform2", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),ApplyMapping-Transform1)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),SelectFields-Transform3)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "paths": "[]", + "transformation_ctx": "\"Transform3\"", + "transformType": "SelectFields", + "nodeId": "Transform3" + }, + "externalUrl": null, + "name": "test-job-2:SelectFields-Transform3", + "description": null, + "type": { + "com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE" + }, + "flowUrn": null + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:s3,test-glue-jsons,PROD)" + ], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),FillMissingValues-Transform2)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-glue-jsons,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [], + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "connection_type": "s3", + "format": "json", + "connection_options": "{'path': 's3://test-glue-jsons/', 'partitionKeys': []}", + "transformation_ctx": "DataSink0" + }, + "externalUrl": null, + "description": null, + "uri": null, + "tags": [] + } + } + ] + } + }, + "proposedDelta": null + } +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_glue_source.py b/metadata-ingestion/tests/unit/test_glue_source.py index 4ef9e6ca7a832..cb5e3727c371a 100644 --- a/metadata-ingestion/tests/unit/test_glue_source.py +++ b/metadata-ingestion/tests/unit/test_glue_source.py @@ -1,188 +1,141 @@ -import unittest -from datetime import datetime +import json from botocore.stub import Stubber from freezegun import freeze_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.glue import GlueSource, GlueSourceConfig, get_column_type -from datahub.ingestion.source.metadata_common import MetadataWorkUnit -from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status -from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot -from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayTypeClass, MapTypeClass, - MySqlDDL, - NumberTypeClass, - SchemaField, SchemaFieldDataType, - SchemaMetadata, StringTypeClass, ) -from datahub.metadata.schema_classes import ( - AuditStampClass, - DatasetPropertiesClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, +from tests.test_helpers import mce_helpers +from tests.unit.test_glue_source_stubs import ( + get_databases_response, + get_dataflow_graph_response_1, + get_dataflow_graph_response_2, + get_jobs_response, + get_object_body_1, + get_object_body_2, + get_object_response_1, + get_object_response_2, + get_tables_response_1, + get_tables_response_2, ) FROZEN_TIME = "2020-04-14 07:00:00" -class GlueSourceTest(unittest.TestCase): - glue_source = GlueSource( +def glue_source() -> GlueSource: + return GlueSource( ctx=PipelineContext(run_id="glue-source-test"), - config=GlueSourceConfig(aws_region="us-east-1"), + config=GlueSourceConfig(aws_region="us-west-2", extract_transforms=True), ) - def test_get_column_type_contains_key(self): - field_type = "char" - data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field") - self.assertEqual( - data_type.to_obj(), SchemaFieldDataType(type=StringTypeClass()).to_obj() - ) +def test_get_column_type_contains_key(): - def test_get_column_type_contains_array(self): + field_type = "char" + data_type = get_column_type(glue_source(), field_type, "a_table", "a_field") + assert data_type.to_obj() == SchemaFieldDataType(type=StringTypeClass()).to_obj() - field_type = "array_lol" - data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field") - self.assertEqual( - data_type.to_obj(), SchemaFieldDataType(type=ArrayTypeClass()).to_obj() - ) - def test_get_column_type_contains_map(self): +def test_get_column_type_contains_array(): - field_type = "map_hehe" - data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field") - self.assertEqual( - data_type.to_obj(), SchemaFieldDataType(type=MapTypeClass()).to_obj() - ) + field_type = "array_lol" + data_type = get_column_type(glue_source(), field_type, "a_table", "a_field") + assert data_type.to_obj() == SchemaFieldDataType(type=ArrayTypeClass()).to_obj() - def test_get_column_type_contains_set(self): - field_type = "set_yolo" - data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field") - self.assertEqual( - data_type.to_obj(), SchemaFieldDataType(type=ArrayTypeClass()).to_obj() - ) +def test_get_column_type_contains_map(): - def test_get_column_type_not_contained(self): + field_type = "map_hehe" + data_type = get_column_type(glue_source(), field_type, "a_table", "a_field") + assert data_type.to_obj() == SchemaFieldDataType(type=MapTypeClass()).to_obj() - field_type = "bad_column_type" - data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field") - self.assertEqual( - data_type.to_obj(), SchemaFieldDataType(type=StringTypeClass()).to_obj() - ) - self.assertEqual( - self.glue_source.report.warnings["bad_column_type"], - [ - "The type 'bad_column_type' is not recognised for field 'a_field' in table 'a_table', " - "setting as StringTypeClass." - ], - ) - @freeze_time(FROZEN_TIME) - def test_turn_boto_glue_data_to_metadata_event(self): - stringy_timestamp = datetime.strptime(FROZEN_TIME, "%Y-%m-%d %H:%M:%S") - timestamp = int(datetime.timestamp(stringy_timestamp) * 1000) +def test_get_column_type_contains_set(): - get_databases_response = { - "DatabaseList": [ - { - "Name": "datalake_grilled", - "Description": "irrelevant", - "LocationUri": "irrelevant", - "Parameters": {}, - "CreateTime": datetime(2015, 1, 1), - "CreateTableDefaultPermissions": [], - "CatalogId": "irrelevant", - }, - ], - } - get_tables_response = { - "TableList": [ - { - "Name": "Barbeque", - "Owner": "Susan", - "DatabaseName": "datalake_grilled", - "Description": "Grilled Food", - "StorageDescriptor": { - "Columns": [ - { - "Name": "Size", - "Type": "int", - "Comment": "Maximum attendees permitted", - } - ] - }, - } - ] - } + field_type = "set_yolo" + data_type = get_column_type(glue_source(), field_type, "a_table", "a_field") + assert data_type.to_obj() == SchemaFieldDataType(type=ArrayTypeClass()).to_obj() - with Stubber(self.glue_source.glue_client) as stubber: - stubber.add_response("get_databases", get_databases_response, {}) - stubber.add_response( - "get_tables", get_tables_response, {"DatabaseName": "datalake_grilled"} - ) - actual_work_unit = list(self.glue_source.get_workunits())[0] - expected_metadata_work_unit = create_metadata_work_unit(timestamp) +def test_get_column_type_not_contained(): - self.assertEqual(expected_metadata_work_unit, actual_work_unit) + glue_source_instance = glue_source() + field_type = "bad_column_type" + data_type = get_column_type(glue_source_instance, field_type, "a_table", "a_field") + assert data_type.to_obj() == SchemaFieldDataType(type=StringTypeClass()).to_obj() + assert glue_source_instance.report.warnings["bad_column_type"] == [ + "The type 'bad_column_type' is not recognised for field 'a_field' in table 'a_table', " + "setting as StringTypeClass." + ] -def create_metadata_work_unit(timestamp): - dataset_snapshot = DatasetSnapshot( - urn="urn:li:dataset:(urn:li:dataPlatform:glue,datalake_grilled.Barbeque,PROD)", - aspects=[], - ) - dataset_snapshot.aspects.append(Status(removed=False)) - - dataset_snapshot.aspects.append( - OwnershipClass( - owners=[ - OwnerClass( - owner="urn:li:corpuser:Susan", type=OwnershipTypeClass.DATAOWNER - ) - ], - lastModified=AuditStampClass( - time=timestamp, actor="urn:li:corpuser:datahub" - ), - ) - ) +@freeze_time(FROZEN_TIME) +def test_glue_ingest(tmp_path, pytestconfig): - dataset_snapshot.aspects.append( - DatasetPropertiesClass( - description="Grilled Food", - ) - ) + glue_source_instance = glue_source() + + with Stubber(glue_source_instance.glue_client) as glue_stubber: - fields = [ - SchemaField( - fieldPath="Size", - nativeDataType="int", - type=SchemaFieldDataType(type=NumberTypeClass()), - description="Maximum attendees permitted", - nullable=True, - recursive=False, + glue_stubber.add_response("get_databases", get_databases_response, {}) + glue_stubber.add_response( + "get_tables", + get_tables_response_1, + {"DatabaseName": "flights-database"}, + ) + glue_stubber.add_response( + "get_tables", + get_tables_response_2, + {"DatabaseName": "test-database"}, + ) + glue_stubber.add_response("get_jobs", get_jobs_response, {}) + glue_stubber.add_response( + "get_dataflow_graph", + get_dataflow_graph_response_1, + {"PythonScript": get_object_body_1}, + ) + glue_stubber.add_response( + "get_dataflow_graph", + get_dataflow_graph_response_2, + {"PythonScript": get_object_body_2}, ) - ] - schema_metadata = SchemaMetadata( - schemaName="datalake_grilled.Barbeque", - version=0, - fields=fields, - platform="urn:li:dataPlatform:glue", - created=AuditStamp(time=timestamp, actor="urn:li:corpuser:etl"), - lastModified=AuditStamp(time=timestamp, actor="urn:li:corpuser:etl"), - hash="", - platformSchema=MySqlDDL(tableSchema=""), - ) - dataset_snapshot.aspects.append(schema_metadata) + with Stubber(glue_source_instance.s3_client) as s3_stubber: - mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - return MetadataWorkUnit(id="glue-datalake_grilled.Barbeque", mce=mce) + s3_stubber.add_response( + "get_object", + get_object_response_1, + { + "Bucket": "aws-glue-assets-123412341234-us-west-2", + "Key": "scripts/job-1.py", + }, + ) + s3_stubber.add_response( + "get_object", + get_object_response_2, + { + "Bucket": "aws-glue-assets-123412341234-us-west-2", + "Key": "scripts/job-2.py", + }, + ) + + mce_objects = [ + wu.mce.to_obj() for wu in glue_source_instance.get_workunits() + ] + + with open(str(tmp_path / "glue_mces.json"), "w") as f: + json.dump(mce_objects, f, indent=2) + + output = mce_helpers.load_json_file(str(tmp_path / "glue_mces.json")) + + test_resources_dir = pytestconfig.rootpath / "tests/unit/glue" + golden = mce_helpers.load_json_file( + str(test_resources_dir / "glue_mces_golden.json") + ) + mce_helpers.assert_mces_equal(output, golden) diff --git a/metadata-ingestion/tests/unit/test_glue_source_stubs.py b/metadata-ingestion/tests/unit/test_glue_source_stubs.py new file mode 100644 index 0000000000000..68b6b80b0b19c --- /dev/null +++ b/metadata-ingestion/tests/unit/test_glue_source_stubs.py @@ -0,0 +1,763 @@ +import datetime +import io +from typing import Any, Dict + +from botocore.response import StreamingBody + +get_databases_response = { + "DatabaseList": [ + { + "Name": "flights-database", + "CreateTime": datetime.datetime(2021, 6, 9, 14, 14, 19), + "CreateTableDefaultPermissions": [ + { + "Principal": { + "DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS" + }, + "Permissions": ["ALL"], + } + ], + "CatalogId": "123412341234", + }, + { + "Name": "test-database", + "CreateTime": datetime.datetime(2021, 6, 1, 14, 55, 2), + "CreateTableDefaultPermissions": [ + { + "Principal": { + "DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS" + }, + "Permissions": ["ALL"], + } + ], + "CatalogId": "123412341234", + }, + ] +} +get_tables_response_1 = { + "TableList": [ + { + "Name": "avro", + "DatabaseName": "flights-database", + "Owner": "owner", + "CreateTime": datetime.datetime(2021, 6, 9, 14, 17, 35), + "UpdateTime": datetime.datetime(2021, 6, 9, 14, 17, 35), + "LastAccessTime": datetime.datetime(2021, 6, 9, 14, 17, 35), + "Retention": 0, + "StorageDescriptor": { + "Columns": [ + {"Name": "yr", "Type": "int"}, + {"Name": "flightdate", "Type": "string"}, + {"Name": "uniquecarrier", "Type": "string"}, + {"Name": "airlineid", "Type": "int"}, + {"Name": "carrier", "Type": "string"}, + {"Name": "flightnum", "Type": "string"}, + {"Name": "origin", "Type": "string"}, + ], + "Location": "s3://crawler-public-us-west-2/flight/avro/", + "InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat", + "Compressed": False, + "NumberOfBuckets": -1, + "SerdeInfo": { + "SerializationLibrary": "org.apache.hadoop.hive.serde2.avro.AvroSerDe", + "Parameters": { + "avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}', + "serialization.format": "1", + }, + }, + "BucketColumns": [], + "SortColumns": [], + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "flights-crawler", + "averageRecordSize": "55", + "avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}', + "classification": "avro", + "compressionType": "none", + "objectCount": "30", + "recordCount": "169222196", + "sizeKey": "9503351413", + "typeOfData": "file", + }, + "StoredAsSubDirectories": False, + }, + "PartitionKeys": [{"Name": "year", "Type": "string"}], + "TableType": "EXTERNAL_TABLE", + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "flights-crawler", + "averageRecordSize": "55", + "avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}', + "classification": "avro", + "compressionType": "none", + "objectCount": "30", + "recordCount": "169222196", + "sizeKey": "9503351413", + "typeOfData": "file", + }, + "CreatedBy": "arn:aws:sts::123412341234:assumed-role/AWSGlueServiceRole-flights-crawler/AWS-Crawler", + "IsRegisteredWithLakeFormation": False, + "CatalogId": "123412341234", + } + ] +} +get_tables_response_2 = { + "TableList": [ + { + "Name": "test_jsons_markers", + "DatabaseName": "test-database", + "Owner": "owner", + "CreateTime": datetime.datetime(2021, 6, 2, 12, 6, 59), + "UpdateTime": datetime.datetime(2021, 6, 2, 12, 6, 59), + "LastAccessTime": datetime.datetime(2021, 6, 2, 12, 6, 59), + "Retention": 0, + "StorageDescriptor": { + "Columns": [ + { + "Name": "markers", + "Type": "array,location:array>>", + } + ], + "Location": "s3://test-glue-jsons/markers/", + "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "Compressed": False, + "NumberOfBuckets": -1, + "SerdeInfo": { + "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe", + "Parameters": {"paths": "markers"}, + }, + "BucketColumns": [], + "SortColumns": [], + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test-jsons", + "averageRecordSize": "273", + "classification": "json", + "compressionType": "none", + "objectCount": "1", + "recordCount": "1", + "sizeKey": "273", + "typeOfData": "file", + }, + "StoredAsSubDirectories": False, + }, + "PartitionKeys": [], + "TableType": "EXTERNAL_TABLE", + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test-jsons", + "averageRecordSize": "273", + "classification": "json", + "compressionType": "none", + "objectCount": "1", + "recordCount": "1", + "sizeKey": "273", + "typeOfData": "file", + }, + "CreatedBy": "arn:aws:sts::795586375822:assumed-role/AWSGlueServiceRole-test-crawler/AWS-Crawler", + "IsRegisteredWithLakeFormation": False, + "CatalogId": "795586375822", + }, + { + "Name": "test_parquet", + "DatabaseName": "test-database", + "Owner": "owner", + "CreateTime": datetime.datetime(2021, 6, 1, 16, 14, 53), + "UpdateTime": datetime.datetime(2021, 6, 1, 16, 14, 53), + "LastAccessTime": datetime.datetime(2021, 6, 1, 16, 14, 53), + "Retention": 0, + "StorageDescriptor": { + "Columns": [ + {"Name": "yr", "Type": "int"}, + {"Name": "quarter", "Type": "int"}, + {"Name": "month", "Type": "int"}, + {"Name": "dayofmonth", "Type": "int"}, + ], + "Location": "s3://crawler-public-us-west-2/flight/parquet/", + "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", + "Compressed": False, + "NumberOfBuckets": -1, + "SerdeInfo": { + "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + "Parameters": {"serialization.format": "1"}, + }, + "BucketColumns": [], + "SortColumns": [], + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test", + "averageRecordSize": "19", + "classification": "parquet", + "compressionType": "none", + "objectCount": "60", + "recordCount": "167497743", + "sizeKey": "4463574900", + "typeOfData": "file", + }, + "StoredAsSubDirectories": False, + }, + "PartitionKeys": [{"Name": "year", "Type": "string"}], + "TableType": "EXTERNAL_TABLE", + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test", + "averageRecordSize": "19", + "classification": "parquet", + "compressionType": "none", + "objectCount": "60", + "recordCount": "167497743", + "sizeKey": "4463574900", + "typeOfData": "file", + }, + "CreatedBy": "arn:aws:sts::795586375822:assumed-role/AWSGlueServiceRole-test-crawler/AWS-Crawler", + "IsRegisteredWithLakeFormation": False, + "CatalogId": "795586375822", + }, + ] +} +get_jobs_response = { + "Jobs": [ + { + "Name": "test-job-1", + "Description": "The first test job", + "Role": "arn:aws:iam::123412341234:role/service-role/AWSGlueServiceRole-glue-crawler", + "CreatedOn": datetime.datetime(2021, 6, 10, 16, 51, 25, 690000), + "LastModifiedOn": datetime.datetime(2021, 6, 10, 16, 55, 35, 307000), + "ExecutionProperty": {"MaxConcurrentRuns": 1}, + "Command": { + "Name": "glueetl", + "ScriptLocation": "s3://aws-glue-assets-123412341234-us-west-2/scripts/job-1.py", + "PythonVersion": "3", + }, + "DefaultArguments": { + "--TempDir": "s3://aws-glue-assets-123412341234-us-west-2/temporary/", + "--class": "GlueApp", + "--enable-continuous-cloudwatch-log": "true", + "--enable-glue-datacatalog": "true", + "--enable-metrics": "true", + "--enable-spark-ui": "true", + "--encryption-type": "sse-s3", + "--job-bookmark-option": "job-bookmark-enable", + "--job-language": "python", + "--spark-event-logs-path": "s3://aws-glue-assets-123412341234-us-west-2/sparkHistoryLogs/", + }, + "MaxRetries": 3, + "AllocatedCapacity": 10, + "Timeout": 2880, + "MaxCapacity": 10.0, + "WorkerType": "G.1X", + "NumberOfWorkers": 10, + "GlueVersion": "2.0", + }, + { + "Name": "test-job-2", + "Description": "The second test job", + "Role": "arn:aws:iam::123412341234:role/service-role/AWSGlueServiceRole-glue-crawler", + "CreatedOn": datetime.datetime(2021, 6, 10, 16, 58, 32, 469000), + "LastModifiedOn": datetime.datetime(2021, 6, 10, 16, 58, 32, 469000), + "ExecutionProperty": {"MaxConcurrentRuns": 1}, + "Command": { + "Name": "glueetl", + "ScriptLocation": "s3://aws-glue-assets-123412341234-us-west-2/scripts/job-2.py", + "PythonVersion": "3", + }, + "DefaultArguments": { + "--TempDir": "s3://aws-glue-assets-123412341234-us-west-2/temporary/", + "--class": "GlueApp", + "--enable-continuous-cloudwatch-log": "true", + "--enable-glue-datacatalog": "true", + "--enable-metrics": "true", + "--enable-spark-ui": "true", + "--encryption-type": "sse-s3", + "--job-bookmark-option": "job-bookmark-enable", + "--job-language": "python", + "--spark-event-logs-path": "s3://aws-glue-assets-123412341234-us-west-2/sparkHistoryLogs/", + }, + "MaxRetries": 3, + "AllocatedCapacity": 10, + "Timeout": 2880, + "MaxCapacity": 10.0, + "WorkerType": "G.1X", + "NumberOfWorkers": 10, + "GlueVersion": "2.0", + }, + ] +} +# for job 1 +get_dataflow_graph_response_1 = { + "DagNodes": [ + { + "Id": "Transform0", + "NodeType": "Filter", + "Args": [ + {"Name": "f", "Value": "lambda row : ()", "Param": False}, + { + "Name": "transformation_ctx", + "Value": '"Transform0"', + "Param": False, + }, + ], + "LineNumber": 32, + }, + { + "Id": "Transform1", + "NodeType": "ApplyMapping", + "Args": [ + { + "Name": "mappings", + "Value": '[("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")]', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform1"', + "Param": False, + }, + ], + "LineNumber": 37, + }, + { + "Id": "Transform2", + "NodeType": "ApplyMapping", + "Args": [ + { + "Name": "mappings", + "Value": '[("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")]', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform2"', + "Param": False, + }, + ], + "LineNumber": 22, + }, + { + "Id": "Transform3", + "NodeType": "Join", + "Args": [ + { + "Name": "keys2", + "Value": '["(right) flightdate"]', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform3"', + "Param": False, + }, + {"Name": "keys1", "Value": '["yr"]', "Param": False}, + ], + "LineNumber": 47, + }, + { + "Id": "DataSource0", + "NodeType": "DataSource", + "Args": [ + { + "Name": "database", + "Value": '"flights-database"', + "Param": False, + }, + {"Name": "table_name", "Value": '"avro"', "Param": False}, + { + "Name": "transformation_ctx", + "Value": '"DataSource0"', + "Param": False, + }, + ], + "LineNumber": 17, + }, + { + "Id": "DataSink0", + "NodeType": "DataSink", + "Args": [ + { + "Name": "database", + "Value": '"test-database"', + "Param": False, + }, + { + "Name": "table_name", + "Value": '"test_jsons_markers"', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"DataSink0"', + "Param": False, + }, + ], + "LineNumber": 57, + }, + { + "Id": "Transform4", + "NodeType": "ApplyMapping", + "Args": [ + { + "Name": "mappings", + "Value": '[("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")]', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform4"', + "Param": False, + }, + ], + "LineNumber": 27, + }, + { + "Id": "Transform5", + "NodeType": "ApplyMapping", + "Args": [ + { + "Name": "mappings", + "Value": '[("yr", "int", "(right) yr", "int"), ("flightdate", "string", "(right) flightdate", "string"), ("uniquecarrier", "string", "(right) uniquecarrier", "string"), ("airlineid", "int", "(right) airlineid", "int"), ("carrier", "string", "(right) carrier", "string"), ("flightnum", "string", "(right) flightnum", "string"), ("origin", "string", "(right) origin", "string"), ("dest", "string", "(right) dest", "string"), ("depdelay", "int", "(right) depdelay", "int"), ("carrierdelay", "int", "(right) carrierdelay", "int"), ("weatherdelay", "int", "(right) weatherdelay", "int"), ("year", "string", "(right) year", "string")]', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform5"', + "Param": False, + }, + ], + "LineNumber": 42, + }, + { + "Id": "DataSink1", + "NodeType": "DataSink", + "Args": [ + {"Name": "connection_type", "Value": '"s3"', "Param": False}, + {"Name": "format", "Value": '"json"', "Param": False}, + { + "Name": "connection_options", + "Value": '{"path": "s3://test-glue-jsons/", "partitionKeys": []}', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"DataSink1"', + "Param": False, + }, + ], + "LineNumber": 52, + }, + ], + "DagEdges": [ + { + "Source": "Transform2", + "Target": "Transform0", + "TargetParameter": "frame", + }, + { + "Source": "Transform0", + "Target": "Transform1", + "TargetParameter": "frame", + }, + { + "Source": "DataSource0", + "Target": "Transform2", + "TargetParameter": "frame", + }, + { + "Source": "Transform4", + "Target": "Transform3", + "TargetParameter": "frame1", + }, + ], +} +# for job 2 +get_dataflow_graph_response_2 = { + "DagNodes": [ + { + "Id": "Transform0", + "NodeType": "SplitFields", + "Args": [ + { + "Name": "paths", + "Value": '["yr", "quarter", "month", "dayofmonth", "dayofweek", "flightdate", "uniquecarrier"]', + "Param": False, + }, + { + "Name": "name2", + "Value": '"Transform0Output1"', + "Param": False, + }, + { + "Name": "name1", + "Value": '"Transform0Output0"', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform0"', + "Param": False, + }, + ], + "LineNumber": 42, + }, + { + "Id": "Transform1", + "NodeType": "ApplyMapping", + "Args": [ + { + "Name": "mappings", + "Value": '[("yr", "int", "yr", "int"), ("quarter", "int", "quarter", "int"), ("month", "int", "month", "int"), ("dayofmonth", "int", "dayofmonth", "int"), ("dayofweek", "int", "dayofweek", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string")]', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform1"', + "Param": False, + }, + ], + "LineNumber": 22, + }, + { + "Id": "Transform2", + "NodeType": "FillMissingValues", + "Args": [ + { + "Name": "missing_values_column", + "Value": '"dayofmonth"', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"Transform2"', + "Param": False, + }, + ], + "LineNumber": 27, + }, + { + "Id": "Transform3", + "NodeType": "SelectFields", + "Args": [ + {"Name": "paths", "Value": "[]", "Param": False}, + { + "Name": "transformation_ctx", + "Value": '"Transform3"', + "Param": False, + }, + ], + "LineNumber": 32, + }, + { + "Id": "DataSource0", + "NodeType": "DataSource", + "Args": [ + { + "Name": "database", + "Value": '"test-database"', + "Param": False, + }, + { + "Name": "table_name", + "Value": '"test_parquet"', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"DataSource0"', + "Param": False, + }, + ], + "LineNumber": 17, + }, + { + "Id": "DataSink0", + "NodeType": "DataSink", + "Args": [ + {"Name": "connection_type", "Value": '"s3"', "Param": False}, + {"Name": "format", "Value": '"json"', "Param": False}, + { + "Name": "connection_options", + "Value": '{"path": "s3://test-glue-jsons/", "partitionKeys": []}', + "Param": False, + }, + { + "Name": "transformation_ctx", + "Value": '"DataSink0"', + "Param": False, + }, + ], + "LineNumber": 37, + }, + ], + "DagEdges": [ + { + "Source": "Transform1", + "Target": "Transform0", + "TargetParameter": "frame", + }, + { + "Source": "DataSource0", + "Target": "Transform1", + "TargetParameter": "frame", + }, + { + "Source": "Transform1", + "Target": "Transform2", + "TargetParameter": "frame", + }, + { + "Source": "Transform2", + "Target": "Transform3", + "TargetParameter": "frame", + }, + { + "Source": "Transform3", + "Target": "DataSink0", + "TargetParameter": "frame", + }, + ], +} + +get_object_body_1 = """ +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +import re + +## @params: [JOB_NAME] +args = getResolvedOptions(sys.argv, ['JOB_NAME']) + +sc = SparkContext() +glueContext = GlueContext(sc) +spark = glueContext.spark_session +job = Job(glueContext) +job.init(args['JOB_NAME'], args) +## @type: DataSource +## @args: [database = "flights-database", table_name = "avro", transformation_ctx = "DataSource0"] +## @return: DataSource0 +## @inputs: [] +DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "flights-database", table_name = "avro", transformation_ctx = "DataSource0") +## @type: ApplyMapping +## @args: [mappings = [("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")], transformation_ctx = "Transform2"] +## @return: Transform2 +## @inputs: [frame = DataSource0] +Transform2 = ApplyMapping.apply(frame = DataSource0, mappings = [("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")], transformation_ctx = "Transform2") +## @type: ApplyMapping +## @args: [mappings = [("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")], transformation_ctx = "Transform4"] +## @return: Transform4 +## @inputs: [frame = Transform2] +Transform4 = ApplyMapping.apply(frame = Transform2, mappings = [("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")], transformation_ctx = "Transform4") +## @type: Filter +## @args: [f = lambda row : (), transformation_ctx = "Transform0"] +## @return: Transform0 +## @inputs: [frame = Transform2] +Transform0 = Filter.apply(frame = Transform2, f = lambda row : (), transformation_ctx = "Transform0") +## @type: ApplyMapping +## @args: [mappings = [("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")], transformation_ctx = "Transform1"] +## @return: Transform1 +## @inputs: [frame = Transform0] +Transform1 = ApplyMapping.apply(frame = Transform0, mappings = [("yr", "int", "yr", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string"), ("flightnum", "string", "flightnum", "string"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("depdelay", "int", "depdelay", "int"), ("carrierdelay", "int", "carrierdelay", "int"), ("weatherdelay", "int", "weatherdelay", "int"), ("year", "string", "year", "string")], transformation_ctx = "Transform1") +## @type: ApplyMapping +## @args: [mappings = [("yr", "int", "(right) yr", "int"), ("flightdate", "string", "(right) flightdate", "string"), ("uniquecarrier", "string", "(right) uniquecarrier", "string"), ("airlineid", "int", "(right) airlineid", "int"), ("carrier", "string", "(right) carrier", "string"), ("flightnum", "string", "(right) flightnum", "string"), ("origin", "string", "(right) origin", "string"), ("dest", "string", "(right) dest", "string"), ("depdelay", "int", "(right) depdelay", "int"), ("carrierdelay", "int", "(right) carrierdelay", "int"), ("weatherdelay", "int", "(right) weatherdelay", "int"), ("year", "string", "(right) year", "string")], transformation_ctx = "Transform5"] +## @return: Transform5 +## @inputs: [frame = Transform1] +Transform5 = ApplyMapping.apply(frame = Transform1, mappings = [("yr", "int", "(right) yr", "int"), ("flightdate", "string", "(right) flightdate", "string"), ("uniquecarrier", "string", "(right) uniquecarrier", "string"), ("airlineid", "int", "(right) airlineid", "int"), ("carrier", "string", "(right) carrier", "string"), ("flightnum", "string", "(right) flightnum", "string"), ("origin", "string", "(right) origin", "string"), ("dest", "string", "(right) dest", "string"), ("depdelay", "int", "(right) depdelay", "int"), ("carrierdelay", "int", "(right) carrierdelay", "int"), ("weatherdelay", "int", "(right) weatherdelay", "int"), ("year", "string", "(right) year", "string")], transformation_ctx = "Transform5") +## @type: Join +## @args: [keys2 = ["(right) flightdate"], keys1 = ["yr"], transformation_ctx = "Transform3"] +## @return: Transform3 +## @inputs: [frame1 = Transform4, frame2 = Transform5] +Transform3 = Join.apply(frame1 = Transform4, frame2 = Transform5, keys2 = ["(right) flightdate"], keys1 = ["yr"], transformation_ctx = "Transform3") +## @type: DataSink +## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://test-glue-jsons/", "partitionKeys": []}, transformation_ctx = "DataSink1"] +## @return: DataSink1 +## @inputs: [frame = Transform3] +DataSink1 = glueContext.write_dynamic_frame.from_options(frame = Transform3, connection_type = "s3", format = "json", connection_options = {"path": "s3://test-glue-jsons/", "partitionKeys": []}, transformation_ctx = "DataSink1") +## @type: DataSink +## @args: [database = "test-database", table_name = "test_jsons_markers", transformation_ctx = "DataSink0"] +## @return: DataSink0 +## @inputs: [frame = Transform3] +DataSink0 = glueContext.write_dynamic_frame.from_catalog(frame = Transform3, database = "test-database", table_name = "test_jsons_markers", transformation_ctx = "DataSink0") +job.commit() +""" + +get_object_body_2 = """ +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +from awsglueml.transforms import FillMissingValues + +## @params: [JOB_NAME] +args = getResolvedOptions(sys.argv, ['JOB_NAME']) + +sc = SparkContext() +glueContext = GlueContext(sc) +spark = glueContext.spark_session +job = Job(glueContext) +job.init(args['JOB_NAME'], args) +## @type: DataSource +## @args: [database = "test-database", table_name = "test_parquet", transformation_ctx = "DataSource0"] +## @return: DataSource0 +## @inputs: [] +DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "test-database", table_name = "test_parquet", transformation_ctx = "DataSource0") +## @type: ApplyMapping +## @args: [mappings = [("yr", "int", "yr", "int"), ("quarter", "int", "quarter", "int"), ("month", "int", "month", "int"), ("dayofmonth", "int", "dayofmonth", "int"), ("dayofweek", "int", "dayofweek", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string")], transformation_ctx = "Transform1"] +## @return: Transform1 +## @inputs: [frame = DataSource0] +Transform1 = ApplyMapping.apply(frame = DataSource0, mappings = [("yr", "int", "yr", "int"), ("quarter", "int", "quarter", "int"), ("month", "int", "month", "int"), ("dayofmonth", "int", "dayofmonth", "int"), ("dayofweek", "int", "dayofweek", "int"), ("flightdate", "string", "flightdate", "string"), ("uniquecarrier", "string", "uniquecarrier", "string"), ("airlineid", "int", "airlineid", "int"), ("carrier", "string", "carrier", "string")], transformation_ctx = "Transform1") +## @type: FillMissingValues +## @args: [missing_values_column = "dayofmonth", transformation_ctx = "Transform2"] +## @return: Transform2 +## @inputs: [frame = Transform1] +Transform2 = FillMissingValues.apply(frame = Transform1, missing_values_column = "dayofmonth", transformation_ctx = "Transform2") +## @type: SelectFields +## @args: [paths = [], transformation_ctx = "Transform3"] +## @return: Transform3 +## @inputs: [frame = Transform2] +Transform3 = SelectFields.apply(frame = Transform2, paths = [], transformation_ctx = "Transform3") +## @type: DataSink +## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://test-glue-jsons/", "partitionKeys": []}, transformation_ctx = "DataSink0"] +## @return: DataSink0 +## @inputs: [frame = Transform3] +DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform3, connection_type = "s3", format = "json", connection_options = {"path": "s3://test-glue-jsons/", "partitionKeys": []}, transformation_ctx = "DataSink0") +## @type: SplitFields +## @args: [paths = ["yr", "quarter", "month", "dayofmonth", "dayofweek", "flightdate", "uniquecarrier", "airlineid", "carrier"], name2 = "Transform0Output1", name1 = "Transform0Output0", transformation_ctx = "Transform0"] +## @return: Transform0 +## @inputs: [frame = Transform1] +Transform0 = SplitFields.apply(frame = Transform1, paths = ["yr", "quarter", "month", "dayofmonth", "dayofweek", "flightdate", "uniquecarrier", "airlineid", "carrier"], name2 = "Transform0Output1", name1 = "Transform0Output0", transformation_ctx = "Transform0") +job.commit() +""" + + +def mock_get_object_response(raw_body: str) -> Dict[str, Any]: + """ + Mock s3 client get_object() response object. + + See https://gist.github.com/grantcooksey/132ddc85274a50b94b821302649f9d7b + + Parameters + ---------- + raw_body: + Content of the 'Body' field to return + """ + + encoded_message = raw_body.encode("utf-8") + raw_stream = StreamingBody(io.BytesIO(encoded_message), len(encoded_message)) + + return {"Body": raw_stream} + + +get_object_response_1 = mock_get_object_response(get_object_body_1) +get_object_response_2 = mock_get_object_response(get_object_body_2) diff --git a/metadata-models/src/main/pegasus/com/linkedin/datajob/azkaban/AzkabanJobType.pdl b/metadata-models/src/main/pegasus/com/linkedin/datajob/azkaban/AzkabanJobType.pdl index 1f852da41763a..b2d960fad3957 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/datajob/azkaban/AzkabanJobType.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/datajob/azkaban/AzkabanJobType.pdl @@ -37,4 +37,9 @@ enum AzkabanJobType { * SQL is for running Presto, mysql queries etc */ SQL + + /** + * Glue type is for running AWS Glue job transforms. + */ + GLUE } \ No newline at end of file