Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tracing methods asynchronously #86

Merged
merged 10 commits into from
Apr 20, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,11 @@
* @return The array of exception classes that will be ignored
*/
Class[] ignoredExceptions() default {};

/**
* Indicates that the method being traced performs an asynchronous task and returns an instance
* that would indicate when the task has completed.
* @return Whether or not the traced method is asynchronous
*/
boolean async() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@ package com.comcast.money.aspectj

import com.comcast.money.annotations.{ Timed, Traced }
import com.comcast.money.core._
import com.comcast.money.core.internal.MDCSupport
import com.comcast.money.core.async.{ AsyncNotifier, DirectExecutionContext }
import com.comcast.money.core.internal.{ MDCSupport, SpanLocal }
import com.comcast.money.core.logging.TraceLogging
import com.comcast.money.core.reflect.Reflections
import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
import org.aspectj.lang.reflect.MethodSignature
import org.aspectj.lang.{ JoinPoint, ProceedingJoinPoint }
import org.slf4j.MDC

import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success }

@Aspect
class TraceAspect extends Reflections with TraceLogging {

val tracer: Tracer = Money.Environment.tracer
val asyncNotifier: AsyncNotifier = Money.Environment.asyncNotifier
val mdcSupport: MDCSupport = new MDCSupport()
implicit val executionContext: ExecutionContext = new DirectExecutionContext()

@Pointcut("execution(@com.comcast.money.annotations.Traced * *(..)) && @annotation(traceAnnotation)")
def traced(traceAnnotation: Traced) = {}
Expand All @@ -39,22 +46,52 @@ class TraceAspect extends Reflections with TraceLogging {

@Around("traced(traceAnnotation)")
def adviseMethodsWithTracing(joinPoint: ProceedingJoinPoint, traceAnnotation: Traced): AnyRef = {
val key: String = traceAnnotation.value
var result = true
val key = traceAnnotation.value()
val oldSpanName = mdcSupport.getSpanNameMDC
var result = true
var stopSpan = true

try {
tracer.startSpan(key)
mdcSupport.setSpanNameMDC(Some(key))
traceMethodArguments(joinPoint)
joinPoint.proceed

val returnValue = joinPoint.proceed()

if (traceAnnotation.async()) {
asyncNotifier.resolveHandler(returnValue).map {
handler =>
stopSpan = false
val span = SpanLocal.pop()
val mdc = Option(MDC.getCopyOfContextMap)

handler.whenComplete(returnValue, completed => {
mdcSupport.propogateMDC(mdc)

val result = completed match {
case Success(_) => true
case Failure(exception) =>
logException(exception)
exceptionMatches(exception, traceAnnotation.ignoredExceptions())
}

span.foreach(_.stop(result))
MDC.clear()
})
}.getOrElse(returnValue)
} else {
returnValue
}
} catch {
case t: Throwable =>
result = if (exceptionMatches(t, traceAnnotation.ignoredExceptions())) true else false
result = exceptionMatches(t, traceAnnotation.ignoredExceptions())
logException(t)
throw t
} finally {
if (stopSpan) {
tracer.stopSpan(result)
}
mdcSupport.setSpanNameMDC(oldSpanName)
tracer.stopSpan(result)
}
}

Expand Down
2 changes: 1 addition & 1 deletion money-aspectj/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ money {
log-exceptions = false

handling = {
async = true
async = false
handlers = [
{
class = "com.comcast.money.core.LogRecorderSpanHandler"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package com.comcast.money.aspectj

import com.comcast.money.annotations.{ Timed, Traced, TracedData }
import com.comcast.money.core.internal.MDCSupport
import com.comcast.money.core.{ Tracer, Money, SpecHelpers }
import com.comcast.money.core.{ LogRecord, Money, SpecHelpers }
import org.aspectj.lang.ProceedingJoinPoint
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.mock.MockitoSugar
import org.slf4j.MDC

import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import scala.util.{ Failure, Try }

class TraceAspectSpec extends WordSpec
with GivenWhenThen with OneInstancePerTest with BeforeAndAfterEach with Matchers with MockitoSugar with Eventually
Expand Down Expand Up @@ -76,6 +81,58 @@ class TraceAspectSpec extends WordSpec
throw new RuntimeException("not-ignored")
}

@Traced(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As opposed to adding an async flag, with aspectj anyway we can add a different joinpoint / pointcut on functions with annotations @Trace that return a promise (I think). Have you tried that? Unsure off the top of my head if that would work

Copy link
Collaborator Author

@HaloFour HaloFour Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would probably work although might require some effort to avoid ambiguity with existing pointcut. The problem with that is that it locks it to supporting only specific types and there is a fairly wide ecosystem of these types out there since Java didn't add one offering a callback until Java 8. Promise[T] is quite specific to Scala and its uncommon to see in other Java libraries.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I am unclear if it would work. Can we have a test where, if someone sets async to be true, on something that is not async, that everything still works?

Imagine

@Traced(value="sync", async=true)
def plusOne(x: Int): Int = x + 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, the tests are a little anemic right now. This PR is more a spike for the purpose of discussion.

I might need to approach the unit tests a little differently as they should validate that log records aren't written until the promise/future is complete and the current helper methods don't really lend themselves to that, short of waiting for a timeout to expire. I'm thinking of mocking the Tracer instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main test that I would like to try out is nested futures (futures with flatmaps).

When I worked with scala Futures in the past, the onComplete of a future was executed in odd ways.

Imagine you have 3 levels of functions / nested futures via flatmaps and maps. You create child spans at each level, so 3 levels deep in a span tree. The onComplete and context that was passed was "off".

Sorry I don't have more information, I have to check out your PR and try and re-create the issue I saw

Copy link
Member

@pauljamescleary pauljamescleary Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think your example is odd. Seems like you have a future that yields a future as a value. That doesn't seem like a valid use case. I am curious if some of the contrived tests I wrote are things that people would never write?

Certainly .flatMap { x => Future { x + 1 } } is valid

But Future { Future { x } } is not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which example is that? I'm trying to duplicate (or at least reinterpret) the unit tests you had around traced futures. Some of the examples are contrived intentionally but I also wouldn't discount my flubbing things up. I've not had the luxury of dedicating my attention to this recently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was playing with going down a path of having TraceAspect replace the existing Span with a wrapper AsyncSpan which internally would use an AtomicInteger as a reference counter. Invoking a terminal method on Future like onComplete would bump that counter up. When that callback would complete it would call stop, which would decrease the counter, as would when the instrumented future completed. Only when the counter reached zero would the span actually be stopped. I don't think that's quite what you were talking about, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s a great idea!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that perhaps my old TraceAspect unit tests had test cases that were invalid.

value = "asyncMethodReturnsPromise",
async = true
)
def asyncMethodReturnsPromise(promise: Promise[String]): Promise[String] = promise

@Traced(
value = "asyncMethodReturnsNull",
async = true
)
def asyncMethodReturnsNull(): Promise[String] = null

@Traced(
value = "asyncMethodReturnsNonPromise",
async = true
)
def asyncMethodReturnsNonPromise(): AnyRef = new Object

@Traced(
value = "asyncMethodThrowingException",
async = true
)
def asyncMethodThrowingException(): AnyRef = {
Thread.sleep(50)
throw new RuntimeException()
}

@Traced(
value = "asyncMethodWithIgnoredException",
async = true,
ignoredExceptions = Array(classOf[IllegalArgumentException])
)
def asyncMethodWithIgnoredException(): AnyRef = {
throw new IllegalArgumentException("ignored")
}

@Traced(
value = "asyncMethodWithNonMatchingIgnoredException",
async = true,
ignoredExceptions = Array(classOf[IllegalArgumentException])
)
def asyncMethodWithNonMatchingIgnoredException(): AnyRef = {
throw new RuntimeException("not-ignored")
}

@Traced(
value = "asyncMethodWithIgnoredException",
async = true,
ignoredExceptions = Array(classOf[IllegalArgumentException])
)
def asyncMethodWithIgnoredException(promise: Promise[String]): Promise[String] = promise

@Timed("methodWithTiming")
def methodWithTiming() = {
Thread.sleep(50)
Expand All @@ -88,6 +145,7 @@ class TraceAspectSpec extends WordSpec

override def beforeEach() = {
reset(mockMdcSupport)
LogRecord.clear()
}

"TraceAspect" when {
Expand Down Expand Up @@ -115,7 +173,7 @@ class TraceAspectSpec extends WordSpec
expectLogMessageContaining("methodThrowingException")

And("a span-success is logged with a value of true")
expectLogMessageContaining("span-success=true")
expectLogMessageContaining("span-success=false")
}
"complete the trace with success for methods that throw ignored exceptions" in {
Given("a method that throws an ignored exception")
Expand Down Expand Up @@ -146,6 +204,141 @@ class TraceAspectSpec extends WordSpec
expectLogMessageContaining("span-success=false")
}
}
"advising methods by tracing them with async flag" should {
"handle async methods that return a future" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodReturnsPromise(promise)

Then("the method execution is logged")
dontExpectSpanInfoThat("is named asyncMethodReturnsPromise", _.name == "asyncMethodReturnsPromise")

When("the future completes")
promise.complete(Try("Success"))
Await.ready(future.future, 500 millis)

Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsPromise", _.name == "asyncMethodReturnsPromise")
}
"handle async methods that return a failed future" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodReturnsPromise(promise)

Then("the method execution is logged")
dontExpectSpanInfoThat("is named asyncMethodReturnsPromise", _.name == "asyncMethodReturnsPromise")

When("the future fails")
promise.complete(Failure(new RuntimeException()))
Await.ready(future.future, 500 millis)

Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsPromise and is not successful", span =>
span.name == "asyncMethodReturnsPromise" && !span.success())
}
"handle async methods that return a failed future with ignored exception" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodWithIgnoredException(promise)
Copy link
Collaborator

@ptravers ptravers Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should future maybe be promise. I feel like I'm being lied to


Then("the method execution is logged")
dontExpectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException")

When("the future fails")
promise.complete(Failure(new IllegalArgumentException()))
Await.ready(future.future, 500 millis)

Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException and is successful", span =>
span.name == "asyncMethodWithIgnoredException" && span.success())
}
"handle async methods that return a failed future with exception not in ignored list" in {
Given("a method that returns a future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodWithIgnoredException(promise)

Then("the method execution is logged")
dontExpectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException")

When("the future fails")
promise.complete(Failure(new RuntimeException()))
Await.ready(future.future, 500 millis)

Then("the async execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException and is not successful", span =>
span.name == "asyncMethodWithIgnoredException" && !span.success())
}
"handle async methods that return a null future" in {
Given("a method that returns null")

When("the method is invoked")
val future = asyncMethodReturnsNull()

Then("the method and future execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsNull", _.name == "asyncMethodReturnsNull")
}
"handle async methods that return a non-future" in {
Given("a method that returns a non-future")

When("the method is invoked")
val promise = Promise[String]()
val future = asyncMethodReturnsNonPromise()

Then("the method and future execution is logged")
expectSpanInfoThat("is named asyncMethodReturnsNonPromise", _.name == "asyncMethodReturnsNonPromise")
}
"complete the trace for methods that throw exceptions" in {
Given("a method that throws an exception")

When("the method is invoked")
a[RuntimeException] should be thrownBy {
asyncMethodThrowingException()
}

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodThrowingException", _.name == "asyncMethodThrowingException")

And("a span-success is logged with a value of false")
expectLogMessageContaining("span-success=false")
}
"complete the trace with success for methods that throw ignored exceptions" in {
Given("a method that throws an ignored exception")

When("the method is invoked")
an[IllegalArgumentException] should be thrownBy {
asyncMethodWithIgnoredException()
}

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException")

And("a span-success is logged with a value of true")
expectLogMessageContaining("span-success=true")
}
"complete the trace with failure for methods that throw exceptions that are not in ignored list" in {
Given("a method that throws an ignored exception")

When("the method is invoked")
a[RuntimeException] should be thrownBy {
asyncMethodWithNonMatchingIgnoredException()
}

Then("the method execution is logged")
expectSpanInfoThat("is named asyncMethodWithNonMatchingIgnoredException", _.name == "asyncMethodWithNonMatchingIgnoredException")

And("a span-success is logged with a value of false")
expectLogMessageContaining("span-success=false")
}

}
"advising methods that have parameters with the TracedData annotation" should {
"record the value of the parameter in the trace" in {
Given("a method that has arguments with the TraceData annotation")
Expand Down
8 changes: 8 additions & 0 deletions money-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ money {
}
]
}

async-notifier {
handlers = [
{
class = "com.comcast.money.core.async.ScalaFutureNotificationHandler"
}
]
}
}
9 changes: 6 additions & 3 deletions money-core/src/main/scala/com/comcast/money/core/Money.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.comcast.money.core
import java.net.InetAddress

