Skip to content

Commit

Permalink
Fix bug wrt cancellation and promise-related entries (#1921)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Sep 4, 2024
1 parent 05a1064 commit 0da2eb0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 65 deletions.
27 changes: 7 additions & 20 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1172,24 +1172,10 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
.get_journal(&invocation_id, journal_length)
.try_filter_map(|(journal_index, journal_entry)| async move {
if let JournalEntry::Entry(journal_entry) = journal_entry {
match journal_entry.header() {
EnrichedEntryHeader::Call { is_completed, .. } if !is_completed => {
return Ok(Some((journal_index, journal_entry)))
}
EnrichedEntryHeader::Awakeable { is_completed }
| EnrichedEntryHeader::GetState { is_completed }
if !is_completed =>
{
return Ok(Some((journal_index, journal_entry)))
}
EnrichedEntryHeader::Sleep { is_completed } if !is_completed => {
return Ok(Some((journal_index, journal_entry)))
}
header => {
assert!(
header.is_completed().unwrap_or(true),
"All non canceled journal entries must be completed."
);
if let Some(is_completed) = journal_entry.header().is_completed() {
if !is_completed {
// Every completable journal entry that hasn't been completed yet should be cancelled
return Ok(Some((journal_index, journal_entry)));
}
}
}
Expand All @@ -1202,15 +1188,15 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
let canceled_result = CompletionResult::from(&CANCELED_INVOCATION_ERROR);

let mut resume_invocation = false;

for (journal_index, journal_entry) in journal_entries_to_cancel {
let (header, entry) = journal_entry.into_inner();
match header {
// cancel uncompleted invocations
EnrichedEntryHeader::Call {
enrichment_result: Some(enrichment_result),
..
} => {
// For calls, we don't immediately complete the call entry with cancelled,
// but we let the cancellation result propagate from the callee.
self.handle_outgoing_message(
ctx,
OutboxMessage::InvocationTermination(InvocationTermination::cancel(
Expand Down Expand Up @@ -1256,6 +1242,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Ok(resume_invocation)
}

/// Cancels a generic completable journal entry
async fn cancel_journal_entry_with<State: JournalTable>(
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
Expand Down
105 changes: 60 additions & 45 deletions crates/worker/src/partition/state_machine/tests/kill_cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async fn cancel_invoked_invocation() -> Result<(), Error> {
let background_call_invocation_id = InvocationId::mock_random();
let finished_call_invocation_id = InvocationId::mock_random();

let invocation_target = InvocationTarget::mock_virtual_object();
let invocation_target = InvocationTarget::mock_workflow();
let invocation_id = InvocationId::generate(&invocation_target);

let _ = test_env
Expand Down Expand Up @@ -301,27 +301,9 @@ async fn cancel_invoked_invocation() -> Result<(), Error> {
);

// Entries are completed
assert_that!(
test_env
.storage
.get_journal_entry(&invocation_id, 4)
.await?,
some(pat!(JournalEntry::Entry(entry_completed_matcher())))
);
assert_that!(
test_env
.storage
.get_journal_entry(&invocation_id, 5)
.await?,
some(pat!(JournalEntry::Entry(entry_completed_matcher())))
);
assert_that!(
test_env
.storage
.get_journal_entry(&invocation_id, 6)
.await?,
some(pat!(JournalEntry::Entry(entry_completed_matcher())))
);
for idx in 4..=9 {
assert_entry_completed(&mut test_env, invocation_id, idx).await?;
}

assert_that!(
actions,
Expand All @@ -333,6 +315,9 @@ async fn cancel_invoked_invocation() -> Result<(), Error> {
contains(forward_canceled_completion_matcher(4)),
contains(forward_canceled_completion_matcher(5)),
contains(forward_canceled_completion_matcher(6)),
contains(forward_canceled_completion_matcher(7)),
contains(forward_canceled_completion_matcher(8)),
contains(forward_canceled_completion_matcher(9)),
contains(delete_timer_matcher(5)),
)
);
Expand All @@ -348,7 +333,7 @@ async fn cancel_suspended_invocation() -> Result<(), Error> {
let background_call_invocation_id = InvocationId::mock_random();
let finished_call_invocation_id = InvocationId::mock_random();

let invocation_target = InvocationTarget::mock_virtual_object();
let invocation_target = InvocationTarget::mock_workflow();
let invocation_id = InvocationId::generate(&invocation_target);

let _ = test_env
Expand Down Expand Up @@ -390,7 +375,7 @@ async fn cancel_suspended_invocation() -> Result<(), Error> {
&invocation_id,
&InvocationStatus::Suspended {
metadata: in_flight_meta,
waiting_for_completed_entries: HashSet::from([3, 4, 5, 6]),
waiting_for_completed_entries: HashSet::from([3, 4, 5, 6, 7, 8, 9]),
},
)
.await;
Expand Down Expand Up @@ -434,27 +419,9 @@ async fn cancel_suspended_invocation() -> Result<(), Error> {
);

// Entries are completed
assert_that!(
test_env
.storage
.get_journal_entry(&invocation_id, 4)
.await?,
some(pat!(JournalEntry::Entry(entry_completed_matcher())))
);
assert_that!(
test_env
.storage
.get_journal_entry(&invocation_id, 5)
.await?,
some(pat!(JournalEntry::Entry(entry_completed_matcher())))
);
assert_that!(
test_env
.storage
.get_journal_entry(&invocation_id, 6)
.await?,
some(pat!(JournalEntry::Entry(entry_completed_matcher())))
);
for idx in 4..=9 {
assert_entry_completed(&mut test_env, invocation_id, idx).await?;
}

assert_that!(
actions,
Expand Down Expand Up @@ -551,9 +518,57 @@ fn create_termination_journal(
},
Bytes::default(),
)),
JournalEntry::Entry(EnrichedRawEntry::new(
EnrichedEntryHeader::GetPromise {
is_completed: false,
},
service_protocol::GetPromiseEntryMessage {
key: "my-promise".to_string(),
..Default::default()
}
.encode_to_vec()
.into(),
)),
JournalEntry::Entry(EnrichedRawEntry::new(
EnrichedEntryHeader::PeekPromise {
is_completed: false,
},
service_protocol::PeekPromiseEntryMessage {
key: "my-promise".to_string(),
..Default::default()
}
.encode_to_vec()
.into(),
)),
JournalEntry::Entry(EnrichedRawEntry::new(
EnrichedEntryHeader::CompletePromise {
is_completed: false,
},
service_protocol::CompletePromiseEntryMessage {
key: "my-promise".to_string(),
..Default::default()
}
.encode_to_vec()
.into(),
)),
]
}

async fn assert_entry_completed(
test_env: &mut TestEnv,
invocation_id: InvocationId,
idx: EntryIndex,
) -> Result<(), Error> {
assert_that!(
test_env
.storage
.get_journal_entry(&invocation_id, idx)
.await?,
some(pat!(JournalEntry::Entry(entry_completed_matcher())))
);
Ok(())
}

fn canceled_completion_matcher(entry_index: EntryIndex) -> impl Matcher<ActualT = Completion> {
pat!(Completion {
entry_index: eq(entry_index),
Expand Down

0 comments on commit 0da2eb0

Please sign in to comment.