diff --git a/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs b/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs index eb52fa8e5a..5107649005 100644 --- a/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs +++ b/opentelemetry-contrib/src/trace/exporter/datadog/mod.rs @@ -278,7 +278,7 @@ impl DatadogPipelineBuilder { #[async_trait] impl trace::SpanExporter for DatadogExporter { /// Export spans to datadog-agent - async fn export(&self, batch: Vec) -> trace::ExportResult { + async fn export(&mut self, batch: Vec) -> trace::ExportResult { let data = self.version.encode(&self.service_name, batch)?; let req = Request::builder() .method(Method::POST) diff --git a/opentelemetry-jaeger/src/agent.rs b/opentelemetry-jaeger/src/agent.rs index 5e62bed6b8..bd6b72485b 100644 --- a/opentelemetry-jaeger/src/agent.rs +++ b/opentelemetry-jaeger/src/agent.rs @@ -6,7 +6,6 @@ use crate::thrift::{ use crate::transport::{TBufferChannel, TNoopChannel}; use std::fmt; use std::net::{ToSocketAddrs, UdpSocket}; -use std::sync::Mutex; use thrift::{ protocol::{TCompactInputProtocol, TCompactOutputProtocol}, transport::{ReadHalf, TIoChannel, WriteHalf}, @@ -37,10 +36,10 @@ pub(crate) struct AgentAsyncClientUDP { #[cfg(all(not(feature = "async-std"), not(feature = "tokio")))] conn: UdpSocket, #[cfg(feature = "tokio")] - conn: tokio::sync::Mutex, + conn: tokio::net::UdpSocket, #[cfg(all(feature = "async-std", not(feature = "tokio")))] - conn: async_std::sync::Mutex, - buffer_client: Mutex, + conn: async_std::net::UdpSocket, + buffer_client: BufferClient, } impl AgentAsyncClientUDP { @@ -59,33 +58,18 @@ impl AgentAsyncClientUDP { #[cfg(all(not(feature = "async-std"), not(feature = "tokio")))] conn, #[cfg(feature = "tokio")] - conn: tokio::sync::Mutex::new(tokio::net::UdpSocket::from_std(conn)?), + conn: tokio::net::UdpSocket::from_std(conn)?, #[cfg(all(feature = "async-std", not(feature = "tokio")))] - conn: async_std::sync::Mutex::new(async_std::net::UdpSocket::from(conn)), - buffer_client: Mutex::new(BufferClient { buffer, client }), + conn: async_std::net::UdpSocket::from(conn), + buffer_client: BufferClient { buffer, client }, }) } /// Emit standard Jaeger batch - pub(crate) async fn emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()> { + pub(crate) async fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> { // Write payload to buffer - let payload = self - .buffer_client - .lock() - .map_err(|err| { - thrift::Error::from(std::io::Error::new( - std::io::ErrorKind::Other, - err.to_string(), - )) - }) - .and_then(|mut buffer_client| { - // Write to tmp buffer - buffer_client.client.emit_batch(batch)?; - // extract written payload, clearing buffer - let payload = buffer_client.buffer.take_bytes(); - - Ok(payload) - })?; + self.buffer_client.client.emit_batch(batch)?; + let payload = self.buffer_client.buffer.take_bytes(); // Write async to socket, reading from buffer write_to_socket(self, payload).await?; @@ -95,24 +79,22 @@ impl AgentAsyncClientUDP { } #[cfg(all(not(feature = "async-std"), not(feature = "tokio")))] -async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec) -> thrift::Result<()> { +async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec) -> thrift::Result<()> { client.conn.send(&payload)?; Ok(()) } #[cfg(feature = "tokio")] -async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec) -> thrift::Result<()> { - let mut conn = client.conn.lock().await; - conn.send(&payload).await?; +async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec) -> thrift::Result<()> { + client.conn.send(&payload).await?; Ok(()) } #[cfg(all(feature = "async-std", not(feature = "tokio")))] -async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec) -> thrift::Result<()> { - let conn = client.conn.lock().await; - conn.send(&payload).await?; +async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec) -> thrift::Result<()> { + client.conn.send(&payload).await?; Ok(()) } diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index 79af7c4a40..3ad0e5dcf5 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -258,7 +258,7 @@ impl Into for Process { #[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Jaeger - async fn export(&self, batch: Vec) -> trace::ExportResult { + async fn export(&mut self, batch: Vec) -> trace::ExportResult { let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); let mut process = self.process.clone(); diff --git a/opentelemetry-jaeger/src/uploader.rs b/opentelemetry-jaeger/src/uploader.rs index 97dbcc4e7c..dc4cef7317 100644 --- a/opentelemetry-jaeger/src/uploader.rs +++ b/opentelemetry-jaeger/src/uploader.rs @@ -16,7 +16,7 @@ pub(crate) enum BatchUploader { impl BatchUploader { /// Emit a jaeger batch for the given uploader - pub(crate) async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult { + pub(crate) async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult { match self { BatchUploader::Agent(client) => { // TODO Implement retry behaviour diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 2427880375..ec28902d82 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -146,7 +146,7 @@ impl Exporter { #[async_trait] impl SpanExporter for Exporter { - async fn export(&self, batch: Vec) -> ExportResult { + async fn export(&mut self, batch: Vec) -> ExportResult { let request = ExportTraceServiceRequest { resource_spans: RepeatedField::from_vec(batch.into_iter().map(Into::into).collect()), unknown_fields: Default::default(), diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index 8162aaa347..98b7029278 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -309,7 +309,7 @@ impl ZipkinPipelineBuilder { #[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Zipkin collector. - async fn export(&self, batch: Vec) -> trace::ExportResult { + async fn export(&mut self, batch: Vec) -> trace::ExportResult { let zipkin_spans = batch .into_iter() .map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span)) diff --git a/opentelemetry/src/api/trace/noop.rs b/opentelemetry/src/api/trace/noop.rs index 074bd20e9c..bf52e4cfcb 100644 --- a/opentelemetry/src/api/trace/noop.rs +++ b/opentelemetry/src/api/trace/noop.rs @@ -181,7 +181,7 @@ impl NoopSpanExporter { #[async_trait] impl SpanExporter for NoopSpanExporter { - async fn export(&self, _batch: Vec) -> ExportResult { + async fn export(&mut self, _batch: Vec) -> ExportResult { Ok(()) } } diff --git a/opentelemetry/src/exporter/trace/mod.rs b/opentelemetry/src/exporter/trace/mod.rs index c83ca3ca81..bde2349d62 100644 --- a/opentelemetry/src/exporter/trace/mod.rs +++ b/opentelemetry/src/exporter/trace/mod.rs @@ -23,45 +23,43 @@ pub type ExportResult = Result<(), Box) -> ExportResult; + /// + /// Any retry logic that is required by the exporter is the responsibility + /// of the exporter. + async fn export(&mut self, batch: Vec) -> ExportResult; /// Shuts down the exporter. Called when SDK is shut down. This is an /// opportunity for exporter to do any cleanup required. /// - /// `shutdown` should be called only once for each Exporter instance. After - /// the call to `shutdown`, subsequent calls to `SpanExport` are not allowed - /// and should return an error. + /// This function should be called only once for each `SpanExporter` + /// instance. After the call to `shutdown`, subsequent calls to `export` are + /// not allowed and should return an error. /// - /// Shutdown should not block indefinitely (e.g. if it attempts to flush the - /// data and the destination is unavailable). SDK authors can - /// decide if they want to make the shutdown timeout to be configurable. + /// This function should not block indefinitely (e.g. if it attempts to + /// flush the data and the destination is unavailable). SDK authors + /// can decide if they want to make the shutdown timeout + /// configurable. fn shutdown(&mut self) {} } /// A minimal interface necessary for export spans over HTTP. /// -/// Users sometime choose http clients that relay on certain runtime. This trait allows users to bring -/// their choice of http clients. +/// Users sometime choose http clients that relay on certain runtime. This trait +/// allows users to bring their choice of http clients. #[cfg(feature = "http")] #[cfg_attr(docsrs, doc(cfg(feature = "http")))] #[async_trait] diff --git a/opentelemetry/src/exporter/trace/stdout.rs b/opentelemetry/src/exporter/trace/stdout.rs index 4a85445aa2..1b257e8382 100644 --- a/opentelemetry/src/exporter/trace/stdout.rs +++ b/opentelemetry/src/exporter/trace/stdout.rs @@ -31,8 +31,7 @@ use crate::{ }; use async_trait::async_trait; use std::fmt::Debug; -use std::io::{self, stdout, Stdout, Write}; -use std::sync::Mutex; +use std::io::{stdout, Stdout, Write}; /// Pipeline builder #[derive(Debug)] @@ -108,7 +107,7 @@ where /// [`Stdout`]: std::io::Stdout #[derive(Debug)] pub struct Exporter { - writer: Mutex, + writer: W, pretty_print: bool, } @@ -116,7 +115,7 @@ impl Exporter { /// Create a new stdout `Exporter`. pub fn new(writer: W, pretty_print: bool) -> Self { Self { - writer: Mutex::new(writer), + writer, pretty_print, } } @@ -128,16 +127,12 @@ where W: Write + Debug + Send + 'static, { /// Export spans to stdout - async fn export(&self, batch: Vec) -> ExportResult { - let mut writer = self - .writer - .lock() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + async fn export(&mut self, batch: Vec) -> ExportResult { for span in batch { if self.pretty_print { - writer.write_all(format!("{:#?}\n", span).as_bytes())?; + self.writer.write_all(format!("{:#?}\n", span).as_bytes())?; } else { - writer.write_all(format!("{:?}\n", span).as_bytes())?; + self.writer.write_all(format!("{:?}\n", span).as_bytes())?; } } diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 3a0cf0203f..839743058a 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -58,15 +58,20 @@ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; -/// `SpanProcessor`s allow hooks for span start and end method invocations. +/// `SpanProcessor` is an interface which allows hooks for span start and end +/// method invocations. The span processors are invoked only when is_recording +/// is true. pub trait SpanProcessor: Send + Sync + std::fmt::Debug { - /// `on_start` method is invoked when a `Span` is started. + /// `on_start` is called when a `Span` is started. This method is called + /// synchronously on the thread that started the span, therefore it should + /// not block or throw exceptions. fn on_start(&self, span: &Span, cx: &Context); - /// `on_end` method is invoked when a `Span` is ended. + /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is + /// already set). This method is called synchronously within the `Span::end` + /// API, therefore it should not block or throw an exception. fn on_end(&self, span: SpanData); - /// Shutdown is invoked when SDK shuts down. Use this call to cleanup any - /// processor data. No calls to `on_start` and `on_end` method is invoked - /// after `shutdown` call is made. + /// Shuts down the processor. Called when SDK is shut down. This is an + /// opportunity for processor to do any cleanup required. fn shutdown(&mut self); } @@ -95,12 +100,14 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// [`SpanProcessor`]: ../../api/trace/span_processor/trait.SpanProcessor.html #[derive(Debug)] pub struct SimpleSpanProcessor { - exporter: Box, + exporter: Mutex>, } impl SimpleSpanProcessor { pub(crate) fn new(exporter: Box) -> Self { - SimpleSpanProcessor { exporter } + SimpleSpanProcessor { + exporter: Mutex::new(exporter), + } } } @@ -110,12 +117,16 @@ impl SpanProcessor for SimpleSpanProcessor { } fn on_end(&self, span: SpanData) { - // TODO: Surface error through global error handler - let _result = executor::block_on(self.exporter.export(vec![span])); + if let Ok(mut exporter) = self.exporter.lock() { + // TODO: Surface error through global error handler + let _result = executor::block_on(exporter.export(vec![span])); + } } fn shutdown(&mut self) { - self.exporter.shutdown(); + if let Ok(mut exporter) = self.exporter.lock() { + exporter.shutdown(); + } } } @@ -433,14 +444,31 @@ where #[cfg(test)] mod tests { - use crate::exporter::trace::stdout; - use crate::sdk::trace::span_processor::{ - OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, - OTEL_BSP_SCHEDULE_DELAY_MILLIS, OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, + use super::{ + BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, + OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS, + OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; - use crate::sdk::trace::BatchSpanProcessor; + use crate::exporter::trace::stdout; + use crate::testing::trace::{new_test_export_span_data, new_test_exporter}; use std::time; + #[test] + fn simple_span_processor_on_end_calls_export() { + let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); + let processor = SimpleSpanProcessor::new(Box::new(exporter)); + processor.on_end(new_test_export_span_data()); + assert!(rx_export.try_recv().is_ok()); + } + + #[test] + fn simple_span_processor_shutdown_calls_shutdown() { + let (exporter, _rx_export, rx_shutdown) = new_test_exporter(); + let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); + processor.shutdown(); + assert!(rx_shutdown.try_recv().is_ok()); + } + #[test] fn test_build_batch_span_processor_from_env() { std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500"); diff --git a/opentelemetry/src/testing/trace.rs b/opentelemetry/src/testing/trace.rs index 14cfcb6cb7..874d0ce76d 100644 --- a/opentelemetry/src/testing/trace.rs +++ b/opentelemetry/src/testing/trace.rs @@ -1,7 +1,15 @@ use crate::{ - trace::{Span, SpanContext, StatusCode}, + exporter::trace::{self as exporter, ExportResult, SpanExporter}, + sdk::{ + trace::{Config, EvictedHashMap, EvictedQueue}, + InstrumentationLibrary, + }, + trace::{Span, SpanContext, SpanId, SpanKind, StatusCode}, KeyValue, }; +use async_trait::async_trait; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::time::SystemTime; #[derive(Debug)] pub struct TestSpan(pub SpanContext); @@ -25,3 +33,52 @@ impl Span for TestSpan { fn update_name(&self, _new_name: String) {} fn end_with_timestamp(&self, _timestamp: std::time::SystemTime) {} } + +pub fn new_test_export_span_data() -> exporter::SpanData { + let config = Config::default(); + exporter::SpanData { + span_context: SpanContext::empty_context(), + parent_span_id: SpanId::from_u64(0), + span_kind: SpanKind::Internal, + name: "opentelemetry".to_string(), + start_time: SystemTime::now(), + end_time: SystemTime::now(), + attributes: EvictedHashMap::new(config.max_attributes_per_span, 0), + message_events: EvictedQueue::new(config.max_events_per_span), + links: EvictedQueue::new(config.max_links_per_span), + status_code: StatusCode::Unset, + status_message: "".to_string(), + resource: config.resource, + instrumentation_lib: InstrumentationLibrary::default(), + } +} + +#[derive(Debug)] +pub struct TestSpanExporter { + tx_export: Sender, + tx_shutdown: Sender<()>, +} + +#[async_trait] +impl SpanExporter for TestSpanExporter { + async fn export(&mut self, batch: Vec) -> ExportResult { + for span_data in batch { + self.tx_export.send(span_data)?; + } + Ok(()) + } + + fn shutdown(&mut self) { + self.tx_shutdown.send(()).unwrap(); + } +} + +pub fn new_test_exporter() -> (TestSpanExporter, Receiver, Receiver<()>) { + let (tx_export, rx_export) = channel(); + let (tx_shutdown, rx_shutdown) = channel(); + let exporter = TestSpanExporter { + tx_export, + tx_shutdown, + }; + (exporter, rx_export, rx_shutdown) +}