Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Cursor not moving correctly after poll in get_filter_changes #546

Merged
merged 21 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f67b91f
add 1 to from_block after poll
Artemka374 Nov 27, 2023
05aa216
increase from_block only if logs are not empty
Artemka374 Nov 27, 2023
21804c8
set from_block to minimum from to_block and current latest block poll…
Artemka374 Nov 28, 2023
da0f697
Merge branch 'main' into afo-fix-get_filter_changes
montekki Nov 30, 2023
cea32d5
Merge branch 'main' into afo-fix-get_filter_changes
Artemka374 Nov 30, 2023
5df7727
update cursor to be inclusive in all types of filters
Artemka374 Nov 30, 2023
f06b79d
remove multiple imports
Artemka374 Nov 30, 2023
91cdf89
refactor get_logs to accept definite range of blocks
Artemka374 Dec 1, 2023
f5b5a2c
fix tests
Artemka374 Dec 1, 2023
22891c7
Merge branch 'main' into afo-fix-get_filter_changes
Artemka374 Dec 1, 2023
c3ed7ef
Merge remote-tracking branch 'origin/afo-fix-get_filter_changes' into…
Artemka374 Dec 1, 2023
0f293e8
update basic filter changes
Artemka374 Dec 1, 2023
2894e8c
Merge branch 'main' into afo-fix-get_filter_changes
Artemka374 Dec 4, 2023
2aa73df
apply suggestions
Artemka374 Dec 4, 2023
1d952e4
apply suggestions
Artemka374 Dec 4, 2023
3ffec67
remove redundant import
Artemka374 Dec 4, 2023
8d17764
Merge branch 'main' into afo-fix-get_filter_changes
Artemka374 Dec 4, 2023
7074c15
Merge branch 'main' into afo-fix-get_filter_changes
Artemka374 Dec 5, 2023
8df660a
apply suggestions
Artemka374 Dec 6, 2023
5eab9f7
Merge branch 'main' into afo-fix-get_filter_changes
Artemka374 Dec 6, 2023
ec0b9c0
Merge branch 'main' into afo-fix-get_filter_changes
Artemka374 Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,33 @@
},
"query": "SELECT * from prover_jobs where id=$1"
},
"2044947d6d29f29cda508b2160c39f74a8bfd524afa2ffc20a98ae039bc86ed7": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "hash",
"ordinal": 1,
"type_info": "Bytea"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"Int8",
"Int8"
]
}
},
"query": "SELECT number, hash FROM miniblocks WHERE number >= $1 ORDER BY number ASC LIMIT $2"
},
"20b22fd457417e9a72f5941887448f9a11b97b449db4759da0b9d368ce93996b": {
"describe": {
"columns": [
Expand Down Expand Up @@ -9492,33 +9519,6 @@
},
"query": "\n SELECT id, circuit_input_blob_url FROM prover_jobs\n WHERE status='successful'\n AND circuit_input_blob_url is NOT NULL\n AND updated_at < NOW() - INTERVAL '30 days'\n LIMIT $1;\n "
},
"b479b7d3334f8d4566c294a44e2adb282fbc66a87be5c248c65211c2a8a07db0": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "hash",
"ordinal": 1,
"type_info": "Bytea"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"Int8",
"Int8"
]
}
},
"query": "SELECT number, hash FROM miniblocks WHERE number > $1 ORDER BY number ASC LIMIT $2"
},
"b4a3c902646725188f7c79ebac992cdce5896fc6fcc9f485c0cba9d90c4c982c": {
"describe": {
"columns": [
Expand Down
6 changes: 3 additions & 3 deletions core/lib/dal/src/blocks_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ impl BlocksWeb3Dal<'_, '_> {
}))
}

