Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic snowplow tracker with file logger for testing #190

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
16 changes: 16 additions & 0 deletions dbt_common/events/base_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ def msg_from_base_event(event: BaseEvent, level: Optional[EventLevel] = None):
return new_event


def msg_to_dict(msg: EventMsg) -> dict:
msg_dict = MessageToDict(
msg,
preserving_proto_field_name=True,
including_default_value_fields=True, # type: ignore
)
# We don't want an empty NodeInfo in output
if (
"data" in msg_dict
and "node_info" in msg_dict["data"]
and msg_dict["data"]["node_info"]["node_name"] == ""
):
del msg_dict["data"]["node_info"]
return msg_dict


# DynamicLevel requires that the level be supplied on the
# event construction call using the "info" function from functions.py
class DynamicLevel(BaseEvent):
Expand Down
32 changes: 32 additions & 0 deletions dbt_common/events/cookie.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pathlib import Path
import uuid
from typing import Any, Dict

import yaml

# the C version is faster, but it doesn't always exist
try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader


class Cookie:
def __init__(self, directory: Path) -> None:
self.id: str = str(uuid.uuid4())
self.path: Path = directory / ".user.yml"
self.save()

def as_dict(self) -> Dict[str, Any]:
return {"id": self.id}

def save(self) -> None:
with open(self.path, "w") as fh:
yaml.dump(self.as_dict(), fh)

def load(self) -> Dict[str, Any]:
with open(self.path, "r") as fh:
try:
return yaml.load(fh, Loader=SafeLoader)
except yaml.reader.ReaderError:
return {}
18 changes: 18 additions & 0 deletions dbt_common/events/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
from typing import List, Optional, Protocol, Tuple

from dbt_common.events.base_types import BaseEvent, EventLevel, msg_from_base_event, TCallback
from dbt_common.events.functions import track, tracker_factory
from dbt_common.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat
from dbt_common.events.tracker import TrackerConfig, Tracker
from dbt_common.events.user import User


class EventManager:
def __init__(self) -> None:
self.loggers: List[_Logger] = []
self.trackers: List[Tracker] = []
self.callbacks: List[TCallback] = []
self.user: Optional[User] = None

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
msg = msg_from_base_event(e, level=level)
Expand All @@ -28,6 +33,9 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
if logger.filter(msg): # type: ignore
logger.write_line(msg)

for tracker in self.trackers:
track(tracker, self.user, msg)

for callback in self.callbacks:
callback(msg)

Expand All @@ -37,9 +45,15 @@ def add_logger(self, config: LoggerConfig) -> None:
)
self.loggers.append(logger)

def add_tracker(self, config: TrackerConfig) -> None:
self.trackers.append(tracker_factory(config))

def add_callback(self, callback: TCallback) -> None:
self.callbacks.append(callback)

def add_user(self, user: User) -> None:
self.user = user

def flush(self) -> None:
for logger in self.loggers:
logger.flush()
Expand All @@ -48,13 +62,17 @@ def flush(self) -> None:
class IEventManager(Protocol):
callbacks: List[TCallback]
loggers: List[_Logger]
trackers: List[Tracker]

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
...

def add_logger(self, config: LoggerConfig) -> None:
...

def add_tracker(self, config: TrackerConfig) -> None:
...

def add_callback(self, callback: TCallback) -> None:
...

Expand Down
6 changes: 6 additions & 0 deletions dbt_common/events/event_manager_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ def add_logger_to_manager(logger) -> None:
_EVENT_MANAGER.add_logger(logger)


def add_tracker_to_manager(tracker) -> None:
global _EVENT_MANAGER
_EVENT_MANAGER.add_tracker(tracker)


def add_callback_to_manager(callback: TCallback) -> None:
global _EVENT_MANAGER
_EVENT_MANAGER.add_callback(callback)
Expand All @@ -32,4 +37,5 @@ def cleanup_event_logger() -> None:
# especially important for tests, since pytest replaces the stdout stream
# during test runs, and closes the stream after the test is over.
_EVENT_MANAGER.loggers.clear()
_EVENT_MANAGER.trackers.clear()
_EVENT_MANAGER.callbacks.clear()
104 changes: 77 additions & 27 deletions dbt_common/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
from pathlib import Path

