Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup akka-http client usages. #3870

Merged
merged 1 commit into from
Jul 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package whisk.core.database

import scala.concurrent.Future
import java.net.URLEncoder
import java.nio.charset.StandardCharsets

Expand All @@ -26,32 +25,32 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.stream.scaladsl._
import akka.util.ByteString
import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common.Logging
import whisk.http.PoolingRestClient
import whisk.http.PoolingRestClient._

import scala.concurrent.{ExecutionContext, Future}

/**
* This class only handles the basic communication to the proper endpoints
* ("JSON in, JSON out"). It is up to its clients to interpret the results.
* It is built on akka-http host-level connection pools; compared to single
* requests, it saves some time on each request because it doesn't need to look
* up the pool corresponding to the host. It is also easier to add an extra
* queueing mechanism.
* A client implementing the CouchDb API.
*
* This client only handles communication to the respective endpoints and works in a Json-in -> Json-out fashion. It's
* up to the client to interpret the results accordingly.
*/
class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
implicit system: ActorSystem,
logging: Logging)
extends PoolingRestClient(protocol, host, port, 16 * 1024) {

implicit override val context = system.dispatchers.lookup("dispatchers.couch-dispatcher")
protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")

// Headers common to all requests.
val baseHeaders: List[HttpHeader] =
protected val baseHeaders: List[HttpHeader] =
List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))

def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev))))
private def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev))))

// Properly encodes the potential slashes in each segment.
protected def uri(segments: Any*): Uri = {
Expand Down Expand Up @@ -162,11 +161,11 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers = baseHeaders ++ revHeader(rev))

request(httpRequest) flatMap { response =>
if (response.status.isSuccess()) {
request(httpRequest).flatMap { response =>
if (response.status.isSuccess) {
response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => Right(response.entity.contentType, r))
} else {
response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map(_ => Left(response.status))
response.discardEntityBytes().future.map(_ => Left(response.status))
}
}
}
Expand Down
117 changes: 43 additions & 74 deletions common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,26 @@

package whisk.http

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.HostConnectionPool
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling._
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl._
import akka.stream.scaladsl.Flow

import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, _}
import spray.json._

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}

/**
* This class only handles the basic communication to the proper endpoints.
* It is up to its clients to interpret the results. It is built on akka-http
* host-level connection pools; compared to single requests, it saves some time
* on each request because it doesn't need to look up the pool corresponding
* to the host. It is also easier to add an extra queueing mechanism.
* Http client to talk to a known host.
*
* This class only handles the basic communication to the proper endpoints. It is up to its clients to interpret the
* results. It is built on akka-http host-level connection pools; compared to single requests, it saves some time
* on each request because it doesn't need to look up the pool corresponding to the host. It is also easier to add an
* extra queueing mechanism.
*/
class PoolingRestClient(
protocol: String,
Expand All @@ -52,8 +47,8 @@ class PoolingRestClient(
implicit system: ActorSystem) {
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")

implicit val context = system.dispatcher
implicit val materializer = ActorMaterializer()
protected implicit val context: ExecutionContext = system.dispatcher
protected implicit val materializer: ActorMaterializer = ActorMaterializer()

// Creates or retrieves a connection pool for the host.
private val pool = if (protocol == "http") {
Expand All @@ -62,82 +57,56 @@ class PoolingRestClient(
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
}

private val defaultHttpFlow = pool.mapMaterializedValue { x =>
poolPromise.success(x); x
}

private val poolPromise = Promise[HostConnectionPool]

// Additional queue in case all connections are busy. Should hardly ever be
// filled in practice but can be useful, e.g., in tests starting many
// asynchronous requests in a very short period of time.
private val requestQueue = Source
.queue(queueSize, OverflowStrategy.dropNew)
.via(httpFlow.getOrElse(defaultHttpFlow))
.via(httpFlow.getOrElse(pool))
.toMat(Sink.foreach({
case ((Success(response), p)) => p.success(response)
case ((Failure(error), p)) => p.failure(error)
}))(Keep.left)
.run

// Enqueue a request, and return a future capturing the corresponding response.
// WARNING: make sure that if the future response is not failed, its entity
// be drained entirely or the connection will be kept open until timeouts kick in.
def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
futureRequest flatMap { request =>
val promise = Promise[HttpResponse]

// When the future completes, we know whether the request made it
// through the queue.
requestQueue.offer(request -> promise).flatMap { buffered =>
buffered match {
case QueueOfferResult.Enqueued =>
promise.future

case QueueOfferResult.Dropped =>
Future.failed(new Exception("DB request queue is full."))

case QueueOfferResult.QueueClosed =>
Future.failed(new Exception("DB request queue was closed."))

case QueueOfferResult.Failure(f) =>
Future.failed(f)
}
}
/**
* Execute an HttpRequest on the underlying connection pool.
*
* WARNING: It is **very** important that the resulting entity is either drained or discarded fully, so the connection
* can be reused. Otherwise, the pool will dry up.
*
* @return a future holding the response from the server.
*/
def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = futureRequest.flatMap { request =>
val promise = Promise[HttpResponse]

// When the future completes, we know whether the request made it
// through the queue.
requestQueue.offer(request -> promise).flatMap {
case QueueOfferResult.Enqueued => promise.future
case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full."))
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed."))
case QueueOfferResult.Failure(f) => Future.failed(f)
}
}

// Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
request(futureRequest) flatMap { response =>
if (response.status.isSuccess()) {
Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
Right(o)
}
/**
* Execute an HttpRequest on the underlying connection pool and return an unmarshalled result.
*
* @return either the unmarshalled result or a status code, if the status code is not a success (2xx class)
*/
def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] =
request(futureRequest).flatMap { response =>
if (response.status.isSuccess) {
Unmarshal(response.entity.withoutSizeLimit).to[T].map(Right.apply)
} else {
// This is important, as it drains the entity stream.
// Otherwise the connection stays open and the pool dries up.
response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
Left(response.status)
}
response.discardEntityBytes().future.map(_ => Left(response.status))
}
}
}

def shutdown(): Future[Unit] = {
materializer.shutdown()
// The code below shuts down the pool, but is apparently not tolerant
// to multiple clients shutting down the same pool (the second one just
// hangs). Given that shutdown is only relevant for tests (unused pools
// close themselves anyway after some time) and that they can call
// Http().shutdownAllConnectionPools(), this is not a major issue.
/* Reintroduce below if they ever make HostConnectionPool.shutdown()
* safe to call >1x.
* val poolOpt = poolPromise.future.value.map(_.toOption).flatten
* poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
*/
Future.successful(())
}
def shutdown(): Future[Unit] = Future.successful(materializer.shutdown())
}

object PoolingRestClient {
Expand Down