Skip to content

Commit

Permalink
Ignore multiple completions for awakeables.
Browse files Browse the repository at this point in the history
While debugging an e2e test we've found out that the runtime can panic in `Codec::write_completion` in case the user tries to complete an awakeable twice. This commit avoids this panic, but rather logs that the user is trying to complete an awakeable twice.
  • Loading branch information
slinkydeveloper committed Jul 21, 2023
1 parent 58a825b commit 070899f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
6 changes: 3 additions & 3 deletions src/service_protocol/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::pb::protocol;
use std::fmt::Debug;

use std::mem;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use prost::Message;
use restate_types::journal::raw::*;
use restate_types::journal::{CompletionResult, Entry, EntryType};
use std::fmt::Debug;
use std::mem;

/// This macro generates the pattern matching with arms per entry.
/// For each entry it first executes `Message#decode` and then `try_into()`.
/// It expects that for each `{...}Entry` there is a valid `TryFrom<{...}Message>` implementation with `Error = &'static str`.
Expand Down
19 changes: 17 additions & 2 deletions src/worker/src/partition/effects/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ use restate_types::identifiers::{EntryIndex, ServiceId, ServiceInvocationId};
use restate_types::invocation::ServiceInvocation;
use restate_types::journal::enriched::{EnrichedEntryHeader, EnrichedRawEntry};
use restate_types::journal::raw::{
PlainRawEntry, RawEntryCodec, RawEntryCodecError, RawEntryHeader,
EntryHeader, PlainRawEntry, RawEntryCodec, RawEntryCodecError, RawEntryHeader,
};
use restate_types::journal::{Completion, CompletionResult, JournalMetadata};
use restate_types::journal::{Completion, CompletionResult, EntryType, JournalMetadata};
use restate_types::message::MessageIndex;
use restate_types::time::MillisSinceEpoch;
use std::marker::PhantomData;
use tracing::{debug, warn};

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
Expand Down Expand Up @@ -675,6 +676,20 @@ impl<Codec: RawEntryCodec> Interpreter<Codec> {
.load_journal_entry(&service_invocation_id.service_id, entry_index)
.await?
{
if journal_entry.ty() == EntryType::Awakeable
&& journal_entry.header.is_completed() == Some(true)
{
// We can ignore when we get an awakeable completion twice as they might be a result of
// some request being retried from the ingress to complete the awakeable.
// We'll use only the first completion, because changing the awakeable result
// after it has been completed for the first time can cause non-deterministic execution.
warn!(
restate.invocation.sid = %service_invocation_id,
restate.journal.index = entry_index,
"Trying to complete an awakeable already completed. Ignoring this completion");
debug!("Discarded awakeable completion: {:?}", completion_result);
return Ok(false);
}
Codec::write_completion(&mut journal_entry, completion_result)?;
state_storage
.store_journal_entry(
Expand Down

0 comments on commit 070899f

Please sign in to comment.