From 070899fcb01790c0da7558160ca07862ee33dd9e Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 21 Jul 2023 13:07:33 +0200 Subject: [PATCH] Ignore multiple completions for awakeables. While debugging an e2e test we've found out that the runtime can panic in `Codec::write_completion` in case the user tries to complete an awakeable twice. This commit avoids this panic, but rather logs that the user is trying to complete an awakeable twice. --- src/service_protocol/src/codec.rs | 6 +++--- .../src/partition/effects/interpreter.rs | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/service_protocol/src/codec.rs b/src/service_protocol/src/codec.rs index 034123a9b0..0237822653 100644 --- a/src/service_protocol/src/codec.rs +++ b/src/service_protocol/src/codec.rs @@ -1,12 +1,12 @@ use super::pb::protocol; -use std::fmt::Debug; - -use std::mem; use bytes::{Buf, BufMut, Bytes, BytesMut}; use prost::Message; use restate_types::journal::raw::*; use restate_types::journal::{CompletionResult, Entry, EntryType}; +use std::fmt::Debug; +use std::mem; + /// This macro generates the pattern matching with arms per entry. /// For each entry it first executes `Message#decode` and then `try_into()`. /// It expects that for each `{...}Entry` there is a valid `TryFrom<{...}Message>` implementation with `Error = &'static str`. diff --git a/src/worker/src/partition/effects/interpreter.rs b/src/worker/src/partition/effects/interpreter.rs index 4076accbd5..fc152f5eab 100644 --- a/src/worker/src/partition/effects/interpreter.rs +++ b/src/worker/src/partition/effects/interpreter.rs @@ -12,12 +12,13 @@ use restate_types::identifiers::{EntryIndex, ServiceId, ServiceInvocationId}; use restate_types::invocation::ServiceInvocation; use restate_types::journal::enriched::{EnrichedEntryHeader, EnrichedRawEntry}; use restate_types::journal::raw::{ - PlainRawEntry, RawEntryCodec, RawEntryCodecError, RawEntryHeader, + EntryHeader, PlainRawEntry, RawEntryCodec, RawEntryCodecError, RawEntryHeader, }; -use restate_types::journal::{Completion, CompletionResult, JournalMetadata}; +use restate_types::journal::{Completion, CompletionResult, EntryType, JournalMetadata}; use restate_types::message::MessageIndex; use restate_types::time::MillisSinceEpoch; use std::marker::PhantomData; +use tracing::{debug, warn}; #[derive(Debug, thiserror::Error)] pub(crate) enum Error { @@ -675,6 +676,20 @@ impl Interpreter { .load_journal_entry(&service_invocation_id.service_id, entry_index) .await? { + if journal_entry.ty() == EntryType::Awakeable + && journal_entry.header.is_completed() == Some(true) + { + // We can ignore when we get an awakeable completion twice as they might be a result of + // some request being retried from the ingress to complete the awakeable. + // We'll use only the first completion, because changing the awakeable result + // after it has been completed for the first time can cause non-deterministic execution. + warn!( + restate.invocation.sid = %service_invocation_id, + restate.journal.index = entry_index, + "Trying to complete an awakeable already completed. Ignoring this completion"); + debug!("Discarded awakeable completion: {:?}", completion_result); + return Ok(false); + } Codec::write_completion(&mut journal_entry, completion_result)?; state_storage .store_journal_entry(