Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka/Pekko after loses thread context propagation #11755

Closed
wsargent opened this issue Jul 6, 2024 · 7 comments
Closed

Akka/Pekko after loses thread context propagation #11755

wsargent opened this issue Jul 6, 2024 · 7 comments
Labels
bug Something isn't working needs triage New issue that requires triage

Comments

@wsargent
Copy link

wsargent commented Jul 6, 2024

Describe the bug

The after pattern causes the instrumentation to get confused. This is related to the execution context provided. To make it work, manual wrapping must be used.

Steps to reproduce

https://github.com/wsargent/akka-after-loses-otel-thread-context

package org.example.application

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.Context
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.slf4j.LoggerFactory

import scala.language.reflectiveCalls
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}

object Main {
  private val logger = LoggerFactory.getLogger(getClass)
  private val openTelemetry = GlobalOpenTelemetry.get()
  private val tracer = openTelemetry.getTracer("example")

  // https://tersesystems.com/blog/2024/06/20/executioncontext.parasitic-and-friends/
  private val opportunisticExecutionContext = (scala.concurrent.ExecutionContext: {def opportunistic: scala.concurrent.ExecutionContextExecutor}).opportunistic

  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem("example")

    implicit val ec = ExecutionContext.global
    val f = for {
      _ <- operation("wrapping")
      _ <- operation("global")
      _ <- operation("parasitic")
      _ <- operation("opportunistic")
      _ <- operation("dispatcher")
    } yield actorSystem.terminate()

    Await.result(f, 30.seconds)
  }

  def operation(mode: String)(implicit actorSystem: ActorSystem): Future[Done] = {
    traceSync(s"root $mode") {
      val expectedSpan = Span.current()
      logger.info(s"mode: We expect ${expectedSpan}")
      val afterExecutionContext = defineExecutionContext(mode)

      org.apache.pekko.pattern.after(1.second, actorSystem.scheduler) {
        val actualSpan = Span.current()
        Future.successful {
          if (!expectedSpan.equals(actualSpan)) {
            logger.error(s"$mode: Unexpected $actualSpan")
          } else {
            logger.info(s"$mode: Reached delayed with $actualSpan")
          }
          Done
        }
      }(afterExecutionContext)
    }
  }

  def traceSync[A](traceName: String)(block: => A): A = {
    val span = tracer.spanBuilder(traceName).startSpan()
    assert(span.isRecording, "No-op span, you must run this class with the java agent so it instruments correctly!")

    try {
      val scope = span.makeCurrent()
      try {
        block
      } finally {
        scope.close()
      }
    } finally {
      span.end()
    }
  }

  def defineExecutionContext(mode: String)(implicit system: ActorSystem): ExecutionContext = {
    val dispatcher = system.classicSystem.dispatcher

    mode match {
      case "wrapping" =>
        val context = Context.current()
        new ExecutionContext {
          override def execute(runnable: Runnable): Unit = dispatcher.execute(context.wrap(runnable))

          override def reportFailure(cause: Throwable): Unit = dispatcher.reportFailure(cause)
        }
      case "global" =>
        ExecutionContext.global
      case "parasitic" =>
        ExecutionContext.parasitic
      case "opportunistic" =>
        opportunisticExecutionContext
      case _ =>
        dispatcher
    }
  }

}

Expected behavior

The after pattern carries over the existing span context.

Actual behavior

