diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index ba4a0cbeb20933..00eee29e212499 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -4,8 +4,7 @@ import logging import os from json.decoder import JSONDecodeError -from types import TracebackType -from typing import Any, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Union import requests from requests.adapters import HTTPAdapter, Retry @@ -16,6 +15,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.request_helper import _make_curl_command from datahub.emitter.serialization_helper import pre_json_transform +from datahub.ingestion.api.closeable import Closeable from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, MetadataChangeProposal, @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) -class DataHubRestEmitter: +class DataHubRestEmitter(Closeable): DEFAULT_CONNECT_TIMEOUT_SEC = 30 # 30 seconds should be plenty to connect DEFAULT_READ_TIMEOUT_SEC = ( 30 # Any ingest call taking longer than 30 seconds should be abandoned @@ -142,17 +142,6 @@ def __init__( timeout=(self._connect_timeout_sec, self._read_timeout_sec), ) - def __enter__(self) -> "DataHubRestEmitter": - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self._session.close() - def test_connection(self) -> dict: response = self._session.get(f"{self._gms_server}/config") if response.status_code == 200: @@ -277,6 +266,9 @@ def __repr__(self) -> str: f"DataHubRestEmitter: configured to talk to {self._gms_server}{token_str}" ) + def close(self) -> None: + self._session.close() + class DatahubRestEmitter(DataHubRestEmitter): """This class exists as a pass-through for backwards compatibility""" diff --git a/metadata-ingestion/src/datahub/ingestion/api/__init__.py b/metadata-ingestion/src/datahub/ingestion/api/__init__.py index 4ee23afa6f0652..e69de29bb2d1d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/__init__.py +++ b/metadata-ingestion/src/datahub/ingestion/api/__init__.py @@ -1 +0,0 @@ -from datahub.ingestion.api.common import RecordEnvelope diff --git a/metadata-ingestion/src/datahub/ingestion/api/closeable.py b/metadata-ingestion/src/datahub/ingestion/api/closeable.py index 59795bb2af131f..523174b9978b3c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/closeable.py +++ b/metadata-ingestion/src/datahub/ingestion/api/closeable.py @@ -1,7 +1,18 @@ from abc import abstractmethod +from contextlib import AbstractContextManager +from types import TracebackType +from typing import Optional, Type -class Closeable: +class Closeable(AbstractContextManager): @abstractmethod def close(self) -> None: pass + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.close() diff --git a/metadata-ingestion/src/datahub/ingestion/api/common.py b/metadata-ingestion/src/datahub/ingestion/api/common.py index e919be043fcf28..8e01e34ec2aa9f 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/common.py +++ b/metadata-ingestion/src/datahub/ingestion/api/common.py @@ -45,7 +45,7 @@ class PipelineContext: def __init__( self, run_id: str, - datahub_api: Optional[DatahubClientConfig] = None, + datahub_api: Optional["DatahubClientConfig"] = None, pipeline_name: Optional[str] = None, dry_run: bool = False, preview_mode: bool = False, diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 5f7444284460d9..70e5ce7db7e821 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -130,6 +130,9 @@ def get_workunits(self) -> Iterable[WorkUnit]: def get_report(self) -> SourceReport: pass + def close(self) -> None: + pass + class TestableSource(Source): @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py b/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py index fd74312b6439be..9c8fba9543c0b5 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py @@ -3,7 +3,7 @@ from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import get_sys_time from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api import RecordEnvelope +from datahub.ingestion.api.common import RecordEnvelope from datahub.ingestion.api.source import Extractor, WorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index cd0ec7b8a78571..69cfdc4515b26a 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -83,12 +83,6 @@ def __init__(self, config: Union[DatahubClientConfig, DataHubGraphConfig]) -> No self.server_id = "missing" logger.debug(f"Failed to get server id due to {e}") - def __enter__(self): - return self - - def __exit__(self, type, value, tb): - return super().__exit__(type, value, tb) - def _get_generic(self, url: str) -> Dict: try: response = self._session.get(url) diff --git a/metadata-ingestion/tests/integration/kafka/docker-compose.yml b/metadata-ingestion/tests/integration/kafka/docker-compose.yml index 4f00dd1ea884b0..54ab09b9c9308b 100644 --- a/metadata-ingestion/tests/integration/kafka/docker-compose.yml +++ b/metadata-ingestion/tests/integration/kafka/docker-compose.yml @@ -22,6 +22,7 @@ services: schema-registry: image: confluentinc/cp-schema-registry:7.2.0 + restart: unless-stopped env_file: schema-registry.env container_name: test_schema_registry depends_on: @@ -31,5 +32,4 @@ services: - "58081:8081" volumes: - test_zkdata: - + test_zkdata: