Skip to content

Commit

Permalink
Merge pull request #2207 from eqlabs/krisztian/0.14.2-backport-bugfixes
Browse files Browse the repository at this point in the history
chore: backport some fixes to 0.14.2
  • Loading branch information
kkovaacs authored Sep 3, 2024
2 parents c0137d6 + 36f44a7 commit d76289f
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 28 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ More expansive patch notes and explanations may be found in the specific [pathfi
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed

- Pathfinder sometimes returns an INVALID_CONTINUATION_TOKEN error when requesting events from the pending block and providing a continuation token.
- `starknet_getEvents` incorrectly returns pending events if `from_block` is greater than latest_block_number + 1.
- `starknet_getEvents` incorrectly does not return pending events if `from_block` is `pending` and `to_block` is missing.

### Added

- `--sync.l1-poll-interval` CLI option has been added to set the poll interval for L1 state. Defaults to 30s.

## [0.14.1] - 2024-07-29

### Fixed
Expand Down
1 change: 1 addition & 0 deletions crates/gateway-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ httpmock = { workspace = true }
lazy_static = { workspace = true }
pathfinder-crypto = { path = "../crypto" }
pretty_assertions_sorted = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
starknet-gateway-test-fixtures = { path = "../gateway-test-fixtures" }
test-log = { workspace = true, features = ["trace"] }
tracing-subscriber = { workspace = true }
Expand Down
26 changes: 14 additions & 12 deletions crates/gateway-client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,24 +449,26 @@ fn retry_condition(e: &SequencerError) -> bool {

match e {
SequencerError::ReqwestError(e) => {
if e.is_body() || e.is_connect() || e.is_timeout() {
if e.is_timeout() {
info!(reason=%e, "Request failed, retrying. Fetching the response or parts of it timed out. Try increasing request timeout by using the `--gateway.request-timeout` CLI option.");
return true;
}

if e.is_body() || e.is_connect() {
info!(reason=%e, "Request failed, retrying");
} else if e.is_status() {
match e.status() {
Some(
StatusCode::NOT_FOUND
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT,
) => {
match e.status().expect("status related error") {
StatusCode::NOT_FOUND
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
debug!(reason=%e, "Request failed, retrying");
}
Some(StatusCode::INTERNAL_SERVER_ERROR) => {
StatusCode::INTERNAL_SERVER_ERROR => {
error!(reason=%e, "Request failed, retrying");
}
Some(_) => warn!(reason=%e, "Request failed, retrying"),
None => unreachable!(),
_ => warn!(reason=%e, "Request failed, retrying"),
}
} else if e.is_decode() {
error!(reason=%e, "Request failed, retrying");
Expand Down
10 changes: 10 additions & 0 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ Examples:
)]
poll_interval: std::num::NonZeroU64,

#[arg(
long = "sync.l1-poll-interval",
long_help = "L1 state poll interval in seconds",
default_value = "30",
env = "PATHFINDER_L1_POLL_INTERVAL_SECONDS"
)]
l1_poll_interval: std::num::NonZeroU64,

#[arg(
long = "color",
long_help = "This flag controls when to use colors in the output logs.",
Expand Down Expand Up @@ -669,6 +677,7 @@ pub struct Config {
pub sqlite_wal: JournalMode,
pub max_rpc_connections: std::num::NonZeroUsize,
pub poll_interval: std::time::Duration,
pub l1_poll_interval: std::time::Duration,
pub color: Color,
pub p2p: P2PConfig,
pub debug: DebugConfig,
Expand Down Expand Up @@ -953,6 +962,7 @@ impl Config {
},
max_rpc_connections: cli.max_rpc_connections,
poll_interval: Duration::from_secs(cli.poll_interval.get()),
l1_poll_interval: Duration::from_secs(cli.l1_poll_interval.get()),
color: cli.color,
p2p: P2PConfig::parse_or_exit(cli.p2p),
debug: DebugConfig::parse(cli.debug),
Expand Down
9 changes: 8 additions & 1 deletion crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
});
let execution_storage = storage_manager
.create_read_only_pool(execution_storage_pool_size)
.context(r"")?;
.context(
r"Creating database connection pool for execution
Hint: This is usually caused by exceeding the file descriptor limit of your system.
Try increasing the file limit to using `ulimit` or similar tooling.",
)?;

let p2p_storage = storage_manager
.create_pool(NonZeroU32::new(1).unwrap())
Expand Down Expand Up @@ -534,6 +539,7 @@ fn start_sync(
gossiper: state::Gossiper,
gateway_public_key: pathfinder_common::PublicKey,
_p2p_client: Option<p2p::client::peer_agnostic::Client>,
_verify_tree_hashes: bool,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
start_feeder_gateway_sync(
storage,
Expand Down Expand Up @@ -569,6 +575,7 @@ fn start_feeder_gateway_sync(
sequencer: pathfinder_context.gateway,
state: sync_state.clone(),
head_poll_interval: config.poll_interval,
l1_poll_interval: config.l1_poll_interval,
pending_data: tx_pending,
block_validation_mode: state::l2::BlockValidationMode::Strict,
websocket_txs,
Expand Down
4 changes: 3 additions & 1 deletion crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct SyncContext<G, E> {
pub sequencer: G,
pub state: Arc<SyncState>,
pub head_poll_interval: Duration,
pub l1_poll_interval: Duration,
pub pending_data: WatchSender<PendingData>,
pub block_validation_mode: l2::BlockValidationMode,
pub websocket_txs: Option<TopicBroadcasters>,
Expand All @@ -95,7 +96,7 @@ where
ethereum: value.ethereum.clone(),
chain: value.chain,
core_address: value.core_address,
poll_interval: value.head_poll_interval,
poll_interval: value.l1_poll_interval,
}
}
}
Expand Down Expand Up @@ -181,6 +182,7 @@ where
sequencer,
state,
head_poll_interval,
l1_poll_interval: _,
pending_data,
block_validation_mode: _,
websocket_txs,
Expand Down
78 changes: 68 additions & 10 deletions crates/rpc/src/method/get_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ pub async fn get_events(
// The database query for 3 and 4 is combined into one step.
//
// 4 requires some additional logic to handle some edge cases:
// a) Query database
// b) if full page -> return page
// a) if from_block_number > pending_block_number -> return empty result
// b) Query database
// c) if full page -> return page
// check if there are matching events in the pending block
// and return a continuation token for the pending block
// c) else if empty / partially full -> append events from start of pending
// d) else if empty / partially full -> append events from start of pending
// if there are more pending events return a continuation token
// with the appropriate offset within the pending block

Expand Down Expand Up @@ -134,27 +135,43 @@ pub async fn get_events(
.transaction()
.context("Creating database transaction")?;

// Handle the trivial (1) and (2) cases.
// Handle the trivial (1), (2) and (4a) cases.
match (&request.from_block, &request.to_block) {
(Some(Pending), non_pending) if *non_pending != Some(Pending) => {
(Some(Pending), id) if !matches!(id, Some(Pending) | None) => {
return Ok(types::GetEventsResult {
events: Vec::new(),
continuation_token: None,
});
}
(Some(Pending), Some(Pending)) => {
(Some(Pending), Some(Pending) | None) => {
let pending = context
.pending_data
.get(&transaction)
.context("Querying pending data")?;
return get_pending_events(&request, &pending, continuation_token);
}
(Some(BlockId::Number(from_block)), Some(BlockId::Pending)) => {
let pending = context
.pending_data
.get(&transaction)
.context("Querying pending data")?;

// `from_block` is larger than or equal to pending block's number
if from_block >= &pending.number {
return Ok(types::GetEventsResult {
events: Vec::new(),
continuation_token: None,
});
}
}
_ => {}
}

let from_block = map_from_block_to_number(&transaction, request.from_block)?;
let to_block = map_to_block_to_number(&transaction, request.to_block)?;

// Handle cases (3) and (4) where `from_block` is non-pending.

let (from_block, requested_offset) = match continuation_token {
Some(token) => token.start_block_and_offset(from_block)?,
None => (from_block, 0),
Expand Down Expand Up @@ -460,10 +477,11 @@ impl std::fmt::Display for ContinuationToken {

impl ContinuationToken {
fn offset_in_block(&self, block_number: BlockNumber) -> Result<usize, GetEventsError> {
if self.block_number == block_number {
Ok(self.offset)
} else {
Err(GetEventsError::InvalidContinuationToken)
use std::cmp::Ordering;
match Ord::cmp(&self.block_number, &block_number) {
Ordering::Equal => Ok(self.offset),
Ordering::Less => Ok(0),
Ordering::Greater => Err(GetEventsError::InvalidContinuationToken),
}
}

Expand Down Expand Up @@ -959,6 +977,14 @@ mod tests {
assert_eq!(result.events, &all[3..4]);
assert_eq!(result.continuation_token, None);

// continuing from a page that does exist, should return all events (even from
// pending)
input.filter.chunk_size = 123;
input.filter.continuation_token = Some("0-0".to_string());
let result = get_events(context.clone(), input.clone()).await.unwrap();
assert_eq!(result.events, all);
assert_eq!(result.continuation_token, None);

// nonexistent page: offset too large
input.filter.chunk_size = 123; // Does not matter
input.filter.continuation_token = Some("3-3".to_string()); // Points to after the last event
Expand Down Expand Up @@ -1039,5 +1065,37 @@ mod tests {
.events;
assert_eq!(events, &all[1..2]);
}

#[tokio::test]
async fn from_block_past_pending() {
let context = RpcContext::for_tests_with_pending().await;

let input = GetEventsInput {
filter: EventFilter {
from_block: Some(BlockId::Number(BlockNumber::new_or_panic(4))),
to_block: Some(BlockId::Pending),
chunk_size: 100,
..Default::default()
},
};
let result = get_events(context, input).await.unwrap();
assert!(result.events.is_empty());
}

#[tokio::test]
async fn from_block_pending_to_block_none() {
let context = RpcContext::for_tests_with_pending().await;

let input = GetEventsInput {
filter: EventFilter {
from_block: Some(BlockId::Pending),
to_block: None,
chunk_size: 100,
..Default::default()
},
};
let result = get_events(context, input).await.unwrap();
assert!(!result.events.is_empty());
}
}
}
17 changes: 13 additions & 4 deletions crates/rpc/src/v03/method/get_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,11 @@ impl std::fmt::Display for ContinuationToken {

impl ContinuationToken {
fn offset_in_block(&self, block_number: BlockNumber) -> Result<usize, GetEventsError> {
if self.block_number == block_number {
Ok(self.offset)
} else {
Err(GetEventsError::InvalidContinuationToken)
use std::cmp::Ordering;
match Ord::cmp(&self.block_number, &block_number) {
Ordering::Equal => Ok(self.offset),
Ordering::Less => Ok(0),
Ordering::Greater => Err(GetEventsError::InvalidContinuationToken),
}
}

Expand Down Expand Up @@ -959,6 +960,14 @@ mod tests {
assert_eq!(result.events, &all[3..4]);
assert_eq!(result.continuation_token, None);

// continuing from a page that does exist, should return all events (even from
// pending)
input.filter.chunk_size = 123;
input.filter.continuation_token = Some("0-0".to_string());
let result = get_events(context.clone(), input.clone()).await.unwrap();
assert_eq!(result.events, all);
assert_eq!(result.continuation_token, None);

// nonexistent page: offset too large
input.filter.chunk_size = 123; // Does not matter
input.filter.continuation_token = Some("3-3".to_string()); // Points to after the last event
Expand Down

0 comments on commit d76289f

Please sign in to comment.