Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bump substreams client to sf.substreams.rpc.v2.Stream/Blocks #4556

Merged
merged 6 commits into from
Apr 28, 2023
Merged
3 changes: 3 additions & 0 deletions chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ mod test {
}),
module_meta: vec![],
package_meta: vec![],
sink_config: None,
network: "".into(),
sink_module: "".into(),
}
}

Expand Down
142 changes: 64 additions & 78 deletions chain/substreams/src/mapper.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::{Block, Chain, EntityChanges, TriggerData};
use graph::blockchain::block_stream::SubstreamsError::{
MultipleModuleOutputError, UnexpectedStoreDeltaOutput,
};
use graph::blockchain::block_stream::{
BlockStreamEvent, BlockWithTriggers, FirehoseCursor, SubstreamsError, SubstreamsMapper,
};
use graph::prelude::{async_trait, BlockHash, BlockNumber, BlockPtr, Logger};
use graph::substreams::module_output::Data;
use graph::substreams::{BlockScopedData, Clock, ForkStep};
use graph::substreams::Clock;
use graph::substreams_rpc::response::Message as SubstreamsMessage;
use prost::Message;

pub struct Mapper {}
Expand All @@ -17,89 +14,78 @@ impl SubstreamsMapper<Chain> for Mapper {
async fn to_block_stream_event(
&self,
logger: &Logger,
block_scoped_data: &BlockScopedData,
message: Option<SubstreamsMessage>,
) -> Result<Option<BlockStreamEvent<Chain>>, SubstreamsError> {
let BlockScopedData {
outputs,
clock,
step,
cursor: _,
} = block_scoped_data;

let step = ForkStep::from_i32(*step).unwrap_or_else(|| {
panic!(
"unknown step i32 value {}, maybe you forgot update & re-regenerate the protobuf definitions?",
step
)
});

if outputs.is_empty() {
return Ok(None);
}

if outputs.len() > 1 {
return Err(MultipleModuleOutputError);
}
match message {
Some(SubstreamsMessage::BlockUndoSignal(undo)) => {
let valid_block = match undo.last_valid_block {
Some(clock) => clock,
None => return Err(SubstreamsError::InvalidUndoError),
};
let valid_ptr = BlockPtr {
hash: valid_block.id.trim_start_matches("0x").try_into()?,
number: valid_block.number as i32,
};
return Ok(Some(BlockStreamEvent::Revert(
valid_ptr,
FirehoseCursor::from(undo.last_valid_cursor.clone()),
)));
}

//todo: handle step
let module_output = &block_scoped_data.outputs[0];
let cursor = &block_scoped_data.cursor;
Some(SubstreamsMessage::BlockScopedData(block_scoped_data)) => {
let module_output = match &block_scoped_data.output {
Some(out) => out,
None => return Ok(None),
};

let clock = match clock {
Some(clock) => clock,
None => return Err(SubstreamsError::MissingClockError),
};
let clock = match block_scoped_data.clock {
Some(clock) => clock,
None => return Err(SubstreamsError::MissingClockError),
};

let Clock {
id: hash,
number,
timestamp: _,
} = clock;
let cursor = &block_scoped_data.cursor;

let hash: BlockHash = hash.as_str().try_into()?;
let number: BlockNumber = *number as BlockNumber;
let Clock {
id: hash,
number,
timestamp: _,
} = clock;

match module_output.data.as_ref() {
Some(Data::MapOutput(msg)) => {
let changes: EntityChanges = Message::decode(msg.value.as_slice())
.map_err(SubstreamsError::DecodingError)?;
let hash: BlockHash = hash.as_str().try_into()?;
let number: BlockNumber = number as BlockNumber;

use ForkStep::*;
match step {
StepIrreversible | StepNew => Ok(Some(BlockStreamEvent::ProcessBlock(
// Even though the trigger processor for substreams doesn't care about TriggerData
// there are a bunch of places in the runner that check if trigger data
// empty and skip processing if so. This will prolly breakdown
// close to head so we will need to improve things.
let changes: EntityChanges = match module_output.map_output.as_ref() {
Some(msg) => Message::decode(msg.value.as_slice())
.map_err(SubstreamsError::DecodingError)?,
None => EntityChanges {
entity_changes: [].to_vec(),
},
};

// TODO(filipe): Fix once either trigger data can be empty
// or we move the changes into trigger data.
BlockWithTriggers::new(
Block {
hash,
number,
changes,
},
vec![TriggerData {}],
logger,
),
FirehoseCursor::from(cursor.clone()),
))),
StepUndo => {
let parent_ptr = BlockPtr { hash, number };
// Even though the trigger processor for substreams doesn't care about TriggerData
// there are a bunch of places in the runner that check if trigger data
// empty and skip processing if so. This will prolly breakdown
// close to head so we will need to improve things.

Ok(Some(BlockStreamEvent::Revert(
parent_ptr,
FirehoseCursor::from(cursor.clone()),
)))
}
StepUnknown => {
panic!("unknown step should not happen in the Firehose response")
}
}
// TODO(filipe): Fix once either trigger data can be empty
// or we move the changes into trigger data.
Ok(Some(BlockStreamEvent::ProcessBlock(
BlockWithTriggers::new(
Block {
hash,
number,
changes,
},
vec![TriggerData {}],
logger,
),
FirehoseCursor::from(cursor.clone()),
)))
}
Some(Data::DebugStoreDeltas(_)) => Err(UnexpectedStoreDeltaOutput),
_ => Err(SubstreamsError::ModuleOutputNotPresentOrUnexpected),

// ignoring Progress messages and SessionInit
// We are only interested in Data and Undo signals
_ => Ok(None),
}
}
}
7 changes: 7 additions & 0 deletions graph/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,11 @@ fn main() {
.out_dir("src/substreams")
.compile(&["proto/substreams.proto"], &["proto"])
.expect("Failed to compile Substreams proto(s)");

tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".sf.substreams.v1", "crate::substreams")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be sf.substreams.rpc.v2 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the sf.substreams.v1 still exists for the packages, manifests and such. Only, the service layer in it has been deprecated and moved to sf.substreams.rpc.v2 (the v2 to avoid confusion, even if we change namespace)
this extern_path directive informs the rpcv2 tonic handler about where to find the sf.substreams.v1 stuff.

.out_dir("src/substreams_rpc")
.compile(&["proto/substreams-rpc.proto"], &["proto"])
.expect("Failed to compile Substreams RPC proto(s)");
}
181 changes: 181 additions & 0 deletions graph/proto/substreams-rpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
syntax = "proto3";

package sf.substreams.rpc.v2;

import "google/protobuf/any.proto";
import "substreams.proto";

service Stream {
rpc Blocks(Request) returns (stream Response);
}

message Request {
int64 start_block_num = 1;
string start_cursor = 2;
uint64 stop_block_num = 3;

// With final_block_only, you only receive blocks that are irreversible:
// 'final_block_height' will be equal to current block and no 'undo_signal' will ever be sent
bool final_blocks_only = 4;

// Substreams has two mode when executing your module(s) either development mode or production
// mode. Development and production modes impact the execution of Substreams, important aspects
// of execution include:
// * The time required to reach the first byte.
// * The speed that large ranges get executed.
// * The module logs and outputs sent back to the client.
//
// By default, the engine runs in developer mode, with richer and deeper output. Differences
// between production and development modes include:
// * Forward parallel execution is enabled in production mode and disabled in development mode
// * The time required to reach the first byte in development mode is faster than in production mode.
//
// Specific attributes of development mode include:
// * The client will receive all of the executed module's logs.
// * It's possible to request specific store snapshots in the execution tree (via `debug_initial_store_snapshot_for_modules`).
// * Multiple module's output is possible.
//
// With production mode`, however, you trade off functionality for high speed enabling forward
// parallel execution of module ahead of time.
bool production_mode = 5;

string output_module = 6;

sf.substreams.v1.Modules modules = 7;

// Available only in developer mode
repeated string debug_initial_store_snapshot_for_modules = 10;
}


message Response {
oneof message {
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before sending in the stream of `data` events.
BlockScopedData block_scoped_data = 3;
BlockUndoSignal block_undo_signal = 4;

// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotData debug_snapshot_data = 10;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotComplete debug_snapshot_complete = 11;

}
}


// BlockUndoSignal informs you that every bit of data
// with a block number above 'last_valid_block' has been reverted
// on-chain. Delete that data and restart from 'last_valid_cursor'
message BlockUndoSignal {
sf.substreams.v1.BlockRef last_valid_block = 1;
string last_valid_cursor = 2;
}

message BlockScopedData {
MapModuleOutput output = 1;
sf.substreams.v1.Clock clock = 2;
string cursor = 3;

// Non-deterministic, allows substreams-sink to let go of their undo data.
uint64 final_block_height = 4;

repeated MapModuleOutput debug_map_outputs = 10;
repeated StoreModuleOutput debug_store_outputs = 11;
}

message SessionInit {
string trace_id = 1;
}

message InitialSnapshotComplete {
string cursor = 1;
}

message InitialSnapshotData {
string module_name = 1;
repeated StoreDelta deltas = 2;
uint64 sent_keys = 4;
uint64 total_keys = 3;
}

message MapModuleOutput {
string name = 1;
google.protobuf.Any map_output = 2;
// DebugOutputInfo is available in non-production mode only
OutputDebugInfo debug_info = 10;
}

// StoreModuleOutput are produced for store modules in development mode.
// It is not possible to retrieve store models in production, with parallelization
// enabled. If you need the deltas directly, write a pass through mapper module
// that will get them down to you.
message StoreModuleOutput {
string name = 1;
repeated StoreDelta debug_store_deltas = 2;
OutputDebugInfo debug_info = 10;
}

message OutputDebugInfo {
repeated string logs = 1;
// LogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 2;
bool cached = 3;
}

message ModulesProgress {
repeated ModuleProgress modules = 1;
}

message ModuleProgress {
string name = 1;

oneof type {
ProcessedRanges processed_ranges = 2;
InitialState initial_state = 3;
ProcessedBytes processed_bytes = 4;
Failed failed = 5;
}

message ProcessedRanges {
repeated BlockRange processed_ranges = 1;
}
message InitialState {
uint64 available_up_to_block = 2;
}
message ProcessedBytes {
uint64 total_bytes_read = 1;
uint64 total_bytes_written = 2;
uint64 bytes_read_delta = 3;
uint64 bytes_written_delta = 4;
uint64 nano_seconds_delta = 5;
}
message Failed {
string reason = 1;
repeated string logs = 2;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 3;
}
}

message BlockRange {
uint64 start_block = 2;
uint64 end_block = 3;
}

message StoreDelta {
enum Operation {
UNSET = 0;
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 1;
uint64 ordinal = 2;
string key = 3;
bytes old_value = 4;
bytes new_value = 5;
}

Loading