Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add stats and performance logging #360

Merged
merged 8 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
)

from airbyte import exceptions as exc
from airbyte._util.connector_info import ConnectorRuntimeInfo
from airbyte._util.hashing import one_way_hash
from airbyte._util.telemetry import (
EventState,
log_config_validation_result,
Expand Down Expand Up @@ -76,6 +78,15 @@ def name(self) -> str:
"""Get the name of the connector."""
return self._name

def _get_connector_runtime_info(self) -> ConnectorRuntimeInfo:
"""Get metadata for telemetry and performance logging."""
return ConnectorRuntimeInfo(
name=self.name,
version=self.connector_version,
executor_type=type(self.executor).__name__,
config_hash=self.config_hash,
)

def _print_info_message(
self,
message: str,
Expand Down Expand Up @@ -124,6 +135,22 @@ def _config(self) -> dict[str, Any]:
)
return self._config_dict

@property
def config_hash(self) -> str | None:
"""Get a hash of the current config.

Returns None if the config is not set.
"""
if self._config_dict is None:
return None

try:
return one_way_hash(self._config_dict)
except Exception:
# This can fail if there are unhashable values in the config,
# or unexpected data types. In this case, return None.
return None

def validate_config(self, config: dict[str, Any] | None = None) -> None:
"""Validate the config against the spec.

Expand Down Expand Up @@ -262,7 +289,11 @@ def connector_version(self) -> str | None:

Returns None if the version cannot be determined.
"""
return self.executor.get_installed_version()
try:
return self.executor.get_installed_version()
except Exception:
# Version not detected, so return None.
return None

def check(self) -> None:
"""Call check on the connector.
Expand Down
10 changes: 10 additions & 0 deletions airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ def __init__(

self.reported_version: str | None = self._manifest_dict.get("version", None)

def get_installed_version(
self,
*,
raise_on_error: bool = False,
recheck: bool = False,
) -> str | None:
"""Detect the version of the connector installed."""
_ = raise_on_error, recheck # Not used
return self.reported_version

def _validate_manifest(self, manifest_dict: dict) -> None:
"""Validate the manifest."""
manifest_text = yaml.safe_dump(manifest_dict)
Expand Down
25 changes: 24 additions & 1 deletion airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
from collections.abc import Iterator
from typing import IO, TYPE_CHECKING, cast

import pendulum
import pydantic
from typing_extensions import final

from airbyte_protocol.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
StreamDescriptor,
TraceType,
Type,
)

from airbyte.constants import AB_EXTRACTED_AT_COLUMN
from airbyte.progress import _new_stream_success_message


if TYPE_CHECKING:
Expand All @@ -28,6 +33,24 @@
from airbyte.results import ReadResult


def _new_stream_success_message(stream_name: str) -> AirbyteMessage:
"""Return a new stream success message."""
return AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
stream=stream_name,
emitted_at=pendulum.now().float_timestamp,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(
name=stream_name,
),
status=AirbyteStreamStatus.COMPLETE,
),
),
)


class AirbyteMessageIterator:
"""Abstract base class for Airbyte message iterables.

Expand Down
30 changes: 30 additions & 0 deletions airbyte/_util/connector_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Connector info classes for PyAirbyte.

Used for telemetry and logging.
"""

from __future__ import annotations

from dataclasses import asdict, dataclass
from typing import Any


@dataclass
class RuntimeInfoBase:
def to_dict(self) -> dict[str, Any]:
return {k: v for k, v in asdict(self).items() if v is not None}


@dataclass
class WriterRuntimeInfo(RuntimeInfoBase):
type: str
config_hash: str | None = None


@dataclass(kw_only=True)
class ConnectorRuntimeInfo(RuntimeInfoBase):
name: str
executor_type: str | None = None
version: str | None = None
config_hash: str | None = None
35 changes: 35 additions & 0 deletions airbyte/_util/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Hashing utils for PyAirbyte."""

from __future__ import annotations

import hashlib
from collections.abc import Mapping


HASH_SEED = "PyAirbyte:"
"""Additional seed for randomizing one-way hashed strings."""


def one_way_hash(
obj: Mapping | list | object,
/,
) -> str:
"""Return a one-way hash of the given string.

To ensure a unique domain of hashes, we prepend a seed to the string before hashing.
"""
string_to_hash: str
if isinstance(obj, Mapping):
# Recursively sort and convert nested dictionaries to tuples of key-value pairs
string_to_hash = str(sorted((k, one_way_hash(v)) for k, v in obj.items()))

elif isinstance(obj, list):
# Recursively hash elements of the list
string_to_hash = str([one_way_hash(item) for item in obj])

else:
# Convert the object to a string
string_to_hash = str(obj)

return hashlib.sha256((HASH_SEED + str(string_to_hash)).encode()).hexdigest()
123 changes: 21 additions & 102 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,39 +32,31 @@
from __future__ import annotations

import datetime
import hashlib
import os
from contextlib import suppress
from dataclasses import asdict, dataclass
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import Any, cast

import requests
import ulid
import yaml

from airbyte import exceptions as exc
from airbyte._util import meta
from airbyte._util.connector_info import (
ConnectorRuntimeInfo,
WriterRuntimeInfo,
)
from airbyte._util.hashing import one_way_hash
from airbyte.version import get_version


if TYPE_CHECKING:
from airbyte._writers.base import AirbyteWriterInterface
from airbyte.caches.base import CacheBase
from airbyte.destinations.base import Destination
from airbyte.sources.base import Source


DEBUG = True
"""Enable debug mode for telemetry code."""


HASH_SEED = "PyAirbyte:"
"""Additional seed for randomizing one-way hashed strings."""


PYAIRBYTE_APP_TRACKING_KEY = (
os.environ.get("AIRBYTE_TRACKING_KEY", "") or "cukeSffc0G6gFQehKDhhzSurDzVSZ2OP"
)
Expand Down Expand Up @@ -185,83 +177,6 @@ class EventType(str, Enum):
CHECK = "check"


@dataclass
class CacheTelemetryInfo:
type: str

@classmethod
def from_cache(cls, cache: CacheBase | None) -> CacheTelemetryInfo:
if not cache:
return cls(type="streaming")

return cls(type=type(cache).__name__)


@dataclass
class SourceTelemetryInfo:
name: str
executor_type: str
version: str | None

@classmethod
def from_source(cls, source: Source | str) -> SourceTelemetryInfo:
if isinstance(source, str):
return cls(
name=str(source),
executor_type=UNKNOWN,
version=UNKNOWN,
)

# Else, `source` should be a `Source` object at this point
return cls(
name=source.name,
executor_type=type(source.executor).__name__,
version=source.executor.reported_version,
)


@dataclass
class DestinationTelemetryInfo:
name: str
executor_type: str
version: str | None

@classmethod
def from_destination(
cls,
destination: Destination | AirbyteWriterInterface | str | None,
) -> DestinationTelemetryInfo:
if not destination:
return cls(name=UNKNOWN, executor_type=UNKNOWN, version=UNKNOWN)

if isinstance(destination, str):
return cls(name=destination, executor_type=UNKNOWN, version=UNKNOWN)

if hasattr(destination, "executor"):
return cls(
name=destination.name,
executor_type=type(destination.executor).__name__,
version=destination.executor.reported_version,
)

return cls(
name=repr(destination),
executor_type=UNKNOWN,
version=UNKNOWN,
)


def one_way_hash(
string_to_hash: Any, # noqa: ANN401 # Allow Any type
/,
) -> str:
"""Return a one-way hash of the given string.

To ensure a unique domain of hashes, we prepend a seed to the string before hashing.
"""
return hashlib.sha256((HASH_SEED + str(string_to_hash)).encode()).hexdigest()


@lru_cache
def get_env_flags() -> dict[str, Any]:
flags: dict[str, bool | str] = {
Expand All @@ -283,9 +198,9 @@ def get_env_flags() -> dict[str, Any]:

def send_telemetry(
*,
source: Source | str | None,
destination: Destination | AirbyteWriterInterface | str | None,
cache: CacheBase | None,
source: ConnectorRuntimeInfo | None,
destination: ConnectorRuntimeInfo | None,
cache: WriterRuntimeInfo | None,
state: EventState,
event_type: EventType,
number_of_records: int | None = None,
Expand All @@ -297,8 +212,6 @@ def send_telemetry(

payload_props: dict[str, str | int | dict] = {
"session_id": PYAIRBYTE_SESSION_ID,
"cache": asdict(CacheTelemetryInfo.from_cache(cache)),
"destination": asdict(DestinationTelemetryInfo.from_destination(destination)),
"state": state,
"version": get_version(),
"python_version": meta.get_python_version(),
Expand All @@ -308,7 +221,13 @@ def send_telemetry(
}

if source:
payload_props["source"] = asdict(SourceTelemetryInfo.from_source(source))
payload_props["source"] = source.to_dict()

if destination:
payload_props["destination"] = destination.to_dict()

if cache:
payload_props["cache"] = cache.to_dict()

if exception:
if isinstance(exception, exc.AirbyteError):
Expand Down Expand Up @@ -345,8 +264,8 @@ def log_config_validation_result(
treated as a source name.
"""
send_telemetry(
source=name if not name.startswith("destination-") else None,
destination=name if name.startswith("destination-") else None,
source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None,
destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None,
cache=None,
state=state,
event_type=EventType.VALIDATE,
Expand All @@ -365,8 +284,8 @@ def log_connector_check_result(
treated as a source name.
"""
send_telemetry(
source=name if not name.startswith("destination-") else None,
destination=name if name.startswith("destination-") else None,
source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None,
destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None,
cache=None,
state=state,
event_type=EventType.CHECK,
Expand All @@ -381,7 +300,7 @@ def log_install_state(
) -> None:
"""Log an install event."""
send_telemetry(
source=name,
source=ConnectorRuntimeInfo(name=name),
destination=None,
cache=None,
state=state,
Expand Down
Loading
Loading