Skip to content

Commit

Permalink
=htc small refactoring in OneHundredContinue support
Browse files Browse the repository at this point in the history
  • Loading branch information
sirthias committed Dec 19, 2014
1 parent 3790ec3 commit 735fdb4
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package akka.http.engine

import scala.annotation.tailrec
import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher }

/**
* An actor publisher for producing a simple stream of singleton tokens
* the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message.
*/
private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] {
private var triggered = 0

def receive = {
case TokenSourceActor.Trigger
triggered += 1
tryDispatch()

case ActorPublisherMessage.Request(_)
tryDispatch()

case ActorPublisherMessage.Cancel
context.stop(self)
}

@tailrec private def tryDispatch(): Unit =
if (triggered > 0 && totalDemand > 0) {
onNext(token)
triggered -= 1
tryDispatch()
}
}

private[engine] object TokenSourceActor {
case object Trigger
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@

package akka.http.engine.server

import scala.util.control.NonFatal
import akka.actor.{ ActorRef, Props }
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.stage.PushPullStage
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
import akka.stream.stage.PushPullStage
import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser }
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.engine.parsing.ParserOutput._
import akka.http.engine.TokenSourceActor
import akka.http.model._
import akka.http.util._
import akka.http.Http

import scala.util.control.NonFatal

/**
* INTERNAL API
Expand All @@ -44,7 +42,7 @@ private[http] object HttpServer {
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source[OneHundredContinue.type] {
Props {
val actor = new OneHundredContinueSourceActor
val actor = new TokenSourceActor(OneHundredContinue)
oneHundredContinueRef = Some(actor.context.self)
actor
}
Expand All @@ -56,7 +54,7 @@ private[http] object HttpServer {
val requestParsing = Flow[ByteString].section(name("rootParser"))(_.transform(()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef)))
rootParser.createShallowCopy(() oneHundredContinueRef).stage))

val requestPreparation =
Flow[RequestOutput]
Expand Down Expand Up @@ -129,9 +127,9 @@ private[http] object HttpServer {
override def initialCompletionHandling = CompletionHandling(
onComplete = (ctx, _) { ctx.complete(); SameState },
onError = {
case (ctx, _, error: Http.StreamException)
case (ctx, _, EntityStreamException(errorInfo))
// the application has forwarded a request entity stream error to the response stream
finishWithError(ctx, "request", StatusCodes.BadRequest, error.info)
finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo)
case (ctx, _, error)
ctx.error(error)
SameState
Expand All @@ -150,27 +148,63 @@ private[http] object HttpServer {
}
}
}
}

private[server] class ErrorsTo500ResponseRecovery(log: LoggingAdapter)
extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] {
import akka.stream.stage.Context

private[this] var errorResponse: ResponseRenderingContext = _

override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem)

override def onPull(ctx: Context[ResponseRenderingContext]) =
if (ctx.isFinishing) ctx.pushAndFinish(errorResponse)
else ctx.pull()

override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) =
error match {
case NonFatal(e)
log.error(e, "Internal server error, sending 500 response")
errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError),
closeAfterResponseCompletion = true)
ctx.absorbTermination()
case _ ctx.fail(error)
}
}
/**
* The `Expect: 100-continue` header has a special status in HTTP.
* It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending
* (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to
* accept the request and responds with
*
* - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation
* (or if the `Expect` header contains other, unsupported expectations).
* - a `100 Continue` response,
* if it is ready to accept the request entity and the client should go ahead with sending it
* - a final response (like a 4xx to signal some client-side error
* (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect)
*
* Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request
* entity. In this case it will receive another response after having completed request sending.
* So this special feature breaks the normal "one request - one response" logic of HTTP!
* It therefore requires special handling in all HTTP stacks (client- and server-side).
*
* For us this means:
*
* - on the server-side:
* After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send
* it through to the application. Only when (and if) the application then requests data from the entity stream do we
* send out a `100 Continue` response and continue reading the request entity.
* The application can therefore determine itself whether it wants the client to send the request entity
* by deciding whether to look at the request entity data stream or not.
* If the application sends a response *without* having looked at the request entity the client receives this
* response *instead of* the `100 Continue` response and the server closes the connection afterwards.
*
* - on the client-side:
* If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until
* we've received a `100 Continue` response.
*/
case object OneHundredContinue

final class ErrorsTo500ResponseRecovery(log: LoggingAdapter)
extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] {

import akka.stream.stage.Context

private[this] var errorResponse: ResponseRenderingContext = _

override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem)

override def onPull(ctx: Context[ResponseRenderingContext]) =
if (ctx.isFinishing) ctx.pushAndFinish(errorResponse)
else ctx.pull()

override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) =
error match {
case NonFatal(e)
log.error(e, "Internal server error, sending 500 response")
errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError),
closeAfterResponseCompletion = true)
ctx.absorbTermination()
case _ ctx.fail(error)
}
}
}

This file was deleted.

0 comments on commit 735fdb4

Please sign in to comment.