Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDb: AWS SDK2; Add tryFlow to be used with RetryFlow #1896

Merged
merged 4 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion docs/src/main/paradox/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,24 @@ The table below shows direct dependencies of this module and the second tab show
@@dependencies { projectId="dynamodb" }


## Usage
## Setup

This connector requires a @javadoc[DynamoDbAsyncClient](software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient) instance to communicate with AWS DynamoDB.

It is your code's responsibility to call `close` to free any resources held by the client. In this example it will be called when the actor system is terminated.

Scala
: @@snip [snip](/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala) { #init-client }

Java
: @@snip [snip](/dynamodb/src/test/java/docs/javadsl/ExampleTest.java) { #init-client }

This connector is set up to use @extref:[Akka HTTP](akka-http:) as default HTTP client via the thin adapter library [AWS Akka-Http SPI implementation](https://github.com/matsluni/aws-spi-akka-http). By setting the `httpClient` explicitly (as above) the Akka actor system is reused, if not set explicitly a separate actor system will be created internally.

It is possible to configure the use of Netty instead, which is Amazon's default. Add an appropriate Netty version to the dependencies and configure @javadoc[`NettyNioAsyncHttpClient`](software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient).


## Sending requests and receiving responses

For simple operations you can issue a single request, and get back the result in a @scala[`Future`]@java[`CompletionStage`].

Expand All @@ -44,6 +61,32 @@ Scala
Java
: @@snip [snip](/dynamodb/src/test/java/docs/javadsl/ExampleTest.java) { #paginated }


## Handling failed requests

By default the stream is stopped if a request fails with server error, or the response can not be parsed.

To handle failed requests later in the stream use @scala[@scaladoc[DynamoDb.tryFlow](akka.stream.alpakka.dynamodb.scaladsl.DynamoDb$)]@java[@scaladoc[DynamoDb.tryFlow](akka.stream.alpakka.dynamodb.javadsl.DynamoDb$)]. The responses will be wrapped with a @scaladoc[Try](scala.util.Try).

This flow composes easily with a Akka Stream RetryFlow, that allows to selectively retry requests with a backoff.

For example to retry all of the failed requests, wrap the `DynamoDb.tryFlow` with `RetryFlow.withBackoff` like so:

Scala
: @@snip [snip](/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala) { #create-retry-flow }

Java
: @@snip [snip](/dynamodb/src/test/java/docs/javadsl/RetryTest.java) { #create-retry-flow }

And then use the new flow to send the requests through:

Scala
: @@snip [snip](/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala) { #use-retry-flow }

Java
: @@snip [snip](/dynamodb/src/test/java/docs/javadsl/RetryTest.java) { #use-retry-flow }


## Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.dynamodb.impl.javadsl

import java.util

import akka.NotUsed
import akka.japi.Pair
import akka.stream.alpakka.dynamodb.impl.scaladsl
import akka.stream.javadsl.{Flow, FlowWithContext, Keep}

import scala.concurrent.duration._
import scala.util.Try
import scala.collection.JavaConverters._

object RetryFlow {

/**
* Allows retrying individual elements in the stream with exponential backoff.
*
* The retry condition is controlled by the `retryWith` partial function. It takes an output element of the wrapped
* flow and should return one or more requests to be retried.
*
* A successful or failed response will be propagated downstream if it is not matched by the `retryFlow` function.
*
* If a successful response is matched and issued a retry, the response is still propagated downstream.
*
* The implementation of the RetryFlow assumes that `flow` follows one-in-out-out element semantics.
*
* The wrapped `flow` and `retryWith` takes an additional `State` parameter which can be used to correlate a request
* with a response.
*
* Backoff state is tracked separately per-element coming into the wrapped `flow`.
*
* @param parallelism controls the number of in-flight requests in the wrapped flow
* @param minBackoff minimum duration to backoff between issuing retries
* @param maxBackoff maximum duration to backoff between issuing retries
* @param randomFactor adds jitter to the retry delay. Use 0 for no jitter
* @param flow a flow to retry elements from
* @param retryWith retry condition decision partial function
*/
def withBackoff[In, Out, State, Mat](
parallelism: Int,
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
flow: Flow[Pair[In, State], Pair[Try[Out], State], Mat],
retryWith: java.util.function.Function[Pair[Try[Out], State], akka.japi.Option[util.Collection[Pair[In, State]]]]
): Flow[akka.japi.Pair[In, State], akka.japi.Pair[Try[Out], State], Mat] = {
val retryFlow = scaladsl.RetryFlow
.withBackoffAndContext(parallelism,
Duration.fromNanos(minBackoff.toNanos),
Duration.fromNanos(maxBackoff.toNanos),
randomFactor,
FlowWithContext.fromPairs(flow).asScala) {
case (t, s) =>
retryWith(Pair.create(t, s))
.map(coll => coll.asScala.toIndexedSeq.map(pair => (pair.first, pair.second)))
}
.asFlow

Flow
.create[akka.japi.Pair[In, State]]()
.map(func(p => (p.first, p.second)))
.viaMat(retryFlow, Keep.right[NotUsed, Mat])
.map(func(t => akka.japi.Pair.create(t._1, t._2)))
}

private def func[T, R](f: T => R) = new akka.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}

}
Loading