Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update core/tonic, update metric options, remove core tracing #380

Merged
merged 2 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
664 changes: 329 additions & 335 deletions temporalio/bridge/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
tokio = "1.26"
tokio-stream = "0.1"
tonic = "0.8"
tonic = "0.9"
tracing = "0.1"
url = "2.2"

Expand Down
16 changes: 6 additions & 10 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)


@dataclass(frozen=True)
class TracingConfig:
"""Python representation of the Rust struct for tracing config."""

filter: str
opentelemetry: OpenTelemetryConfig


@dataclass(frozen=True)
class LoggingConfig:
"""Python representation of the Rust struct for logging config."""
Expand All @@ -48,6 +40,9 @@ class MetricsConfig:

opentelemetry: Optional[OpenTelemetryConfig]
prometheus: Optional[PrometheusConfig]
attach_service_name: bool
global_tags: Optional[Mapping[str, str]]
metric_prefix: Optional[str]


@dataclass(frozen=True)
Expand All @@ -57,20 +52,21 @@ class OpenTelemetryConfig:
url: str
headers: Mapping[str, str]
metric_periodicity_millis: Optional[int]
metric_temporality_delta: bool


@dataclass(frozen=True)
class PrometheusConfig:
"""Python representation of the Rust struct for Prometheus config."""

bind_address: str
counters_total_suffix: bool
unit_suffix: bool


@dataclass(frozen=True)
class TelemetryConfig:
"""Python representation of the Rust struct for telemetry config."""

