Skip to content

Commit

Permalink
fix(ingest): Closeable as a context manager (datahub-project#6067)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and cccs-tom committed Nov 18, 2022
1 parent 3500c00 commit d1e3355
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 26 deletions.
20 changes: 6 additions & 14 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import logging
import os
from json.decoder import JSONDecodeError
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import requests
from requests.adapters import HTTPAdapter, Retry
Expand All @@ -16,6 +15,7 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import _make_curl_command
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.ingestion.api.closeable import Closeable
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
Expand All @@ -25,7 +25,7 @@
logger = logging.getLogger(__name__)


class DataHubRestEmitter:
class DataHubRestEmitter(Closeable):
DEFAULT_CONNECT_TIMEOUT_SEC = 30 # 30 seconds should be plenty to connect
DEFAULT_READ_TIMEOUT_SEC = (
30 # Any ingest call taking longer than 30 seconds should be abandoned
Expand Down Expand Up @@ -142,17 +142,6 @@ def __init__(
timeout=(self._connect_timeout_sec, self._read_timeout_sec),
)

def __enter__(self) -> "DataHubRestEmitter":
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self._session.close()

def test_connection(self) -> dict:
response = self._session.get(f"{self._gms_server}/config")
if response.status_code == 200:
Expand Down Expand Up @@ -277,6 +266,9 @@ def __repr__(self) -> str:
f"DataHubRestEmitter: configured to talk to {self._gms_server}{token_str}"
)

def close(self) -> None:
self._session.close()


class DatahubRestEmitter(DataHubRestEmitter):
"""This class exists as a pass-through for backwards compatibility"""
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from datahub.ingestion.api.common import RecordEnvelope
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/closeable.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
from abc import abstractmethod
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Optional, Type


class Closeable:
class Closeable(AbstractContextManager):
@abstractmethod
def close(self) -> None:
pass

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PipelineContext:
def __init__(
self,
run_id: str,
datahub_api: Optional[DatahubClientConfig] = None,
datahub_api: Optional["DatahubClientConfig"] = None,
pipeline_name: Optional[str] = None,
dry_run: bool = False,
preview_mode: bool = False,
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def get_workunits(self) -> Iterable[WorkUnit]:
def get_report(self) -> SourceReport:
pass

def close(self) -> None:
pass


class TestableSource(Source):
@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api import RecordEnvelope
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.source import Extractor, WorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
Expand Down
6 changes: 0 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ def __init__(self, config: Union[DatahubClientConfig, DataHubGraphConfig]) -> No
self.server_id = "missing"
logger.debug(f"Failed to get server id due to {e}")

def __enter__(self):
return self

def __exit__(self, type, value, tb):
return super().__exit__(type, value, tb)

def _get_generic(self, url: str) -> Dict:
try:
response = self._session.get(url)
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/tests/integration/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:

schema-registry:
image: confluentinc/cp-schema-registry:7.2.0
restart: unless-stopped
env_file: schema-registry.env
container_name: test_schema_registry
depends_on:
Expand All @@ -31,5 +32,4 @@ services:
- "58081:8081"

volumes:
test_zkdata:

test_zkdata:

0 comments on commit d1e3355

Please sign in to comment.