Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tracing methods asynchronously #86

Merged
merged 10 commits into from
Apr 20, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.comcast.money.aspectj

import com.comcast.money.annotations.{ Timed, Traced }
import com.comcast.money.api.Span
import com.comcast.money.core._
import com.comcast.money.core.async.AsyncTracingService
import com.comcast.money.core.internal.{ MDCSupport, SpanLocal }
Expand All @@ -41,56 +42,70 @@ class TraceAspect extends Reflections with TraceLogging {

@Around("traced(traceAnnotation)")
def adviseMethodsWithTracing(joinPoint: ProceedingJoinPoint, traceAnnotation: Traced): AnyRef = {
val key: String = traceAnnotation.value
if (traceAnnotation.async()) {
traceMethodAsync(joinPoint, traceAnnotation)
} else {
traceMethod(joinPoint, traceAnnotation)
}
}

private def traceMethod(joinPoint: ProceedingJoinPoint, traceAnnotation: Traced): AnyRef = {
val key = traceAnnotation.value()
val oldSpanName = mdcSupport.getSpanNameMDC
var result = true
var stopSpan = true

try {
tracer.startSpan(key)
mdcSupport.setSpanNameMDC(Some(key))
traceMethodArguments(joinPoint)

joinPoint.proceed()
} catch {
case t: Throwable =>
result = exceptionMatches(t, traceAnnotation.ignoredExceptions())
logException(t)
throw t
} finally {
tracer.stopSpan(result)
mdcSupport.setSpanNameMDC(oldSpanName)
}
}

private def traceMethodAsync(joinPoint: ProceedingJoinPoint, traceAnnotation: Traced): AnyRef = {
val key = traceAnnotation.value()
val asyncKey = key + "-async"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we append -async to the name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm creating two spans. I'm giving the second span which represents the async task the suffix -async. I could change that to do something else, or make it configurable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that is where I was confused. I don't understand why we need two separate spans.

We use Money heavily with scala futures, and all tracing is declaritive in the code. We want to give the user the ability to start / stop however we want.

It is a little unclear here with the @Traced annotation as to when the span actually starts. The way we have done this is that the span starts when the method is invoked and completes when the Future (Promise) ends. Something conceptually like this...

def doSomethingTraced(foo: String): Future[String] = {
  val span = startSpan("myName")
  val fut = doSomethingInTheFuture(foo)
  fut.onComplete { case Success(x) => span.stop(true); case Failure(e) => span.stop(false) }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That is the approach I was taking in the first iteration and I have no problem going back to that model. If I do that then the code above could be reduced to:

@Traced(value = "myName", async = true)
def doSomethingTraced(foo: String): Future[String] =
    doSomethingInTheFuture(foo)

val oldSpanName = mdcSupport.getSpanNameMDC
var result = true
var asyncSpan: Option[Span] = None

try {
tracer.startSpan(key)
mdcSupport.setSpanNameMDC(Some(key))
traceMethodArguments(joinPoint)

if (traceAnnotation.async()) {
val methodTimeKey = "method-time"
tracer.startTimer(methodTimeKey)
var retval = joinPoint.proceed
tracer.stopTimer(methodTimeKey)

AsyncTracingService.findTracingService(retval).foreach {
service =>
stopSpan = false
val asyncSpan = SpanLocal.current.get
val mdc = Option(MDC.getCopyOfContextMap)

retval = service.whenDone(retval, (_, exception) => {
mdcSupport.propogateMDC(mdc)
result = true
if (exception != null) {
logException(exception)
result = exceptionMatches(exception, traceAnnotation.ignoredExceptions())
}
asyncSpan.stop(result)
MDC.clear()
})
}

retval
} else {
joinPoint.proceed
tracer.startSpan(key + "-async")
asyncSpan = SpanLocal.pop()

val future = joinPoint.proceed()

AsyncTracingService.findTracingService(future) match {
case Some(service) =>
service.whenDone(future, (_, t) => {
val asyncResult = t == null || exceptionMatches(t, traceAnnotation.ignoredExceptions())
logException(t)
asyncSpan.foreach(_.stop(asyncResult))
})
case _ =>
future
}
} catch {
case t: Throwable =>
result = if (exceptionMatches(t, traceAnnotation.ignoredExceptions())) true else false
result = exceptionMatches(t, traceAnnotation.ignoredExceptions())
logException(t)
throw t
} finally {
tracer.stopSpan(result)
mdcSupport.setSpanNameMDC(oldSpanName)
if (stopSpan) {
tracer.stopSpan(result)
} else {
SpanLocal.pop()
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion money-aspectj/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ money {
log-exceptions = false

handling = {
async = true
async = false
handlers = [
{
class = "com.comcast.money.core.LogRecorderSpanHandler"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.mock.MockitoSugar

import scala.concurrent.Promise
import scala.util.Try
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import scala.util.{ Failure, Try }

class TraceAspectSpec extends WordSpec
with GivenWhenThen with OneInstancePerTest with BeforeAndAfterEach with Matchers with MockitoSugar with Eventually
Expand Down Expand Up @@ -80,10 +81,56 @@ class TraceAspectSpec extends WordSpec
}

@Traced(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As opposed to adding an async flag, with aspectj anyway we can add a different joinpoint / pointcut on functions with annotations @Trace that return a promise (I think). Have you tried that? Unsure off the top of my head if that would work

Copy link
Collaborator Author

@HaloFour HaloFour Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would probably work although might require some effort to avoid ambiguity with existing pointcut. The problem with that is that it locks it to supporting only specific types and there is a fairly wide ecosystem of these types out there since Java didn't add one offering a callback until Java 8. Promise[T] is quite specific to Scala and its uncommon to see in other Java libraries.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I am unclear if it would work. Can we have a test where, if someone sets async to be true, on something that is not async, that everything still works?

Imagine

@Traced(value="sync", async=true)
def plusOne(x: Int): Int = x + 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, the tests are a little anemic right now. This PR is more a spike for the purpose of discussion.

I might need to approach the unit tests a little differently as they should validate that log records aren't written until the promise/future is complete and the current helper methods don't really lend themselves to that, short of waiting for a timeout to expire. I'm thinking of mocking the Tracer instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main test that I would like to try out is nested futures (futures with flatmaps).

When I worked with scala Futures in the past, the onComplete of a future was executed in odd ways.

Imagine you have 3 levels of functions / nested futures via flatmaps and maps. You create child spans at each level, so 3 levels deep in a span tree. The onComplete and context that was passed was "off".

Sorry I don't have more information, I have to check out your PR and try and re-create the issue I saw

Copy link
Member

@pauljamescleary pauljamescleary Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think your example is odd. Seems like you have a future that yields a future as a value. That doesn't seem like a valid use case. I am curious if some of the contrived tests I wrote are things that people would never write?

Certainly .flatMap { x => Future { x + 1 } } is valid

But Future { Future { x } } is not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which example is that? I'm trying to duplicate (or at least reinterpret) the unit tests you had around traced futures. Some of the examples are contrived intentionally but I also wouldn't discount my flubbing things up. I've not had the luxury of dedicating my attention to this recently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was playing with going down a path of having TraceAspect replace the existing Span with a wrapper AsyncSpan which internally would use an AtomicInteger as a reference counter. Invoking a terminal method on Future like onComplete would bump that counter up. When that callback would complete it would call stop, which would decrease the counter, as would when the instrumented future completed. Only when the counter reached zero would the span actually be stopped. I don't think that's quite what you were talking about, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s a great idea!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that perhaps my old TraceAspect unit tests had test cases that were invalid.

value = "methodReturnsScalaPromise",
value = "asyncMethodReturnsPromise",
async = true
)
def methodReturnsScalaPromise(promise: Promise[String]): Promise[String] = promise
def asyncMethodReturnsPromise(promise: Promise[String]): Promise[String] = promise

@Traced(
value = "asyncMethodReturnsNull",
async = true
)
def asyncMethodReturnsNull(): Promise[String] = null

@Traced(
value = "asyncMethodReturnsNonPromise",
async = true
)
def asyncMethodReturnsNonPromise(): AnyRef = new Object

@Traced(
value = "asyncMethodThrowingException",
async = true
)
def asyncMethodThrowingException(): AnyRef = {
Thread.sleep(50)
throw new RuntimeException()
}

@Traced(
value = "asyncMethodWithIgnoredException",
async = true,
ignoredExceptions = Array(classOf[IllegalArgumentException])
)
def asyncMethodWithIgnoredException(): AnyRef = {
throw new IllegalArgumentException("ignored")
}

@Traced(
value = "asyncMethodWithNonMatchingIgnoredException",
async = true,
ignoredExceptions = Array(classOf[IllegalArgumentException])
)
def asyncMethodWithNonMatchingIgnoredException(): AnyRef = {
throw new RuntimeException("not-ignored")
}

@Traced(
value = "asyncMethodWithIgnoredException",
async = true,
ignoredExceptions = Array(classOf[IllegalArgumentException])
)
def asyncMethodWithIgnoredException(promise: Promise[String]): Promise[String] = promise

@Timed("methodWithTiming")
def methodWithTiming() = {
Expand All @@ -97,6 +144,7 @@ class TraceAspectSpec extends WordSpec

override def beforeEach() = {
reset(mockMdcSupport)
LogRecord.clear()
}

"TraceAspect" when {
Expand Down Expand Up @@ -124,7 +172,7 @@ class TraceAspectSpec extends WordSpec
expectLogMessageContaining("methodThrowingException")

And("a span-success is logged with a value of true")
expectLogMessageContaining("span-success=true")
expectLogMessageContaining("span-success=false")
}
"complete the trace with success for methods that throw ignored exceptions" in {
Given("a method that throws an ignored exception")
Expand Down Expand Up @@ -154,19 +202,150 @@ class TraceAspectSpec extends WordSpec
And("a span-success is logged with a value of false")
expectLogMessageContaining("span-success=false")
}
"handle async methods" in {
}
"advising methods by tracing them with async flag" should {
"handle async methods that return a future" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
methodReturnsScalaPromise(promise)
val future = asyncMethodReturnsPromise(promise)

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsPromise", _.name == "asyncMethodReturnsPromise")
dontExpectSpanInfoThat("is named asyncMethodReturnsPromise-async", _.name == "asyncMethodReturnsPromise-async")

When("the future completes")
promise.complete(Try("Success"))
expectLogMessageContaining("methodReturnsScalaPromise")
expectLogMessageContaining("method-time")
Await.ready(future.future, 500 millis)

LogRecord.log("log").foreach(System.out.println(_))
Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsPromise-async", _.name == "asyncMethodReturnsPromise-async")
}
"handle async methods that return a failed future" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodReturnsPromise(promise)

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsPromise", _.name == "asyncMethodReturnsPromise")
dontExpectSpanInfoThat("is named asyncMethodReturnsPromise-async", _.name == "asyncMethodReturnsPromise-async")

When("the future fails")
promise.complete(Failure(new RuntimeException()))
Await.ready(future.future, 500 millis)

Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsPromise-async and is not successful", span =>
span.name == "asyncMethodReturnsPromise-async" && !span.success())
}
"handle async methods that return a failed future with ignored exception" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodWithIgnoredException(promise)
Copy link
Collaborator

@ptravers ptravers Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should future maybe be promise. I feel like I'm being lied to


Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException")
dontExpectSpanInfoThat("is named asyncMethodWithIgnoredException-async", _.name == "asyncMethodWithIgnoredException-async")

When("the future fails")
promise.complete(Failure(new IllegalArgumentException()))
Await.ready(future.future, 500 millis)

Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException-async and is successful", span =>
span.name == "asyncMethodWithIgnoredException-async" && span.success())
}
"handle async methods that return a failed future with exception not in ignored list" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodWithIgnoredException(promise)

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException")
dontExpectSpanInfoThat("is named asyncMethodWithIgnoredException-async", _.name == "asyncMethodWithIgnoredException-async")

When("the future fails")
promise.complete(Failure(new RuntimeException()))
Await.ready(future.future, 500 millis)

Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException-async and is not successful", span =>
span.name == "asyncMethodWithIgnoredException-async" && !span.success())
}
"handle async methods that return a null future" in {
Given("a method that returns null")

When("the method is invoked")
val future = asyncMethodReturnsNull()

Then("the method and future execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsNull", _.name == "asyncMethodReturnsNull")
dontExpectSpanInfoThat("is named asyncMethodReturnsNull-async", _.name == "asyncMethodReturnsNull-async")
}
"handle async methods that return a non-future" in {
Given("a method that returns a non-future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodReturnsNonPromise()

Then("the method and future execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsNonPromise", _.name == "asyncMethodReturnsNonPromise")
dontExpectSpanInfoThat("is named asyncMethodReturnsNonPromise-async", _.name == "asyncMethodReturnsNonPromise-async")
}
"complete the trace for methods that throw exceptions" in {
Given("a method that throws an exception")

When("the method is invoked")
a[RuntimeException] should be thrownBy {
asyncMethodThrowingException()
}

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodThrowingException", _.name == "asyncMethodThrowingException")
dontExpectSpanInfoThat("is named asyncMethodThrowingException-async", _.name == "asyncMethodThrowingException-async")

And("a span-success is logged with a value of false")
expectLogMessageContaining("span-success=false")
}
"complete the trace with success for methods that throw ignored exceptions" in {
Given("a method that throws an ignored exception")

When("the method is invoked")
an[IllegalArgumentException] should be thrownBy {
asyncMethodWithIgnoredException()
}

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException")
dontExpectSpanInfoThat("is named asyncMethodWithIgnoredException-async", _.name == "asyncMethodWithIgnoredException-async")

And("a span-success is logged with a value of true")
expectLogMessageContaining("span-success=true")
}
"complete the trace with failure for methods that throw exceptions that are not in ignored list" in {
Given("a method that throws an ignored exception")

When("the method is invoked")
a[RuntimeException] should be thrownBy {
asyncMethodWithNonMatchingIgnoredException()
}

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodWithNonMatchingIgnoredException", _.name == "asyncMethodWithNonMatchingIgnoredException")
dontExpectSpanInfoThat("is named asyncMethodWithNonMatchingIgnoredException-async", _.name == "asyncMethodWithNonMatchingIgnoredException-async")

And("a span-success is logged with a value of false")
expectLogMessageContaining("span-success=false")
}

}
"advising methods that have parameters with the TracedData annotation" should {
"record the value of the parameter in the trace" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object AsyncTracingService {
private lazy val services = loadServices()

def findTracingService(future: AnyRef): Option[AsyncTracingService] =
services.find(_.supports(future))
if (future != null) services.find(_.supports(future)) else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicky, but...

Option(future).map(f => services.find(_.supports(f))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicky back at you, but: 😁

Option(future).flatMap(f => services.find(_.supports(f))


private def loadServices() =
ServiceLoader.load(classOf[AsyncTracingService], classOf[AsyncTracingService].getClassLoader).asScala.toList
Expand Down
Loading