tracing: Optional[TracingConfig]
logging: Optional[LoggingConfig]
metrics: Optional[MetricsConfig]
global_tags: Mapping[str, str]
5 changes: 4 additions & 1 deletion temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ pub fn connect_client<'a>(
runtime_ref.runtime.future_into_py(py, async move {
Ok(ClientRef {
retry_client: opts
.connect_no_namespace(runtime.core.metric_meter().as_deref(), headers)
.connect_no_namespace(
runtime.core.telemetry().get_temporal_metric_meter(),
headers,
)
.await
.map_err(|err| {
PyRuntimeError::new_err(format!("Failed client connect: {}", err))
Expand Down
158 changes: 100 additions & 58 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
use temporal_sdk_core::CoreRuntime;
use temporal_sdk_core_api::telemetry::metrics::CoreMeter;
use temporal_sdk_core_api::telemetry::{
Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions, TelemetryOptionsBuilder,
TraceExportConfig, TraceExporter,
Logger, MetricTemporality, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
TelemetryOptions, TelemetryOptionsBuilder,
};
use url::Url;

Expand All @@ -27,16 +29,8 @@ pub(crate) struct Runtime {

#[derive(FromPyObject)]
pub struct TelemetryConfig {
tracing: Option<TracingConfig>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we are removing rather then say deprecating an treating it as unused?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as part of temporalio/sdk-core#599 which didn't work anyways and, based on internal discussions, was a rarely used advanced core debug tracer that we wanted to remove instead of deprecate.

logging: Option<LoggingConfig>,
metrics: Option<MetricsConfig>,
global_tags: Option<HashMap<String, String>>
}

#[derive(FromPyObject)]
pub struct TracingConfig {
filter: String,
opentelemetry: OpenTelemetryConfig,
}

#[derive(FromPyObject)]
Expand All @@ -49,32 +43,44 @@ pub struct LoggingConfig {
pub struct MetricsConfig {
opentelemetry: Option<OpenTelemetryConfig>,
prometheus: Option<PrometheusConfig>,
attach_service_name: bool,
global_tags: Option<HashMap<String, String>>,
metric_prefix: Option<String>,
}

#[derive(FromPyObject)]
pub struct OpenTelemetryConfig {
url: String,
headers: HashMap<String, String>,
metric_periodicity_millis: Option<u64>,
metric_temporality_delta: bool,
}

#[derive(FromPyObject)]
pub struct PrometheusConfig {
bind_address: String,
counters_total_suffix: bool,
unit_suffix: bool,
}

pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult<RuntimeRef> {
let mut core = CoreRuntime::new(
// We don't move telemetry config here because we need it for
// late-binding metrics
(&telemetry_config).try_into()?,
tokio::runtime::Builder::new_multi_thread(),
)
.map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err)))?;
// We late-bind the metrics after core runtime is created since it needs
// the Tokio handle
if let Some(metrics_conf) = telemetry_config.metrics {
let _guard = core.tokio_handle().enter();
core.telemetry_mut()
.attach_late_init_metrics(metrics_conf.try_into()?);
}
Ok(RuntimeRef {
runtime: Runtime {
core: Arc::new(
CoreRuntime::new(
telemetry_config.try_into()?,
tokio::runtime::Builder::new_multi_thread(),
)
.map_err(|err| {
PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err))
})?,
),
core: Arc::new(core),
},
})
}
Expand All @@ -94,61 +100,97 @@ impl Runtime {
}
}

impl TryFrom<TelemetryConfig> for TelemetryOptions {
impl TryFrom<&TelemetryConfig> for TelemetryOptions {
type Error = PyErr;

fn try_from(conf: TelemetryConfig) -> PyResult<Self> {
fn try_from(conf: &TelemetryConfig) -> PyResult<Self> {
let mut build = TelemetryOptionsBuilder::default();
if let Some(v) = conf.tracing {
build.tracing(TraceExportConfig {
filter: v.filter,
exporter: TraceExporter::Otel(v.opentelemetry.try_into()?),
});
}
if let Some(v) = conf.logging {
build.logging(if v.forward {
Logger::Forward { filter: v.filter }
} else {
Logger::Console { filter: v.filter }
});
}
if let Some(v) = conf.metrics {
build.metrics(if let Some(t) = v.opentelemetry {
if v.prometheus.is_some() {
return Err(PyValueError::new_err(
"Cannot have OpenTelemetry and Prometheus metrics",
));
if let Some(logging_conf) = &conf.logging {
build.logging(if logging_conf.forward {
Logger::Forward {
filter: logging_conf.filter.to_string(),
}
MetricsExporter::Otel(t.try_into()?)
} else if let Some(t) = v.prometheus {
MetricsExporter::Prometheus(SocketAddr::from_str(&t.bind_address).map_err(
|err| PyValueError::new_err(format!("Invalid Prometheus address: {}", err)),
)?)
} else {
return Err(PyValueError::new_err(
"Either OpenTelemetry or Prometheus config must be provided",
));
Logger::Console {
filter: logging_conf.filter.to_string(),
}
});
}
if let Some(v) = conf.global_tags {
build.global_tags(v);
if let Some(metrics_conf) = &conf.metrics {
// Note, actual metrics instance is late-bound in init_runtime
build.attach_service_name(metrics_conf.attach_service_name);
if let Some(prefix) = &metrics_conf.metric_prefix {
build.metric_prefix(prefix.to_string());
}
}
build
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err)))
}
}

impl TryFrom<OpenTelemetryConfig> for OtelCollectorOptions {
impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
type Error = PyErr;

fn try_from(conf: OpenTelemetryConfig) -> PyResult<Self> {
Ok(OtelCollectorOptions {
url: Url::parse(&conf.url)
.map_err(|err| PyValueError::new_err(format!("Invalid OTel URL: {}", err)))?,
headers: conf.headers,
metric_periodicity: conf.metric_periodicity_millis.map(Duration::from_millis),
})
fn try_from(conf: MetricsConfig) -> PyResult<Self> {
if let Some(otel_conf) = conf.opentelemetry {
if !conf.prometheus.is_none() {
return Err(PyValueError::new_err(
"Cannot have OpenTelemetry and Prometheus metrics",
));
}

// Build OTel exporter
let mut build = OtelCollectorOptionsBuilder::default();
build
.url(
Url::parse(&otel_conf.url).map_err(|err| {
PyValueError::new_err(format!("Invalid OTel URL: {}", err))
})?,
)
.headers(otel_conf.headers);
if let Some(period) = otel_conf.metric_periodicity_millis {
build.metric_periodicity(Duration::from_millis(period));
}
if otel_conf.metric_temporality_delta {
build.metric_temporality(MetricTemporality::Delta);
}
if let Some(global_tags) = conf.global_tags {
build.global_tags(global_tags);
}
let otel_options = build
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid OTel config: {}", err)))?;
Ok(Arc::new(build_otlp_metric_exporter(otel_options).map_err(
|err| PyValueError::new_err(format!("Failed building OTel exporter: {}", err)),
)?))
} else if let Some(prom_conf) = conf.prometheus {
// Start prom exporter
let mut build = PrometheusExporterOptionsBuilder::default();
build
.socket_addr(
SocketAddr::from_str(&prom_conf.bind_address).map_err(|err| {
PyValueError::new_err(format!("Invalid Prometheus address: {}", err))
})?,
)
.counters_total_suffix(prom_conf.counters_total_suffix)
.unit_suffix(prom_conf.unit_suffix);
if let Some(global_tags) = conf.global_tags {
build.global_tags(global_tags);
}
let prom_options = build.build().map_err(|err| {
PyValueError::new_err(format!("Invalid Prometheus config: {}", err))
})?;
Ok(start_prometheus_metric_exporter(prom_options)
.map_err(|err| {
PyValueError::new_err(format!("Failed starting Prometheus exporter: {}", err))
})?
.meter)
} else {
Err(PyValueError::new_err(
"Either OpenTelemetry or Prometheus config must be provided",
))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct WorkerConfig {
macro_rules! enter_sync {
($runtime:expr) => {
temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(
$runtime.core.trace_subscriber(),
$runtime.core.telemetry().trace_subscriber(),
);
let _guard = $runtime.core.tokio_handle().enter();
};
Expand Down
Loading