diff --git a/money-api/src/main/java/com/comcast/money/annotations/Traced.java b/money-api/src/main/java/com/comcast/money/annotations/Traced.java index a91a8b18..b88c43e5 100644 --- a/money-api/src/main/java/com/comcast/money/annotations/Traced.java +++ b/money-api/src/main/java/com/comcast/money/annotations/Traced.java @@ -75,4 +75,11 @@ * @return The array of exception classes that will be ignored */ Class[] ignoredExceptions() default {}; + + /** + * Indicates that the method being traced performs an asynchronous task and returns an instance + * that would indicate when the task has completed. + * @return Whether or not the traced method is asynchronous + */ + boolean async() default false; } diff --git a/money-aspectj/src/main/scala/com/comcast/money/aspectj/TraceAspect.scala b/money-aspectj/src/main/scala/com/comcast/money/aspectj/TraceAspect.scala index 839a7dc1..c4641cb3 100644 --- a/money-aspectj/src/main/scala/com/comcast/money/aspectj/TraceAspect.scala +++ b/money-aspectj/src/main/scala/com/comcast/money/aspectj/TraceAspect.scala @@ -18,17 +18,22 @@ package com.comcast.money.aspectj import com.comcast.money.annotations.{ Timed, Traced } import com.comcast.money.core._ -import com.comcast.money.core.internal.MDCSupport +import com.comcast.money.core.async.AsyncNotifier +import com.comcast.money.core.internal.{ MDCSupport, SpanLocal } import com.comcast.money.core.logging.TraceLogging import com.comcast.money.core.reflect.Reflections import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import org.aspectj.lang.reflect.MethodSignature import org.aspectj.lang.{ JoinPoint, ProceedingJoinPoint } +import org.slf4j.MDC + +import scala.util.{ Failure, Success } @Aspect class TraceAspect extends Reflections with TraceLogging { val tracer: Tracer = Money.Environment.tracer + val asyncNotifier: AsyncNotifier = Money.Environment.asyncNotifier val mdcSupport: MDCSupport = new MDCSupport() @Pointcut("execution(@com.comcast.money.annotations.Traced * *(..)) && @annotation(traceAnnotation)") @@ -39,22 +44,38 @@ class TraceAspect extends Reflections with TraceLogging { @Around("traced(traceAnnotation)") def adviseMethodsWithTracing(joinPoint: ProceedingJoinPoint, traceAnnotation: Traced): AnyRef = { - val key: String = traceAnnotation.value - var result = true + val key = traceAnnotation.value() val oldSpanName = mdcSupport.getSpanNameMDC + var spanResult: Option[Boolean] = Some(true) + try { tracer.startSpan(key) mdcSupport.setSpanNameMDC(Some(key)) traceMethodArguments(joinPoint) - joinPoint.proceed + + val returnValue = joinPoint.proceed() + + if (traceAnnotation.async()) { + traceAsyncResult(traceAnnotation, returnValue) match { + case Some(asyncResult) => + // Do not stop the span when the advice returns as the span will + // be stopped by the callback registered to the `AsyncNotificationHandler` + spanResult = None + asyncResult + case None => + returnValue + } + } else { + returnValue + } } catch { case t: Throwable => - result = if (exceptionMatches(t, traceAnnotation.ignoredExceptions())) true else false + spanResult = Some(exceptionMatches(t, traceAnnotation.ignoredExceptions())) logException(t) throw t } finally { + spanResult.foreach(tracer.stopSpan) mdcSupport.setSpanNameMDC(oldSpanName) - tracer.stopSpan(result) } } @@ -77,4 +98,42 @@ class TraceAspect extends Reflections with TraceLogging { } } } + + /** + * Binds the duration and result of the current span to the return value of the traced method + * + * @param traceAnnotation The `@Traced` annotation applied to the method + * @param returnValue The return value from the `@Traced` method + * @return An option with the result from the `AsyncNotificationHandler`, or `None` if no handler + * supports the return value + */ + private def traceAsyncResult(traceAnnotation: Traced, returnValue: AnyRef): Option[AnyRef] = + // attempt to resolve the AsyncNotificationHandler for the return value + asyncNotifier.resolveHandler(returnValue).map { + handler => + // pop the current span from the stack as it will not be stopped by the tracer + val span = SpanLocal.pop() + // capture the current MDC context to be applied on the callback thread + val mdc = Option(MDC.getCopyOfContextMap) + + // register callback to be invoked when the future is completed + handler.whenComplete(returnValue, completed => { + + // reapply the MDC onto the callback thread + mdcSupport.propogateMDC(mdc) + + // determine if the future completed successfully or exceptionally + val result = completed match { + case Success(_) => true + case Failure(exception) => + logException(exception) + exceptionMatches(exception, traceAnnotation.ignoredExceptions()) + } + + // stop the captured span with the success/failure flag + span.foreach(_.stop(result)) + // clear the MDC from the callback thread + MDC.clear() + }) + } } diff --git a/money-aspectj/src/test/resources/application.conf b/money-aspectj/src/test/resources/application.conf index d2f1780b..be5d38a8 100644 --- a/money-aspectj/src/test/resources/application.conf +++ b/money-aspectj/src/test/resources/application.conf @@ -19,7 +19,7 @@ money { log-exceptions = false handling = { - async = true + async = false handlers = [ { class = "com.comcast.money.core.LogRecorderSpanHandler" diff --git a/money-aspectj/src/test/scala/com/comcast/money/aspectj/TraceAspectSpec.scala b/money-aspectj/src/test/scala/com/comcast/money/aspectj/TraceAspectSpec.scala index 4256cb4e..b94cdcb9 100644 --- a/money-aspectj/src/test/scala/com/comcast/money/aspectj/TraceAspectSpec.scala +++ b/money-aspectj/src/test/scala/com/comcast/money/aspectj/TraceAspectSpec.scala @@ -18,12 +18,17 @@ package com.comcast.money.aspectj import com.comcast.money.annotations.{ Timed, Traced, TracedData } import com.comcast.money.core.internal.MDCSupport -import com.comcast.money.core.{ Tracer, Money, SpecHelpers } +import com.comcast.money.core.{ LogRecord, Money, SpecHelpers } import org.aspectj.lang.ProceedingJoinPoint import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.concurrent.Eventually import org.scalatest.mock.MockitoSugar +import org.slf4j.MDC + +import scala.concurrent.{ Await, Future, Promise } +import scala.concurrent.duration._ +import scala.util.{ Failure, Try } class TraceAspectSpec extends WordSpec with GivenWhenThen with OneInstancePerTest with BeforeAndAfterEach with Matchers with MockitoSugar with Eventually @@ -76,6 +81,58 @@ class TraceAspectSpec extends WordSpec throw new RuntimeException("not-ignored") } + @Traced( + value = "asyncMethodReturnsFuture", + async = true + ) + def asyncMethodReturnsFuture(future: Future[String]): Future[String] = future + + @Traced( + value = "asyncMethodReturnsNull", + async = true + ) + def asyncMethodReturnsNull(): Future[String] = null + + @Traced( + value = "asyncMethodReturnsNonFuture", + async = true + ) + def asyncMethodReturnsNonFuture(): AnyRef = new Object + + @Traced( + value = "asyncMethodThrowingException", + async = true + ) + def asyncMethodThrowingException(): AnyRef = { + Thread.sleep(50) + throw new RuntimeException() + } + + @Traced( + value = "asyncMethodWithIgnoredException", + async = true, + ignoredExceptions = Array(classOf[IllegalArgumentException]) + ) + def asyncMethodWithIgnoredException(): AnyRef = { + throw new IllegalArgumentException("ignored") + } + + @Traced( + value = "asyncMethodWithNonMatchingIgnoredException", + async = true, + ignoredExceptions = Array(classOf[IllegalArgumentException]) + ) + def asyncMethodWithNonMatchingIgnoredException(): AnyRef = { + throw new RuntimeException("not-ignored") + } + + @Traced( + value = "asyncMethodWithIgnoredException", + async = true, + ignoredExceptions = Array(classOf[IllegalArgumentException]) + ) + def asyncMethodWithIgnoredException(future: Future[String]): Future[String] = future + @Timed("methodWithTiming") def methodWithTiming() = { Thread.sleep(50) @@ -88,6 +145,7 @@ class TraceAspectSpec extends WordSpec override def beforeEach() = { reset(mockMdcSupport) + LogRecord.clear() } "TraceAspect" when { @@ -115,7 +173,7 @@ class TraceAspectSpec extends WordSpec expectLogMessageContaining("methodThrowingException") And("a span-success is logged with a value of true") - expectLogMessageContaining("span-success=true") + expectLogMessageContaining("span-success=false") } "complete the trace with success for methods that throw ignored exceptions" in { Given("a method that throws an ignored exception") @@ -146,6 +204,140 @@ class TraceAspectSpec extends WordSpec expectLogMessageContaining("span-success=false") } } + "advising methods by tracing them with async flag" should { + "handle async methods that return a future" in { + Given("a method that returns a future") + + When("the method is invoked") + val promise = Promise[String]() + val future = asyncMethodReturnsFuture(promise.future) + + Then("the method execution is logged") + dontExpectSpanInfoThat("is named asyncMethodReturnsFuture", _.name == "asyncMethodReturnsFuture") + + When("the future completes") + promise.complete(Try("Success")) + Await.ready(future, 500 millis) + + Then("the async execution is logged") + expectSpanInfoThat("is named asyncMethodReturnsFuture", _.name == "asyncMethodReturnsFuture") + } + "handle async methods that return a failed future" in { + Given("a method that returns a future") + + When("the method is invoked") + val promise = Promise[String]() + val future = asyncMethodReturnsFuture(promise.future) + + Then("the method execution is logged") + dontExpectSpanInfoThat("is named asyncMethodReturnsFuture", _.name == "asyncMethodReturnsFuture") + + When("the future fails") + promise.complete(Failure(new RuntimeException())) + Await.ready(future, 500 millis) + + Then("the async execution is logged") + expectSpanInfoThat("is named asyncMethodReturnsFuture and is not successful", span => + span.name == "asyncMethodReturnsFuture" && !span.success()) + } + "handle async methods that return a failed future with ignored exception" in { + Given("a method that returns a future") + + When("the method is invoked") + val promise = Promise[String]() + val future = asyncMethodWithIgnoredException(promise.future) + + Then("the method execution is logged") + dontExpectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException") + + When("the future fails") + promise.complete(Failure(new IllegalArgumentException())) + Await.ready(future, 500 millis) + + Then("the async execution is logged") + expectSpanInfoThat("is named asyncMethodWithIgnoredException and is successful", span => + span.name == "asyncMethodWithIgnoredException" && span.success()) + } + "handle async methods that return a failed future with exception not in ignored list" in { + Given("a method that returns a future") + + When("the method is invoked") + val promise = Promise[String]() + val future = asyncMethodWithIgnoredException(promise.future) + + Then("the method execution is logged") + dontExpectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException") + + When("the future fails") + promise.complete(Failure(new RuntimeException())) + Await.ready(future, 500 millis) + + Then("the async execution is logged") + expectSpanInfoThat("is named asyncMethodWithIgnoredException and is not successful", span => + span.name == "asyncMethodWithIgnoredException" && !span.success()) + } + "handle async methods that return a null future" in { + Given("a method that returns null") + + When("the method is invoked") + val _ = asyncMethodReturnsNull() + + Then("the method and future execution is logged") + expectSpanInfoThat("is named asyncMethodReturnsNull", _.name == "asyncMethodReturnsNull") + } + "handle async methods that return a non-future" in { + Given("a method that returns a non-future") + + When("the method is invoked") + val _ = asyncMethodReturnsNonFuture() + + Then("the method and future execution is logged") + expectSpanInfoThat("is named asyncMethodReturnsNonFuture", _.name == "asyncMethodReturnsNonFuture") + } + "complete the trace for methods that throw exceptions" in { + Given("a method that throws an exception") + + When("the method is invoked") + a[RuntimeException] should be thrownBy { + asyncMethodThrowingException() + } + + Then("the method execution is logged") + expectSpanInfoThat("is named asyncMethodThrowingException", _.name == "asyncMethodThrowingException") + + And("a span-success is logged with a value of false") + expectLogMessageContaining("span-success=false") + } + "complete the trace with success for methods that throw ignored exceptions" in { + Given("a method that throws an ignored exception") + + When("the method is invoked") + an[IllegalArgumentException] should be thrownBy { + asyncMethodWithIgnoredException() + } + + Then("the method execution is logged") + expectSpanInfoThat("is named asyncMethodWithIgnoredException", _.name == "asyncMethodWithIgnoredException") + + And("a span-success is logged with a value of true") + expectLogMessageContaining("span-success=true") + } + "complete the trace with failure for methods that throw exceptions that are not in ignored list" in { + Given("a method that throws an ignored exception") + + When("the method is invoked") + a[RuntimeException] should be thrownBy { + asyncMethodWithNonMatchingIgnoredException() + } + + Then("the method execution is logged") + expectSpanInfoThat("is named asyncMethodWithNonMatchingIgnoredException", _.name == "asyncMethodWithNonMatchingIgnoredException") + + And("a span-success is logged with a value of false") + expectLogMessageContaining("span-success=false") + } + + } "advising methods that have parameters with the TracedData annotation" should { "record the value of the parameter in the trace" in { Given("a method that has arguments with the TraceData annotation") diff --git a/money-core/src/main/resources/reference.conf b/money-core/src/main/resources/reference.conf index f4c59349..fb3f0ea5 100644 --- a/money-core/src/main/resources/reference.conf +++ b/money-core/src/main/resources/reference.conf @@ -44,4 +44,12 @@ money { } ] } + + async-notifier { + handlers = [ + { + class = "com.comcast.money.core.async.ScalaFutureNotificationHandler" + } + ] + } } diff --git a/money-core/src/main/scala/com/comcast/money/core/Money.scala b/money-core/src/main/scala/com/comcast/money/core/Money.scala index 9c64f3b4..900e85a3 100644 --- a/money-core/src/main/scala/com/comcast/money/core/Money.scala +++ b/money-core/src/main/scala/com/comcast/money/core/Money.scala @@ -19,6 +19,7 @@ package com.comcast.money.core import java.net.InetAddress import com.comcast.money.api.{ SpanFactory, SpanHandler } +import com.comcast.money.core.async.{ AsyncNotificationHandler, AsyncNotifier } import com.comcast.money.core.handlers.HandlerChain import com.typesafe.config.{ Config, ConfigFactory } @@ -29,7 +30,8 @@ case class Money( hostName: String, factory: SpanFactory, tracer: Tracer, - logExceptions: Boolean = false + logExceptions: Boolean = false, + asyncNotifier: AsyncNotifier = new AsyncNotifier(Seq()) ) object Money { @@ -48,7 +50,8 @@ object Money { override val spanFactory: SpanFactory = factory } val logExceptions = conf.getBoolean("log-exceptions") - Money(enabled, handler, applicationName, hostName, factory, tracer, logExceptions) + val asyncNotificationHandlerChain = AsyncNotifier(conf.getConfig("async-notifier")) + Money(enabled, handler, applicationName, hostName, factory, tracer, logExceptions, asyncNotificationHandlerChain) } else { disabled(applicationName, hostName) } @@ -56,5 +59,5 @@ object Money { } private def disabled(applicationName: String, hostName: String): Money = - Money(false, DisabledSpanHandler, applicationName, hostName, DisabledSpanFactory, DisabledTracer) + Money(enabled = false, DisabledSpanHandler, applicationName, hostName, DisabledSpanFactory, DisabledTracer) } diff --git a/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotificationHandler.scala b/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotificationHandler.scala new file mode 100644 index 00000000..fddaed7d --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotificationHandler.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import scala.util.Try + +trait AsyncNotificationHandler { + def supports(future: AnyRef): Boolean + def whenComplete(future: AnyRef, f: Try[_] => Unit): AnyRef +} diff --git a/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotificationHandlerFactory.scala b/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotificationHandlerFactory.scala new file mode 100644 index 00000000..45744f0b --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotificationHandlerFactory.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import com.typesafe.config.Config + +object AsyncNotificationHandlerFactory { + + def create(config: Config): AsyncNotificationHandler = { + val className = config.getString("class") + + val handlerInstance = Class.forName(className) + .newInstance() + .asInstanceOf[AsyncNotificationHandler] + + handlerInstance match { + case configurable: ConfigurableNotificationHandler => + configurable.configure(config) + configurable + + case _ => handlerInstance + } + } +} diff --git a/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotifier.scala b/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotifier.scala new file mode 100644 index 00000000..31c0c6ef --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/async/AsyncNotifier.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import com.typesafe.config.Config + +import scala.collection.JavaConversions._ + +case class AsyncNotifier(handlers: Seq[AsyncNotificationHandler]) { + def resolveHandler(future: AnyRef): Option[AsyncNotificationHandler] = + Option(future).flatMap(f => handlers.find(_.supports(f))) +} + +object AsyncNotifier { + import AsyncNotificationHandlerFactory.create + + def apply(config: Config): AsyncNotifier = { + val handlers = config.getConfigList("handlers").map(create) + new AsyncNotifier(handlers) + } +} diff --git a/money-core/src/main/scala/com/comcast/money/core/async/ConfigurableNotificationHandler.scala b/money-core/src/main/scala/com/comcast/money/core/async/ConfigurableNotificationHandler.scala new file mode 100644 index 00000000..4cb2e1c7 --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/async/ConfigurableNotificationHandler.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import com.typesafe.config.Config + +trait ConfigurableNotificationHandler extends AsyncNotificationHandler { + + def configure(config: Config): Unit +} diff --git a/money-core/src/main/scala/com/comcast/money/core/async/DirectExecutionContext.scala b/money-core/src/main/scala/com/comcast/money/core/async/DirectExecutionContext.scala new file mode 100644 index 00000000..ac5c3b6f --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/async/DirectExecutionContext.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import scala.concurrent.ExecutionContext + +class DirectExecutionContext extends ExecutionContext { + override def execute(runnable: Runnable): Unit = { + try { + runnable.run() + } catch { + case t: Throwable => + reportFailure(t) + } + } + override def reportFailure(cause: Throwable): Unit = ExecutionContext.defaultReporter +} diff --git a/money-core/src/main/scala/com/comcast/money/core/async/ScalaFutureNotificationHandler.scala b/money-core/src/main/scala/com/comcast/money/core/async/ScalaFutureNotificationHandler.scala new file mode 100644 index 00000000..d3b92de8 --- /dev/null +++ b/money-core/src/main/scala/com/comcast/money/core/async/ScalaFutureNotificationHandler.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Try } + +class ScalaFutureNotificationHandler extends AsyncNotificationHandler { + implicit val executionContext: ExecutionContext = new DirectExecutionContext + + override def supports(future: AnyRef): Boolean = + future != null && future.isInstanceOf[Future[_]] + + override def whenComplete(future: AnyRef, f: Try[_] => Unit): Future[_] = { + future.asInstanceOf[Future[_]].transform(result => { + f(Try(result)) + result + }, throwable => { + f(Failure(throwable)) + throwable + }) + } +} diff --git a/money-core/src/test/scala/com/comcast/money/core/SpecHelpers.scala b/money-core/src/test/scala/com/comcast/money/core/SpecHelpers.scala index 7d10482a..8266d2bb 100644 --- a/money-core/src/test/scala/com/comcast/money/core/SpecHelpers.scala +++ b/money-core/src/test/scala/com/comcast/money/core/SpecHelpers.scala @@ -16,7 +16,7 @@ package com.comcast.money.core -import com.comcast.money.api.SpanId +import com.comcast.money.api.{ SpanId, SpanInfo } import com.comcast.money.core.handlers.LoggingSpanHandler import com.typesafe.config.Config import org.scalatest.Matchers @@ -27,14 +27,21 @@ import scala.collection.{ Set, mutable } import scala.concurrent.duration._ object LogRecord { - + private val spans = new mutable.ArrayBuffer[SpanInfo] private val messages = new mutable.HashMap[String, mutable.Set[String]] with mutable.MultiMap[String, String] - def clear() = messages.clear() + def clear(): Unit = { + messages.clear() + spans.clear() + } + + def add(log: String, message: String): Unit = messages.addBinding(log, message) + + def add(spanInfo: SpanInfo): Unit = spans.append(spanInfo) - def add(log: String, message: String) = messages.addBinding(log, message) + def contains(log: String)(cond: String => Boolean): Boolean = messages.entryExists(log, cond) - def contains(log: String)(cond: String => Boolean) = messages.entryExists(log, cond) + def contains(cond: SpanInfo => Boolean): Boolean = spans.exists(cond) def log(name: String): Set[String] = messages.getOrElse(name, mutable.Set.empty) } @@ -46,15 +53,36 @@ class LogRecorderSpanHandler extends LoggingSpanHandler { logFunction = record } + override def handle(spanInfo: SpanInfo): Unit = { + LogRecord.add(spanInfo) + super.handle(spanInfo) + } + def record(message: String): Unit = LogRecord.add("log", message) } trait SpecHelpers extends Eventually { this: Matchers => - def awaitCond(condition: => Boolean, max: FiniteDuration = 2.seconds, interval: Duration = 100.millis, message: String = "failed waiting") = + def awaitCond(condition: => Boolean, max: FiniteDuration = 2.seconds, interval: Duration = 100.millis, message: String = "failed waiting"): Unit = { + implicit val patienceConfig: PatienceConfig = PatienceConfig(Span(max.toMillis, Millis), Span(interval.toMillis, Millis)) eventually { - condition shouldBe true - }(PatienceConfig(Span(max.toMillis, Millis), Span(interval.toMillis, Millis))) + assert(condition, message) + } + } + + def expectSpanInfoThat(message: String, condition: SpanInfo => Boolean, wait: FiniteDuration = 2.seconds): Unit = { + awaitCond( + LogRecord.contains(condition), wait, 100 milliseconds, + s"Expected span info that $message not found after $wait" + ) + } + + def dontExpectSpanInfoThat(message: String, condition: SpanInfo => Boolean, wait: FiniteDuration = 2.seconds): Unit = { + awaitCond( + !LogRecord.contains(condition), wait, 100 milliseconds, + s"Not expected span info that $message found after $wait" + ) + } def expectLogMessageContaining(contains: String, wait: FiniteDuration = 2.seconds) { awaitCond( diff --git a/money-core/src/test/scala/com/comcast/money/core/async/AsyncNotifierSpec.scala b/money-core/src/test/scala/com/comcast/money/core/async/AsyncNotifierSpec.scala new file mode 100644 index 00000000..ad94997a --- /dev/null +++ b/money-core/src/test/scala/com/comcast/money/core/async/AsyncNotifierSpec.scala @@ -0,0 +1,116 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import com.comcast.money.core.SpecHelpers +import com.comcast.money.core.concurrent.ConcurrentSupport +import com.typesafe.config.ConfigFactory +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{ Matchers, OneInstancePerTest, WordSpecLike } +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Future + +class AsyncNotifierSpec + extends WordSpecLike + with MockitoSugar with Matchers with ConcurrentSupport with OneInstancePerTest with SpecHelpers { + + "AsyncNotifier" should { + "create a sequence of AsyncNotificationServices" in { + val config = ConfigFactory.parseString( + """ + |{ + | handlers = [ + | { + | class = "com.comcast.money.core.async.NonConfiguredNotificationHandler" + | }, + | { + | class = "com.comcast.money.core.async.NonConfiguredNotificationHandler" + | } + | ] + |} + """.stripMargin + ) + + val result = AsyncNotifier(config) + result shouldBe a[AsyncNotifier] + result.handlers.size shouldBe 2 + } + "configures an AsyncNotificationHandler that implements ConfigurableNotificationHandler" in { + val config = ConfigFactory.parseString( + """ + |{ + | handlers = [ + | { + | class = "com.comcast.money.core.async.NonConfiguredNotificationHandler" + | }, + | { + | class = "com.comcast.money.core.async.ConfiguredNotificationHandler" + | } + | ] + |} + """.stripMargin + ) + + val result = AsyncNotifier(config) + result shouldBe a[AsyncNotifier] + result.handlers.size shouldBe 2 + + result.handlers.head shouldBe a[NonConfiguredNotificationHandler] + result.handlers.last shouldBe a[ConfiguredNotificationHandler] + + result.handlers.last.asInstanceOf[ConfiguredNotificationHandler].calledConfigure shouldBe true + } + "find AsyncNotificationHandler that supports Future" in { + val mockHandler = mock[AsyncNotificationHandler] + val future = mock[Future[String]] + + val asyncNotifier = AsyncNotifier(Seq(mockHandler)) + doReturn(true).when(mockHandler).supports(future) + + val result = asyncNotifier.resolveHandler(future) + + verify(mockHandler, times(1)).supports(future) + + result.isDefined shouldEqual true + result.get shouldEqual mockHandler + } + "not find any AsyncNotificationHandler for null" in { + val mockHandler = mock[AsyncNotificationHandler] + + val asyncNotifier = AsyncNotifier(Seq(mockHandler)) + doReturn(true).when(mockHandler).supports(any) + + val result = asyncNotifier.resolveHandler(null) + + result.isEmpty shouldEqual true + verify(mockHandler, never).supports(any) + } + "not find AsyncNotificationHandler when not supported" in { + val mockHandler = mock[AsyncNotificationHandler] + + val asyncNotifier = AsyncNotifier(Seq(mockHandler)) + doReturn(false).when(mockHandler).supports(any) + + val result = asyncNotifier.resolveHandler(new Object) + + result.isEmpty shouldEqual true + verify(mockHandler, times(1)).supports(any) + } + } +} diff --git a/money-core/src/test/scala/com/comcast/money/core/async/DirectExecutionContextSpec.scala b/money-core/src/test/scala/com/comcast/money/core/async/DirectExecutionContextSpec.scala new file mode 100644 index 00000000..2dd4bfda --- /dev/null +++ b/money-core/src/test/scala/com/comcast/money/core/async/DirectExecutionContextSpec.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import com.comcast.money.core.SpecHelpers +import com.comcast.money.core.concurrent.ConcurrentSupport +import org.scalatest.{ Matchers, OneInstancePerTest, WordSpecLike } +import org.scalatest.mock.MockitoSugar + +class DirectExecutionContextSpec + extends WordSpecLike + with MockitoSugar with Matchers with ConcurrentSupport with OneInstancePerTest with SpecHelpers { + + val underTest = new DirectExecutionContext() + + "DirectExecutionContext" should { + "execute the Runnable on the current thread" in { + val currentThreadId = Thread.currentThread().getId + var callbackThreadId: Long = 0 + + underTest.execute(new Runnable { + override def run(): Unit = { + callbackThreadId = Thread.currentThread().getId + } + }) + + callbackThreadId shouldEqual currentThreadId + } + } +} diff --git a/money-core/src/test/scala/com/comcast/money/core/async/ScalaFutureNotificationHandlerSpec.scala b/money-core/src/test/scala/com/comcast/money/core/async/ScalaFutureNotificationHandlerSpec.scala new file mode 100644 index 00000000..a80b0576 --- /dev/null +++ b/money-core/src/test/scala/com/comcast/money/core/async/ScalaFutureNotificationHandlerSpec.scala @@ -0,0 +1,116 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async + +import com.comcast.money.core.SpecHelpers +import com.comcast.money.core.concurrent.ConcurrentSupport +import org.scalatest.{ Matchers, OneInstancePerTest, WordSpecLike } +import org.scalatest.mock.MockitoSugar +import org.mockito.Mockito._ +import org.mockito.Matchers.{ any, eq => argEq } + +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.util.{ Failure, Try } + +class ScalaFutureNotificationHandlerSpec + extends WordSpecLike + with MockitoSugar with Matchers with ConcurrentSupport with OneInstancePerTest with SpecHelpers { + + val underTest = new ScalaFutureNotificationHandler() + implicit val executionContext: ExecutionContext = new DirectExecutionContext() + + "ScalaFutureNotificationHandler" should { + "support scala.concurrent.Future" in { + val future = Future.successful("success") + val result = underTest.supports(future) + + result shouldEqual true + } + "support descendents of scala.concurrent.Future" in { + val future = mock[MyFuture[String]] + val result = underTest.supports(future) + + result shouldEqual true + } + "does not support non-Futures" in { + val nonFuture = new Object() + val result = underTest.supports(nonFuture) + + result shouldEqual false + } + "does not support null" in { + val result = underTest.supports(null) + + result shouldEqual false + } + "calls transform method on the future" in { + val future = mock[Future[String]] + doReturn(future).when(future).transform(any(), any())(argEq(underTest.executionContext)) + + val result = underTest.whenComplete(future, _ => {}) + + verify(result, times(1)).transform(any(), any())(argEq(underTest.executionContext)) + } + "calls registered completion function for already completed future" in { + val future = Future.successful("success") + val func = mock[Try[_] => Unit] + + underTest.whenComplete(future, func) + + verify(func, times(1)).apply(argEq(Try("success"))) + } + "calls registered completion function for already exceptionally completed future" in { + val ex = new RuntimeException + val future = Future.failed(ex) + val func = mock[Try[_] => Unit] + val executionContext = new DirectExecutionContext() + + underTest.whenComplete(future, func) + + verify(func, times(1)).apply(argEq(Failure(ex))) + } + "calls registered completion function when the future completes successfully" in { + val promise = Promise[String]() + val future = promise.future + + val func = mock[Try[_] => Unit] + + underTest.whenComplete(future, func) + verify(func, never()).apply(any()) + + promise.complete(Try("success")) + + verify(func, times(1)).apply(argEq(Try("success"))) + } + "calls registered completion function when the future completes exceptionally" in { + val promise = Promise[String]() + val future = promise.future + + val func = mock[Try[_] => Unit] + + underTest.whenComplete(future, func) + verify(func, never()).apply(any()) + + val exception = new RuntimeException() + promise.complete(Failure(exception)) + + verify(func, times(1)).apply(argEq(Failure(exception))) + } + } + + private abstract class MyFuture[T] extends Future[T] {} +} diff --git a/money-core/src/test/scala/com/comcast/money/core/async/TestData.scala b/money-core/src/test/scala/com/comcast/money/core/async/TestData.scala new file mode 100644 index 00000000..9bc765f5 --- /dev/null +++ b/money-core/src/test/scala/com/comcast/money/core/async/TestData.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2012-2015 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.comcast.money.core.async +import com.typesafe.config.Config + +import scala.util.Try + +class ConfiguredNotificationHandler extends ConfigurableNotificationHandler { + var calledConfigure: Boolean = false + + override def configure(config: Config): Unit = calledConfigure = true + override def supports(future: AnyRef): Boolean = false + override def whenComplete(future: AnyRef, f: Try[_] => Unit): AnyRef = null +} + +class NonConfiguredNotificationHandler extends AsyncNotificationHandler { + override def supports(future: AnyRef): Boolean = false + override def whenComplete(future: AnyRef, f: Try[_] => Unit): AnyRef = null +}