From 3a578449c52ec4a2c8a8b17b9bf2ccde8c9a6a56 Mon Sep 17 00:00:00 2001 From: Max Revitt Date: Thu, 19 Sep 2024 11:49:28 +0100 Subject: [PATCH] wip: slice manager --- eth/stagedsync/stages/stages_zk.go | 1 - zk/datastream/client/commands.go | 26 ++++- zk/datastream/client/stream_client.go | 140 +++++++++++++++++++++++++- zk/stages/stage_batches.go | 96 +++++++++--------- 4 files changed, 202 insertions(+), 61 deletions(-) diff --git a/eth/stagedsync/stages/stages_zk.go b/eth/stagedsync/stages/stages_zk.go index bca8e9f0962..4ac4583fa82 100644 --- a/eth/stagedsync/stages/stages_zk.go +++ b/eth/stagedsync/stages/stages_zk.go @@ -22,7 +22,6 @@ var ( L1SequencerSyncer SyncStage = "L1SequencerSyncer" L1VerificationsBatchNo SyncStage = "L1VerificationsBatchNo" Batches SyncStage = "Batches" - DsEntriesConsumed SyncStage = "DsEntriesConsumed" HighestHashableL2BlockNo SyncStage = "HighestHashableL2BlockNo" HighestSeenBatchNumber SyncStage = "HighestSeenBatchNumber" VerificationsStateRootCheck SyncStage = "VerificationStateRootCheck" diff --git a/zk/datastream/client/commands.go b/zk/datastream/client/commands.go index bb9cbafe825..e13c7404101 100644 --- a/zk/datastream/client/commands.go +++ b/zk/datastream/client/commands.go @@ -15,7 +15,7 @@ const ( // sendHeaderCmd sends the header command to the server. func (c *StreamClient) sendHeaderCmd() error { - err := c.sendCommand(CmdHeader) + err := c.sendCommandAndStreamType(CmdHeader) if err != nil { return fmt.Errorf("%s %v", c.id, err) } @@ -26,7 +26,7 @@ func (c *StreamClient) sendHeaderCmd() error { // sendStartBookmarkCmd sends a start command to the server, indicating // that the client wishes to start streaming from the given bookmark func (c *StreamClient) sendStartBookmarkCmd(bookmark []byte) error { - err := c.sendCommand(CmdStartBookmark) + err := c.sendCommandAndStreamType(CmdStartBookmark) if err != nil { return err } @@ -42,10 +42,26 @@ func (c *StreamClient) sendStartBookmarkCmd(bookmark []byte) error { return nil } +// sendEntryCommand sends an entry command to the server, indicating +// that the client wishes to stream the given entry number. +func (c *StreamClient) sendEntryCommand(entryNum uint64) error { + err := c.sendCommandAndStreamType(CmdEntry) + if err != nil { + return err + } + + // Send entry number + if err := writeFullUint64ToConn(c.conn, entryNum); err != nil { + return err + } + + return nil +} + // sendStartCmd sends a start command to the server, indicating // that the client wishes to start streaming from the given entry number. func (c *StreamClient) sendStartCmd(from uint64) error { - err := c.sendCommand(CmdStart) + err := c.sendCommandAndStreamType(CmdStart) if err != nil { return err } @@ -60,7 +76,7 @@ func (c *StreamClient) sendStartCmd(from uint64) error { // sendHeaderCmd sends the header command to the server. func (c *StreamClient) sendStopCmd() error { - err := c.sendCommand(CmdStop) + err := c.sendCommandAndStreamType(CmdStop) if err != nil { return fmt.Errorf("%s %v", c.id, err) } @@ -68,7 +84,7 @@ func (c *StreamClient) sendStopCmd() error { return nil } -func (c *StreamClient) sendCommand(cmd Command) error { +func (c *StreamClient) sendCommandAndStreamType(cmd Command) error { // Send command if err := writeFullUint64ToConn(c.conn, uint64(cmd)); err != nil { return fmt.Errorf("%s %v", c.id, err) diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 9211e8fe264..d18a051fa0f 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -50,6 +50,7 @@ type StreamClient struct { streaming atomic.Bool lastWrittenTime atomic.Int64 progress atomic.Uint64 + highestL2Block atomic.Uint64 // stream status status Status @@ -70,6 +71,7 @@ const ( PtPadding = 0 PtHeader = 1 // Just for the header page PtData = 2 // Data entry + PtDataRsp = 0xfe // PtDataRsp is packet type for command response with data PtResult = 0xff // Not stored/present in file (just for client command result) ) @@ -132,6 +134,10 @@ func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 { return &c.progress } +func (c *StreamClient) GetHighestL2BlockAtomic() *atomic.Uint64 { + return &c.highestL2Block +} + func (c *StreamClient) StartStreaming() { c.streaming.Store(true) @@ -160,12 +166,14 @@ func (c *StreamClient) StartStreaming() { streamConnFailCount = 0 c.setStatusStable(true) - // get header - if _, err := c.GetHeader(); err != nil { - log.Warn(fmt.Sprintf("Failed to get header: %s", err)) - continue + // find highest l2 block + _, err := c.FindHighestL2BlockNumber() + if err != nil { + log.Warn(fmt.Sprintf("Failed to find the highest l2 block number: %s", err)) } + // TODO: the above command drops the connection + if err := c.ReadAllEntries(); err != nil { c.setStatusStable(false) @@ -198,6 +206,10 @@ func (c *StreamClient) StartStreaming() { } } +func (c *StreamClient) Reconnect() { + +} + func (c *StreamClient) setStatusStable(stable bool) { c.statusMutex.Lock() defer c.statusMutex.Unlock() @@ -472,6 +484,25 @@ func (c *StreamClient) readFileEntry() (file *types.FileEntry, err error) { return } +func (c *StreamClient) readDataEntry() (*types.FileEntry, error) { + file, err := readBuffer(c.conn, types.FileEntryMinSize) + if err != nil { + return nil, fmt.Errorf("failed to read header bytes %w", err) + } + + fileLength := binary.BigEndian.Uint32(file[1:5]) + if fileLength >= types.FileEntryMinSize { + // Read the rest of fixed size fields + buffer, err := readBuffer(c.conn, fileLength-types.FileEntryMinSize) + if err != nil { + return nil, fmt.Errorf("failed to read header bytes %w", err) + } + file = append(file, buffer...) + } + + return types.DecodeFileEntry(file) +} + // reads header bytes from socket and tries to parse them // returns the parsed HeaderEntry func (c *StreamClient) readHeaderEntry() (h *types.HeaderEntry, err error) { @@ -578,6 +609,42 @@ func (c *StreamClient) GetHeader() (*types.HeaderEntry, error) { return &c.Header, nil } +// Command entry: Get entry at entryNo +// If started, terminate the connection. +func (c *StreamClient) GetEntry(entryNo uint64) (interface{}, error) { + if err := c.sendEntryCommand(entryNo); err != nil { + return nil, fmt.Errorf("%s send header error: %w", c.id, err) + } + + // Read packet + packet, err := readBuffer(c.conn, 1) + if err != nil { + return nil, fmt.Errorf("%s read buffer: %w", c.id, err) + } + + // Check packet type + if packet[0] != PtResult { + return nil, fmt.Errorf("%s error expecting result packet type %d and received %d", c.id, PtResult, packet[0]) + } + + // Read server result entry for the command + r, err := c.readResultEntry(packet) + if err != nil { + return nil, fmt.Errorf("%s read result entry error: %w", c.id, err) + } + if err := r.GetError(); err != nil { + return nil, fmt.Errorf("%s got Result error code %d: %w", c.id, r.ErrorNum, err) + } + + // Read data entry + fe, err := c.readDataEntry() + if err != nil { + return nil, fmt.Errorf("%s read header entry error: %w", c.id, err) + } + + return fe, nil +} + // TODO: only used in correctness checker! func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function func(file *types.FileEntry) error) error { // Get header from server @@ -618,3 +685,68 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu return nil } + +// read stream size (TotalEntries) and iterate backwards to find highest l2 block number +func (c *StreamClient) FindHighestL2BlockNumber() (uint64, error) { + // Get header from server + if _, err := c.GetHeader(); err != nil { + return 0, fmt.Errorf("%s get header error: %w", c.id, err) + } + + entryNo := c.Header.TotalEntries + + // send entry command and iterate backwards until we get an L2 block + for entryNo > 0 { + fe, err := c.GetEntry(entryNo) + if err != nil { + continue + } + entryNo-- + + if fe == nil { + continue + } + + c.start() // if we get a result we must restart connection + + // parse fe + if fileEntry, ok := fe.(*types.FileEntry); ok { + if fileEntry.EntryType == types.EntryTypeL2Block { + l2Block, err := types.UnmarshalL2Block(fileEntry.Data) + if err != nil { + return 0, fmt.Errorf("%s unmarshal L2 block error: %w", c.id, err) + } + + // store highest block number + currentHighest := c.highestL2Block.Load() + if l2Block.L2BlockNumber > currentHighest { + c.highestL2Block.Store(l2Block.L2BlockNumber) + } + + return l2Block.L2BlockNumber, nil + } + } + + fe = nil + + // check if it's a L2 block + // if fe.IsL2Block() { + // l2Block, err := types.UnmarshalL2Block(fe.Data) + // if err != nil { + // return 0, fmt.Errorf("%s unmarshal L2 block error: %w", c.id, err) + // } + // + // // store highest block number + // currentHighest := c.highestL2Block.Load() + // if l2Block.L2BlockNumber > currentHighest { + // c.highestL2Block.Store(l2Block.L2BlockNumber) + // } + // + // return l2Block.L2BlockNumber, nil + // } + } + + c.start() + + return 0, fmt.Errorf("%s no L2 block found", c.id) +} diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 020fbcc5331..e507fec2fc7 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -71,6 +71,7 @@ type HermezDb interface { type DatastreamClient interface { GetProgressAtomic() *atomic.Uint64 + GetHighestL2BlockAtomic() *atomic.Uint64 GetSliceManager() *slice_manager.SliceManager GetStatus() client.Status GetTotalEntries() uint64 @@ -138,11 +139,6 @@ func SpawnStageBatches( return fmt.Errorf("get batch no by l2 block error: %v", err) } - //// BISECT //// - if cfg.zkCfg.DebugLimit > 0 && stageProgressBlockNo > cfg.zkCfg.DebugLimit { - return nil - } - highestVerifiedBatch, err := stages.GetStageProgress(tx, stages.L1VerificationsBatchNo) if err != nil { return errors.New("could not retrieve l1 verifications batch no progress") @@ -185,21 +181,13 @@ func SpawnStageBatches( // return nil //} - // can calculate how many entries we should recieve in a given run of the stage loop by - // retrieving the total entries in the stream minus the number of entries we've processed - dsEntriesConsumed, err := stages.GetStageProgress(tx, stages.DsEntriesConsumed) - if err != nil { - return fmt.Errorf("failed to get ds entries consumed, %w", err) - } - totalEntries := cfg.dsClient.GetTotalEntries() // length of stream - entriesToConsume := int64(totalEntries - dsEntriesConsumed) // length to consume - lastHash := emptyHash lastBlockRoot := emptyHash log.Info(fmt.Sprintf("[%s] Reading blocks from the datastream slice manager.", logPrefix)) - // loop consumes from slice manager which is populated by the stream_client reading continuously from the datastream + // use the stream client or debug/sync limit flags to find out the l2 block height to finish the stage + targetL2Block := getTargetL2Block(cfg) offset := 0 for { select { @@ -207,23 +195,19 @@ func SpawnStageBatches( log.Warn(fmt.Sprintf("[%s] Context done", logPrefix)) endLoop = true default: + // STAGE FINISH CONTROL + if lastBlockHeight >= targetL2Block { + log.Info(fmt.Sprintf("[%s] Highest block reached, stopping stage", logPrefix)) + endLoop = true + break + } + entries := dsSliceManager.ReadCurrentItemsWithOffset(offset) // STAGE EXIT/WAIT CONTROL // no items returned - either stream is disconnected, // or we've consumed all available data and can move the stage loop on - entryCount := len(entries) - - log.Info("ENTRIES", "entriesToConsume", entriesToConsume, "entryCount", entryCount) - - if entriesToConsume <= 1000 && entryCount == 0 { - // initial sync - if we've read all the entries - i.e. we're at the tip - err = stages.SaveStageProgress(tx, stages.DsEntriesConsumed, totalEntries) - if err != nil { - return fmt.Errorf("save stage progress error: %v", err) - } - endLoop = true - } else if entryCount == 0 { + if len(entries) == 0 { // subsequent sync - quickly check for instability, or end the stage so we can keep the node synced at the tip if !cfg.dsClient.GetStatus().Stable { log.Warn(fmt.Sprintf("[%s] Stream unstable, sleeping", logPrefix)) @@ -239,7 +223,6 @@ func SpawnStageBatches( offset++ switch entry := entry.(type) { case *types.BatchStart: - entriesToConsume -= 2 // one for the batch start, one for the bookmark // check if the batch is invalid so that we can replicate this over in the stream // when we re-populate it if entry.BatchType == types.BatchTypeInvalid { @@ -253,7 +236,6 @@ func SpawnStageBatches( } } case *types.BatchEnd: - entriesToConsume-- if entry.StateRoot != lastBlockRoot { log.Warn(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot)) } @@ -262,12 +244,15 @@ func SpawnStageBatches( return err } case *types.FullL2Block: - entriesToConsume -= 3 + int64(len(entry.L2Txs)) // 1 each block start/block end, plus txs, plus 1 for the bookmark - if cfg.zkCfg.SyncLimit > 0 && entry.L2BlockNumber >= cfg.zkCfg.SyncLimit { - // stop the node going into a crazy loop - time.Sleep(2 * time.Second) - break EntriesLoop + /////// DEBUG BISECTION /////// + // if we're above StepAfter, and we're at a step, move the stages on + if cfg.zkCfg.DebugStep > 0 && cfg.zkCfg.DebugStepAfter > 0 && entry.L2BlockNumber > cfg.zkCfg.DebugStepAfter { + if entry.L2BlockNumber%cfg.zkCfg.DebugStep == 0 { + fmt.Printf("[%s] Debug step reached, stopping stage\n", logPrefix) + endLoop = true + } } + /////// END DEBUG BISECTION /////// // handle batch boundary changes - we do this here instead of reading the batch start channel because // channels can be read in random orders which then creates problems in detecting fork changes during @@ -323,22 +308,6 @@ func SpawnStageBatches( } highestSeenBatchNo = entry.BatchNumber - /////// DEBUG BISECTION /////// - // exit stage when debug bisection flags set and we're at the limit block - if cfg.zkCfg.DebugLimit > 0 && entry.L2BlockNumber > cfg.zkCfg.DebugLimit { - fmt.Printf("[%s] Debug limit reached, stopping stage\n", logPrefix) - endLoop = true - } - - // if we're above StepAfter, and we're at a step, move the stages on - if cfg.zkCfg.DebugStep > 0 && cfg.zkCfg.DebugStepAfter > 0 && entry.L2BlockNumber > cfg.zkCfg.DebugStepAfter { - if entry.L2BlockNumber%cfg.zkCfg.DebugStep == 0 { - fmt.Printf("[%s] Debug step reached, stopping stage\n", logPrefix) - endLoop = true - } - } - /////// END DEBUG BISECTION /////// - // store our finalized state if this batch matches the highest verified batch number on the L1 if entry.BatchNumber == highestVerifiedBatch { rawdb.WriteForkchoiceFinalized(tx, entry.L2Blockhash) @@ -375,7 +344,6 @@ func SpawnStageBatches( break } case *types.GerUpdate: - entriesToConsume-- if entry.GlobalExitRoot == emptyHash { log.Warn(fmt.Sprintf("[%s] Skipping GER update with empty root", logPrefix)) break EntriesLoop @@ -451,6 +419,32 @@ func SpawnStageBatches( return nil } +func getTargetL2Block(cfg BatchesCfg) uint64 { + targetL2Block := uint64(0) + + // debug limit + if cfg.zkCfg.DebugLimit > 0 { + return cfg.zkCfg.DebugLimit + } + + // sync limit + if cfg.zkCfg.SyncLimit > 0 { + return cfg.zkCfg.SyncLimit + } + + // from stream client + for { + targetL2Block = cfg.dsClient.GetHighestL2BlockAtomic().Load() + if targetL2Block == 0 { + time.Sleep(1 * time.Second) + continue + } + break + } + + return targetL2Block +} + func saveStageProgress(tx kv.RwTx, logPrefix string, highestHashableL2BlockNo, highestSeenBatchNo, lastBlockHeight, lastForkId uint64) error { var err error // store the highest hashable block number