Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process WASM blocks from substreams #4916

Merged
merged 1 commit into from
Dec 5, 2023
Merged

Conversation

mangas
Copy link
Contributor

@mangas mangas commented Oct 13, 2023

Add support for running WASM mappings on incoming substreams blocks.

  • The block stream will emit the encoded result as a byte array
  • The mapping will need to handle the decoding of the input

Example is provided here

@mangas mangas force-pushed the filipe/substreams-wasm-block branch 4 times, most recently from 670b89b to dce78e4 Compare November 7, 2023 15:59
@mangas mangas force-pushed the filipe/substreams-wasm-block branch 3 times, most recently from dc2fb4b to f3fea4f Compare November 13, 2023 15:13
@mangas mangas marked this pull request as ready for review November 13, 2023 15:30
@mangas mangas requested a review from lutter November 13, 2023 15:30
Copy link
Collaborator

@lutter lutter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest thing that I'd like to see addressed is the code duplication between handle_process_block and handle_process_wasm_block.

Since this touches and refactors some very sensitive core logic, it also needs a run in the integration cluster.

_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
unimplemented!()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this unimplemented be turned into an error so that we don't get a panic if we hit this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this code is reached it is a problem. Substreams is itself a chain so anything that calls into this code should be on another chain

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment was more about that panics a lot of times are fairly invisible and easy to miss in our huge logs, errors are a bit easier to find. And since this already returns a Result, it wouldn't be hard to make this an error.

_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
unimplemented!()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - error would be preferable

_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
unimplemented!()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - error would be preferable

};

const NEAR_FILTER_MODULE_NAME: &str = "near_filter";
const SUBSTREAMS_TRIGGER_FILTER_BYTES: &[u8; 497306] = include_bytes!(
const SUBSTREAMS_TRIGGER_FILTER_BYTES: &[u8; 510162] = include_bytes!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment here what this is and what it is used for would be great. Also something about when/how this needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the binary produced by our other crate inside substreams, it will tell you when it needs to be updated :P

logger,
chain.metrics_registry.clone(),
),
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since most of the argument to SubstreamsBlockStream are the same in both branches, it would be clearer to just have something like let mapper = match .. { }; and then one statement SubstreamsBlockStream::new

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that before and couldn't get it to compile, I can't remember what was the issue but I couldn't convince the compiler some specific type was correct. I will give it another go, I maybe have made more changes in the meantime

@@ -77,7 +77,7 @@ impl blockchain::DataSource<Chain> for DataSource {

// runtime is not needed for substreams, it will cause the host creation to be skipped.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn't seem accurate anymore

state = self
.instance
.hosts()
.first()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's because I am not very familiar with this code, but why do we only pay attention to the first host here? What are the other ones for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hosts exist map 1:1 to DataSources, on substreams we only allow one data source since it's triggered on every block so we basically only care about the first one.

The code around the runner has all these points where context is required to understand what is going. It's something I would like to refactor at some point in the near future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment

}
}

if matches!(action, Action::Restart) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to make this a match action { .. } with all possibilities spelled out so that when anything is added to Action, the compiler points us to the places that require updating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I think I see the issue, this is all old code, moved from elsewhere in the runner so it can be re-used so Idont want to change anything here

.start_section(PROCESS_WASM_BLOCK_SECTION_NAME);
self.handle_process_wasm_block(block_ptr, data, handler, cursor, cancel_handle)
.await?
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future refactor: why aren't the branches for Some(Ok(event)) a methond on BlockStreamEvent so that these three branches could simple be Some(Ok(event)) => event.handle(stopwatch, cancel_handle) ?

@@ -880,197 +1077,247 @@ where
C: Blockchain,
T: RuntimeHostBuilder<C>,
{
async fn handle_process_block(
async fn handle_process_wasm_block(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a lot of code duplication between handle_process_wasm_block and handle_process_block which I fear will lead to subtle bugs as that logic evolves over time. Is there no way to avoid that duplication? I think handle_process_block would also be improved by introducing some helper structs that encapsulate some of the logic that is shared between the two; I know it's grown over time, but to the uninitiated, the code for handle_process_block is very hard to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my other comment, I think this whole section requires a refactor btu I would first like to understand better how substream will work so I decided to duplicate a lot of the code instead of doing the refactor now but this certainly needs improvement in the near future

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you then add an issue so this doesn't get completely forgotten?

@lutter
Copy link
Collaborator

lutter commented Dec 4, 2023

I've approved this though I have pretty big reservations about the code duplication, and the fact that that code is pretty much undocumented (what's the high-level intended behavior of that code?) and very hard to follow.

@mangas mangas force-pushed the filipe/substreams-wasm-block branch 3 times, most recently from 01fcdf9 to 4d1012b Compare December 5, 2023 12:47
@mangas mangas force-pushed the filipe/substreams-wasm-block branch from 4d1012b to bafda5b Compare December 5, 2023 14:01
@mangas mangas merged commit d06e1b5 into master Dec 5, 2023
7 checks passed
@mangas mangas deleted the filipe/substreams-wasm-block branch December 5, 2023 16:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants