Skip to content

Commit

Permalink
fix(stage_batches): read highest l2 block from stream for stage target
Browse files Browse the repository at this point in the history
  • Loading branch information
revitteth committed Sep 20, 2024
1 parent 260d035 commit 54181ed
Show file tree
Hide file tree
Showing 12 changed files with 1,053 additions and 460 deletions.
10 changes: 6 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,10 +962,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
*/
latestForkId, err := stages.GetStageProgress(tx, stages.ForkId)
highestDownloadedBlock, err := stages.GetStageProgress(tx, stages.Batches)
if err != nil {
return nil, err
}
streamClient := initDataStreamClient(ctx, cfg.Zk, uint16(latestForkId))
streamClient := initDataStreamClient(ctx, cfg.Zk, uint16(latestForkId), highestDownloadedBlock)

backend.syncStages = stages2.NewDefaultZkStages(
backend.sentryCtx,
Expand Down Expand Up @@ -1050,9 +1051,10 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien
return em
}

// creates a datastream client with default parameters
func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient {
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16, highestDowloadedBlock uint64) *client.StreamClient {
c := client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId, highestDowloadedBlock)
c.Start() // fires off goroutine to download from the stream!
return c
}

func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
Expand Down
26 changes: 21 additions & 5 deletions zk/datastream/client/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendHeaderCmd() error {
err := c.sendCommand(CmdHeader)
err := c.sendCommandAndStreamType(CmdHeader)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}
Expand All @@ -26,7 +26,7 @@ func (c *StreamClient) sendHeaderCmd() error {
// sendStartBookmarkCmd sends a start command to the server, indicating
// that the client wishes to start streaming from the given bookmark
func (c *StreamClient) sendStartBookmarkCmd(bookmark []byte) error {
err := c.sendCommand(CmdStartBookmark)
err := c.sendCommandAndStreamType(CmdStartBookmark)
if err != nil {
return err
}
Expand All @@ -42,10 +42,26 @@ func (c *StreamClient) sendStartBookmarkCmd(bookmark []byte) error {
return nil
}

// sendEntryCommand sends an entry command to the server, indicating
// that the client wishes to stream the given entry number.
func (c *StreamClient) sendEntryCommand(entryNum uint64) error {
err := c.sendCommandAndStreamType(CmdEntry)
if err != nil {
return err
}

// Send entry number
if err := writeFullUint64ToConn(c.conn, entryNum); err != nil {
return err
}

return nil
}

// sendStartCmd sends a start command to the server, indicating
// that the client wishes to start streaming from the given entry number.
func (c *StreamClient) sendStartCmd(from uint64) error {
err := c.sendCommand(CmdStart)
err := c.sendCommandAndStreamType(CmdStart)
if err != nil {
return err
}
Expand All @@ -60,15 +76,15 @@ func (c *StreamClient) sendStartCmd(from uint64) error {

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendStopCmd() error {
err := c.sendCommand(CmdStop)
err := c.sendCommandAndStreamType(CmdStop)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
}

func (c *StreamClient) sendCommand(cmd Command) error {
func (c *StreamClient) sendCommandAndStreamType(cmd Command) error {
// Send command
if err := writeFullUint64ToConn(c.conn, uint64(cmd)); err != nil {
return fmt.Errorf("%s %v", c.id, err)
Expand Down
Loading

0 comments on commit 54181ed

Please sign in to comment.