Skip to content

Commit

Permalink
use opentelemetry to separate services from runtime traces (#1922)
Browse files Browse the repository at this point in the history
Summary:
This changes completely separate services tracing from
runtime tracing by making sure to use open telemetry directly
to manage services spans
  • Loading branch information
muhamadazmy authored Sep 9, 2024
1 parent f6da847 commit b39cde2
Show file tree
Hide file tree
Showing 18 changed files with 588 additions and 489 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/ingress-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-ingress-dispatcher = { workspace = true }
restate-serde-util = { workspace = true }
restate-service-protocol = { workspace = true, features = ["awakeable-id"] }
restate-tracing-instrumentation = { workspace = true }
restate-types = { workspace = true }
restate-service-protocol = { workspace = true, features = [ "awakeable-id" ] }

# Encoding/Decoding
bytes = { workspace = true }
Expand Down
12 changes: 9 additions & 3 deletions crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,16 @@ where
};

// Prepare the tracing span
let (ingress_span, ingress_span_context) =
prepare_tracing_span(&invocation_id, &invocation_target, &req);
let runtime_span = tracing::info_span!(
"ingress",
restate.invocation.id = %invocation_id,
restate.invocation.target = %invocation_target.short()
);

let result = async move {
let ingress_span_context =
prepare_tracing_span(&invocation_id, &invocation_target, &req);

info!("Processing ingress request");

let (parts, body) = req.into_parts();
Expand Down Expand Up @@ -208,7 +214,7 @@ where
}
}
}
.instrument(ingress_span)
.instrument(runtime_span)
.await;

// Note that we only record (mostly) successful requests here. We might want to
Expand Down
55 changes: 22 additions & 33 deletions crates/ingress-http/src/handler/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,59 +11,48 @@
use super::ConnectInfo;

use http::Request;
use opentelemetry::global::ObjectSafeSpan;
use opentelemetry::trace::{SpanContext, TraceContextExt};
use restate_tracing_instrumentation as instrumentation;
use restate_types::identifiers::InvocationId;
use restate_types::invocation::{InvocationTarget, SpanRelation};
use tracing::{info_span, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

pub(crate) fn prepare_tracing_span<B>(
invocation_id: &InvocationId,
invocation_target: &InvocationTarget,
req: &Request<B>,
) -> (Span, SpanContext) {
) -> SpanContext {
let connect_info: &ConnectInfo = req
.extensions()
.get()
.expect("Should have been injected by the previous layer");
let (client_addr, client_port) = (connect_info.address(), connect_info.port());

// Create the ingress span and attach it to the next async block.
// This span is committed once the async block terminates, recording the execution time of the invocation.
// Another span is created later by the ServiceInvocationFactory, for the ServiceInvocation itself,
// which is used by the Restate services to correctly link to a single parent span
// to commit intermediate results of the processing.
let ingress_span = info_span!(
"ingress_invoke",
otel.name = format!("ingress_invoke {}", invocation_target),
rpc.system = "restate",
rpc.service = %invocation_target.service_name(),
rpc.method = %invocation_target.handler_name(),
restate.invocation.id = %invocation_id,
restate.invocation.target = %invocation_target,
client.socket.address = %client_addr,
client.socket.port = %client_port,
);

// Extract tracing context if any
let tracing_context: &opentelemetry::Context = req
.extensions()
.get()
.expect("Should have been injected by the previous layer");

// Attach this ingress_span to the parent parsed from the headers, if any.
span_relation(tracing_context.span().span_context()).attach_to_span(&ingress_span);
let inbound_span = tracing_context.span();

// We need the context to link it to the service invocation span
let ingress_span_context = ingress_span.context().span().span_context().clone();

(ingress_span, ingress_span_context)
}

fn span_relation(request_span: &SpanContext) -> SpanRelation {
if request_span.is_valid() {
SpanRelation::Parent(request_span.clone())
// if the inbound span is set (`traceparent`) we use that as
// parent to the ingress span.
let relation = if inbound_span.span_context().is_valid() {
SpanRelation::Parent(inbound_span.span_context().clone())
} else {
SpanRelation::None
}
};

let span = instrumentation::info_invocation_span!(
relation = relation,
prefix = "ingress",
id = invocation_id,
target = invocation_target,
tags = (
client.socket.address = client_addr.to_string(),
client.socket.port = client_port as i64
)
);

span.span_context().clone()
}
13 changes: 12 additions & 1 deletion crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,18 @@ where
}

/// Loop opening the request to deployment and consuming the stream
#[instrument(level = "debug", name = "invoker_invocation_task", fields(rpc.system = "restate", rpc.service = %self.invocation_target.service_name(), restate.invocation.id = %self.invocation_id, restate.invocation.target = %self.invocation_target), skip_all)]
#[instrument(
level = "debug",
name = "invoker_invocation_task",
fields(
rpc.system = "restate",
rpc.service = %self.invocation_target.service_name(),
restate.invocation.id = %self.invocation_id,
restate.invocation.target = %self.invocation_target,
otel.name = "invocation-task: run",
),
skip_all,
)]
pub async fn run(mut self, input_journal: InvokeInputJournal) {
let start = Instant::now();
// Execute the task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use std::future::poll_fn;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info, trace, warn, Span};
use tracing::{debug, info, trace, warn};

/// Provides the value of the invocation id
const INVOCATION_ID_HEADER_NAME: HeaderName = HeaderName::from_static("x-restate-invocation-id");
Expand Down Expand Up @@ -124,13 +124,6 @@ where

let journal_size = journal_metadata.length;

// Attach parent and uri to the current span
let invocation_task_span = Span::current();
journal_metadata
.span_context
.as_parent()
.attach_to_span(&invocation_task_span);

info!(
invocation.id = %self.invocation_task.invocation_id,
deployment.address = %deployment.metadata.address_display(),
Expand Down
6 changes: 2 additions & 4 deletions crates/tracing-instrumentation/src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use arc_swap::ArcSwap;
use futures::future::BoxFuture;
use opentelemetry::trace::TraceError;
use opentelemetry::{Key, KeyValue, StringValue, Value};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::export::trace::{SpanData, SpanExporter};
use opentelemetry_sdk::Resource;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use std::collections::HashMap;
Expand All @@ -39,9 +39,7 @@ impl<T> ResourceModifyingSpanExporter<T> {
}
}

impl<T: opentelemetry_sdk::export::trace::SpanExporter + 'static>
opentelemetry_sdk::export::trace::SpanExporter for ResourceModifyingSpanExporter<T>
{
impl<T: SpanExporter + 'static> SpanExporter for ResourceModifyingSpanExporter<T> {
fn export(
&mut self,
batch: Vec<SpanData>,
Expand Down
Loading

0 comments on commit b39cde2

Please sign in to comment.