Skip to content

Commit

Permalink
Cleanup akka-http client usages. (#3870)
Browse files Browse the repository at this point in the history
  • Loading branch information
markusthoemmes authored and dubee committed Jul 11, 2018
1 parent 50de78a commit 4a0b4cd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 133 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package whisk.core.database

import scala.concurrent.Future
import java.net.URLEncoder
import java.nio.charset.StandardCharsets

Expand All @@ -26,32 +25,32 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.stream.scaladsl._
import akka.util.ByteString
import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common.Logging
import whisk.http.PoolingRestClient
import whisk.http.PoolingRestClient._

import scala.concurrent.{ExecutionContext, Future}

/**
* This class only handles the basic communication to the proper endpoints
* ("JSON in, JSON out"). It is up to its clients to interpret the results.
* It is built on akka-http host-level connection pools; compared to single
* requests, it saves some time on each request because it doesn't need to look
* up the pool corresponding to the host. It is also easier to add an extra
* queueing mechanism.
* A client implementing the CouchDb API.
*
* This client only handles communication to the respective endpoints and works in a Json-in -> Json-out fashion. It's
* up to the client to interpret the results accordingly.
*/
class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
implicit system: ActorSystem,
logging: Logging)
extends PoolingRestClient(protocol, host, port, 16 * 1024) {

implicit override val context = system.dispatchers.lookup("dispatchers.couch-dispatcher")
protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")

// Headers common to all requests.
val baseHeaders: List[HttpHeader] =
protected val baseHeaders: List[HttpHeader] =
List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))

def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev))))
private def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev))))

// Properly encodes the potential slashes in each segment.
protected def uri(segments: Any*): Uri = {
Expand Down Expand Up @@ -162,11 +161,11 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers = baseHeaders ++ revHeader(rev))

request(httpRequest) flatMap { response =>
if (response.status.isSuccess()) {
request(httpRequest).flatMap { response =>
if (response.status.isSuccess) {
response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => Right(response.entity.contentType, r))
} else {
response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map(_ => Left(response.status))
response.discardEntityBytes().future.map(_ => Left(response.status))
}
}
}
Expand Down
117 changes: 43 additions & 74 deletions common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,26 @@

package whisk.http

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.HostConnectionPool
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling._
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl._
import akka.stream.scaladsl.Flow

import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, _}
import spray.json._

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}

/**
* This class only handles the basic communication to the proper endpoints.
* It is up to its clients to interpret the results. It is built on akka-http
* host-level connection pools; compared to single requests, it saves some time
* on each request because it doesn't need to look up the pool corresponding
* to the host. It is also easier to add an extra queueing mechanism.
* Http client to talk to a known host.
*
* This class only handles the basic communication to the proper endpoints. It is up to its clients to interpret the
* results. It is built on akka-http host-level connection pools; compared to single requests, it saves some time
* on each request because it doesn't need to look up the pool corresponding to the host. It is also easier to add an
* extra queueing mechanism.
*/
class PoolingRestClient(
protocol: String,
Expand All @@ -52,8 +47,8 @@ class PoolingRestClient(
implicit system: ActorSystem) {
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")

implicit val context = system.dispatcher
implicit val materializer = ActorMaterializer()
protected implicit val context: ExecutionContext = system.dispatcher
protected implicit val materializer: ActorMaterializer = ActorMaterializer()

// Creates or retrieves a connection pool for the host.
private val pool = if (protocol == "http") {
Expand All @@ -62,82 +57,56 @@ class PoolingRestClient(
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
}

private val defaultHttpFlow = pool.mapMaterializedValue { x =>
poolPromise.success(x); x
}

private val poolPromise = Promise[HostConnectionPool]

// Additional queue in case all connections are busy. Should hardly ever be
// filled in practice but can be useful, e.g., in tests starting many
// asynchronous requests in a very short period of time.
private val requestQueue = Source
.queue(queueSize, OverflowStrategy.dropNew)
.via(httpFlow.getOrElse(defaultHttpFlow))
.via(httpFlow.getOrElse(pool))
.toMat(Sink.foreach({
case ((Success(response), p)) => p.success(response)
case ((Failure(error), p)) => p.failure(error)
}))(Keep.left)
.run

// Enqueue a request, and return a future capturing the corresponding response.
// WARNING: make sure that if the future response is not failed, its entity
// be drained entirely or the connection will be kept open until timeouts kick in.
def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
futureRequest flatMap { request =>
val promise = Promise[HttpResponse]

// When the future completes, we know whether the request made it
// through the queue.
requestQueue.offer(request -> promise).flatMap { buffered =>
buffered match {
case QueueOfferResult.Enqueued =>
promise.future

case QueueOfferResult.Dropped =>
Future.failed(new Exception("DB request queue is full."))

case QueueOfferResult.QueueClosed =>
Future.failed(new Exception("DB request queue was closed."))

case QueueOfferResult.Failure(f) =>
Future.failed(f)
}
}
/**
* Execute an HttpRequest on the underlying connection pool.
*
* WARNING: It is **very** important that the resulting entity is either drained or discarded fully, so the connection
* can be reused. Otherwise, the pool will dry up.
*
* @return a future holding the response from the server.
*/
def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = futureRequest.flatMap { request =>
val promise = Promise[HttpResponse]

// When the future completes, we know whether the request made it
// through the queue.
requestQueue.offer(request -> promise).flatMap {
case QueueOfferResult.Enqueued => promise.future
case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full."))
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed."))
case QueueOfferResult.Failure(f) => Future.failed(f)
}
}

// Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
request(futureRequest) flatMap { response =>
if (response.status.isSuccess()) {
Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
Right(o)
}
/**
* Execute an HttpRequest on the underlying connection pool and return an unmarshalled result.
*
* @return either the unmarshalled result or a status code, if the status code is not a success (2xx class)
*/
def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] =
request(futureRequest).flatMap { response =>
if (response.status.isSuccess) {
Unmarshal(response.entity.withoutSizeLimit).to[T].map(Right.apply)
} else {
// This is important, as it drains the entity stream.
// Otherwise the connection stays open and the pool dries up.
response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
Left(response.status)
}
response.discardEntityBytes().future.map(_ => Left(response.status))
}
}
}

def shutdown(): Future[Unit] = {
materializer.shutdown()
// The code below shuts down the pool, but is apparently not tolerant
// to multiple clients shutting down the same pool (the second one just
// hangs). Given that shutdown is only relevant for tests (unused pools
// close themselves anyway after some time) and that they can call
// Http().shutdownAllConnectionPools(), this is not a major issue.
/* Reintroduce below if they ever make HostConnectionPool.shutdown()
* safe to call >1x.
* val poolOpt = poolPromise.future.value.map(_.toOption).flatten
* poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
*/
Future.successful(())
}
def shutdown(): Future[Unit] = Future.successful(materializer.shutdown())
}

object PoolingRestClient {
Expand Down

0 comments on commit 4a0b4cd

Please sign in to comment.