Skip to content

Commit

Permalink
fix(vm-runner): add config value for the first processed batch (#2158)
Browse files Browse the repository at this point in the history
## What ❔

Adds a new config value that regulate which batch to consider as the
last processed on a fresh start.

Also renames config values to be more idiomatic due to several from
DevOps and other people :)

## Why ❔

We don't want to wait half a year for VM to process all testnet/mainnet
batches.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
itegulov authored Jun 6, 2024
1 parent b08a667 commit f666717
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 34 deletions.
11 changes: 7 additions & 4 deletions core/lib/config/src/configs/vm_runner.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use serde::Deserialize;
use zksync_basic_types::L1BatchNumber;

#[derive(Debug, Deserialize, Clone, PartialEq, Default)]
pub struct ProtectiveReadsWriterConfig {
/// Path to the RocksDB data directory that serves state cache.
#[serde(default = "ProtectiveReadsWriterConfig::default_protective_reads_db_path")]
pub protective_reads_db_path: String,
#[serde(default = "ProtectiveReadsWriterConfig::default_db_path")]
pub db_path: String,
/// How many max batches should be processed at the same time.
pub protective_reads_window_size: u32,
pub window_size: u32,
/// All batches before this one (inclusive) are always considered to be processed.
pub first_processed_batch: L1BatchNumber,
}

impl ProtectiveReadsWriterConfig {
fn default_protective_reads_db_path() -> String {
fn default_db_path() -> String {
"./db/protective_reads_writer".to_owned()
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions core/lib/dal/src/vm_runner_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ pub struct VmRunnerDal<'c, 'a> {
impl VmRunnerDal<'_, '_> {
pub async fn get_protective_reads_latest_processed_batch(
&mut self,
default_batch: L1BatchNumber,
) -> DalResult<L1BatchNumber> {
let row = sqlx::query!(
r#"
SELECT
COALESCE(MAX(l1_batch_number), 0) AS "last_processed_l1_batch!"
COALESCE(MAX(l1_batch_number), $1) AS "last_processed_l1_batch!"
FROM
vm_runner_protective_reads
"#
"#,
default_batch.0 as i32
)
.instrument("get_protective_reads_latest_processed_batch")
.report_latency()
Expand Down
5 changes: 3 additions & 2 deletions core/lib/protobuf_config/src/proto/config/vm_runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package zksync.config.vm_runner;

message ProtectiveReadsWriter {
optional string protective_reads_db_path = 1; // required; fs path
optional uint64 protective_reads_window_size = 2; // required
optional string db_path = 1; // required; fs path
optional uint64 window_size = 2; // required
optional uint64 first_processed_batch = 3; // required
}
17 changes: 9 additions & 8 deletions core/lib/protobuf_config/src/vm_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context;
use zksync_basic_types::L1BatchNumber;
use zksync_config::configs::{self};
use zksync_protobuf::{required, ProtoRepr};

Expand All @@ -9,19 +10,19 @@ impl ProtoRepr for proto::ProtectiveReadsWriter {

fn read(&self) -> anyhow::Result<Self::Type> {
Ok(Self::Type {
protective_reads_db_path: required(&self.protective_reads_db_path)
.context("protective_reads_db_path")?
.clone(),
protective_reads_window_size: *required(&self.protective_reads_window_size)
.context("protective_reads_window_size")?
as u32,
db_path: required(&self.db_path).context("db_path")?.clone(),
window_size: *required(&self.window_size).context("window_size")? as u32,
first_processed_batch: L1BatchNumber(
*required(&self.first_processed_batch).context("first_batch")? as u32,
),
})
}

fn build(this: &Self::Type) -> Self {
Self {
protective_reads_db_path: Some(this.protective_reads_db_path.clone()),
protective_reads_window_size: Some(this.protective_reads_window_size as u64),
db_path: Some(this.db_path.clone()),
window_size: Some(this.window_size as u64),
first_processed_batch: Some(this.first_processed_batch.0 as u64),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,16 @@ impl WiringLayer for ProtectiveReadsWriterLayer {
// One for `ConcurrentOutputHandlerFactoryTask`/`VmRunner` as they need occasional access
// to DB for querying last processed batch and last ready to be loaded batch.
//
// `self.protective_reads_writer_config` connections for `ProtectiveReadsOutputHandlerFactory`
// `window_size` connections for `ProtectiveReadsOutputHandlerFactory`
// as there can be multiple output handlers holding multi-second connections to write
// large amount of protective reads.
master_pool
.get_custom(
self.protective_reads_writer_config
.protective_reads_window_size
+ 2,
)
.get_custom(self.protective_reads_writer_config.window_size + 2)
.await?,
self.protective_reads_writer_config.protective_reads_db_path,
self.protective_reads_writer_config.db_path,
self.zksync_network_id,
self.protective_reads_writer_config
.protective_reads_window_size,
self.protective_reads_writer_config.first_processed_batch,
self.protective_reads_writer_config.window_size,
)
.await?;

Expand Down
9 changes: 7 additions & 2 deletions core/node/vm_runner/src/impls/protective_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ impl ProtectiveReadsWriter {
pool: ConnectionPool<Core>,
rocksdb_path: String,
chain_id: L2ChainId,
first_processed_batch: L1BatchNumber,
window_size: u32,
) -> anyhow::Result<(Self, ProtectiveReadsWriterTasks)> {
let io = ProtectiveReadsIo { window_size };
let io = ProtectiveReadsIo {
first_processed_batch,
window_size,
};
let (loader, loader_task) =
VmRunnerStorage::new(pool.clone(), rocksdb_path, io.clone(), chain_id).await?;
let output_handler_factory = ProtectiveReadsOutputHandlerFactory { pool: pool.clone() };
Expand Down Expand Up @@ -74,6 +78,7 @@ pub struct ProtectiveReadsWriterTasks {

#[derive(Debug, Clone)]
pub struct ProtectiveReadsIo {
first_processed_batch: L1BatchNumber,
window_size: u32,
}

Expand All @@ -89,7 +94,7 @@ impl VmRunnerIo for ProtectiveReadsIo {
) -> anyhow::Result<L1BatchNumber> {
Ok(conn
.vm_runner_dal()
.get_protective_reads_latest_processed_batch()
.get_protective_reads_latest_processed_batch(self.first_processed_batch)
.await?)
}

Expand Down
6 changes: 4 additions & 2 deletions etc/env/base/vm_runner.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

[vm_runner.protective_reads]
# Path to the directory that contains RocksDB with protective reads writer cache.
protective_reads_db_path = "./db/main/protective_reads"
db_path = "./db/main/protective_reads"
# Amount of batches that can be processed in parallel.
protective_reads_window_size = 3
window_size = 3
# All batches before this one (inclusive) are always considered to be processed.
first_processed_batch = 0
5 changes: 3 additions & 2 deletions etc/env/file_based/general.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -323,5 +323,6 @@ observability:
level: debug

protective_reads_writer:
protective_reads_db_path: "./db/main/protective_reads"
protective_reads_window_size: 3
db_path: "./db/main/protective_reads"
window_size: 3
first_processed_batch: 0

0 comments on commit f666717

Please sign in to comment.