Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ResultMessage is processed #4135

Merged
merged 3 commits into from
Nov 29, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ class ShardingContainerPoolBalancer(

/** State related to invocations and throttling */
protected[loadBalancer] val activations = TrieMap[ActivationId, ActivationEntry]()
protected[loadBalancer] val blockingPromises =
TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
private val totalActivations = new LongAdder()
private val totalActivationMemory = new LongAdder()
Expand Down Expand Up @@ -262,9 +264,13 @@ class ShardingContainerPoolBalancer(

chosen
.map { invoker =>
val entry = setupActivation(msg, action, invoker)
setupActivation(msg, action, invoker)
sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
entry.promise.future
if (msg.blocking) {
blockingPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while not likely, it's plausible the active ack is received before the promise is added to this map (which is why setup activation adds the promise before it's posted to any invoker).

} else {
Future.successful(Left(msg.activationId))
}
}
}
.getOrElse {
Expand Down Expand Up @@ -313,8 +319,7 @@ class ShardingContainerPoolBalancer(
action.limits.memory.megabytes.MB,
action.limits.concurrency.maxConcurrent,
action.fullyQualifiedName(true),
timeoutHandler,
Promise[Either[ActivationId, WhiskActivation]]())
timeoutHandler)
})
}

Expand Down Expand Up @@ -387,9 +392,7 @@ class ShardingContainerPoolBalancer(
// Resolve the promise to send the result back to the user
// The activation will be removed from `activations`-map later, when we receive the completion message, because the
// slot of the invoker is not yet free for new activations.
activations.get(aid).map { entry =>
entry.promise.trySuccess(response)
}
blockingPromises.remove(aid).map(_.trySuccess(response))
logging.info(this, s"received result ack for '$aid'")(tid)
}

Expand Down Expand Up @@ -422,13 +425,9 @@ class ShardingContainerPoolBalancer(
.foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memory.toMB.toInt))
if (!forced) {
entry.timeoutHandler.cancel()
// If the action was blocking and the Resultmessage has been received before nothing will happen here.
// If the action was blocking and the ResultMessage is still missing, we pass the ActivationId. With this Id,
// the controller will get the result out of the database.
// If the action was non-blocking, we will close the promise here.
entry.promise.trySuccess(Left(aid))
} else {
entry.promise.tryFailure(new Throwable("no completion ack received"))
// remove blocking promise when timeout, if the ResultMessage is already processed, this will do nothing
blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no completion ack received")))
}

logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)
Expand Down Expand Up @@ -717,13 +716,11 @@ case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeout
* @param namespaceId namespace that invoked the action
* @param invokerName invoker the action is scheduled to
* @param timeoutHandler times out completion of this activation, should be canceled on good paths
* @param promise the promise to be completed by the activation
*/
case class ActivationEntry(id: ActivationId,
namespaceId: UUID,
invokerName: InvokerInstanceId,
memory: ByteSize,
maxConcurrent: Int,
fullyQualifiedEntityName: FullyQualifiedEntityName,
timeoutHandler: Cancellable,
promise: Promise[Either[ActivationId, WhiskActivation]])
timeoutHandler: Cancellable)