100.123.234.53 ❱ ./bin/akka-after-loses-thread-context
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
129   INFO  trace_id= span_id= [main] io.opentelemetry.javaagent.tooling.VersionLogger - opentelemetry-javaagent - version: 2.4.0
678   INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [main] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root wrapping, kind=INTERNAL, attributes=AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292847324051943, endEpochNanos=0}}
680   INFO  trace_id= span_id= [main] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root wrapping' : 8746ecc707c7eb642df61a1933b396e9 1a8c4711d7702a04 INTERNAL [tracer: example:] AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}
1711  INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [example-pekko.actor.default-dispatcher-5] org.example.application.Main$ - wrapping: Reached delayed with ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root wrapping, kind=INTERNAL, attributes=AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292847324051943, endEpochNanos=1720292847338923884}}
1715  INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=7246937d638eba1a [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=7246937d638eba1a, parentSpanContext=ImmutableSpanContext{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, name=root global, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292848374104426, endEpochNanos=0}}
1715  INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root global' : 8746ecc707c7eb642df61a1933b396e9 7246937d638eba1a INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
2730  ERROR trace_id= span_id= [scala-execution-context-global-26] org.example.application.Main$ - global: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
2731  INFO  trace_id=4a46e96a6cd3bb1fe64335281a6bb229 span_id=3f967afae96d6627 [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=4a46e96a6cd3bb1fe64335281a6bb229, spanId=3f967afae96d6627, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root parasitic, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292849389997315, endEpochNanos=0}}
2731  INFO  trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root parasitic' : 4a46e96a6cd3bb1fe64335281a6bb229 3f967afae96d6627 INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
3747  ERROR trace_id= span_id= [example-scheduler-1] org.example.application.Main$ - parasitic: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
3748  INFO  trace_id=f0c1bc85c8635a598f51244982df7750 span_id=5e10398ab14c17ee [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=f0c1bc85c8635a598f51244982df7750, spanId=5e10398ab14c17ee, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root opportunistic, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292850407256495, endEpochNanos=0}}
3748  INFO  trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root opportunistic' : f0c1bc85c8635a598f51244982df7750 5e10398ab14c17ee INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
4767  ERROR trace_id= span_id= [scala-execution-context-global-26] org.example.application.Main$ - opportunistic: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
4768  INFO  trace_id=077ce825f5698598d8ca24c433f812f5 span_id=0b8912756c432c1d [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=077ce825f5698598d8ca24c433f812f5, spanId=0b8912756c432c1d, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root dispatcher, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292851427164157, endEpochNanos=0}}
4768  INFO  trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root dispatcher' : 077ce825f5698598d8ca24c433f812f5 0b8912756c432c1d INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
5788  ERROR trace_id= span_id= [example-pekko.actor.default-dispatcher-5] org.example.application.Main$ - dispatcher: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
[INFO] [07/06/2024 12:07:32.469] [scala-execution-context-global-26] [CoordinatedShutdown(pekko://example)] Running CoordinatedShutdown with reason [ActorSystemTerminateReason]

Javaagent or library instrumentation version

1.39.0

Environment

JDK:

openjdk version "17.0.7" 2023-04-18
OpenJDK Runtime Environment Temurin-17.0.7+7 (build 17.0.7+7)
OpenJDK 64-Bit Server VM Temurin-17.0.7+7 (build 17.0.7+7, mixed mode, sharing)

OS:

Linux devserver 6.5.0-41-generic #41~22.04.2-Ubuntu SMP PREEMPT_DYNAMIC Mon Jun 3 11:32:55 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

Additional context

https://github.com/wsargent/opentelemetry-with-scala-futures

@wsargent wsargent added bug Something isn't working needs triage New issue that requires triage labels Jul 6, 2024
@wsargent
Copy link
Author

wsargent commented Jul 10, 2024

I think this is because the scheduler.scheduleOnce method creates a new Runnable and passes the function in:

  final def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit)(
      implicit
      executor: ExecutionContext): Cancellable =
    schedule(initialDelay, interval, new Runnable { override def run(): Unit = f })

https://github.com/apache/pekko/blob/main/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala#L389

@Dogacel
Copy link

Dogacel commented Sep 7, 2024

I think I am having an issue with the after call as well. Randomly, our trace_id s disappear, I suspect some of those after calls cause it.

To make it work, manual wrapping must be used.

Can you elaborate how you manage to do this with after?

@wsargent
Copy link
Author

wsargent commented Sep 7, 2024

@Dogacel checkout wsargent/opentelemetry-with-scala-futures

@Dogacel
Copy link

Dogacel commented Sep 8, 2024

@Dogacel checkout wsargent/opentelemetry-with-scala-futures

Thank you.

I wonder if it is possible to fix this with an update to the auto-instrumentation library? It is hard to catch usages of all those scheduler actions in code.

@Dogacel
Copy link

Dogacel commented Sep 8, 2024

Side note, we might need to instrument all schedule and scheduleOnce calls as well as CircuitBreaker calls.

https://github.com/search?q=repo%3Aapache%2Fpekko%20%22new%20Runnable%22&type=code

I am pretty new to OTEL instrumentation so I don't have a deep understanding in how we might want to do it but I can try once I have some spare time.

@Dogacel
Copy link

Dogacel commented Sep 8, 2024

For reference, I have created a unit test in this commit:

Dogacel@b6160e7

I am not entirely sure how I should create the instrumentation feature.

@laurit
Copy link
Contributor

laurit commented Oct 2, 2024

Resolved with #12359 and #12373

@laurit laurit closed this as completed Oct 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage New issue that requires triage
Projects
None yet
Development

No branches or pull requests

3 participants