Skip to content

Commit

Permalink
Show GetPromise in CLI, together with promise name (#1919)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Sep 5, 2024
1 parent 0da2eb0 commit 9988eb9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
20 changes: 19 additions & 1 deletion cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ pub enum JournalEntryType {
SetState,
ClearState,
SideEffect,
/// GetPromise is the blocking promise API,
/// PeekPromise is the non-blocking variant (we don't need to show it)
GetPromise(Option<String>),
Other(String),
}

Expand All @@ -213,6 +216,7 @@ impl JournalEntryType {
| JournalEntryType::Call(_)
| JournalEntryType::Awakeable(_)
| JournalEntryType::GetState
| JournalEntryType::GetPromise(_)
)
}

Expand All @@ -224,6 +228,7 @@ impl JournalEntryType {
| JournalEntryType::OneWayCall(_)
| JournalEntryType::Awakeable(_)
| JournalEntryType::SideEffect
| JournalEntryType::GetPromise(_)
)
}
}
Expand All @@ -239,6 +244,7 @@ impl Display for JournalEntryType {
JournalEntryType::SetState => write!(f, "SetState"),
JournalEntryType::ClearState => write!(f, "ClearState"),
JournalEntryType::SideEffect => write!(f, "SideEffect"),
JournalEntryType::GetPromise(_) => write!(f, "Promise"),
JournalEntryType::Other(s) => write!(f, "{}", s),
}
}
Expand Down Expand Up @@ -957,12 +963,22 @@ struct JournalRowResult {
invoked_target: Option<String>,
sleep_wakeup_at: Option<RestateDateTime>,
name: Option<String>,
promise_name: Option<String>,
}

pub async fn get_invocation_journal(
client: &DataFusionHttpClient,
invocation_id: &str,
) -> Result<Vec<JournalEntry>> {
let has_restate_1_1_promise_name_column = client
.check_columns_exists("sys_journal", &["promise_name"])
.await?;
let select_promise_column = if has_restate_1_1_promise_name_column {
"sj.promise_name"
} else {
"CAST(NULL as STRING) AS promise_name"
};

// We are only looking for one...
// Let's get journal details.
let query = format!(
Expand All @@ -973,7 +989,8 @@ pub async fn get_invocation_journal(
sj.invoked_id,
sj.invoked_target,
sj.sleep_wakeup_at,
sj.name
sj.name,
{select_promise_column}
FROM sys_journal sj
WHERE
sj.id = '{}'
Expand Down Expand Up @@ -1008,6 +1025,7 @@ pub async fn get_invocation_journal(
"SetState" => JournalEntryType::SetState,
"ClearState" => JournalEntryType::ClearState,
"Run" => JournalEntryType::SideEffect,
"GetPromise" => JournalEntryType::GetPromise(row.promise_name),
t => JournalEntryType::Other(t.to_owned()),
};

Expand Down
3 changes: 3 additions & 0 deletions cli/src/ui/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ pub fn format_entry_type_details(entry_type: &JournalEntryType) -> String {
JournalEntryType::Awakeable(awakeable_id) => {
format!("{}", style(awakeable_id.to_string()).cyan())
}
JournalEntryType::GetPromise(Some(promise_name)) => {
format!("{}", style(promise_name).cyan())
}
_ => String::new(),
}
}
23 changes: 23 additions & 0 deletions crates/storage-query-datafusion/src/journal/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use crate::journal::schema::SysJournalBuilder;
use bytestring::ByteString;

use restate_service_protocol::codec::ProtobufRawEntryCodec;

Expand Down Expand Up @@ -74,6 +75,15 @@ pub(crate) fn append_journal_row(
));
}
}
EnrichedEntryHeader::GetPromise { .. }
| EnrichedEntryHeader::PeekPromise { .. }
| EnrichedEntryHeader::CompletePromise { .. } => {
if row.is_promise_name_defined() {
if let Some(promise_name) = get_promise_name(&entry) {
row.promise_name(promise_name);
}
}
}
EnrichedEntryHeader::Sleep { .. } => {
if row.is_sleep_wakeup_at_defined() {
if let Some(sleep_entry) = deserialize_sleep_entry(&entry) {
Expand Down Expand Up @@ -102,3 +112,16 @@ fn deserialize_sleep_entry(entry: &EnrichedRawEntry) -> Option<SleepEntry> {
_ => None,
}
}

fn get_promise_name(entry: &EnrichedRawEntry) -> Option<ByteString> {
let decoded_entry = entry
.deserialize_entry_ref::<ProtobufRawEntryCodec>()
.expect("journal entry must deserialize");

match decoded_entry {
Entry::GetPromise(entry) => Some(entry.key),
Entry::PeekPromise(entry) => Some(entry.key),
Entry::CompletePromise(entry) => Some(entry.key),
_ => None,
}
}
3 changes: 3 additions & 0 deletions crates/storage-query-datafusion/src/journal/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ define_table!(sys_journal(
/// If this entry represents a sleep, indicates wakeup time.
sleep_wakeup_at: DataType::Date64,

/// If this entry is a promise related entry (GetPromise, PeekPromise, CompletePromise), indicates the promise name.
promise_name: DataType::LargeUtf8,

/// Raw binary representation of the entry. Check the [service protocol](https://github.com/restatedev/service-protocol)
/// for more details to decode it.
raw: DataType::LargeBinary,
Expand Down

0 comments on commit 9988eb9

Please sign in to comment.