diff --git a/smoke-test/test_e2e.py b/smoke-test/test_e2e.py index cde8fc616f60c..b331188430743 100644 --- a/smoke-test/test_e2e.py +++ b/smoke-test/test_e2e.py @@ -1,16 +1,20 @@ import time import urllib +from contextlib import contextmanager +from typing import Optional import pytest import requests - from datahub.cli.docker import check_local_docker_containers from datahub.ingestion.run.pipeline import Pipeline -from tests.utils import ingest_file_via_rest -GMS_ENDPOINT = "http://localhost:8080" -FRONTEND_ENDPOINT = "http://localhost:9002" -KAFKA_BROKER = "localhost:9092" +from tests.utils import ( + get_frontend_url, + get_gms_url, + get_kafka_broker_url, + get_sleep_info, + ingest_file_via_rest, +) bootstrap_sample_data = "../metadata-ingestion/examples/mce_files/bootstrap_mce.json" usage_sample_data = ( @@ -44,15 +48,77 @@ def frontend_session(wait_for_healthchecks): "Content-Type": "application/json", } data = '{"username":"datahub", "password":"datahub"}' - response = session.post(f"{FRONTEND_ENDPOINT}/logIn", headers=headers, data=data) + response = session.post(f"{get_frontend_url()}/logIn", headers=headers, data=data) response.raise_for_status() yield session +@contextmanager +def with_sleep_times( + sleep_between: Optional[int] = None, sleep_times: Optional[int] = None +): + _sleep_between, _sleep_times = get_sleep_info() + if sleep_times is None: + sleep_times = _sleep_times + while True: + try: + yield + except Exception as e: + if sleep_times > 0: + sleep_time = sleep_between or _sleep_between + sleep_times -= 1 + print( + f"Sleeping for {sleep_time}. Will sleep for {sleep_times} more if needed" + ) + time.sleep(sleep_time) + else: + raise e + finally: + break + + +def _ensure_user_present( + urn: str, sleep_between: Optional[int] = None, sleep_times: Optional[int] = None +): + with with_sleep_times(sleep_between, sleep_times): + response = requests.get( + f"{get_gms_url()}/entities/{urllib.parse.quote(urn)}", + headers={ + **restli_default_headers, + }, + ) + response.raise_for_status() + data = response.json() + + user_key = "com.linkedin.metadata.snapshot.CorpUserSnapshot" + assert data["value"] + assert data["value"][user_key] + assert data["value"][user_key]["urn"] == urn + + +def _ensure_dataset_present( + urn: str, sleep_between: Optional[int] = None, sleep_times: Optional[int] = None +): + with with_sleep_times(sleep_between, sleep_times): + response = requests.get( + f"{get_gms_url()}/entitiesV2?ids=List({urllib.parse.quote(urn)})&aspects=List(datasetProperties)", + headers={ + **restli_default_headers, + "X-RestLi-Method": "batch_get", + }, + ) + response.raise_for_status() + res_data = response.json() + assert res_data["results"] + assert res_data["results"][urn] + assert res_data["results"][urn]["aspects"]["datasetProperties"] + + @pytest.mark.dependency(depends=["test_healthchecks"]) def test_ingestion_via_rest(wait_for_healthchecks): ingest_file_via_rest(bootstrap_sample_data) + _ensure_user_present(urn="urn:li:corpuser:datahub", sleep_between=10, sleep_times=6) @pytest.mark.dependency(depends=["test_healthchecks"]) @@ -72,7 +138,7 @@ def test_ingestion_via_kafka(wait_for_healthchecks): "type": "datahub-kafka", "config": { "connection": { - "bootstrap": KAFKA_BROKER, + "bootstrap": get_kafka_broker_url(), } }, }, @@ -80,6 +146,9 @@ def test_ingestion_via_kafka(wait_for_healthchecks): ) pipeline.run() pipeline.raise_from_status() + _ensure_dataset_present( + "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times,PROD)" + ) # Since Kafka emission is asynchronous, we must wait a little bit so that # the changes are actually processed. @@ -102,20 +171,7 @@ def test_run_ingestion(wait_for_healthchecks): def test_gms_get_user(): username = "jdoe" urn = f"urn:li:corpuser:{username}" - response = requests.get( - f"{GMS_ENDPOINT}/entities/{urllib.parse.quote(urn)}", - headers={ - **restli_default_headers, - }, - ) - response.raise_for_status() - data = response.json() - - assert data["value"] - assert data["value"]["com.linkedin.metadata.snapshot.CorpUserSnapshot"] - assert ( - data["value"]["com.linkedin.metadata.snapshot.CorpUserSnapshot"]["urn"] == urn - ) + _ensure_user_present(urn=urn) @pytest.mark.parametrize( @@ -145,7 +201,7 @@ def test_gms_get_dataset(platform, dataset_name, env): urn = f"urn:li:dataset:({platform},{dataset_name},{env})" response = requests.get( - f"{GMS_ENDPOINT}/entities/{urllib.parse.quote(urn)}", + f"{get_gms_url()}/entities/{urllib.parse.quote(urn)}", headers={ **restli_default_headers, "X-RestLi-Method": "get", @@ -172,7 +228,7 @@ def test_gms_batch_get_v2(): urn2 = f"urn:li:dataset:({platform},{name_2},{env})" response = requests.get( - f"{GMS_ENDPOINT}/entitiesV2?ids=List({urllib.parse.quote(urn1)},{urllib.parse.quote(urn2)})&aspects=List(datasetProperties,ownership)", + f"{get_gms_url()}/entitiesV2?ids=List({urllib.parse.quote(urn1)},{urllib.parse.quote(urn2)})&aspects=List(datasetProperties,ownership)", headers={ **restli_default_headers, "X-RestLi-Method": "batch_get", @@ -206,7 +262,7 @@ def test_gms_search_dataset(query, min_expected_results): json = {"input": f"{query}", "entity": "dataset", "start": 0, "count": 10} print(json) response = requests.post( - f"{GMS_ENDPOINT}/entities?action=search", + f"{get_gms_url()}/entities?action=search", headers=restli_default_headers, json=json, ) @@ -231,7 +287,7 @@ def test_gms_search_across_entities(query, min_expected_results): json = {"input": f"{query}", "entities": [], "start": 0, "count": 10} print(json) response = requests.post( - f"{GMS_ENDPOINT}/entities?action=searchAcrossEntities", + f"{get_gms_url()}/entities?action=searchAcrossEntities", headers=restli_default_headers, json=json, ) @@ -246,7 +302,7 @@ def test_gms_search_across_entities(query, min_expected_results): @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_gms_usage_fetch(): response = requests.post( - f"{GMS_ENDPOINT}/usageStats?action=queryRange", + f"{get_gms_url()}/usageStats?action=queryRange", headers=restli_default_headers, json={ "resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)", @@ -304,7 +360,7 @@ def test_frontend_browse_datasets(frontend_session): "variables": {"input": {"type": "DATASET", "path": ["prod"]}}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -347,7 +403,7 @@ def test_frontend_search_datasets(frontend_session, query, min_expected_results) }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -390,7 +446,7 @@ def test_frontend_search_across_entities(frontend_session, query, min_expected_r }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -426,7 +482,7 @@ def test_frontend_user_info(frontend_session): }""", "variables": {"urn": urn}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -475,7 +531,7 @@ def test_frontend_datasets(frontend_session, platform, dataset_name, env): "variables": {"urn": urn}, } # Basic dataset info. - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -490,7 +546,7 @@ def test_frontend_datasets(frontend_session, platform, dataset_name, env): @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_ingest_with_system_metadata(): response = requests.post( - f"{GMS_ENDPOINT}/entities?action=ingest", + f"{get_gms_url()}/entities?action=ingest", headers=restli_default_headers, json={ "entity": { @@ -523,7 +579,7 @@ def test_ingest_with_system_metadata(): @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_ingest_with_blank_system_metadata(): response = requests.post( - f"{GMS_ENDPOINT}/entities?action=ingest", + f"{get_gms_url()}/entities?action=ingest", headers=restli_default_headers, json={ "entity": { @@ -553,7 +609,7 @@ def test_ingest_with_blank_system_metadata(): @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_ingest_without_system_metadata(): response = requests.post( - f"{GMS_ENDPOINT}/entities?action=ingest", + f"{get_gms_url()}/entities?action=ingest", headers=restli_default_headers, json={ "entity": { @@ -618,7 +674,7 @@ def test_frontend_list_policies(frontend_session): } }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -631,62 +687,50 @@ def test_frontend_list_policies(frontend_session): @pytest.mark.dependency( - depends=["test_healthchecks", "test_run_ingestion", "test_frontend_list_policies"] + depends=[ + "test_healthchecks", + "test_run_ingestion", + "test_frontend_list_policies", + ] ) -def test_frontend_update_policy(frontend_session): +def test_frontend_create_update_delete_policy(frontend_session): + # Policy tests are not idempotent. If you rerun this test it will be wrong. json = { - "query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n - updatePolicy(urn: $urn, input: $input) }""", + "query": """mutation createPolicy($input: PolicyUpdateInput!) {\n + createPolicy(input: $input) }""", "variables": { - "urn": "urn:li:dataHubPolicy:7", "input": { - "type": "PLATFORM", - "state": "INACTIVE", - "name": "Updated Platform Policy", + "type": "METADATA", + "name": "Test Metadata Policy", "description": "My Metadaata Policy", - "privileges": ["MANAGE_POLICIES"], + "state": "ACTIVE", + "resources": {"type": "dataset", "allResources": True}, + "privileges": ["EDIT_ENTITY_TAGS"], "actors": { "users": ["urn:li:corpuser:datahub"], "resourceOwners": False, "allUsers": False, "allGroups": False, }, - }, + } }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["updatePolicy"] - assert res_data["data"]["updatePolicy"] == "urn:li:dataHubPolicy:7" - - -@pytest.mark.dependency( - depends=[ - "test_healthchecks", - "test_run_ingestion", - "test_frontend_list_policies", - "test_frontend_update_policy", - ] -) -def test_frontend_delete_policy(frontend_session): + assert res_data["data"]["createPolicy"] - json = { - "query": """mutation deletePolicy($urn: String!) {\n - deletePolicy(urn: $urn) }""", - "variables": {"urn": "urn:li:dataHubPolicy:7"}, - } + new_urn = res_data["data"]["createPolicy"] - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) - response.raise_for_status() - res_data = response.json() + # Sleep for eventual consistency + time.sleep(3) - # Now verify the policy has been removed. + # Now verify the policy has been added. json = { "query": """query listPolicies($input: ListPoliciesInput!) {\n listPolicies(input: $input) {\n @@ -705,7 +749,7 @@ def test_frontend_delete_policy(frontend_session): } }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -713,60 +757,54 @@ def test_frontend_delete_policy(frontend_session): assert res_data["data"] assert res_data["data"]["listPolicies"] - # Verify that the URN is no longer in the list + # Verify that the URN appears in the list result = filter( - lambda x: x["urn"] == "urn:li:dataHubPolicy:7", - res_data["data"]["listPolicies"]["policies"], + lambda x: x["urn"] == new_urn, res_data["data"]["listPolicies"]["policies"] ) - assert len(list(result)) == 0 - - -@pytest.mark.dependency( - depends=[ - "test_healthchecks", - "test_run_ingestion", - "test_frontend_list_policies", - "test_frontend_delete_policy", - ] -) -def test_frontend_create_policy(frontend_session): + assert len(list(result)) == 1 - # Policy tests are not idempotent. If you rerun this test it will be wrong. + # update policy json = { - "query": """mutation createPolicy($input: PolicyUpdateInput!) {\n - createPolicy(input: $input) }""", + "query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n + updatePolicy(urn: $urn, input: $input) }""", "variables": { + "urn": new_urn, "input": { - "type": "METADATA", - "name": "Test Metadata Policy", + "type": "PLATFORM", + "state": "INACTIVE", + "name": "Updated Platform Policy", "description": "My Metadaata Policy", - "state": "ACTIVE", - "resources": {"type": "dataset", "allResources": True}, - "privileges": ["EDIT_ENTITY_TAGS"], + "privileges": ["MANAGE_POLICIES"], "actors": { "users": ["urn:li:corpuser:datahub"], "resourceOwners": False, "allUsers": False, "allGroups": False, }, - } + }, }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["createPolicy"] - - new_urn = res_data["data"]["createPolicy"] + assert res_data["data"]["updatePolicy"] + assert res_data["data"]["updatePolicy"] == new_urn - # Sleep for eventual consistency - time.sleep(3) + # Remove policy + json = { + "query": """mutation deletePolicy($urn: String!) {\n + deletePolicy(urn: $urn) }""", + "variables": {"urn": new_urn}, + } + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response.raise_for_status() + res_data = response.json() - # Now verify the policy has been added. + # Now verify the policy has been removed. json = { "query": """query listPolicies($input: ListPoliciesInput!) {\n listPolicies(input: $input) {\n @@ -785,7 +823,7 @@ def test_frontend_create_policy(frontend_session): } }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -793,11 +831,12 @@ def test_frontend_create_policy(frontend_session): assert res_data["data"] assert res_data["data"]["listPolicies"] - # Verify that the URN appears in the list + # Verify that the URN is no longer in the list result = filter( - lambda x: x["urn"] == new_urn, res_data["data"]["listPolicies"]["policies"] + lambda x: x["urn"] == new_urn, + res_data["data"]["listPolicies"]["policies"], ) - assert len(list(result)) == 1 + assert len(list(result)) == 0 @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) @@ -831,7 +870,7 @@ def test_frontend_app_config(frontend_session): }""" } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -872,7 +911,7 @@ def test_frontend_me_query(frontend_session): }""" } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -915,7 +954,7 @@ def test_list_users(frontend_session): } }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -955,7 +994,7 @@ def test_list_groups(frontend_session): } }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -986,7 +1025,7 @@ def test_add_remove_members_from_group(frontend_session): }""", "variables": {"urn": "urn:li:corpuser:jdoe"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -1007,7 +1046,7 @@ def test_add_remove_members_from_group(frontend_session): }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() # Sleep for edge store to be updated. Not ideal! @@ -1025,7 +1064,7 @@ def test_add_remove_members_from_group(frontend_session): }""", "variables": {"urn": "urn:li:corpuser:jdoe"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -1047,7 +1086,7 @@ def test_add_remove_members_from_group(frontend_session): }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() # Sleep for edge store to be updated. Not ideal! @@ -1065,7 +1104,7 @@ def test_add_remove_members_from_group(frontend_session): }""", "variables": {"urn": "urn:li:corpuser:jdoe"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -1075,9 +1114,7 @@ def test_add_remove_members_from_group(frontend_session): assert res_data["data"]["corpUser"]["relationships"]["total"] == 0 -@pytest.mark.dependency( - depends=["test_healthchecks", "test_run_ingestion"] -) +@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_update_corp_group_properties(frontend_session): group_urn = "urn:li:corpGroup:bfoo" @@ -1087,20 +1124,20 @@ def test_update_corp_group_properties(frontend_session): "query": """mutation updateCorpGroupProperties($urn: String!, $input: CorpGroupUpdateInput!) {\n updateCorpGroupProperties(urn: $urn, input: $input) { urn } }""", "variables": { - "urn": group_urn, - "input": { - "description": "My test description", - "slack": "test_group_slack", - "email": "test_group_email@email.com", - }, + "urn": group_urn, + "input": { + "description": "My test description", + "slack": "test_group_slack", + "email": "test_group_email@email.com", + }, }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() print(res_data) - assert "error" not in res_data + assert "errors" not in res_data assert res_data["data"]["updateCorpGroupProperties"] is not None # Verify the description has been updated @@ -1117,19 +1154,19 @@ def test_update_corp_group_properties(frontend_session): }""", "variables": {"urn": group_urn}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data - assert "error" not in res_data + assert "errors" not in res_data assert res_data["data"] assert res_data["data"]["corpGroup"] assert res_data["data"]["corpGroup"]["editableProperties"] assert res_data["data"]["corpGroup"]["editableProperties"] == { - "description": "My test description", - "slack": "test_group_slack", - "email": "test_group_email@email.com" + "description": "My test description", + "slack": "test_group_slack", + "email": "test_group_email@email.com", } # Reset the editable properties @@ -1137,20 +1174,21 @@ def test_update_corp_group_properties(frontend_session): "query": """mutation updateCorpGroupProperties($urn: String!, $input: UpdateCorpGroupPropertiesInput!) {\n updateCorpGroupProperties(urn: $urn, input: $input) }""", "variables": { - "urn": group_urn, - "input": { - "description": "", - "slack": "", - "email": "" - }, + "urn": group_urn, + "input": {"description": "", "slack": "", "email": ""}, }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() + @pytest.mark.dependency( - depends=["test_healthchecks", "test_run_ingestion", "test_update_corp_group_properties"] + depends=[ + "test_healthchecks", + "test_run_ingestion", + "test_update_corp_group_properties", + ] ) def test_update_corp_group_description(frontend_session): @@ -1161,18 +1199,15 @@ def test_update_corp_group_description(frontend_session): "query": """mutation updateDescription($input: DescriptionUpdateInput!) {\n updateDescription(input: $input) }""", "variables": { - "input": { - "description": "My test description", - "resourceUrn": group_urn - }, + "input": {"description": "My test description", "resourceUrn": group_urn}, }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() print(res_data) - assert "error" not in res_data + assert "errors" not in res_data assert res_data["data"]["updateDescription"] is True # Verify the description has been updated @@ -1187,32 +1222,33 @@ def test_update_corp_group_description(frontend_session): }""", "variables": {"urn": group_urn}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data - assert "error" not in res_data + assert "errors" not in res_data assert res_data["data"] assert res_data["data"]["corpGroup"] assert res_data["data"]["corpGroup"]["editableProperties"] - assert res_data["data"]["corpGroup"]["editableProperties"]["description"] == "My test description" + assert ( + res_data["data"]["corpGroup"]["editableProperties"]["description"] + == "My test description" + ) # Reset Corp Group Description json = { "query": """mutation updateDescription($input: DescriptionUpdateInput!) {\n updateDescription(input: $input) }""", "variables": { - "input": { - "description": "", - "resourceUrn": group_urn - }, + "input": {"description": "", "resourceUrn": group_urn}, }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() + @pytest.mark.dependency( depends=[ "test_healthchecks", @@ -1229,7 +1265,7 @@ def test_remove_user(frontend_session): "variables": {"urn": "urn:li:corpuser:jdoe"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() json = { @@ -1243,12 +1279,12 @@ def test_remove_user(frontend_session): }""", "variables": {"urn": "urn:li:corpuser:jdoe"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data - assert "error" not in res_data + assert "errors" not in res_data assert res_data["data"] assert res_data["data"]["corpUser"] assert res_data["data"]["corpUser"]["properties"] is None @@ -1270,7 +1306,7 @@ def test_remove_group(frontend_session): "variables": {"urn": "urn:li:corpGroup:bfoo"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() json = { @@ -1284,7 +1320,7 @@ def test_remove_group(frontend_session): }""", "variables": {"urn": "urn:li:corpGroup:bfoo"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -1316,7 +1352,7 @@ def test_create_group(frontend_session): }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() json = { @@ -1330,7 +1366,7 @@ def test_create_group(frontend_session): }""", "variables": {"urn": "urn:li:corpGroup:test-id"}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -1357,7 +1393,7 @@ def test_home_page_recommendations(frontend_session): }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() print(res_data) @@ -1365,7 +1401,7 @@ def test_home_page_recommendations(frontend_session): assert res_data assert res_data["data"] assert res_data["data"]["listRecommendations"] - assert "error" not in res_data + assert "errors" not in res_data assert ( len(res_data["data"]["listRecommendations"]["modules"]) > min_expected_recommendation_modules @@ -1378,7 +1414,7 @@ def test_search_results_recommendations(frontend_session): # This test simply ensures that the recommendations endpoint does not return an error. json = { "query": """query listRecommendations($input: ListRecommendationsInput!) {\n - listRecommendations(input: $input) { modules { title } }""", + listRecommendations(input: $input) { modules { title } } }""", "variables": { "input": { "userUrn": "urn:li:corpuser:datahub", @@ -1391,12 +1427,12 @@ def test_search_results_recommendations(frontend_session): }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data - assert "error" not in res_data + assert "errors" not in res_data @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) @@ -1418,14 +1454,14 @@ def test_generate_personal_access_token(frontend_session): }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] assert res_data["data"]["getAccessToken"]["accessToken"] is not None - assert "error" not in res_data + assert "errors" not in res_data # Test unauthenticated case json = { @@ -1441,13 +1477,14 @@ def test_generate_personal_access_token(frontend_session): }, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() assert res_data assert "errors" in res_data # Assert the request fails + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_native_user_endpoints(frontend_session): # Sign up tests @@ -1461,16 +1498,19 @@ def test_native_user_endpoints(frontend_session): }""" } - get_invite_token_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_invite_token_json) + get_invite_token_response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=get_invite_token_json + ) get_invite_token_response.raise_for_status() get_invite_token_res_data = get_invite_token_response.json() assert get_invite_token_res_data assert get_invite_token_res_data["data"] - invite_token = get_invite_token_res_data["data"]["getNativeUserInviteToken"]["inviteToken"] + invite_token = get_invite_token_res_data["data"]["getNativeUserInviteToken"][ + "inviteToken" + ] assert invite_token is not None - assert "error" not in get_invite_token_res_data - + assert "errors" not in get_invite_token_res_data # Pass the invite token when creating the user sign_up_json = { @@ -1478,15 +1518,19 @@ def test_native_user_endpoints(frontend_session): "email": "test@email.com", "password": "password", "title": "Date Engineer", - "inviteToken": invite_token + "inviteToken": invite_token, } - sign_up_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/signUp", json=sign_up_json) + sign_up_response = frontend_session.post( + f"{get_frontend_url()}/signUp", json=sign_up_json + ) assert sign_up_response - assert "error" not in sign_up_response + assert "errors" not in sign_up_response # Creating the same user again fails - same_user_sign_up_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/signUp", json=sign_up_json) + same_user_sign_up_response = frontend_session.post( + f"{get_frontend_url()}/signUp", json=sign_up_json + ) assert not same_user_sign_up_response # Test that a bad invite token leads to failed sign up @@ -1495,14 +1539,15 @@ def test_native_user_endpoints(frontend_session): "email": "test2@email.com", "password": "password", "title": "Date Engineer", - "inviteToken": "invite_token" + "inviteToken": "invite_token", } - bad_sign_up_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/signUp", json=bad_sign_up_json) + bad_sign_up_response = frontend_session.post( + f"{get_frontend_url()}/signUp", json=bad_sign_up_json + ) assert not bad_sign_up_response frontend_session.cookies.clear() - # Reset credentials tests # Log in as root again @@ -1510,7 +1555,9 @@ def test_native_user_endpoints(frontend_session): "Content-Type": "application/json", } root_login_data = '{"username":"datahub", "password":"datahub"}' - frontend_session.post(f"{FRONTEND_ENDPOINT}/logIn", headers=headers, data=root_login_data) + frontend_session.post( + f"{get_frontend_url()}/logIn", headers=headers, data=root_login_data + ) # Test creating the password reset token create_reset_token_json = { @@ -1519,68 +1566,84 @@ def test_native_user_endpoints(frontend_session): resetToken\n }\n }""", - "variables": { - "input": { - "userUrn": "urn:li:corpuser:test@email.com" - } - }, + "variables": {"input": {"userUrn": "urn:li:corpuser:test@email.com"}}, } - create_reset_token_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=create_reset_token_json) + create_reset_token_response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=create_reset_token_json + ) create_reset_token_response.raise_for_status() create_reset_token_res_data = create_reset_token_response.json() assert create_reset_token_res_data assert create_reset_token_res_data["data"] - reset_token = create_reset_token_res_data["data"]["createNativeUserResetToken"]["resetToken"] + reset_token = create_reset_token_res_data["data"]["createNativeUserResetToken"][ + "resetToken" + ] assert reset_token is not None - assert "error" not in create_reset_token_res_data + assert "errors" not in create_reset_token_res_data # Pass the reset token when resetting credentials reset_credentials_json = { "email": "test@email.com", "password": "password", - "resetToken": reset_token + "resetToken": reset_token, } - reset_credentials_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/resetNativeUserCredentials", json=reset_credentials_json) + reset_credentials_response = frontend_session.post( + f"{get_frontend_url()}/resetNativeUserCredentials", json=reset_credentials_json + ) assert reset_credentials_response - assert "error" not in reset_credentials_response + assert "errors" not in reset_credentials_response # Test that a bad reset token leads to failed response bad_user_reset_credentials_json = { "email": "test@email.com", "password": "password", - "resetToken": "reset_token" + "resetToken": "reset_token", } - bad_reset_credentials_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/resetNativeUserCredentials", json=bad_user_reset_credentials_json) + bad_reset_credentials_response = frontend_session.post( + f"{get_frontend_url()}/resetNativeUserCredentials", + json=bad_user_reset_credentials_json, + ) assert not bad_reset_credentials_response # Test that only a native user can reset their password jaas_user_reset_credentials_json = { "email": "datahub", "password": "password", - "resetToken": reset_token + "resetToken": reset_token, } - jaas_user_reset_credentials_response = frontend_session.post(f"{FRONTEND_ENDPOINT}/resetNativeUserCredentials", json=jaas_user_reset_credentials_json) + jaas_user_reset_credentials_response = frontend_session.post( + f"{get_frontend_url()}/resetNativeUserCredentials", + json=jaas_user_reset_credentials_json, + ) assert not jaas_user_reset_credentials_response - # Tests that unauthenticated users can't invite users or send reset password links native_user_frontend_session = requests.Session() native_user_login_data = '{"username":"test@email.com", "password":"password"}' - native_user_frontend_session.post(f"{FRONTEND_ENDPOINT}/logIn", headers=headers, data=native_user_login_data) + native_user_frontend_session.post( + f"{get_frontend_url()}/logIn", headers=headers, data=native_user_login_data + ) - unauthenticated_get_invite_token_response = native_user_frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_invite_token_json) + unauthenticated_get_invite_token_response = native_user_frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=get_invite_token_json + ) unauthenticated_get_invite_token_response.raise_for_status() - unauthenticated_get_invite_token_res_data = unauthenticated_get_invite_token_response.json() + unauthenticated_get_invite_token_res_data = ( + unauthenticated_get_invite_token_response.json() + ) assert unauthenticated_get_invite_token_res_data assert "errors" in unauthenticated_get_invite_token_res_data assert unauthenticated_get_invite_token_res_data["data"] - assert unauthenticated_get_invite_token_res_data["data"]["getNativeUserInviteToken"] is None + assert ( + unauthenticated_get_invite_token_res_data["data"]["getNativeUserInviteToken"] + is None + ) unauthenticated_create_reset_token_json = { "query": """mutation createNativeUserResetToken($input: CreateNativeUserResetTokenInput!) {\n @@ -1588,18 +1651,37 @@ def test_native_user_endpoints(frontend_session): resetToken\n }\n }""", - "variables": { - "input": { - "userUrn": "urn:li:corpuser:test@email.com" - } - }, + "variables": {"input": {"userUrn": "urn:li:corpuser:test@email.com"}}, } - unauthenticated_create_reset_token_response = native_user_frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=unauthenticated_create_reset_token_json) + unauthenticated_create_reset_token_response = native_user_frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", + json=unauthenticated_create_reset_token_json, + ) unauthenticated_create_reset_token_response.raise_for_status() - unauthenticated_create_reset_token_res_data = unauthenticated_create_reset_token_response.json() + unauthenticated_create_reset_token_res_data = ( + unauthenticated_create_reset_token_response.json() + ) assert unauthenticated_create_reset_token_res_data assert "errors" in unauthenticated_create_reset_token_res_data assert unauthenticated_create_reset_token_res_data["data"] - assert unauthenticated_create_reset_token_res_data["data"]["createNativeUserResetToken"] is None + assert ( + unauthenticated_create_reset_token_res_data["data"][ + "createNativeUserResetToken" + ] + is None + ) + + # cleanup steps + json = { + "query": """mutation removeUser($urn: String!) {\n + removeUser(urn: $urn) }""", + "variables": {"urn": "urn:li:corpuser:test@email.com"}, + } + + remove_user_response = native_user_frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json + ) + remove_user_response.raise_for_status() + assert "errors" not in remove_user_response diff --git a/smoke-test/test_rapid.py b/smoke-test/test_rapid.py index 0219154bdb51d..fae575a9bac27 100644 --- a/smoke-test/test_rapid.py +++ b/smoke-test/test_rapid.py @@ -1,18 +1,14 @@ import time -import urllib -from typing import Any, Dict, Optional, cast import pytest import requests - from datahub.cli.docker import check_local_docker_containers -from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.source.state.checkpoint import Checkpoint -from tests.utils import ingest_file_via_rest + +from tests.utils import get_frontend_url, ingest_file_via_rest bootstrap_small = "test_resources/bootstrap_single.json" bootstrap_small_2 = "test_resources/bootstrap_single2.json" -FRONTEND_ENDPOINT = "http://localhost:9002" + @pytest.fixture(scope="session") def wait_for_healthchecks(): @@ -20,6 +16,7 @@ def wait_for_healthchecks(): assert not check_local_docker_containers() yield + @pytest.fixture(scope="session") def frontend_session(wait_for_healthchecks): session = requests.Session() @@ -28,19 +25,18 @@ def frontend_session(wait_for_healthchecks): "Content-Type": "application/json", } data = '{"username":"datahub", "password":"datahub"}' - response = session.post( - f"{FRONTEND_ENDPOINT}/logIn", headers=headers, data=data - ) + response = session.post(f"{get_frontend_url()}/logIn", headers=headers, data=data) response.raise_for_status() yield session + def test_ingestion_via_rest_rapid(frontend_session, wait_for_healthchecks): ingest_file_via_rest(bootstrap_small) ingest_file_via_rest(bootstrap_small_2) - urn = f"urn:li:dataset:(urn:li:dataPlatform:testPlatform,testDataset,PROD)" + urn = "urn:li:dataset:(urn:li:dataPlatform:testPlatform,testDataset,PROD)" json = { - "query": """query getDataset($urn: String!) {\n + "query": """query getDataset($urn: String!) {\n dataset(urn: $urn) {\n urn\n name\n @@ -70,15 +66,11 @@ def test_ingestion_via_rest_rapid(frontend_session, wait_for_healthchecks): }\n }\n }""", - "variables": { - "urn": urn - } - } + "variables": {"urn": urn}, + } # time.sleep(2) - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() diff --git a/smoke-test/tests/assertions/assertions_test.py b/smoke-test/tests/assertions/assertions_test.py index 99ae7110de0bd..66db987e62ceb 100644 --- a/smoke-test/tests/assertions/assertions_test.py +++ b/smoke-test/tests/assertions/assertions_test.py @@ -4,7 +4,6 @@ import pytest import requests - from datahub.cli.docker import check_local_docker_containers from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -25,11 +24,7 @@ PartitionSpecClass, PartitionTypeClass, ) -from tests.utils import ingest_file_via_rest -from tests.utils import delete_urns_from_file - - -GMS_ENDPOINT = "http://localhost:8080" +from tests.utils import delete_urns_from_file, get_gms_url, ingest_file_via_rest restli_default_headers = { "X-RestLi-Protocol-Version": "2.0.0", @@ -253,6 +248,7 @@ def test_healthchecks(wait_for_healthchecks): def test_run_ingestion(generate_test_data): ingest_file_via_rest(generate_test_data) + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_gms_get_latest_assertions_results_by_partition(): urn = make_dataset_urn("postgres", "foo") @@ -294,7 +290,7 @@ def test_gms_get_latest_assertions_results_by_partition(): } ) response = requests.post( - f"{GMS_ENDPOINT}/analytics?action=getTimeseriesStats", + f"{get_gms_url()}/analytics?action=getTimeseriesStats", data=query, headers=restli_default_headers, ) @@ -325,7 +321,7 @@ def test_gms_get_assertions_on_dataset(): """lists all assertion urns including those which may not have executed""" urn = make_dataset_urn("postgres", "foo") response = requests.get( - f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts" + f"{get_gms_url()}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts" ) response.raise_for_status() @@ -339,7 +335,7 @@ def test_gms_get_assertions_on_dataset_field(): dataset_urn = make_dataset_urn("postgres", "foo") field_urn = make_schema_field_urn(dataset_urn, "col1") response = requests.get( - f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts" + f"{get_gms_url()}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts" ) response.raise_for_status() @@ -351,7 +347,7 @@ def test_gms_get_assertions_on_dataset_field(): def test_gms_get_assertion_info(): assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b" response = requests.get( - f"{GMS_ENDPOINT}/aspects/{urllib.parse.quote(assertion_urn)}\ + f"{get_gms_url()}/aspects/{urllib.parse.quote(assertion_urn)}\ ?aspect=assertionInfo&version=0", headers=restli_default_headers, ) @@ -364,4 +360,4 @@ def test_gms_get_assertion_info(): assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["type"] == "DATASET" assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"][ "scope" - ] \ No newline at end of file + ] diff --git a/smoke-test/tests/cli/datahub_graph_test.py b/smoke-test/tests/cli/datahub_graph_test.py index a4b01e452217f..3728f68f8208f 100644 --- a/smoke-test/tests/cli/datahub_graph_test.py +++ b/smoke-test/tests/cli/datahub_graph_test.py @@ -1,14 +1,7 @@ -import time - import pytest from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph -from datahub.metadata.schema_classes import SchemaMetadataClass, KafkaSchemaClass -from tests.utils import ( - FRONTEND_ENDPOINT, - GMS_ENDPOINT, - delete_urns_from_file, - ingest_file_via_rest, -) +from datahub.metadata.schema_classes import KafkaSchemaClass, SchemaMetadataClass +from tests.utils import delete_urns_from_file, ingest_file_via_rest @pytest.fixture(scope="module", autouse=False) @@ -38,5 +31,7 @@ def test_get_aspect_v2(frontend_session, ingest_cleanup_data): assert schema_metadata.platform == "urn:li:dataPlatform:kafka" assert isinstance(schema_metadata.platformSchema, KafkaSchemaClass) k_schema: KafkaSchemaClass = schema_metadata.platformSchema - assert k_schema.documentSchema == "{\"type\":\"record\",\"name\":\"SampleKafkaSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Kafka dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}" - + assert ( + k_schema.documentSchema + == '{"type":"record","name":"SampleKafkaSchema","namespace":"com.linkedin.dataset","doc":"Sample Kafka dataset","fields":[{"name":"field_foo","type":["string"]},{"name":"field_bar","type":["boolean"]}]}' + ) diff --git a/smoke-test/tests/conftest.py b/smoke-test/tests/conftest.py index 7d0f3c159864b..946d282a2b050 100644 --- a/smoke-test/tests/conftest.py +++ b/smoke-test/tests/conftest.py @@ -1,22 +1,22 @@ import os -import time import pytest import requests -import urllib from datahub.cli.docker import check_local_docker_containers -from datahub.ingestion.run.pipeline import Pipeline -from tests.utils import FRONTEND_ENDPOINT + +from tests.utils import get_frontend_url # Disable telemetry os.putenv("DATAHUB_TELEMETRY_ENABLED", "false") + @pytest.fixture(scope="session") def wait_for_healthchecks(): # Simply assert that everything is healthy, but don't wait. assert not check_local_docker_containers() yield + @pytest.fixture(scope="session") def frontend_session(wait_for_healthchecks): session = requests.Session() @@ -25,15 +25,14 @@ def frontend_session(wait_for_healthchecks): "Content-Type": "application/json", } data = '{"username":"datahub", "password":"datahub"}' - response = session.post( - f"{FRONTEND_ENDPOINT}/logIn", headers=headers, data=data - ) + response = session.post(f"{get_frontend_url()}/logIn", headers=headers, data=data) response.raise_for_status() yield session + # TODO: Determine whether we need this or not. @pytest.mark.dependency() def test_healthchecks(wait_for_healthchecks): # Call to wait_for_healthchecks fixture will do the actual functionality. - pass \ No newline at end of file + pass diff --git a/smoke-test/tests/containers/containers_test.py b/smoke-test/tests/containers/containers_test.py index daa6b2bbc7bd8..575e3def6cf23 100644 --- a/smoke-test/tests/containers/containers_test.py +++ b/smoke-test/tests/containers/containers_test.py @@ -1,9 +1,6 @@ import pytest -import time -from tests.utils import FRONTEND_ENDPOINT -from tests.utils import GMS_ENDPOINT -from tests.utils import ingest_file_via_rest -from tests.utils import delete_urns_from_file +from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest + @pytest.fixture(scope="module", autouse=False) def ingest_cleanup_data(request): @@ -13,11 +10,13 @@ def ingest_cleanup_data(request): print("removing containers test data") delete_urns_from_file("tests/containers/data.json") + @pytest.mark.dependency() def test_healthchecks(wait_for_healthchecks): # Call to wait_for_healthchecks fixture will do the actual functionality. pass + @pytest.mark.dependency(depends=["test_healthchecks"]) def test_get_full_container(frontend_session, ingest_cleanup_data): @@ -96,13 +95,11 @@ def test_get_full_container(frontend_session, ingest_cleanup_data): }\n }\n }""", - "variables": { - "urn": container_urn - } + "variables": {"urn": container_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_container_json + f"{get_frontend_url()}/api/v2/graphql", json=get_container_json ) response.raise_for_status() res_data = response.json() @@ -119,12 +116,15 @@ def test_get_full_container(frontend_session, ingest_cleanup_data): assert container["properties"]["name"] == container_name assert container["properties"]["description"] == container_description assert container["subTypes"]["typeNames"][0] == "Schema" - assert container["editableProperties"]["description"] == editable_container_description + assert ( + container["editableProperties"]["description"] == editable_container_description + ) assert container["ownership"] is None assert container["institutionalMemory"] is None assert container["tags"] is None assert container["glossaryTerms"] is None + @pytest.mark.dependency(depends=["test_healthchecks", "test_get_full_container"]) def test_get_parent_container(frontend_session, ingest_cleanup_data): @@ -143,13 +143,11 @@ def test_get_parent_container(frontend_session, ingest_cleanup_data): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=get_dataset_json ) response.raise_for_status() res_data = response.json() @@ -162,6 +160,7 @@ def test_get_parent_container(frontend_session, ingest_cleanup_data): dataset = res_data["data"]["dataset"] assert dataset["container"]["properties"]["name"] == "datahub_schema" + @pytest.mark.dependency(depends=["test_healthchecks", "test_get_full_container"]) def test_update_container(frontend_session, ingest_cleanup_data): @@ -175,14 +174,14 @@ def test_update_container(frontend_session, ingest_cleanup_data): }""", "variables": { "input": { - "tagUrn": new_tag, - "resourceUrn": container_urn, + "tagUrn": new_tag, + "resourceUrn": container_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_tag_json + f"{get_frontend_url()}/api/v2/graphql", json=add_tag_json ) response.raise_for_status() res_data = response.json() @@ -199,14 +198,14 @@ def test_update_container(frontend_session, ingest_cleanup_data): }""", "variables": { "input": { - "termUrn": new_term, - "resourceUrn": container_urn, + "termUrn": new_term, + "resourceUrn": container_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_term_json + f"{get_frontend_url()}/api/v2/graphql", json=add_term_json ) response.raise_for_status() res_data = response.json() @@ -223,15 +222,15 @@ def test_update_container(frontend_session, ingest_cleanup_data): }""", "variables": { "input": { - "ownerUrn": new_owner, - "resourceUrn": container_urn, - "ownerEntityType": "CORP_USER" + "ownerUrn": new_owner, + "resourceUrn": container_urn, + "ownerEntityType": "CORP_USER", } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_owner_json + f"{get_frontend_url()}/api/v2/graphql", json=add_owner_json ) response.raise_for_status() res_data = response.json() @@ -248,15 +247,15 @@ def test_update_container(frontend_session, ingest_cleanup_data): }""", "variables": { "input": { - "linkUrl": new_link, - "resourceUrn": container_urn, - "label": "Label" + "linkUrl": new_link, + "resourceUrn": container_urn, + "label": "Label", } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_link_json + f"{get_frontend_url()}/api/v2/graphql", json=add_link_json ) response.raise_for_status() res_data = response.json() @@ -273,14 +272,14 @@ def test_update_container(frontend_session, ingest_cleanup_data): }""", "variables": { "input": { - "description": new_description, - "resourceUrn": container_urn, + "description": new_description, + "resourceUrn": container_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=update_description_json + f"{get_frontend_url()}/api/v2/graphql", json=update_description_json ) response.raise_for_status() res_data = response.json() @@ -290,7 +289,7 @@ def test_update_container(frontend_session, ingest_cleanup_data): assert res_data["data"]["updateDescription"] is True # Now fetch the container to ensure it was updated - # Get the container + # Get the container get_container_json = { "query": """query container($urn: String!) {\n container(urn: $urn) {\n @@ -327,13 +326,11 @@ def test_update_container(frontend_session, ingest_cleanup_data): }\n }\n }""", - "variables": { - "urn": container_urn - } + "variables": {"urn": container_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_container_json + f"{get_frontend_url()}/api/v2/graphql", json=get_container_json ) response.raise_for_status() res_data = response.json() @@ -348,4 +345,4 @@ def test_update_container(frontend_session, ingest_cleanup_data): assert container["ownership"]["owners"][0]["owner"]["urn"] == new_owner assert container["institutionalMemory"]["elements"][0]["url"] == new_link assert container["tags"]["tags"][0]["tag"]["urn"] == new_tag - assert container["glossaryTerms"]["terms"][0]["term"]["urn"] == new_term \ No newline at end of file + assert container["glossaryTerms"]["terms"][0]["term"]["urn"] == new_term diff --git a/smoke-test/tests/deprecation/deprecation_test.py b/smoke-test/tests/deprecation/deprecation_test.py index 1173ca8d6327d..ffa2c13196207 100644 --- a/smoke-test/tests/deprecation/deprecation_test.py +++ b/smoke-test/tests/deprecation/deprecation_test.py @@ -1,9 +1,6 @@ import pytest -import time -from tests.utils import FRONTEND_ENDPOINT -from tests.utils import GMS_ENDPOINT -from tests.utils import ingest_file_via_rest -from tests.utils import delete_urns_from_file +from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest + @pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(request): @@ -13,14 +10,18 @@ def ingest_cleanup_data(request): print("removing deprecation test data") delete_urns_from_file("tests/deprecation/data.json") + @pytest.mark.dependency() def test_healthchecks(wait_for_healthchecks): # Call to wait_for_healthchecks fixture will do the actual functionality. pass + @pytest.mark.dependency(depends=["test_healthchecks"]) def test_update_deprecation_all_fields(frontend_session): - dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)" + dataset_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)" + ) dataset_json = { "query": """query getDataset($urn: String!) {\n @@ -33,14 +34,12 @@ def test_update_deprecation_all_fields(frontend_session): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } # Fetch tags response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -56,16 +55,16 @@ def test_update_deprecation_all_fields(frontend_session): }""", "variables": { "input": { - "urn": dataset_urn, - "deprecated": True, - "note": "My test note", - "decommissionTime": 0 + "urn": dataset_urn, + "deprecated": True, + "note": "My test note", + "decommissionTime": 0, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=update_deprecation_json + f"{get_frontend_url()}/api/v2/graphql", json=update_deprecation_json ) response.raise_for_status() res_data = response.json() @@ -76,7 +75,7 @@ def test_update_deprecation_all_fields(frontend_session): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -85,30 +84,30 @@ def test_update_deprecation_all_fields(frontend_session): assert res_data["data"] assert res_data["data"]["dataset"] assert res_data["data"]["dataset"]["deprecation"] == { - 'deprecated': True, - 'decommissionTime': 0, - 'note': 'My test note', - 'actor': 'urn:li:corpuser:datahub' + "deprecated": True, + "decommissionTime": 0, + "note": "My test note", + "actor": "urn:li:corpuser:datahub", } -@pytest.mark.dependency(depends=["test_healthchecks", "test_update_deprecation_all_fields"]) + +@pytest.mark.dependency( + depends=["test_healthchecks", "test_update_deprecation_all_fields"] +) def test_update_deprecation_partial_fields(frontend_session, ingest_cleanup_data): - dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)" + dataset_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)" + ) update_deprecation_json = { "query": """mutation updateDeprecation($input: UpdateDeprecationInput!) {\n updateDeprecation(input: $input) }""", - "variables": { - "input": { - "urn": dataset_urn, - "deprecated": False - } - } + "variables": {"input": {"urn": dataset_urn, "deprecated": False}}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=update_deprecation_json + f"{get_frontend_url()}/api/v2/graphql", json=update_deprecation_json ) response.raise_for_status() res_data = response.json() @@ -129,13 +128,11 @@ def test_update_deprecation_partial_fields(frontend_session, ingest_cleanup_data }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -144,8 +141,8 @@ def test_update_deprecation_partial_fields(frontend_session, ingest_cleanup_data assert res_data["data"] assert res_data["data"]["dataset"] assert res_data["data"]["dataset"]["deprecation"] == { - 'deprecated': False, - 'note': '', - 'actor': 'urn:li:corpuser:datahub', - 'decommissionTime': None - } \ No newline at end of file + "deprecated": False, + "note": "", + "actor": "urn:li:corpuser:datahub", + "decommissionTime": None, + } diff --git a/smoke-test/tests/domains/domains_test.py b/smoke-test/tests/domains/domains_test.py index 11b41cb4240a8..0e935c244ed09 100644 --- a/smoke-test/tests/domains/domains_test.py +++ b/smoke-test/tests/domains/domains_test.py @@ -1,9 +1,13 @@ -import pytest import time -from tests.utils import FRONTEND_ENDPOINT -from tests.utils import GMS_ENDPOINT -from tests.utils import ingest_file_via_rest -from tests.utils import delete_urns_from_file + +import pytest +from tests.utils import ( + delete_urns_from_file, + get_frontend_url, + get_gms_url, + ingest_file_via_rest, +) + @pytest.fixture(scope="module", autouse=False) def ingest_cleanup_data(request): @@ -13,11 +17,13 @@ def ingest_cleanup_data(request): print("removing domains test data") delete_urns_from_file("tests/domains/data.json") + @pytest.mark.dependency() def test_healthchecks(wait_for_healthchecks): # Call to wait_for_healthchecks fixture will do the actual functionality. pass + @pytest.mark.dependency(depends=["test_healthchecks"]) def test_create_list_get_domain(frontend_session): @@ -36,16 +42,11 @@ def test_create_list_get_domain(frontend_session): }\n }\n }""", - "variables": { - "input": { - "start": "0", - "count": "20" - } - } + "variables": {"input": {"start": "0", "count": "20"}}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=list_domains_json + f"{get_frontend_url()}/api/v2/graphql", json=list_domains_json ) response.raise_for_status() res_data = response.json() @@ -68,16 +69,16 @@ def test_create_list_get_domain(frontend_session): createDomain(input: $input) }""", "variables": { - "input": { - "id": domain_id, - "name": domain_name, - "description": domain_description - } - } + "input": { + "id": domain_id, + "name": domain_name, + "description": domain_description, + } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=create_domain_json + f"{get_frontend_url()}/api/v2/graphql", json=create_domain_json ) response.raise_for_status() res_data = response.json() @@ -94,7 +95,7 @@ def test_create_list_get_domain(frontend_session): # Get new count of Domains response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=list_domains_json + f"{get_frontend_url()}/api/v2/graphql", json=list_domains_json ) response.raise_for_status() res_data = response.json() @@ -109,7 +110,6 @@ def test_create_list_get_domain(frontend_session): print(after_count) assert after_count == before_count + 1 - # Get the domain value back get_domain_json = { "query": """query domain($urn: String!) {\n @@ -122,13 +122,11 @@ def test_create_list_get_domain(frontend_session): }\n }\n }""", - "variables": { - "urn": domain_urn - } + "variables": {"urn": domain_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_domain_json + f"{get_frontend_url()}/api/v2/graphql", json=get_domain_json ) response.raise_for_status() res_data = response.json() @@ -139,18 +137,16 @@ def test_create_list_get_domain(frontend_session): assert "errors" not in res_data domain = res_data["data"]["domain"] - assert domain["urn"] == f'urn:li:domain:{domain_id}' + assert domain["urn"] == f"urn:li:domain:{domain_id}" assert domain["id"] == domain_id assert domain["properties"]["name"] == domain_name assert domain["properties"]["description"] == domain_description - delete_json = { - "urn": domain_urn - } + delete_json = {"urn": domain_urn} # Cleanup: Delete the domain response = frontend_session.post( - f"{GMS_ENDPOINT}/entities?action=delete", json=delete_json + f"{get_gms_url()}/entities?action=delete", json=delete_json ) response.raise_for_status() @@ -160,20 +156,20 @@ def test_create_list_get_domain(frontend_session): def test_set_unset_domain(frontend_session, ingest_cleanup_data): # Set and Unset a Domain for a dataset. Note that this doesn't test for adding domains to charts, dashboards, charts, & jobs. - dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)" + dataset_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)" + ) domain_urn = "urn:li:domain:engineering" # First unset to be sure. unset_domain_json = { "query": """mutation unsetDomain($entityUrn: String!) {\n unsetDomain(entityUrn: $entityUrn)}""", - "variables": { - "entityUrn": dataset_urn - } + "variables": {"entityUrn": dataset_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=unset_domain_json + f"{get_frontend_url()}/api/v2/graphql", json=unset_domain_json ) response.raise_for_status() res_data = response.json() @@ -187,14 +183,11 @@ def test_set_unset_domain(frontend_session, ingest_cleanup_data): set_domain_json = { "query": """mutation setDomain($entityUrn: String!, $domainUrn: String!) {\n setDomain(entityUrn: $entityUrn, domainUrn: $domainUrn)}""", - "variables": { - "entityUrn": dataset_urn, - "domainUrn": domain_urn - } + "variables": {"entityUrn": dataset_urn, "domainUrn": domain_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=set_domain_json + f"{get_frontend_url()}/api/v2/graphql", json=set_domain_json ) response.raise_for_status() res_data = response.json() @@ -204,7 +197,7 @@ def test_set_unset_domain(frontend_session, ingest_cleanup_data): assert res_data["data"]["setDomain"] is True assert "errors" not in res_data - # Now, fetch the dataset's domain and confirm it was set.GMS_ENDPOINT + # Now, fetch the dataset's domain and confirm it was set. get_dataset_json = { "query": """query dataset($urn: String!) {\n dataset(urn: $urn) {\n @@ -217,13 +210,11 @@ def test_set_unset_domain(frontend_session, ingest_cleanup_data): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=get_dataset_json ) response.raise_for_status() res_data = response.json() diff --git a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py index 8a149eccaaea2..47a0f0e2e5f2e 100644 --- a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py +++ b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py @@ -1,7 +1,8 @@ -import pytest import time -import requests -from tests.utils import FRONTEND_ENDPOINT + +import pytest +from tests.utils import get_frontend_url + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_create_list_get_remove_secret(frontend_session): @@ -19,17 +20,10 @@ def test_create_list_get_remove_secret(frontend_session): }\n }\n }""", - "variables": { - "input": { - "start": "0", - "count": "20" - } - } + "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -45,17 +39,10 @@ def test_create_list_get_remove_secret(frontend_session): "query": """mutation createSecret($input: CreateSecretInput!) {\n createSecret(input: $input) }""", - "variables": { - "input": { - "name": "SMOKE_TEST", - "value": "mytestvalue" - } - } + "variables": {"input": {"name": "SMOKE_TEST", "value": "mytestvalue"}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -82,17 +69,10 @@ def test_create_list_get_remove_secret(frontend_session): }\n }\n }""", - "variables": { - "input": { - "start": "0", - "count": "20" - } - } + "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -113,16 +93,10 @@ def test_create_list_get_remove_secret(frontend_session): value\n }\n }""", - "variables": { - "input": { - "secrets": ["SMOKE_TEST"] - } - } + "variables": {"input": {"secrets": ["SMOKE_TEST"]}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -141,14 +115,10 @@ def test_create_list_get_remove_secret(frontend_session): "query": """mutation deleteSecret($urn: String!) {\n deleteSecret(urn: $urn) }""", - "variables": { - "urn": secret_urn - } + "variables": {"urn": secret_urn}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -168,16 +138,10 @@ def test_create_list_get_remove_secret(frontend_session): value\n }\n }""", - "variables": { - "input": { - "secrets": ["SMOKE_TEST"] - } - } + "variables": {"input": {"secrets": ["SMOKE_TEST"]}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -190,6 +154,7 @@ def test_create_list_get_remove_secret(frontend_session): secret_value_arr = [x for x in secret_values if x["name"] == "SMOKE_TEST"] assert len(secret_value_arr) == 0 + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_create_list_get_remove_ingestion_source(frontend_session): @@ -205,17 +170,10 @@ def test_create_list_get_remove_ingestion_source(frontend_session): }\n }\n }""", - "variables": { - "input": { - "start": "0", - "count": "20" - } - } + "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -232,26 +190,21 @@ def test_create_list_get_remove_ingestion_source(frontend_session): createIngestionSource(input: $input) }""", "variables": { - "input": { - "name": "My Test Ingestion Source", - "type": "mysql", - "description": "My ingestion source description", - "schedule": { - "interval": "* * * * *", - "timezone": "UTC" - }, - "config": { - "recipe": "MY_TEST_RECIPE", - "version": "0.8.18", - "executorId": "mytestexecutor" - } - } - } + "input": { + "name": "My Test Ingestion Source", + "type": "mysql", + "description": "My ingestion source description", + "schedule": {"interval": "* * * * *", "timezone": "UTC"}, + "config": { + "recipe": "MY_TEST_RECIPE", + "version": "0.8.18", + "executorId": "mytestexecutor", + }, + } + }, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -277,17 +230,10 @@ def test_create_list_get_remove_ingestion_source(frontend_session): }\n }\n }""", - "variables": { - "input": { - "start": "0", - "count": "20" - } - } + "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -318,14 +264,10 @@ def test_create_list_get_remove_ingestion_source(frontend_session): }\n }\n }""", - "variables": { - "urn": ingestion_source_urn - } + "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -349,14 +291,10 @@ def test_create_list_get_remove_ingestion_source(frontend_session): "query": """mutation deleteIngestionSource($urn: String!) {\n deleteIngestionSource(urn: $urn) }""", - "variables": { - "urn": ingestion_source_urn - } + "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -381,17 +319,10 @@ def test_create_list_get_remove_ingestion_source(frontend_session): }\n }\n }""", - "variables": { - "input": { - "start": "0", - "count": "20" - } - } + "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -403,7 +334,14 @@ def test_create_list_get_remove_ingestion_source(frontend_session): final_count = res_data["data"]["listIngestionSources"]["total"] assert final_count == after_count - 1 -@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion", "test_create_list_get_remove_ingestion_source"]) + +@pytest.mark.dependency( + depends=[ + "test_healthchecks", + "test_run_ingestion", + "test_create_list_get_remove_ingestion_source", + ] +) def test_create_list_get_ingestion_execution_request(frontend_session): # Create new ingestion source json = { @@ -411,26 +349,21 @@ def test_create_list_get_ingestion_execution_request(frontend_session): createIngestionSource(input: $input) }""", "variables": { - "input": { - "name": "My Test Ingestion Source", - "type": "mysql", - "description": "My ingestion source description", - "schedule": { - "interval": "* * * * *", - "timezone": "UTC" - }, - "config": { - "recipe": "MY_TEST_RECIPE", - "version": "0.8.18", - "executorId": "mytestexecutor" - } - } - } + "input": { + "name": "My Test Ingestion Source", + "type": "mysql", + "description": "My ingestion source description", + "schedule": {"interval": "* * * * *", "timezone": "UTC"}, + "config": { + "recipe": "MY_TEST_RECIPE", + "version": "0.8.18", + "executorId": "mytestexecutor", + }, + } + }, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -446,16 +379,10 @@ def test_create_list_get_ingestion_execution_request(frontend_session): "query": """mutation createIngestionExecutionRequest($input: CreateIngestionExecutionRequestInput!) {\n createIngestionExecutionRequest(input: $input) }""", - "variables": { - "input": { - "ingestionSourceUrn": ingestion_source_urn - } - } + "variables": {"input": {"ingestionSourceUrn": ingestion_source_urn}}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -483,14 +410,10 @@ def test_create_list_get_ingestion_execution_request(frontend_session): }\n }\n }""", - "variables": { - "urn": ingestion_source_urn - } + "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -501,7 +424,10 @@ def test_create_list_get_ingestion_execution_request(frontend_session): ingestion_source = res_data["data"]["ingestionSource"] assert ingestion_source["executions"]["total"] == 1 - assert ingestion_source["executions"]["executionRequests"][0]["urn"] == execution_request_urn + assert ( + ingestion_source["executions"]["executionRequests"][0]["urn"] + == execution_request_urn + ) # Get the ingestion request back via direct lookup json = { @@ -522,14 +448,10 @@ def test_create_list_get_ingestion_execution_request(frontend_session): }\n }\n }""", - "variables": { - "urn": execution_request_urn - } + "variables": {"urn": execution_request_urn}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -544,10 +466,10 @@ def test_create_list_get_ingestion_execution_request(frontend_session): # Verify input assert execution_request["input"]["task"] == "RUN_INGEST" assert len(execution_request["input"]["arguments"]) == 2 - assert execution_request["input"]["arguments"][0]["key"] == 'recipe' - assert execution_request["input"]["arguments"][0]["value"] == 'MY_TEST_RECIPE' - assert execution_request["input"]["arguments"][1]["key"] == 'version' - assert execution_request["input"]["arguments"][1]["value"] == '0.8.18' + assert execution_request["input"]["arguments"][0]["key"] == "recipe" + assert execution_request["input"]["arguments"][0]["value"] == "MY_TEST_RECIPE" + assert execution_request["input"]["arguments"][1]["key"] == "version" + assert execution_request["input"]["arguments"][1]["value"] == "0.8.18" # Verify no result assert execution_request["result"] is None @@ -557,14 +479,10 @@ def test_create_list_get_ingestion_execution_request(frontend_session): "query": """mutation deleteIngestionSource($urn: String!) {\n deleteIngestionSource(urn: $urn) }""", - "variables": { - "urn": ingestion_source_urn - } + "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -572,4 +490,3 @@ def test_create_list_get_ingestion_execution_request(frontend_session): assert res_data["data"] assert res_data["data"]["deleteIngestionSource"] is not None assert "errors" not in res_data - diff --git a/smoke-test/tests/tags-and-terms/tags_and_terms_test.py b/smoke-test/tests/tags-and-terms/tags_and_terms_test.py index f043d10b0692a..a8e315801fdc0 100644 --- a/smoke-test/tests/tags-and-terms/tags_and_terms_test.py +++ b/smoke-test/tests/tags-and-terms/tags_and_terms_test.py @@ -1,7 +1,6 @@ import pytest -from tests.utils import FRONTEND_ENDPOINT -from tests.utils import ingest_file_via_rest -from tests.utils import delete_urns_from_file +from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest + @pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(request): @@ -11,12 +10,11 @@ def ingest_cleanup_data(request): print("removing test data") delete_urns_from_file("tests/tags-and-terms/data.json") + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) -def test_add_tag(frontend_session,wait_for_healthchecks): +def test_add_tag(frontend_session, wait_for_healthchecks): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-tags-terms-sample-kafka" - ) + dataset_name = "test-tags-terms-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -34,14 +32,12 @@ def test_add_tag(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } # Fetch tags response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -57,14 +53,14 @@ def test_add_tag(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "tagUrn": "urn:li:tag:Legacy", - "resourceUrn": dataset_urn, + "tagUrn": "urn:li:tag:Legacy", + "resourceUrn": dataset_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_json + f"{get_frontend_url()}/api/v2/graphql", json=add_json ) response.raise_for_status() res_data = response.json() @@ -75,7 +71,7 @@ def test_add_tag(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -83,7 +79,17 @@ def test_add_tag(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["globalTags"] == {'tags': [{'tag': {'description': 'Indicates the dataset is no longer supported', 'name': 'Legacy', 'urn': 'urn:li:tag:Legacy'}}]} + assert res_data["data"]["dataset"]["globalTags"] == { + "tags": [ + { + "tag": { + "description": "Indicates the dataset is no longer supported", + "name": "Legacy", + "urn": "urn:li:tag:Legacy", + } + } + ] + } remove_json = { "query": """mutation removeTag($input: TagAssociationInput!) {\n @@ -91,14 +97,14 @@ def test_add_tag(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "tagUrn": "urn:li:tag:Legacy", - "resourceUrn": dataset_urn, + "tagUrn": "urn:li:tag:Legacy", + "resourceUrn": dataset_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=remove_json + f"{get_frontend_url()}/api/v2/graphql", json=remove_json ) response.raise_for_status() res_data = response.json() @@ -111,7 +117,7 @@ def test_add_tag(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -119,10 +125,11 @@ def test_add_tag(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["globalTags"] == {'tags': [] } + assert res_data["data"]["dataset"]["globalTags"] == {"tags": []} + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) -def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): +def test_add_tag_to_chart(frontend_session, wait_for_healthchecks): chart_urn = "urn:li:chart:(looker,test-tags-terms-sample-chart)" chart_json = { @@ -139,14 +146,12 @@ def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": { - "urn": chart_urn - } + "variables": {"urn": chart_urn}, } # Fetch tags response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=chart_json + f"{get_frontend_url()}/api/v2/graphql", json=chart_json ) response.raise_for_status() res_data = response.json() @@ -162,14 +167,14 @@ def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "tagUrn": "urn:li:tag:Legacy", - "resourceUrn": chart_urn, + "tagUrn": "urn:li:tag:Legacy", + "resourceUrn": chart_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_json + f"{get_frontend_url()}/api/v2/graphql", json=add_json ) response.raise_for_status() res_data = response.json() @@ -180,7 +185,7 @@ def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=chart_json + f"{get_frontend_url()}/api/v2/graphql", json=chart_json ) response.raise_for_status() res_data = response.json() @@ -188,7 +193,17 @@ def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["chart"] - assert res_data["data"]["chart"]["globalTags"] == {'tags': [{'tag': {'description': 'Indicates the dataset is no longer supported', 'name': 'Legacy', 'urn': 'urn:li:tag:Legacy'}}]} + assert res_data["data"]["chart"]["globalTags"] == { + "tags": [ + { + "tag": { + "description": "Indicates the dataset is no longer supported", + "name": "Legacy", + "urn": "urn:li:tag:Legacy", + } + } + ] + } remove_json = { "query": """mutation removeTag($input: TagAssociationInput!) {\n @@ -196,14 +211,14 @@ def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "tagUrn": "urn:li:tag:Legacy", - "resourceUrn": chart_urn, + "tagUrn": "urn:li:tag:Legacy", + "resourceUrn": chart_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=remove_json + f"{get_frontend_url()}/api/v2/graphql", json=remove_json ) response.raise_for_status() res_data = response.json() @@ -214,7 +229,7 @@ def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=chart_json + f"{get_frontend_url()}/api/v2/graphql", json=chart_json ) response.raise_for_status() res_data = response.json() @@ -222,14 +237,13 @@ def test_add_tag_to_chart(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["chart"] - assert res_data["data"]["chart"]["globalTags"] == {'tags': [] } + assert res_data["data"]["chart"]["globalTags"] == {"tags": []} + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) -def test_add_term(frontend_session,wait_for_healthchecks): +def test_add_term(frontend_session, wait_for_healthchecks): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-tags-terms-sample-kafka" - ) + dataset_name = "test-tags-terms-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -246,15 +260,12 @@ def test_add_term(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } - # Fetch the terms response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -270,14 +281,14 @@ def test_add_term(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "termUrn": "urn:li:glossaryTerm:SavingAccount", - "resourceUrn": dataset_urn, + "termUrn": "urn:li:glossaryTerm:SavingAccount", + "resourceUrn": dataset_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_json + f"{get_frontend_url()}/api/v2/graphql", json=add_json ) response.raise_for_status() res_data = response.json() @@ -290,7 +301,7 @@ def test_add_term(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -298,7 +309,16 @@ def test_add_term(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["glossaryTerms"] == {'terms': [{'term': {'name': 'SavingAccount', 'urn': 'urn:li:glossaryTerm:SavingAccount'}}]} + assert res_data["data"]["dataset"]["glossaryTerms"] == { + "terms": [ + { + "term": { + "name": "SavingAccount", + "urn": "urn:li:glossaryTerm:SavingAccount", + } + } + ] + } remove_json = { "query": """mutation removeTerm($input: TermAssociationInput!) {\n @@ -306,14 +326,14 @@ def test_add_term(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "termUrn": "urn:li:glossaryTerm:SavingAccount", - "resourceUrn": dataset_urn, + "termUrn": "urn:li:glossaryTerm:SavingAccount", + "resourceUrn": dataset_urn, } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=remove_json + f"{get_frontend_url()}/api/v2/graphql", json=remove_json ) response.raise_for_status() res_data = response.json() @@ -325,7 +345,7 @@ def test_add_term(frontend_session,wait_for_healthchecks): assert res_data["data"]["removeTerm"] is True # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_json + f"{get_frontend_url()}/api/v2/graphql", json=dataset_json ) response.raise_for_status() res_data = response.json() @@ -333,14 +353,13 @@ def test_add_term(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["glossaryTerms"] == {'terms': []} + assert res_data["data"]["dataset"]["glossaryTerms"] == {"terms": []} + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) -def test_update_schemafield(frontend_session,wait_for_healthchecks): +def test_update_schemafield(frontend_session, wait_for_healthchecks): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-tags-terms-sample-kafka" - ) + dataset_name = "test-tags-terms-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -367,12 +386,10 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } - dataset_schema_json_tags = { + dataset_schema_json_tags = { "query": """query getDataset($urn: String!) {\n dataset(urn: $urn) {\n urn\n @@ -396,12 +413,10 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } - dataset_schema_json_description = { + dataset_schema_json_description = { "query": """query getDataset($urn: String!) {\n dataset(urn: $urn) {\n urn\n @@ -417,14 +432,12 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": { - "urn": dataset_urn - } + "variables": {"urn": dataset_urn}, } # dataset schema tags response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_tags + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_tags ) response.raise_for_status() res_data = response.json() @@ -440,16 +453,16 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "tagUrn": "urn:li:tag:Legacy", - "resourceUrn": dataset_urn, - "subResource": "[version=2.0].[type=boolean].field_bar", - "subResourceType": "DATASET_FIELD" + "tagUrn": "urn:li:tag:Legacy", + "resourceUrn": dataset_urn, + "subResource": "[version=2.0].[type=boolean].field_bar", + "subResourceType": "DATASET_FIELD", } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_json + f"{get_frontend_url()}/api/v2/graphql", json=add_json ) response.raise_for_status() res_data = response.json() @@ -460,7 +473,7 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): # Refetch the dataset schema response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_tags + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_tags ) response.raise_for_status() res_data = response.json() @@ -468,7 +481,23 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["editableSchemaMetadata"] == {'editableSchemaFieldInfo': [{'globalTags': {'tags': [{'tag': {'description': 'Indicates the dataset is no longer supported', 'name': 'Legacy', 'urn': 'urn:li:tag:Legacy'}}]}}]} + assert res_data["data"]["dataset"]["editableSchemaMetadata"] == { + "editableSchemaFieldInfo": [ + { + "globalTags": { + "tags": [ + { + "tag": { + "description": "Indicates the dataset is no longer supported", + "name": "Legacy", + "urn": "urn:li:tag:Legacy", + } + } + ] + } + } + ] + } remove_json = { "query": """mutation removeTag($input: TagAssociationInput!) {\n @@ -476,16 +505,16 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "tagUrn": "urn:li:tag:Legacy", - "resourceUrn": dataset_urn, - "subResource": "[version=2.0].[type=boolean].field_bar", - "subResourceType": "DATASET_FIELD" + "tagUrn": "urn:li:tag:Legacy", + "resourceUrn": dataset_urn, + "subResource": "[version=2.0].[type=boolean].field_bar", + "subResourceType": "DATASET_FIELD", } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=remove_json + f"{get_frontend_url()}/api/v2/graphql", json=remove_json ) response.raise_for_status() res_data = response.json() @@ -498,7 +527,7 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_tags + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_tags ) response.raise_for_status() res_data = response.json() @@ -506,7 +535,9 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["editableSchemaMetadata"] == {'editableSchemaFieldInfo': [{'globalTags': {'tags': []}}]} + assert res_data["data"]["dataset"]["editableSchemaMetadata"] == { + "editableSchemaFieldInfo": [{"globalTags": {"tags": []}}] + } add_json = { "query": """mutation addTerm($input: TermAssociationInput!) {\n @@ -514,16 +545,16 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "termUrn": "urn:li:glossaryTerm:SavingAccount", - "resourceUrn": dataset_urn, - "subResource": "[version=2.0].[type=boolean].field_bar", - "subResourceType": "DATASET_FIELD" + "termUrn": "urn:li:glossaryTerm:SavingAccount", + "resourceUrn": dataset_urn, + "subResource": "[version=2.0].[type=boolean].field_bar", + "subResourceType": "DATASET_FIELD", } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=add_json + f"{get_frontend_url()}/api/v2/graphql", json=add_json ) response.raise_for_status() res_data = response.json() @@ -534,7 +565,7 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): # Refetch the dataset schema response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_terms + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_terms ) response.raise_for_status() res_data = response.json() @@ -542,7 +573,22 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["editableSchemaMetadata"] == {'editableSchemaFieldInfo': [{'glossaryTerms': {'terms': [{'term': {'name': 'SavingAccount', 'urn': 'urn:li:glossaryTerm:SavingAccount'}}]}}]} + assert res_data["data"]["dataset"]["editableSchemaMetadata"] == { + "editableSchemaFieldInfo": [ + { + "glossaryTerms": { + "terms": [ + { + "term": { + "name": "SavingAccount", + "urn": "urn:li:glossaryTerm:SavingAccount", + } + } + ] + } + } + ] + } remove_json = { "query": """mutation removeTerm($input: TermAssociationInput!) {\n @@ -550,16 +596,16 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "termUrn": "urn:li:glossaryTerm:SavingAccount", - "resourceUrn": dataset_urn, - "subResource": "[version=2.0].[type=boolean].field_bar", - "subResourceType": "DATASET_FIELD" + "termUrn": "urn:li:glossaryTerm:SavingAccount", + "resourceUrn": dataset_urn, + "subResource": "[version=2.0].[type=boolean].field_bar", + "subResourceType": "DATASET_FIELD", } - } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=remove_json + f"{get_frontend_url()}/api/v2/graphql", json=remove_json ) response.raise_for_status() res_data = response.json() @@ -570,7 +616,7 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_terms + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_terms ) response.raise_for_status() res_data = response.json() @@ -578,11 +624,13 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["editableSchemaMetadata"] == {'editableSchemaFieldInfo': [{'glossaryTerms': {'terms': []}}]} + assert res_data["data"]["dataset"]["editableSchemaMetadata"] == { + "editableSchemaFieldInfo": [{"glossaryTerms": {"terms": []}}] + } # dataset schema tags response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_tags + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_tags ) response.raise_for_status() res_data = response.json() @@ -593,17 +641,17 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): }""", "variables": { "input": { - "description": "new description", - "resourceUrn": dataset_urn, - "subResource": "[version=2.0].[type=boolean].field_bar", - "subResourceType": "DATASET_FIELD" + "description": "new description", + "resourceUrn": dataset_urn, + "subResource": "[version=2.0].[type=boolean].field_bar", + "subResourceType": "DATASET_FIELD", } - } + }, } # fetch no description response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_description + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_description ) response.raise_for_status() res_data = response.json() @@ -611,10 +659,12 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["editableSchemaMetadata"] == {'editableSchemaFieldInfo': [{ 'description': None }]} + assert res_data["data"]["dataset"]["editableSchemaMetadata"] == { + "editableSchemaFieldInfo": [{"description": None}] + } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=update_description_json + f"{get_frontend_url()}/api/v2/graphql", json=update_description_json ) response.raise_for_status() res_data = response.json() @@ -625,7 +675,7 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): # Refetch the dataset response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=dataset_schema_json_description + f"{get_frontend_url()}/api/v2/graphql", json=dataset_schema_json_description ) response.raise_for_status() res_data = response.json() @@ -633,4 +683,6 @@ def test_update_schemafield(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["dataset"] - assert res_data["data"]["dataset"]["editableSchemaMetadata"] == {'editableSchemaFieldInfo': [{'description': 'new description'}]} + assert res_data["data"]["dataset"]["editableSchemaMetadata"] == { + "editableSchemaFieldInfo": [{"description": "new description"}] + } diff --git a/smoke-test/tests/test_stateful_ingestion.py b/smoke-test/tests/test_stateful_ingestion.py index 85c8477e0875d..1cf39e59fb7c8 100644 --- a/smoke-test/tests/test_stateful_ingestion.py +++ b/smoke-test/tests/test_stateful_ingestion.py @@ -3,12 +3,13 @@ from datahub.ingestion.api.committable import StatefulCommittable from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource -from datahub.ingestion.source.sql.sql_common import \ - BaseSQLAlchemyCheckpointState +from datahub.ingestion.source.sql.sql_common import BaseSQLAlchemyCheckpointState from datahub.ingestion.source.state.checkpoint import Checkpoint from sqlalchemy import create_engine from sqlalchemy.sql import text +from tests.utils import get_gms_url + def test_stateful_ingestion(wait_for_healthchecks): def create_mysql_engine(mysql_source_config_dict: Dict[str, Any]) -> Any: @@ -57,7 +58,7 @@ def get_current_checkpoint_from_pipeline( "remove_stale_metadata": True, "state_provider": { "type": "datahub", - "config": {"datahub_api": {"server": "http://localhost:8080"}}, + "config": {"datahub_api": {"server": get_gms_url()}}, }, }, } @@ -69,13 +70,13 @@ def get_current_checkpoint_from_pipeline( }, "sink": { "type": "datahub-rest", - "config": {"server": "http://localhost:8080"}, + "config": {"server": get_gms_url()}, }, "pipeline_name": "mysql_stateful_ingestion_smoke_test_pipeline", "reporting": [ { "type": "datahub", - "config": {"datahub_api": {"server": "http://localhost:8080"}}, + "config": {"datahub_api": {"server": get_gms_url()}}, } ], } diff --git a/smoke-test/tests/tests/tests_test.py b/smoke-test/tests/tests/tests_test.py index b677d0086189a..e0c460feb66ee 100644 --- a/smoke-test/tests/tests/tests_test.py +++ b/smoke-test/tests/tests/tests_test.py @@ -1,8 +1,7 @@ import pytest -import time -from tests.utils import FRONTEND_ENDPOINT -from tests.utils import ingest_file_via_rest -from tests.utils import delete_urns_from_file + +from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest + @pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(request): @@ -12,17 +11,20 @@ def ingest_cleanup_data(request): print("removing test data") delete_urns_from_file("tests/tests/data.json") + @pytest.mark.dependency() def test_healthchecks(wait_for_healthchecks): # Call to wait_for_healthchecks fixture will do the actual functionality. pass + test_id = "test id" test_name = "test name" test_category = "test category" test_description = "test description" test_description = "test description" + def create_test(frontend_session): # Create new Test @@ -31,20 +33,18 @@ def create_test(frontend_session): createTest(input: $input) }""", "variables": { - "input": { - "id": test_id, - "name": test_name, - "category": test_category, - "description": test_description, - "definition": { - "json": "{}" - } - } - } + "input": { + "id": test_id, + "name": test_name, + "category": test_category, + "description": test_description, + "definition": {"json": "{}"}, + } + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=create_test_json + f"{get_frontend_url()}/api/v2/graphql", json=create_test_json ) response.raise_for_status() res_data = response.json() @@ -56,23 +56,23 @@ def create_test(frontend_session): return res_data["data"]["createTest"] + def delete_test(frontend_session, test_urn): delete_test_json = { "query": """mutation deleteTest($urn: String!) {\n deleteTest(urn: $urn) }""", - "variables": { - "urn": test_urn - } + "variables": {"urn": test_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=delete_test_json + f"{get_frontend_url()}/api/v2/graphql", json=delete_test_json ) response.raise_for_status() + @pytest.mark.dependency(depends=["test_healthchecks"]) -def test_create_test(frontend_session,wait_for_healthchecks): +def test_create_test(frontend_session, wait_for_healthchecks): test_urn = create_test(frontend_session) @@ -89,12 +89,10 @@ def test_create_test(frontend_session,wait_for_healthchecks): }\n } }""", - "variables": { - "urn": test_urn - } + "variables": {"urn": test_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_test_json + f"{get_frontend_url()}/api/v2/graphql", json=get_test_json ) response.raise_for_status() res_data = response.json() @@ -102,13 +100,13 @@ def test_create_test(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["test"] == { - "urn": test_urn, - "name": test_name, - "category": test_category, - "description": test_description, - "definition": { - "json": "{}", - } + "urn": test_urn, + "name": test_name, + "category": test_category, + "description": test_description, + "definition": { + "json": "{}", + }, } assert "errors" not in res_data @@ -117,7 +115,7 @@ def test_create_test(frontend_session,wait_for_healthchecks): # Ensure the test no longer exists response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_test_json + f"{get_frontend_url()}/api/v2/graphql", json=get_test_json ) response.raise_for_status() res_data = response.json() @@ -127,7 +125,7 @@ def test_create_test(frontend_session,wait_for_healthchecks): @pytest.mark.dependency(depends=["test_healthchecks", "test_create_test"]) -def test_update_test(frontend_session,wait_for_healthchecks): +def test_update_test(frontend_session, wait_for_healthchecks): test_urn = create_test(frontend_session) test_name = "new name" test_category = "new category" @@ -140,20 +138,18 @@ def test_update_test(frontend_session,wait_for_healthchecks): updateTest(urn: $urn, input: $input) }""", "variables": { - "urn": test_urn, - "input": { - "name": test_name, - "category": test_category, - "description": test_description, - "definition": { - "json": "{}" - } - } - } + "urn": test_urn, + "input": { + "name": test_name, + "category": test_category, + "description": test_description, + "definition": {"json": "{}"}, + }, + }, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=update_test_json + f"{get_frontend_url()}/api/v2/graphql", json=update_test_json ) response.raise_for_status() res_data = response.json() @@ -176,12 +172,10 @@ def test_update_test(frontend_session,wait_for_healthchecks): }\n } }""", - "variables": { - "urn": test_urn - } + "variables": {"urn": test_urn}, } response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=get_test_json + f"{get_frontend_url()}/api/v2/graphql", json=get_test_json ) response.raise_for_status() res_data = response.json() @@ -189,22 +183,23 @@ def test_update_test(frontend_session,wait_for_healthchecks): assert res_data assert res_data["data"] assert res_data["data"]["test"] == { - "urn": test_urn, - "name": test_name, - "category": test_category, - "description": test_description, - "definition": { - "json": "{}", - } + "urn": test_urn, + "name": test_name, + "category": test_category, + "description": test_description, + "definition": { + "json": "{}", + }, } assert "errors" not in res_data delete_test(frontend_session, test_urn) + @pytest.mark.dependency(depends=["test_healthchecks", "test_update_test"]) -def test_list_tests(frontend_session,wait_for_healthchecks): - list_tests_json = { - "query": """query listTests($input: ListTestsInput!) {\n +def test_list_tests(frontend_session, wait_for_healthchecks): + list_tests_json = { + "query": """query listTests($input: ListTestsInput!) {\n listTests(input: $input) {\n start\n count\n @@ -214,30 +209,27 @@ def test_list_tests(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": { - "input": { - "start": "0", - "count": "20" - } - } - } - - response = frontend_session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=list_tests_json - ) - response.raise_for_status() - res_data = response.json() - - assert res_data - assert res_data["data"] - assert res_data["data"]["listTests"]["total"] >= 2 - assert len(res_data["data"]["listTests"]["tests"]) >= 2 - assert "errors" not in res_data + "variables": {"input": {"start": "0", "count": "20"}}, + } + + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=list_tests_json + ) + response.raise_for_status() + res_data = response.json() + + assert res_data + assert res_data["data"] + assert res_data["data"]["listTests"]["total"] >= 2 + assert len(res_data["data"]["listTests"]["tests"]) >= 2 + assert "errors" not in res_data @pytest.mark.dependency(depends=["test_healthchecks"]) -def test_get_test_results(frontend_session,wait_for_healthchecks): - urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tests-sample,PROD)" # Test urn +def test_get_test_results(frontend_session, wait_for_healthchecks): + urn = ( + "urn:li:dataset:(urn:li:dataPlatform:kafka,test-tests-sample,PROD)" # Test urn + ) json = { "query": """query getDataset($urn: String!) {\n dataset(urn: $urn) {\n @@ -258,9 +250,9 @@ def test_get_test_results(frontend_session,wait_for_healthchecks): }\n }\n }""", - "variables": {"urn": urn }, + "variables": {"urn": urn}, } - response = frontend_session.post(f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json) + response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() res_data = response.json() @@ -269,20 +261,6 @@ def test_get_test_results(frontend_session,wait_for_healthchecks): assert res_data["data"]["dataset"] assert res_data["data"]["dataset"]["urn"] == urn assert res_data["data"]["dataset"]["testResults"] == { - "failing": [ - { - "test": { - "urn": "urn:li:test:test-1" - }, - "type": "FAILURE" - } - ], - "passing": [ - { - "test": { - "urn": "urn:li:test:test-2" - }, - "type": "SUCCESS" - } - ] + "failing": [{"test": {"urn": "urn:li:test:test-1"}, "type": "FAILURE"}], + "passing": [{"test": {"urn": "urn:li:test:test-2"}, "type": "SUCCESS"}], } diff --git a/smoke-test/tests/tokens/revokable_access_token_test.py b/smoke-test/tests/tokens/revokable_access_token_test.py index 0c6733fbeaf1c..a28f33c4c3bd8 100644 --- a/smoke-test/tests/tokens/revokable_access_token_test.py +++ b/smoke-test/tests/tokens/revokable_access_token_test.py @@ -1,10 +1,9 @@ +from time import sleep + import pytest -import time import requests -from tests.utils import FRONTEND_ENDPOINT -from time import sleep -from tests.utils import ingest_file_via_rest -from datahub.cli.ingest_cli import get_session_and_host +from tests.utils import get_frontend_url, ingest_file_via_rest + @pytest.fixture(autouse=True) def test_setup(): @@ -28,7 +27,8 @@ def test_setup(): # Clean up res_data = listAccessTokens(admin_session) for metadata in res_data["data"]["listAccessTokens"]["tokens"]: - revokeAccessToken(admin_session, metadata["id"]) + revokeAccessToken(admin_session, metadata["id"]) + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_admin_can_create_list_and_revoke_tokens(): @@ -47,7 +47,10 @@ def test_admin_can_create_list_and_revoke_tokens(): assert res_data["data"] assert res_data["data"]["createAccessToken"] assert res_data["data"]["createAccessToken"]["accessToken"] - assert res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] == "urn:li:corpuser:datahub" + assert ( + res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] + == "urn:li:corpuser:datahub" + ) admin_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"] # Using a super account, list the previously created token. @@ -56,8 +59,14 @@ def test_admin_can_create_list_and_revoke_tokens(): assert res_data["data"] assert res_data["data"]["listAccessTokens"]["total"] is not None assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1 - assert res_data["data"]["listAccessTokens"]["tokens"][1]["actorUrn"] == "urn:li:corpuser:datahub" - assert res_data["data"]["listAccessTokens"]["tokens"][1]["ownerUrn"] == "urn:li:corpuser:datahub" + assert ( + res_data["data"]["listAccessTokens"]["tokens"][1]["actorUrn"] + == "urn:li:corpuser:datahub" + ) + assert ( + res_data["data"]["listAccessTokens"]["tokens"][1]["ownerUrn"] + == "urn:li:corpuser:datahub" + ) # Check that the super account can revoke tokens that it created res_data = revokeAccessToken(admin_session, admin_tokenId) @@ -73,6 +82,7 @@ def test_admin_can_create_list_and_revoke_tokens(): assert res_data["data"]["listAccessTokens"]["total"] is not None assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0 + @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_admin_can_create_and_revoke_tokens_for_other_user(): admin_session = loginAs("datahub", "datahub") @@ -90,7 +100,10 @@ def test_admin_can_create_and_revoke_tokens_for_other_user(): assert res_data["data"] assert res_data["data"]["createAccessToken"] assert res_data["data"]["createAccessToken"]["accessToken"] - assert res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] == "urn:li:corpuser:user" + assert ( + res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] + == "urn:li:corpuser:user" + ) user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"] # Using a super account, list the previously created tokens. @@ -99,8 +112,14 @@ def test_admin_can_create_and_revoke_tokens_for_other_user(): assert res_data["data"] assert res_data["data"]["listAccessTokens"]["total"] is not None assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1 - assert res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"] == "urn:li:corpuser:user" - assert res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"] == "urn:li:corpuser:datahub" + assert ( + res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"] + == "urn:li:corpuser:user" + ) + assert ( + res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"] + == "urn:li:corpuser:datahub" + ) # Check that the super account can revoke tokens that it created for another user res_data = revokeAccessToken(admin_session, user_tokenId) @@ -115,6 +134,8 @@ def test_admin_can_create_and_revoke_tokens_for_other_user(): assert res_data["data"] assert res_data["data"]["listAccessTokens"]["total"] is not None assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0 + + """ @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_non_admin_can_create_list_revoke_tokens(): @@ -215,8 +236,10 @@ def test_non_admin_can_not_generate_tokens_for_others(): assert res_data["errors"] assert res_data["errors"][0]["message"] == "Unauthorized to perform this action. Please contact your DataHub administrator." """ + + def generateAccessToken_v1(session, actorUrn): - # Create new token + # Create new token json = { "query": """query getAccessToken($input: GetAccessTokenInput!) {\n getAccessToken(input: $input) {\n @@ -224,22 +247,17 @@ def generateAccessToken_v1(session, actorUrn): }\n }""", "variables": { - "input": { - "type": "PERSONAL", - "actorUrn": actorUrn, - "duration": "ONE_HOUR" - } - } + "input": {"type": "PERSONAL", "actorUrn": actorUrn, "duration": "ONE_HOUR"} + }, } - response = session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() return response.json() + def generateAccessToken_v2(session, actorUrn): - # Create new token + # Create new token json = { "query": """mutation createAccessToken($input: CreateAccessTokenInput!) {\n createAccessToken(input: $input) {\n @@ -254,32 +272,31 @@ def generateAccessToken_v2(session, actorUrn): }\n }""", "variables": { - "input": { - "type": "PERSONAL", - "actorUrn": actorUrn, - "duration": "ONE_HOUR", - "name": "my token" - } - } + "input": { + "type": "PERSONAL", + "actorUrn": actorUrn, + "duration": "ONE_HOUR", + "name": "my token", + } + }, } - response = session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() sleep(5) return response.json() + def listAccessTokens(session, filters=[]): # Get count of existing tokens input = { - "start": "0", - "count": "20", + "start": "0", + "count": "20", } if filters: - input['filters'] = filters + input["filters"] = filters json = { "query": """query listAccessTokens($input: ListAccessTokenInput!) {\n @@ -295,45 +312,37 @@ def listAccessTokens(session, filters=[]): }\n }\n }""", - "variables": { - "input": input - } + "variables": {"input": input}, } - response = session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) response.raise_for_status() return response.json() + def revokeAccessToken(session, tokenId): # Revoke token json = { "query": """mutation revokeAccessToken($tokenId: String!) {\n revokeAccessToken(tokenId: $tokenId) }""", - "variables": { - "tokenId": tokenId - } + "variables": {"tokenId": tokenId}, } - response = session.post( - f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json - ) + response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) sleep(5) response.raise_for_status() return response.json() + def loginAs(username, password): session = requests.Session() headers = { "Content-Type": "application/json", } - data = '{"username":"' + username +'", "password":"' + password + '"}' - response = session.post( - f"{FRONTEND_ENDPOINT}/logIn", headers=headers, data=data - ) + data = '{"username":"' + username + '", "password":"' + password + '"}' + response = session.post(f"{get_frontend_url()}/logIn", headers=headers, data=data) response.raise_for_status() - return session \ No newline at end of file + return session diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index 354a98c8e473d..350b3a06c9931 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -1,12 +1,30 @@ import json +import os +from typing import Any import requests -from typing import Any from datahub.cli import cli_utils from datahub.ingestion.run.pipeline import Pipeline -GMS_ENDPOINT = "http://localhost:8080" -FRONTEND_ENDPOINT = "http://localhost:9002" + +def get_gms_url(): + return os.getenv("DATAHUB_GMS_URL") or "http://localhost:8080" + + +def get_frontend_url(): + return os.getenv("DATAHUB_FRONTEND_URL") or "http://localhost:9002" + + +def get_kafka_broker_url(): + return os.getenv("DATAHUB_KAFKA_URL") or "localhost:9092" + + +def get_sleep_info(): + return ( + os.environ.get("DATAHUB_TEST_SLEEP_BETWEEN") or 60, + os.environ.get("DATAHUB_TEST_SLEEP_TIMES") or 5, + ) + def ingest_file_via_rest(filename: str) -> Any: pipeline = Pipeline.create( @@ -17,7 +35,7 @@ def ingest_file_via_rest(filename: str) -> Any: }, "sink": { "type": "datahub-rest", - "config": {"server": GMS_ENDPOINT}, + "config": {"server": get_gms_url()}, }, } ) @@ -39,19 +57,19 @@ def delete_urns_from_file(filename: str) -> None: with open(filename) as f: d = json.load(f) for entry in d: - is_mcp = 'entityUrn' in entry + is_mcp = "entityUrn" in entry urn = None # Kill Snapshot if is_mcp: - urn = entry['entityUrn'] + urn = entry["entityUrn"] else: - snapshot_union = entry['proposedSnapshot'] - snapshot = list(snapshot_union.values())[0] - urn = snapshot['urn'] + snapshot_union = entry["proposedSnapshot"] + snapshot = list(snapshot_union.values())[0] + urn = snapshot["urn"] payload_obj = {"urn": urn} cli_utils.post_delete_endpoint_with_session_and_url( session, - GMS_ENDPOINT + "/entities?action=delete", + get_gms_url() + "/entities?action=delete", payload_obj, )