Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Resume incomplete snapshot in snapshot creator in more cases #2886

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/bin/snapshots_creator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ futures.workspace = true

[dev-dependencies]
rand.workspace = true
test-casing.workspace = true
42 changes: 22 additions & 20 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,44 +291,46 @@ impl SnapshotCreator {
.get_sealed_l1_batch_number()
.await?;
let sealed_l1_batch_number = sealed_l1_batch_number.context("No L1 batches in Postgres")?;
let requested_l1_batch_number = if let Some(l1_batch_number) = config.l1_batch_number {
let (requested_l1_batch_number, existing_snapshot) = if let Some(l1_batch_number) =
config.l1_batch_number
{
anyhow::ensure!(
l1_batch_number <= sealed_l1_batch_number,
"Requested a snapshot for L1 batch #{l1_batch_number} that doesn't exist in Postgres (latest L1 batch: {sealed_l1_batch_number})"
);
l1_batch_number

let existing_snapshot = master_conn
.snapshots_dal()
.get_snapshot_metadata(l1_batch_number)
.await?;
(l1_batch_number, existing_snapshot)
} else {
// We subtract 1 so that after restore, EN node has at least one L1 batch to fetch.
anyhow::ensure!(
sealed_l1_batch_number != L1BatchNumber(0),
"Cannot create snapshot when only the genesis L1 batch is present in Postgres"
);
sealed_l1_batch_number - 1
};
let requested_l1_batch_number = sealed_l1_batch_number - 1;

let existing_snapshot = master_conn
.snapshots_dal()
.get_snapshot_metadata(requested_l1_batch_number)
.await?;
// Continue creating a pending snapshot if it exists, even if it doesn't correspond to the latest L1 batch.
// OTOH, a completed snapshot does not matter, unless it corresponds to `requested_l1_batch_number` (in which case it doesn't need to be created again).
let existing_snapshot = master_conn
.snapshots_dal()
.get_newest_snapshot_metadata()
.await?
.filter(|snapshot| {
!snapshot.is_complete() || snapshot.l1_batch_number == requested_l1_batch_number
});
(requested_l1_batch_number, existing_snapshot)
};
drop(master_conn);

match existing_snapshot {
Some(snapshot) if snapshot.is_complete() => {
tracing::info!("Snapshot for the requested L1 batch is complete: {snapshot:?}");
Ok(None)
}
Some(snapshot) if config.l1_batch_number.is_some() => {
Ok(Some(SnapshotProgress::from_existing_snapshot(&snapshot)))
}
Some(snapshot) => {
// Unless creating a snapshot for a specific L1 batch is requested, we never continue an existing snapshot, even if it's incomplete.
// This it to make running multiple snapshot creator instances in parallel easier to reason about.
tracing::warn!(
"Snapshot at expected L1 batch #{requested_l1_batch_number} exists, but is incomplete: {snapshot:?}. If you need to resume creating it, \
specify the L1 batch number in the snapshot creator config"
);
Ok(None)
}
Some(snapshot) => Ok(Some(SnapshotProgress::from_existing_snapshot(&snapshot))),
None => {
Self::initialize_snapshot_progress(
config,
Expand Down
99 changes: 77 additions & 22 deletions core/bin/snapshots_creator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use rand::{thread_rng, Rng};
use test_casing::test_casing;
use zksync_config::SnapshotsCreatorConfig;
use zksync_dal::{Connection, CoreDal};
use zksync_object_store::{MockObjectStore, ObjectStore};
Expand Down Expand Up @@ -64,6 +65,15 @@ impl HandleEvent for TestEventListener {
}
}

#[derive(Debug)]
struct UnreachableEventListener;

impl HandleEvent for UnreachableEventListener {
fn on_chunk_started(&self) -> TestBehavior {
unreachable!("should not be reached");
}
}

impl SnapshotCreator {
fn for_tests(blob_store: Arc<dyn ObjectStore>, pool: ConnectionPool<Core>) -> Self {
Self {
Expand All @@ -80,6 +90,13 @@ impl SnapshotCreator {
..self
}
}

fn panic_on_chunk_start(self) -> Self {
Self {
event_listener: Box::new(UnreachableEventListener),
..self
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -431,8 +448,9 @@ async fn persisting_snapshot_logs_for_v0_snapshot() {
assert_eq!(actual_logs, expected_outputs.storage_logs);
}

#[test_casing(2, [false, true])]
#[tokio::test]
async fn recovery_workflow() {
async fn recovery_workflow(specify_batch_after_recovery: bool) {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store = MockObjectStore::arc();
Expand Down Expand Up @@ -462,29 +480,9 @@ async fn recovery_workflow() {
let actual_deps: HashSet<_> = factory_deps.into_iter().collect();
assert_eq!(actual_deps, expected_outputs.deps);

// Check that the creator does nothing unless it's requested to create a new snapshot.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.stop_after_chunk_count(2)
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();
let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(
snapshot_metadata
.storage_logs_filepaths
.iter()
.all(Option::is_none),
"{snapshot_metadata:?}"
);

// Process 2 storage log chunks, then stop.
let recovery_config = SnapshotsCreatorConfig {
l1_batch_number: Some(snapshot_l1_batch_number),
l1_batch_number: specify_batch_after_recovery.then_some(snapshot_l1_batch_number),
..SEQUENTIAL_TEST_CONFIG
};
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
Expand All @@ -510,11 +508,68 @@ async fn recovery_workflow() {

// Process the remaining chunks.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(recovery_config.clone(), MIN_CHUNK_COUNT)
.await
.unwrap();

assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;

// Check that the snapshot is not created anew after it is completed.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.panic_on_chunk_start()
.run(recovery_config, MIN_CHUNK_COUNT)
.await
.unwrap();

let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");
}

#[tokio::test]
async fn recovery_workflow_with_new_l1_batch() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store = MockObjectStore::arc();
let mut conn = pool.connection().await.unwrap();
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;

SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.stop_after_chunk_count(2)
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();

let snapshot_l1_batch_number = L1BatchNumber(8);
let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(!snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");

let new_logs = gen_storage_logs(&mut thread_rng(), 50);
create_l1_batch(&mut conn, snapshot_l1_batch_number + 2, &new_logs).await;

// The old snapshot should be completed.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;

let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");
}

#[tokio::test]
Expand Down
6 changes: 3 additions & 3 deletions infrastructure/zk/src/docker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Command} from 'commander';
import { Command } from 'commander';
import * as utils from 'utils';

const IMAGES = [
Expand Down Expand Up @@ -31,7 +31,7 @@ async function dockerCommand(
dockerOrg: string = 'matterlabs'
) {
// Generating all tags for containers. We need 2 tags here: SHA and SHA+TS
const {stdout: COMMIT_SHORT_SHA}: { stdout: string } = await utils.exec('git rev-parse --short HEAD');
const { stdout: COMMIT_SHORT_SHA }: { stdout: string } = await utils.exec('git rev-parse --short HEAD');
// COMMIT_SHORT_SHA returns with newline, so we need to trim it
const imageTagShaTS: string = process.env.IMAGE_TAG_SUFFIX
? process.env.IMAGE_TAG_SUFFIX
Expand Down Expand Up @@ -126,7 +126,7 @@ async function _build(image: string, tagList: string[], dockerOrg: string, platf
}
buildArgs += extraArgs;

console.log("Build args: ", buildArgs);
console.log('Build args: ', buildArgs);

const buildCommand =
`DOCKER_BUILDKIT=1 docker buildx build ${tagsToBuild}` +
Expand Down
Loading