import com.comcast.money.api.{ SpanFactory, SpanHandler }
import com.comcast.money.core.async.{ AsyncNotificationHandler, AsyncNotifier }
import com.comcast.money.core.handlers.HandlerChain
import com.typesafe.config.{ Config, ConfigFactory }

Expand All @@ -29,7 +30,8 @@ case class Money(
hostName: String,
factory: SpanFactory,
tracer: Tracer,
logExceptions: Boolean = false
logExceptions: Boolean = false,
asyncNotifier: AsyncNotifier = new AsyncNotifier(Seq())
)

object Money {
Expand All @@ -48,13 +50,14 @@ object Money {
override val spanFactory: SpanFactory = factory
}
val logExceptions = conf.getBoolean("log-exceptions")
Money(enabled, handler, applicationName, hostName, factory, tracer, logExceptions)
val asyncNotificationHandlerChain = AsyncNotifier(conf.getConfig("async-notifier"))
Money(enabled, handler, applicationName, hostName, factory, tracer, logExceptions, asyncNotificationHandlerChain)
} else {
disabled(applicationName, hostName)
}

}

private def disabled(applicationName: String, hostName: String): Money =
Money(false, DisabledSpanHandler, applicationName, hostName, DisabledSpanFactory, DisabledTracer)
Money(enabled = false, DisabledSpanHandler, applicationName, hostName, DisabledSpanFactory, DisabledTracer)
}
Loading