diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index f45307c87..85d2d5b2c 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -12,7 +12,7 @@ use super::{ }; use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS}; use opentelemetry::{ - self, + self, global, metrics::{Meter, MeterProvider as MeterProviderT}, Key, KeyValue, Value, }; @@ -121,6 +121,9 @@ pub(super) fn augment_meter_provider_with_defaults( pub fn build_otlp_metric_exporter( opts: OtelCollectorOptions, ) -> Result { + global::set_error_handler(|err| { + tracing::error!("{}", err); + })?; let exporter = match opts.protocol { OtlpProtocol::Grpc => { let mut exporter = opentelemetry_otlp::TonicExporterBuilder::default() diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 48d1a6a5e..993ca0186 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,7 +1,13 @@ use anyhow::anyhow; use assert_matches::assert_matches; -use std::string::ToString; -use std::{collections::HashMap, env, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + env, + net::SocketAddr, + string::ToString, + sync::{Arc, Mutex}, + time::Duration, +}; use temporal_client::{ WorkflowClientTrait, WorkflowOptions, WorkflowService, REQUEST_LATENCY_HISTOGRAM_NAME, }; @@ -48,6 +54,7 @@ use temporal_sdk_core_test_utils::{ PROMETHEUS_QUERY_API, }; use tokio::{join, sync::Barrier, task::AbortHandle}; +use tracing_subscriber::fmt::MakeWriter; use url::Url; static ANY_PORT: &str = "127.0.0.1:0"; @@ -900,3 +907,69 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { // Metric shouldn't show up at all, since it's zero the whole time. assert!(!body.contains("temporal_sticky_cache_total_forced_eviction")); } + +struct CapturingWriter { + buf: Arc>>, +} + +impl MakeWriter<'_> for CapturingWriter { + type Writer = CapturingHandle; + + fn make_writer(&self) -> Self::Writer { + CapturingHandle(self.buf.clone()) + } +} + +struct CapturingHandle(Arc>>); + +impl std::io::Write for CapturingHandle { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut b = self.0.lock().unwrap(); + b.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +#[tokio::test] +async fn otel_errors_logged_as_errors() { + // Set up tracing subscriber to capture ERROR logs + let logs = Arc::new(Mutex::new(Vec::new())); + let writer = CapturingWriter { buf: logs.clone() }; + let subscriber = tracing_subscriber::fmt().with_writer(writer).finish(); + let _guard = tracing::subscriber::set_default(subscriber); + + let opts = OtelCollectorOptionsBuilder::default() + .url("https://localhostt:9995/v1/metrics".parse().unwrap()) // Invalid endpoint + .build() + .unwrap(); + let exporter = build_otlp_metric_exporter(opts).unwrap(); + + let telemopts = TelemetryOptionsBuilder::default() + .metrics(Arc::new(exporter) as Arc) + .build() + .unwrap(); + + let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); + let _worker = starter.get_worker().await; + + // Wait to allow exporter to attempt sending metrics and fail. + tokio::time::sleep(Duration::from_secs(2)).await; + + let logs = logs.lock().unwrap(); + let log_str = String::from_utf8_lossy(&logs); + + assert!( + log_str.contains("ERROR"), + "Expected ERROR log not found in logs: {}", + log_str + ); + assert!( + log_str.contains("Metrics exporter otlp failed with the grpc server returns error"), + "Expected an OTel exporter error message in logs: {}", + log_str + ); +}