Skip to content

Commit

Permalink
OpenTelemetry init (#5426)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Michał Bartoszkiewicz <[email protected]>
GitOrigin-RevId: a18273a568e7ca5c230702cd582e695861380d2f
  • Loading branch information
2 people authored and Manul from Pathway committed Feb 2, 2024
1 parent 1ec6b44 commit 9eddc59
Show file tree
Hide file tree
Showing 18 changed files with 1,262 additions and 60 deletions.
538 changes: 537 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ itertools = "0.12.0"
jemallocator = { version = "0.5.4", features = ["stats", "disable_initial_exec_tls"] }
log = { version = "0.4.20", features = ["std"] }
ndarray = { version = "0.15.6", features = ["serde"] }
nix = { version = "0.27.1", features = ["fs", "user"] }
nix = { version = "0.27.1", features = ["fs", "user", "resource"] }
num-integer = "0.1.45"
numpy = "0.20.0"
once_cell = "1.19.0"
opentelemetry = { version = "0.21.0", features = ["trace", "metrics"] }
opentelemetry-otlp = { version = "0.14.0", features = ["default", "tls", "tls-roots", "metrics"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.2", features = ["rt-tokio", "rt-tokio-current-thread"] }
ordered-float = { version = "4.2.0", features = ["serde"] }
pipe = "0.4.0"
postgres = { version = "0.19.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
Expand All @@ -61,10 +65,12 @@ serde_json = "1.0"
serde_with = "3.5.1"
smallvec = { version = "1.13.1", features = ["union", "const_generics"] }
syn = { version = "2.0.48", features = ["default", "full", "visit", "visit-mut"] } # Hack to keep features unified between normal and build deps
sysinfo = "0.30.5"
tempfile = "3.9.0"
thiserror = "1.0.56"
timely = { path = "./external/timely-dataflow/timely", features = ["bincode"] }
tokio = "1.35.1"
tokio = { version = "1.35.1", features = ["rt-multi-thread"] }
uuid = { version = "1.7.0", features = ["v4"] }
xxhash-rust = { version = "0.8.8", features = ["xxh3"] }

[target.'cfg(target_os = "linux")'.dependencies]
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ dependencies = [
"jmespath >= 1.0.1",
"Office365-REST-Python-Client >= 2.5.3",
"aiohttp_cors >= 0.7.0",
"opentelemetry-api >= 1.22.0",
"opentelemetry-sdk >= 1.22.0",
"opentelemetry-exporter-otlp-proto-grpc >= 1.22.0",
]

[project.optional-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions python/pathway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
schema_from_csv,
schema_from_dict,
schema_from_types,
set_license_key,
set_telemetry_server,
sql,
table_transformer,
this,
Expand Down Expand Up @@ -184,6 +186,8 @@
"join_outer",
"groupby",
"persistence",
"set_license_key",
"set_telemetry_server",
]


Expand Down
14 changes: 14 additions & 0 deletions python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,9 @@ def run_with_new_graph(
monitoring_level: MonitoringLevel = MonitoringLevel.NONE,
with_http_server: bool = False,
persistence_config: PersistenceConfig | None = None,
license_key: str | None = None,
telemetry_server: str | None = None,
trace_parent: str | None = None,
) -> list[CapturedStream]: ...
def unsafe_make_pointer(arg) -> Pointer: ...

Expand Down Expand Up @@ -700,3 +703,14 @@ class SnapshotEvent:
class LocalBinarySnapshotWriter:
def __init__(self, path: str, persistent_id: str, worker_id: int): ...
def write(self, events: list[SnapshotEvent]): ...

class TelemetryConfig:
telemetry_enabled: bool
telemetry_server_endpoint: str | None
service_name: str | None
service_version: str | None
run_id: str
@staticmethod
def create(
*, license_key: str | None = None, telemetry_server: str | None = None
) -> TelemetryConfig: ...
3 changes: 3 additions & 0 deletions python/pathway/internals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
table_transformer,
unwrap,
)
from pathway.internals.config import set_license_key, set_telemetry_server
from pathway.internals.custom_reducers import BaseCustomAccumulator
from pathway.internals.datetime_types import DateTimeNaive, DateTimeUtc, Duration
from pathway.internals.decorators import (
Expand Down Expand Up @@ -143,4 +144,6 @@
"join_right",
"join_outer",
"groupby",
"set_license_key",
"set_telemetry_server",
]
13 changes: 12 additions & 1 deletion python/pathway/internals/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class PathwayConfig:
persistence_mode: api.PersistenceMode = field(default_factory=_persistence_mode)
snapshot_access: api.SnapshotAccess | None = field(default_factory=_snapshot_access)
replay_storage: str | None = _env_field("PATHWAY_REPLAY_STORAGE")
license_key: str | None = _env_field("PATHWAY_LICENSE_KEY")
telemetry_server: str | None = _env_field("PATHWAY_TELEMETRY_SERVER")

@property
def replay_config(
Expand Down Expand Up @@ -86,4 +88,13 @@ def replay_config(

pathway_config = PathwayConfig()

__all__ = ["PathwayConfig", "pathway_config"]

def set_license_key(key: str) -> None:
pathway_config.license_key = key


def set_telemetry_server(endpoint: str) -> None:
pathway_config.telemetry_server = endpoint


__all__ = ["PathwayConfig", "pathway_config", "set_license_key", "set_telemetry_server"]
125 changes: 75 additions & 50 deletions python/pathway/internals/graph_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from collections.abc import Callable, Iterable

import pathway.internals.graph_runner.telemetry as telemetry
from pathway.internals import api, parse_graph as graph, table, trace
from pathway.internals.column_path import ColumnPath
from pathway.internals.config import pathway_config
Expand Down Expand Up @@ -31,6 +32,7 @@ class GraphRunner:
debug: bool
ignore_asserts: bool
runtime_typechecking: bool
telemetry_config: api.TelemetryConfig

def __init__(
self,
Expand All @@ -43,6 +45,7 @@ def __init__(
default_logging: bool = True,
persistence_config: PersistenceConfig | None = None,
runtime_typechecking: bool | None = None,
license_key: str | None = None,
) -> None:
self._graph = input_graph
self.debug = debug
Expand All @@ -57,6 +60,9 @@ def __init__(
self.runtime_typechecking = pathway_config.runtime_typechecking
else:
self.runtime_typechecking = runtime_typechecking
if license_key is None:
license_key = pathway_config.license_key
self.license_key = license_key

def run_tables(
self,
Expand Down Expand Up @@ -106,59 +112,78 @@ def _run(
output_tables: Iterable[table.Table] = (),
after_build: Callable[[ScopeState, OperatorStorageGraph], None] | None = None,
) -> list[api.CapturedStream]:
storage_graph = OperatorStorageGraph.from_scope_context(
context, self, output_tables
otel = telemetry.Telemetry.create(
license_key=self.license_key,
telemetry_server=pathway_config.telemetry_server,
)

def logic(
scope: api.Scope,
storage_graph: OperatorStorageGraph = storage_graph,
) -> list[tuple[api.Table, list[ColumnPath]]]:
state = ScopeState(scope)
storage_graph.build_scope(scope, state, self)
if after_build is not None:
after_build(state, storage_graph)
return storage_graph.get_output_tables(output_tables, state)

node_names = [
(operator.id, operator.label())
for operator in context.nodes
if isinstance(operator, ContextualizedIntermediateOperator)
]
monitoring_level = self.monitoring_level.to_internal()

with new_event_loop() as event_loop, monitor_stats(
monitoring_level, node_names, self.default_logging
) as stats_monitor:
if self.persistence_config:
self.persistence_config.on_before_run()
persistence_engine_config = self.persistence_config.engine_config
else:
persistence_engine_config = None

try:
return api.run_with_new_graph(
logic,
event_loop=event_loop,
ignore_asserts=self.ignore_asserts,
stats_monitor=stats_monitor,
monitoring_level=monitoring_level,
with_http_server=self.with_http_server,
persistence_config=persistence_engine_config,
)
except api.EngineErrorWithTrace as e:
error, frame = e.args
if frame is not None:
trace.add_pathway_trace_note(
error,
trace.Frame(
filename=frame.file_name,
line_number=frame.line_number,
line=frame.line,
function=frame.function,
),
with otel.tracer.start_as_current_span("graph_runner.run"):
trace_context, trace_parent = telemetry.get_current_context()

storage_graph = OperatorStorageGraph.from_scope_context(
context, self, output_tables
)

@otel.tracer.start_as_current_span(
"graph_runner.build",
context=trace_context,
attributes=dict(
graph=repr(self._graph),
debug=self.debug,
),
)
def logic(
scope: api.Scope,
storage_graph: OperatorStorageGraph = storage_graph,
) -> list[tuple[api.Table, list[ColumnPath]]]:
state = ScopeState(scope)
storage_graph.build_scope(scope, state, self)
if after_build is not None:
after_build(state, storage_graph)
return storage_graph.get_output_tables(output_tables, state)

node_names = [
(operator.id, operator.label())
for operator in context.nodes
if isinstance(operator, ContextualizedIntermediateOperator)
]
monitoring_level = self.monitoring_level.to_internal()

with new_event_loop() as event_loop, monitor_stats(
monitoring_level, node_names, self.default_logging
) as stats_monitor:
if self.persistence_config:
self.persistence_config.on_before_run()
persistence_engine_config = self.persistence_config.engine_config
else:
persistence_engine_config = None

try:
return api.run_with_new_graph(
logic,
event_loop=event_loop,
ignore_asserts=self.ignore_asserts,
stats_monitor=stats_monitor,
monitoring_level=monitoring_level,
with_http_server=self.with_http_server,
persistence_config=persistence_engine_config,
license_key=self.license_key,
telemetry_server=pathway_config.telemetry_server,
trace_parent=trace_parent,
)
raise error
except api.EngineErrorWithTrace as e:
error, frame = e.args
if frame is not None:
trace.add_pathway_trace_note(
error,
trace.Frame(
filename=frame.file_name,
line_number=frame.line_number,
line=frame.line,
function=frame.function,
),
)
raise error

def tree_shake_tables(
self, graph_scope: graph.Scope, tables: Iterable[table.Table]
Expand Down
67 changes: 67 additions & 0 deletions python/pathway/internals/graph_runner/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

import sys

from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.util.types import AttributeValue

from pathway.internals import api

propagator = TraceContextTextMapPropagator()


class Telemetry:
config: api.TelemetryConfig
tracer: trace.Tracer

def __init__(self, telemetry_config: api.TelemetryConfig) -> None:
self.config = telemetry_config
self.tracer = self._init_tracer()

@classmethod
def create(
cls, license_key: str | None = None, telemetry_server: str | None = None
) -> Telemetry:
config = api.TelemetryConfig.create(
license_key=license_key, telemetry_server=telemetry_server
)
return cls(config)

def _init_tracer(self) -> trace.Tracer:
if self.config.telemetry_enabled:
trace_provider = TracerProvider(
resource=Resource(
attributes={
SERVICE_NAME: self.config.service_name or "",
SERVICE_VERSION: self.config.service_version or "",
"run.id": self.config.run_id,
"python.version": sys.version,
"otel.scope.name": "python",
}
)
)
exporter = OTLPSpanExporter(endpoint=self.config.telemetry_server_endpoint)
trace_provider.add_span_processor(BatchSpanProcessor(exporter))
return trace_provider.get_tracer("pathway-tracer")
else:
return trace.NoOpTracer()


def get_current_context() -> tuple[Context, str | None]:
carrier: dict[str, str | list[str]] = {}
propagator.inject(carrier)
context = propagator.extract(carrier)
trace_parent = carrier.get("traceparent", None)
assert trace_parent is None or isinstance(trace_parent, str)
return context, trace_parent


def event(name: str, attributes: dict[str, AttributeValue]):
span = trace.get_current_span()
span.add_event(name, attributes)
4 changes: 4 additions & 0 deletions python/pathway/internals/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def run(
default_logging: bool = True,
persistence_config: PersistenceConfig | None = None,
runtime_typechecking: bool | None = None,
license_key: str | None = None,
):
"""Runs the computation graph.
Expand All @@ -41,6 +42,7 @@ def run(
with_http_server=with_http_server,
default_logging=default_logging,
persistence_config=persistence_config,
license_key=license_key,
runtime_typechecking=runtime_typechecking,
).run_outputs()

Expand All @@ -54,6 +56,7 @@ def run_all(
default_logging: bool = True,
persistence_config: PersistenceConfig | None = None,
runtime_typechecking: bool | None = None,
license_key: str | None = None,
):
"""Runs the computation graph with disabled tree-shaking optimization.
Expand All @@ -79,4 +82,5 @@ def run_all(
default_logging=default_logging,
persistence_config=persistence_config,
runtime_typechecking=runtime_typechecking,
license_key=license_key,
).run_all()
Loading

0 comments on commit 9eddc59

Please sign in to comment.