Skip to content

Commit

Permalink
wip: slice manager
Browse files Browse the repository at this point in the history
  • Loading branch information
revitteth committed Sep 19, 2024
1 parent 081a82d commit 3a57844
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 61 deletions.
1 change: 0 additions & 1 deletion eth/stagedsync/stages/stages_zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ var (
L1SequencerSyncer SyncStage = "L1SequencerSyncer"
L1VerificationsBatchNo SyncStage = "L1VerificationsBatchNo"
Batches SyncStage = "Batches"
DsEntriesConsumed SyncStage = "DsEntriesConsumed"
HighestHashableL2BlockNo SyncStage = "HighestHashableL2BlockNo"
HighestSeenBatchNumber SyncStage = "HighestSeenBatchNumber"
VerificationsStateRootCheck SyncStage = "VerificationStateRootCheck"
Expand Down
26 changes: 21 additions & 5 deletions zk/datastream/client/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendHeaderCmd() error {
err := c.sendCommand(CmdHeader)
err := c.sendCommandAndStreamType(CmdHeader)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}
Expand All @@ -26,7 +26,7 @@ func (c *StreamClient) sendHeaderCmd() error {
// sendStartBookmarkCmd sends a start command to the server, indicating
// that the client wishes to start streaming from the given bookmark
func (c *StreamClient) sendStartBookmarkCmd(bookmark []byte) error {
err := c.sendCommand(CmdStartBookmark)
err := c.sendCommandAndStreamType(CmdStartBookmark)
if err != nil {
return err
}
Expand All @@ -42,10 +42,26 @@ func (c *StreamClient) sendStartBookmarkCmd(bookmark []byte) error {
return nil
}

// sendEntryCommand sends an entry command to the server, indicating
// that the client wishes to stream the given entry number.
func (c *StreamClient) sendEntryCommand(entryNum uint64) error {
err := c.sendCommandAndStreamType(CmdEntry)
if err != nil {
return err
}

// Send entry number
if err := writeFullUint64ToConn(c.conn, entryNum); err != nil {
return err
}

return nil
}

// sendStartCmd sends a start command to the server, indicating
// that the client wishes to start streaming from the given entry number.
func (c *StreamClient) sendStartCmd(from uint64) error {
err := c.sendCommand(CmdStart)
err := c.sendCommandAndStreamType(CmdStart)
if err != nil {
return err
}
Expand All @@ -60,15 +76,15 @@ func (c *StreamClient) sendStartCmd(from uint64) error {

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendStopCmd() error {
err := c.sendCommand(CmdStop)
err := c.sendCommandAndStreamType(CmdStop)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
}

func (c *StreamClient) sendCommand(cmd Command) error {
func (c *StreamClient) sendCommandAndStreamType(cmd Command) error {
// Send command
if err := writeFullUint64ToConn(c.conn, uint64(cmd)); err != nil {
return fmt.Errorf("%s %v", c.id, err)
Expand Down
140 changes: 136 additions & 4 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type StreamClient struct {
streaming atomic.Bool
lastWrittenTime atomic.Int64
progress atomic.Uint64
highestL2Block atomic.Uint64

// stream status
status Status
Expand All @@ -70,6 +71,7 @@ const (
PtPadding = 0
PtHeader = 1 // Just for the header page
PtData = 2 // Data entry
PtDataRsp = 0xfe // PtDataRsp is packet type for command response with data
PtResult = 0xff // Not stored/present in file (just for client command result)
)

Expand Down Expand Up @@ -132,6 +134,10 @@ func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 {
return &c.progress
}

func (c *StreamClient) GetHighestL2BlockAtomic() *atomic.Uint64 {
return &c.highestL2Block
}

func (c *StreamClient) StartStreaming() {
c.streaming.Store(true)

Expand Down Expand Up @@ -160,12 +166,14 @@ func (c *StreamClient) StartStreaming() {
streamConnFailCount = 0
c.setStatusStable(true)

// get header
if _, err := c.GetHeader(); err != nil {
log.Warn(fmt.Sprintf("Failed to get header: %s", err))
continue
// find highest l2 block
_, err := c.FindHighestL2BlockNumber()
if err != nil {
log.Warn(fmt.Sprintf("Failed to find the highest l2 block number: %s", err))
}

// TODO: the above command drops the connection

if err := c.ReadAllEntries(); err != nil {
c.setStatusStable(false)

Expand Down Expand Up @@ -198,6 +206,10 @@ func (c *StreamClient) StartStreaming() {
}
}

func (c *StreamClient) Reconnect() {

}

func (c *StreamClient) setStatusStable(stable bool) {
c.statusMutex.Lock()
defer c.statusMutex.Unlock()
Expand Down Expand Up @@ -472,6 +484,25 @@ func (c *StreamClient) readFileEntry() (file *types.FileEntry, err error) {
return
}

func (c *StreamClient) readDataEntry() (*types.FileEntry, error) {
file, err := readBuffer(c.conn, types.FileEntryMinSize)
if err != nil {
return nil, fmt.Errorf("failed to read header bytes %w", err)
}

fileLength := binary.BigEndian.Uint32(file[1:5])
if fileLength >= types.FileEntryMinSize {
// Read the rest of fixed size fields
buffer, err := readBuffer(c.conn, fileLength-types.FileEntryMinSize)
if err != nil {
return nil, fmt.Errorf("failed to read header bytes %w", err)
}
file = append(file, buffer...)
}

return types.DecodeFileEntry(file)
}

// reads header bytes from socket and tries to parse them
// returns the parsed HeaderEntry
func (c *StreamClient) readHeaderEntry() (h *types.HeaderEntry, err error) {
Expand Down Expand Up @@ -578,6 +609,42 @@ func (c *StreamClient) GetHeader() (*types.HeaderEntry, error) {
return &c.Header, nil
}

// Command entry: Get entry at entryNo
// If started, terminate the connection.
func (c *StreamClient) GetEntry(entryNo uint64) (interface{}, error) {
if err := c.sendEntryCommand(entryNo); err != nil {
return nil, fmt.Errorf("%s send header error: %w", c.id, err)
}

// Read packet
packet, err := readBuffer(c.conn, 1)
if err != nil {
return nil, fmt.Errorf("%s read buffer: %w", c.id, err)
}

// Check packet type
if packet[0] != PtResult {
return nil, fmt.Errorf("%s error expecting result packet type %d and received %d", c.id, PtResult, packet[0])
}

// Read server result entry for the command
r, err := c.readResultEntry(packet)
if err != nil {
return nil, fmt.Errorf("%s read result entry error: %w", c.id, err)
}
if err := r.GetError(); err != nil {
return nil, fmt.Errorf("%s got Result error code %d: %w", c.id, r.ErrorNum, err)
}

// Read data entry
fe, err := c.readDataEntry()
if err != nil {
return nil, fmt.Errorf("%s read header entry error: %w", c.id, err)
}

return fe, nil
}

// TODO: only used in correctness checker!
func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function func(file *types.FileEntry) error) error {
// Get header from server
Expand Down Expand Up @@ -618,3 +685,68 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu

return nil
}

// read stream size (TotalEntries) and iterate backwards to find highest l2 block number
func (c *StreamClient) FindHighestL2BlockNumber() (uint64, error) {
// Get header from server
if _, err := c.GetHeader(); err != nil {
return 0, fmt.Errorf("%s get header error: %w", c.id, err)
}

entryNo := c.Header.TotalEntries

// send entry command and iterate backwards until we get an L2 block
for entryNo > 0 {
fe, err := c.GetEntry(entryNo)
if err != nil {
continue
}
entryNo--

if fe == nil {
continue
}

c.start() // if we get a result we must restart connection

// parse fe
if fileEntry, ok := fe.(*types.FileEntry); ok {
if fileEntry.EntryType == types.EntryTypeL2Block {
l2Block, err := types.UnmarshalL2Block(fileEntry.Data)
if err != nil {
return 0, fmt.Errorf("%s unmarshal L2 block error: %w", c.id, err)
}

// store highest block number
currentHighest := c.highestL2Block.Load()
if l2Block.L2BlockNumber > currentHighest {
c.highestL2Block.Store(l2Block.L2BlockNumber)
}

return l2Block.L2BlockNumber, nil
}
}

fe = nil

// check if it's a L2 block
// if fe.IsL2Block() {
// l2Block, err := types.UnmarshalL2Block(fe.Data)
// if err != nil {
// return 0, fmt.Errorf("%s unmarshal L2 block error: %w", c.id, err)
// }
//
// // store highest block number
// currentHighest := c.highestL2Block.Load()
// if l2Block.L2BlockNumber > currentHighest {
// c.highestL2Block.Store(l2Block.L2BlockNumber)
// }
//
// return l2Block.L2BlockNumber, nil
// }
}

c.start()

return 0, fmt.Errorf("%s no L2 block found", c.id)
}
Loading

0 comments on commit 3a57844

Please sign in to comment.