/// Returns hashes of blocks with numbers greater than `from_block` and the number of the last block.
pub async fn get_block_hashes_after(
/// Returns hashes of blocks with numbers starting from `from_block` and the number of the last block.
pub async fn get_block_hashes_since(
&mut self,
from_block: MiniblockNumber,
limit: usize,
) -> sqlx::Result<(Vec<H256>, Option<MiniblockNumber>)> {
let rows = sqlx::query!(
"SELECT number, hash FROM miniblocks \
WHERE number > $1 \
WHERE number >= $1 \
ORDER BY number ASC \
LIMIT $2",
from_block.0 as i64,
Expand Down
13 changes: 4 additions & 9 deletions core/lib/dal/src/events_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use zksync_types::{
};

use crate::{
instrument::InstrumentExt,
models::{storage_block::web3_block_number_to_sql, storage_event::StorageWeb3Log},
SqlxError, StorageProcessor,
instrument::InstrumentExt, models::storage_event::StorageWeb3Log, SqlxError, StorageProcessor,
};

#[derive(Debug)]
Expand Down Expand Up @@ -119,10 +117,8 @@ impl EventsWeb3Dal<'_, '_> {

let mut where_sql = format!("(miniblock_number >= {})", filter.from_block.0 as i64);

if let Some(to_block) = filter.to_block {
let block_sql = web3_block_number_to_sql(to_block);
where_sql += &format!(" AND (miniblock_number <= {})", block_sql);
}
where_sql += &format!(" AND (miniblock_number <= {})", filter.to_block.0 as i64);

if !filter.addresses.is_empty() {
where_sql += &format!(" AND (address = ANY(${}))", arg_index);
arg_index += 1;
Expand Down Expand Up @@ -172,7 +168,6 @@ impl EventsWeb3Dal<'_, '_> {

#[cfg(test)]
mod tests {
use zksync_types::api::BlockNumber;
use zksync_types::{Address, H256};

use super::*;
Expand All @@ -185,7 +180,7 @@ mod tests {
let events_web3_dal = EventsWeb3Dal { storage };
let filter = GetLogsFilter {
from_block: MiniblockNumber(100),
to_block: Some(BlockNumber::Number(200.into())),
to_block: MiniblockNumber(200),
addresses: vec![Address::from_low_u64_be(123)],
topics: vec![(0, vec![H256::from_low_u64_be(456)])],
};
Expand Down
2 changes: 1 addition & 1 deletion core/lib/types/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ pub struct TransactionDetails {
#[derive(Debug, Clone)]
pub struct GetLogsFilter {
pub from_block: MiniblockNumber,
pub to_block: Option<BlockNumber>,
pub to_block: MiniblockNumber,
pub addresses: Vec<Address>,
pub topics: Vec<(u32, Vec<H256>)>,
}
Expand Down
45 changes: 29 additions & 16 deletions core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
.installed_filters
.lock()
.await
.add(TypedFilter::Blocks(last_block_number));
.add(TypedFilter::Blocks(last_block_number + 1));
method_latency.observe();
Ok(idx)
}
Expand Down Expand Up @@ -773,14 +773,19 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
.map_err(|err| internal_error(METHOD_NAME, err))?;
let (block_hashes, last_block_number) = conn
.blocks_web3_dal()
.get_block_hashes_after(*from_block, self.state.api_config.req_entities_limit)
.get_block_hashes_since(*from_block, self.state.api_config.req_entities_limit)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
*from_block = last_block_number.unwrap_or(*from_block);

*from_block = match last_block_number {
Some(last_block_number) => last_block_number + 1,
None => *from_block,
};

FilterChanges::Hashes(block_hashes)
}

TypedFilter::PendingTransactions(from_timestamp) => {
TypedFilter::PendingTransactions(from_timestamp_excluded) => {
let mut conn = self
.state
.connection_pool
Expand All @@ -790,12 +795,14 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
let (tx_hashes, last_timestamp) = conn
.transactions_web3_dal()
.get_pending_txs_hashes_after(
*from_timestamp,
*from_timestamp_excluded,
Some(self.state.api_config.req_entities_limit),
)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
*from_timestamp = last_timestamp.unwrap_or(*from_timestamp);

*from_timestamp_excluded = last_timestamp.unwrap_or(*from_timestamp_excluded);

FilterChanges::Hashes(tx_hashes)
}

Expand All @@ -816,16 +823,26 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
} else {
vec![]
};

let mut to_block = self
.state
.resolve_filter_block_number(filter.to_block)
.await?;

if matches!(filter.to_block, Some(BlockNumber::Number(_))) {
to_block = to_block.min(
self.state
.resolve_filter_block_number(Some(BlockNumber::Latest))
.await?,
);
}

let get_logs_filter = GetLogsFilter {
from_block: *from_block,
to_block: filter.to_block,
to_block,
addresses,
topics,
};
let to_block = self
.state
.resolve_filter_block_number(filter.to_block)
.await?;

let mut storage = self
.state
Expand Down Expand Up @@ -859,11 +876,7 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
.get_logs(get_logs_filter, i32::MAX as usize)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
*from_block = logs
.last()
.map(|log| MiniblockNumber(log.block_number.unwrap().as_u32()))
.unwrap_or(*from_block);
// FIXME: why is `from_block` not updated?
*from_block = to_block + 1;
FilterChanges::Logs(logs)
}
};
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl<G: L1GasPriceProvider> ZksNamespace<G> {
.get_logs(
GetLogsFilter {
from_block: first_miniblock_of_l1_batch,
to_block: Some(block_number.0.into()),
to_block: block_number,
addresses: vec![L1_MESSENGER_ADDRESS],
topics: vec![(2, vec![address_to_h256(&sender)]), (3, vec![msg])],
},
Expand Down
3 changes: 2 additions & 1 deletion core/lib/zksync_core/src/api_server/web3/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,10 @@ impl<E> RpcState<E> {
.enumerate()
.filter_map(|(idx, topics)| topics.map(|topics| (idx as u32 + 1, topics.0)))
.collect();

let get_logs_filter = GetLogsFilter {
from_block,
to_block: filter.to_block,
to_block,
addresses,
topics,
};
Expand Down
24 changes: 9 additions & 15 deletions core/lib/zksync_core/src/api_server/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ impl HttpTest for LogFilterChanges {
assert_logs_match(&topics_logs, &[events[1], events[3]]);

let new_all_logs = client.get_filter_changes(all_logs_filter_id).await?;
let FilterChanges::Logs(new_all_logs) = new_all_logs else {
let FilterChanges::Hashes(new_all_logs) = new_all_logs else {
panic!("Unexpected getFilterChanges output: {:?}", new_all_logs);
};
assert_eq!(new_all_logs, all_logs); // FIXME(#546): update test after behavior is fixed
assert!(new_all_logs.is_empty());
Ok(())
}
}
Expand Down Expand Up @@ -458,11 +458,10 @@ impl HttpTest for LogFilterChangesWithBlockBoundaries {
};
assert_logs_match(&lower_bound_logs, &new_events);

// FIXME(#546): update test after behavior is fixed
let new_upper_bound_logs = client.get_filter_changes(upper_bound_filter_id).await?;
assert_eq!(new_upper_bound_logs, FilterChanges::Logs(upper_bound_logs));
assert_matches!(new_upper_bound_logs, FilterChanges::Hashes(hashes) if hashes.is_empty());
let new_bounded_logs = client.get_filter_changes(bounded_filter_id).await?;
assert_eq!(new_bounded_logs, FilterChanges::Logs(bounded_logs));
assert_matches!(new_bounded_logs, FilterChanges::Hashes(hashes) if hashes.is_empty());

// Add miniblock #3. It should not be picked up by the bounded and upper bound filters,
// and should be picked up by the lower bound filter.
Expand All @@ -472,27 +471,22 @@ impl HttpTest for LogFilterChangesWithBlockBoundaries {
let new_events: Vec<_> = new_events.iter().collect();

let bounded_logs = client.get_filter_changes(bounded_filter_id).await?;
let FilterChanges::Logs(bounded_logs) = bounded_logs else {
let FilterChanges::Hashes(bounded_logs) = bounded_logs else {
panic!("Unexpected getFilterChanges output: {:?}", bounded_logs);
};
assert!(bounded_logs
.iter()
.all(|log| log.block_number.unwrap() < 3.into()));
assert!(bounded_logs.is_empty());

let upper_bound_logs = client.get_filter_changes(upper_bound_filter_id).await?;
let FilterChanges::Logs(upper_bound_logs) = upper_bound_logs else {
let FilterChanges::Hashes(upper_bound_logs) = upper_bound_logs else {
panic!("Unexpected getFilterChanges output: {:?}", upper_bound_logs);
};
assert!(upper_bound_logs
.iter()
.all(|log| log.block_number.unwrap() < 3.into()));
assert!(upper_bound_logs.is_empty());

let lower_bound_logs = client.get_filter_changes(lower_bound_filter_id).await?;
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
let FilterChanges::Logs(lower_bound_logs) = lower_bound_logs else {
panic!("Unexpected getFilterChanges output: {:?}", lower_bound_logs);
};
let start_idx = lower_bound_logs.len() - 4;
assert_logs_match(&lower_bound_logs[start_idx..], &new_events);
assert_logs_match(&lower_bound_logs, &new_events);
Ok(())
}
}
Expand Down