from dbt_common.events.event_manager_client import get_event_manager
from dbt_common.exceptions import EventCompilationError
from dbt_common.invocation import get_invocation_id
from dbt_common.helper_types import WarnErrorOptions
from dbt_common.utils.encoding import ForgivingJSONEncoder
from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg
from dbt_common.events.logger import LoggerConfig, LineFormat
from dbt_common.exceptions import scrub_secrets, env_secrets
from dbt_common.events.types import Note
from functools import partial
import json
import os
from pathlib import Path
import sys
from typing import Callable, Dict, Optional, TextIO, Union
from google.protobuf.json_format import MessageToDict
from typing import Any, Callable, Dict, Optional, TextIO, Union

from dbt_common.helper_types import WarnErrorOptions
from dbt_common.invocation import get_invocation_id
from dbt_common.events.base_types import (
BaseEvent,
EventLevel,
EventMsg,
msg_to_dict as _msg_to_dict,
)
from dbt_common.events.cookie import Cookie
from dbt_common.events.event_manager_client import get_event_manager
from dbt_common.events.logger import LoggerConfig, LineFormat
from dbt_common.events.tracker import FileTracker, SnowplowTracker, Tracker, TrackerConfig
from dbt_common.events.types import Note, SendingEvent, SendEventFailure
from dbt_common.events.user import User
from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets
from dbt_common.utils.encoding import ForgivingJSONEncoder


LOG_VERSION = 3
metadata_vars: Optional[Dict[str, str]] = None
_METADATA_ENV_PREFIX = "DBT_ENV_CUSTOM_ENV_"
WARN_ERROR_OPTIONS = WarnErrorOptions(include=[], exclude=[])
WARN_ERROR = False


# This global, and the following two functions for capturing stdout logs are
# an unpleasant hack we intend to remove as part of API-ification. The GitHub
# issue #6350 was opened for that work.
Expand Down Expand Up @@ -92,26 +100,14 @@ def msg_to_json(msg: EventMsg) -> str:


def msg_to_dict(msg: EventMsg) -> dict:
msg_dict = dict()
try:
msg_dict = MessageToDict(
msg,
preserving_proto_field_name=True,
including_default_value_fields=True, # type: ignore
)
return _msg_to_dict(msg)
except Exception as exc:
event_type = type(msg).__name__
fire_event(
Note(msg=f"type {event_type} is not serializable. {str(exc)}"), level=EventLevel.WARN
)
# We don't want an empty NodeInfo in output
if (
"data" in msg_dict
and "node_info" in msg_dict["data"]
and msg_dict["data"]["node_info"]["node_name"] == ""
):
del msg_dict["data"]["node_info"]
return msg_dict
return {}


def warn_or_error(event, node=None) -> None:
Expand Down Expand Up @@ -153,3 +149,57 @@ def get_metadata_vars() -> Dict[str, str]:
def reset_metadata_vars() -> None:
global metadata_vars
metadata_vars = None


def tracker_factory(config: TrackerConfig) -> Tracker:
if all([config.invocation_id, config.endpoint, config.msg_schemas]):
return SnowplowTracker.from_config(config)
elif all([config.invocation_id, config.name, config.output_file_name]):
return FileTracker.from_config(config)
raise Exception("Invalid tracking configuration.")


def enable_tracking(tracker: Tracker, user: User):
cookie = _get_cookie(user)
user.enable_tracking(cookie)
tracker.enable_tracking(cookie)


def disable_tracking(tracker: Tracker, user: User):
user.disable_tracking()
tracker.disable_tracking()


def _get_cookie(user: User) -> Dict[str, Any]:
if cookie := user.cookie:
return cookie
return _set_cookie(user)


