diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala deleted file mode 100644 index ce6a9a9a880..00000000000 --- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package whisk.core.database - -import scala.concurrent.Future - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.HttpMethods -import akka.http.scaladsl.model.StatusCode - -import spray.json._ -import spray.json.DefaultJsonProtocol._ - -import whisk.common.Logging -import whisk.http.PoolingRestClient._ - -/** - * This class only handles the basic communication to the proper endpoints - * ("JSON in, JSON out"). It is up to its clients to interpret the results. - */ -class CloudantRestClient(host: String, port: Int, username: String, password: String, db: String)( - implicit system: ActorSystem, - logging: Logging) - extends CouchDbRestClient("https", host, port, username, password, db) { - - // https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C - def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = { - requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc, baseHeaders)) - } -} diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala index 16e6cfafb21..b1e3b420f96 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala @@ -17,7 +17,6 @@ package whisk.core.database -import scala.concurrent.Future import java.net.URLEncoder import java.nio.charset.StandardCharsets @@ -26,32 +25,32 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.stream.scaladsl._ import akka.util.ByteString -import spray.json._ import spray.json.DefaultJsonProtocol._ +import spray.json._ import whisk.common.Logging import whisk.http.PoolingRestClient import whisk.http.PoolingRestClient._ +import scala.concurrent.{ExecutionContext, Future} + /** - * This class only handles the basic communication to the proper endpoints - * ("JSON in, JSON out"). It is up to its clients to interpret the results. - * It is built on akka-http host-level connection pools; compared to single - * requests, it saves some time on each request because it doesn't need to look - * up the pool corresponding to the host. It is also easier to add an extra - * queueing mechanism. + * A client implementing the CouchDb API. + * + * This client only handles communication to the respective endpoints and works in a Json-in -> Json-out fashion. It's + * up to the client to interpret the results accordingly. */ class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)( implicit system: ActorSystem, logging: Logging) extends PoolingRestClient(protocol, host, port, 16 * 1024) { - implicit override val context = system.dispatchers.lookup("dispatchers.couch-dispatcher") + protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher") // Headers common to all requests. - val baseHeaders: List[HttpHeader] = + protected val baseHeaders: List[HttpHeader] = List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`)) - def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev)))) + private def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev)))) // Properly encodes the potential slashes in each segment. protected def uri(segments: Any*): Uri = { @@ -162,11 +161,11 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = { val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers = baseHeaders ++ revHeader(rev)) - request(httpRequest) flatMap { response => - if (response.status.isSuccess()) { + request(httpRequest).flatMap { response => + if (response.status.isSuccess) { response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => Right(response.entity.contentType, r)) } else { - response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map(_ => Left(response.status)) + response.discardEntityBytes().future.map(_ => Left(response.status)) } } } diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala index b842e759bac..5e24b29fc13 100644 --- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala +++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala @@ -17,31 +17,26 @@ package whisk.http -import scala.concurrent.{Future, Promise} -import scala.util.{Failure, Success, Try} -import scala.concurrent.ExecutionContext - import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.Http.HostConnectionPool import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling._ -import akka.stream.ActorMaterializer -import akka.stream.OverflowStrategy -import akka.stream.QueueOfferResult -import akka.stream.scaladsl._ -import akka.stream.scaladsl.Flow - +import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} +import akka.stream.scaladsl.{Flow, _} import spray.json._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} + /** - * This class only handles the basic communication to the proper endpoints. - * It is up to its clients to interpret the results. It is built on akka-http - * host-level connection pools; compared to single requests, it saves some time - * on each request because it doesn't need to look up the pool corresponding - * to the host. It is also easier to add an extra queueing mechanism. + * Http client to talk to a known host. + * + * This class only handles the basic communication to the proper endpoints. It is up to its clients to interpret the + * results. It is built on akka-http host-level connection pools; compared to single requests, it saves some time + * on each request because it doesn't need to look up the pool corresponding to the host. It is also easier to add an + * extra queueing mechanism. */ class PoolingRestClient( protocol: String, @@ -52,8 +47,8 @@ class PoolingRestClient( implicit system: ActorSystem) { require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") - implicit val context = system.dispatcher - implicit val materializer = ActorMaterializer() + protected implicit val context: ExecutionContext = system.dispatcher + protected implicit val materializer: ActorMaterializer = ActorMaterializer() // Creates or retrieves a connection pool for the host. private val pool = if (protocol == "http") { @@ -62,82 +57,56 @@ class PoolingRestClient( Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port) } - private val defaultHttpFlow = pool.mapMaterializedValue { x => - poolPromise.success(x); x - } - - private val poolPromise = Promise[HostConnectionPool] - // Additional queue in case all connections are busy. Should hardly ever be // filled in practice but can be useful, e.g., in tests starting many // asynchronous requests in a very short period of time. private val requestQueue = Source .queue(queueSize, OverflowStrategy.dropNew) - .via(httpFlow.getOrElse(defaultHttpFlow)) + .via(httpFlow.getOrElse(pool)) .toMat(Sink.foreach({ case ((Success(response), p)) => p.success(response) case ((Failure(error), p)) => p.failure(error) }))(Keep.left) .run - // Enqueue a request, and return a future capturing the corresponding response. - // WARNING: make sure that if the future response is not failed, its entity - // be drained entirely or the connection will be kept open until timeouts kick in. - def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = { - futureRequest flatMap { request => - val promise = Promise[HttpResponse] - - // When the future completes, we know whether the request made it - // through the queue. - requestQueue.offer(request -> promise).flatMap { buffered => - buffered match { - case QueueOfferResult.Enqueued => - promise.future - - case QueueOfferResult.Dropped => - Future.failed(new Exception("DB request queue is full.")) - - case QueueOfferResult.QueueClosed => - Future.failed(new Exception("DB request queue was closed.")) - - case QueueOfferResult.Failure(f) => - Future.failed(f) - } - } + /** + * Execute an HttpRequest on the underlying connection pool. + * + * WARNING: It is **very** important that the resulting entity is either drained or discarded fully, so the connection + * can be reused. Otherwise, the pool will dry up. + * + * @return a future holding the response from the server. + */ + def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = futureRequest.flatMap { request => + val promise = Promise[HttpResponse] + + // When the future completes, we know whether the request made it + // through the queue. + requestQueue.offer(request -> promise).flatMap { + case QueueOfferResult.Enqueued => promise.future + case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full.")) + case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) + case QueueOfferResult.Failure(f) => Future.failed(f) } } - // Runs a request and returns either a JsObject, or a StatusCode if not 2xx. - def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = { - request(futureRequest) flatMap { response => - if (response.status.isSuccess()) { - Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o => - Right(o) - } + /** + * Execute an HttpRequest on the underlying connection pool and return an unmarshalled result. + * + * @return either the unmarshalled result or a status code, if the status code is not a success (2xx class) + */ + def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = + request(futureRequest).flatMap { response => + if (response.status.isSuccess) { + Unmarshal(response.entity.withoutSizeLimit).to[T].map(Right.apply) } else { // This is important, as it drains the entity stream. // Otherwise the connection stays open and the pool dries up. - response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ => - Left(response.status) - } + response.discardEntityBytes().future.map(_ => Left(response.status)) } } - } - def shutdown(): Future[Unit] = { - materializer.shutdown() - // The code below shuts down the pool, but is apparently not tolerant - // to multiple clients shutting down the same pool (the second one just - // hangs). Given that shutdown is only relevant for tests (unused pools - // close themselves anyway after some time) and that they can call - // Http().shutdownAllConnectionPools(), this is not a major issue. - /* Reintroduce below if they ever make HostConnectionPool.shutdown() - * safe to call >1x. - * val poolOpt = poolPromise.future.value.map(_.toOption).flatten - * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(())) - */ - Future.successful(()) - } + def shutdown(): Future[Unit] = Future.successful(materializer.shutdown()) } object PoolingRestClient {