Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(katana-rpc): event idx not respecting cursor's block idx #2578

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions crates/katana/rpc/rpc/src/utils/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
}
}

/// A partial cursor that points to a specific event within a transaction.
/// A partial cursor that points to a specific event WITHIN a transaction.
#[derive(Debug, Clone, PartialEq, Default)]
struct PartialCursor {
/// The transaction index within a block.
Expand Down Expand Up @@ -153,12 +153,11 @@
let block_range = cursor.block..=*block_range.end();

for block_num in block_range {
// collect all receipts at `block_num` block.

Check warning on line 156 in crates/katana/rpc/rpc/src/utils/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/rpc/rpc/src/utils/events.rs#L156

Added line #L156 was not covered by tests
let block_hash = provider.block_hash_by_num(block_num)?.context("Missing block hash")?;
let receipts = provider.receipts_by_block(block_num.into())?.context("Missing receipts")?;

let body_index =
provider.block_body_indices(block_num.into())?.context("Missing block body index")?;

let tx_hashes = provider.transaction_hashes_in_range(body_index.into())?;

if block_num == cursor.block {
Expand All @@ -178,16 +177,26 @@
.enumerate()
.skip(total_tx_to_skip)
{
// we should only skip for the last txn pointed by the cursor.
// Determine the next event index to start processing.
let next_event =
// Check if the block AND tx we're currently processing is exactly the one pointed by the cursor.
//
// If yes, then we check whether (1) the event index pointed by the cursor is less than
// OR (2) exceed the total number of events in the current transaction.

Check warning on line 185 in crates/katana/rpc/rpc/src/utils/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/rpc/rpc/src/utils/events.rs#L182-L185

Added lines #L182 - L185 were not covered by tests
if block_num == cursor.block && tx_idx == cursor.txn.idx {
match events.len().cmp(&cursor.txn.event) {
Ordering::Greater => {}
Ordering::Less | Ordering::Equal => continue,
// If its (1), then that means there are still some events left to process in

Check warning on line 187 in crates/katana/rpc/rpc/src/utils/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/rpc/rpc/src/utils/events.rs#L187

Added line #L187 was not covered by tests
// the current transaction. Else if its (2), meaning the cursor is pointing to either the
// last event or out of bound, which we can just skip to the next transaction.
match cursor.txn.event.cmp(&events.len()) {
Ordering::Less => cursor.txn.event,
Ordering::Greater | Ordering::Equal => continue,

Check warning on line 192 in crates/katana/rpc/rpc/src/utils/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/rpc/rpc/src/utils/events.rs#L192

Added line #L192 was not covered by tests
}
}
// If we're not processing the block and tx pointed by the cursor, then we start from 0
else {

Check warning on line 196 in crates/katana/rpc/rpc/src/utils/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/rpc/rpc/src/utils/events.rs#L195-L196

Added lines #L195 - L196 were not covered by tests
0
};

Check warning on line 198 in crates/katana/rpc/rpc/src/utils/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/rpc/rpc/src/utils/events.rs#L198

Added line #L198 was not covered by tests

// we should only skip for the last txn pointed by the cursor.
let next_event = if tx_idx == cursor.txn.idx { cursor.txn.event } else { 0 };
let partial_cursor = fetch_tx_events(
next_event,
Some(block_num),
Expand Down Expand Up @@ -267,8 +276,23 @@
}
}

// returns a cursor if it couldn't include all the events of the current transaction because
// the buffer is already full. otherwise none.
/// Fetches events from a transaction, applying filters and respecting chunk size limits.
///
/// Returns a cursor if it couldn't include all the events of the current transaction because
/// the buffer is already full. Otherwise, if it is able to include all the transactions,
/// returns None.
///
/// # Arguments
///
/// * `next_event_idx` - The index of the transaction in the current transaction to start from
/// * `block_number` - Block number of the current transaction
/// * `block_hash` - Block hash of the current transaction
/// * `tx_idx` - Index of the current transaction in the block
/// * `tx_hash` - Hash of the current transaction
/// * `events` - All events in the current transaction
/// * `filter` - The filter to apply on the events
/// * `chunk_size` - Maximum number of events that can be taken, based on user-specified chunk size
/// * `buffer` - Buffer to store the matched events

Check warning on line 295 in crates/katana/rpc/rpc/src/utils/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/rpc/rpc/src/utils/events.rs#L279-L295

Added lines #L279 - L295 were not covered by tests
#[allow(clippy::too_many_arguments)]
fn fetch_tx_events(
next_event_idx: usize,
Expand All @@ -295,6 +319,7 @@
transaction_hash: tx_hash,
from_address: e.from_address.into(),
})
// enumerate so that we can keep track of the event's index in the transaction
.enumerate()
.skip(next_event_idx)
.take(total_can_take)
Expand All @@ -304,10 +329,9 @@
let total_events_traversed = next_event_idx + total_can_take;

// get the index of the last matching event that we have reached. if there is not
// matching events (ie `filtered` is empty) we point the end of the chunk
// matching events (ie `filtered` is empty) we point to the end of the chunk
// we've covered thus far using the iterator..
let last_event_idx = filtered.last().map(|(idx, _)| *idx).unwrap_or(total_events_traversed);

buffer.extend(filtered.into_iter().map(|(_, event)| event));

if buffer.len() >= chunk_size {
Expand Down
Loading