def _set_cookie(user: User) -> Dict[str, Any]:
"""
If the user points dbt to a profile directory which exists AND
contains a profiles.yml file, then we can set a cookie. If the
specified folder does not exist, or if there is not a profiles.yml
file in this folder, then an inconsistent cookie can be used. This
will change in every dbt invocation until the user points to a
profile dir file which contains a valid profiles.yml file.

See: https://github.com/dbt-labs/dbt-core/issues/1645
"""
if user.profile.exists():
cookie = Cookie(user.directory)
user.cookie = cookie.as_dict()
return user.cookie
return {}


def track(tracker: Tracker, user: User, msg: EventMsg) -> None:
if user.do_not_track:
return

fire_event(SendingEvent(kwargs=str(**msg_to_dict(msg))))
try:
tracker.track(msg)
except Exception:
fire_event(SendEventFailure())
121 changes: 121 additions & 0 deletions dbt_common/events/tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from dataclasses import dataclass
import logging
from logging.handlers import RotatingFileHandler
from typing import Any, Dict, Optional, Protocol, Self

import snowplow_tracker
from snowplow_tracker.typing import FailureCallback

from dbt_common.events.base_types import EventMsg, msg_to_dict
from dbt_common.events.format import timestamp_to_datetime_string


@dataclass
class TrackerConfig:
invocation_id: Optional[str] = None
msg_schemas: Optional[Dict[str, str]] = None
endpoint: Optional[str] = None
protocol: Optional[str] = "https"
on_failure: Optional[FailureCallback] = None
name: Optional[str] = None
output_file_name: Optional[str] = None
output_file_max_bytes: Optional[int] = 10 * 1024 * 1024 # 10 mb


class Tracker(Protocol):
@classmethod
def from_config(cls, config: TrackerConfig) -> Self:
...

def track(self, msg: EventMsg) -> None:
...

def enable_tracking(self, cookie: Dict[str, Any]) -> None:
...

def disable_tracking(self) -> None:
...


class FileTracker(Tracker):
def __init__(self, logger: logging.Logger, invocation_id: Optional[str]) -> None:
self.logger = logger
self.invocation_id = invocation_id

@classmethod
def from_config(cls, config: TrackerConfig) -> Self:
file_handler = RotatingFileHandler(
filename=config.output_file_name,
maxBytes=config.output_file_max_bytes, # type: ignore
backupCount=5,
encoding="utf8",
)
file_handler.setFormatter(logging.Formatter(fmt="%(message)s"))

logger = logging.getLogger(config.name)
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
logger.propagate = False
logger.addHandler(file_handler)
return cls(logger, config.invocation_id)

def track(self, msg: EventMsg) -> None:
ts: str = timestamp_to_datetime_string(msg.info.ts)
log_line = f"{ts} | {msg.info.msg}"
self.logger.debug(log_line)

def enable_tracking(self, cookie: Dict[str, Any]) -> None:
pass

def disable_tracking(self) -> None:
pass


class SnowplowTracker(Tracker):
def __init__(
self,
tracker: snowplow_tracker.Tracker,
msg_schemas: Dict[str, str],
invocation_id: Optional[str],
) -> None:
self.tracker = tracker
self.msg_schemas = msg_schemas
self.invocation_id = invocation_id

@classmethod
def from_config(cls, config: TrackerConfig) -> Self:
emitter = snowplow_tracker.Emitter(
config.endpoint,
config.protocol,
method="post",
batch_size=30,
on_failure=config.on_failure,
byte_limit=None,
request_timeout=5.0,
)
tracker = snowplow_tracker.Tracker(
emitters=emitter,
namespace="cf",
app_id="dbt",
)
return cls(tracker, config.msg_schemas, config.invocation_id)

def track(self, msg: EventMsg) -> None:
data = msg_to_dict(msg)
schema = self.msg_schemas.get(msg.info.name)
context = [snowplow_tracker.SelfDescribingJson(schema, data)]
event = snowplow_tracker.StructuredEvent(
category="dbt",
action=msg.info.name,
label=self.invocation_id,
context=context,
)
self.tracker.track(event)

def enable_tracking(self, cookie: Dict[str, Any]) -> None:
subject = snowplow_tracker.Subject()
subject.set_user_id(cookie.get("id"))
self.tracker.set_subject(subject)

def disable_tracking(self) -> None:
self.tracker.set_subject(None)
Loading
Loading