Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make data stream more modular #152

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 184 additions & 42 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,71 @@ package server

import (
"bytes"
"encoding/binary"
"fmt"
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
libcommon "github.com/ledgerwatch/erigon-lib/common"
types2 "github.com/ledgerwatch/erigon/core/types"
eritypes "github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/ledgerwatch/erigon/zk/hermez_db"
)

type BookmarkType byte

var BlockBookmarkType BookmarkType = 0

var (
EntryTypeUpdateGer = datastreamer.EntryType(4)
EntryTypeL2BlockEnd = datastreamer.EntryType(3)
EntryTypeL2Tx = datastreamer.EntryType(2)
)
var entryTypeMappings = map[types.EntryType]datastreamer.EntryType{
types.EntryTypeStartL2Block: datastreamer.EntryType(1),
types.EntryTypeL2Tx: datastreamer.EntryType(2),
types.EntryTypeEndL2Block: datastreamer.EntryType(3),
types.EntryTypeBookmark: datastreamer.EntryType(176),
}

type DataStreamServer struct {
stream *datastreamer.StreamServer
chainId uint64
}

type DataStreamEntry interface {
EntryType() types.EntryType
Bytes(bigEndian bool) []byte
}

func NewDataStreamServer(stream *datastreamer.StreamServer, chainId uint64) *DataStreamServer {
return &DataStreamServer{
stream: stream,
chainId: chainId,
}
}

func (srv *DataStreamServer) AddBookmark(t BookmarkType, marker uint64) error {
bookmark := types.Bookmark{Type: byte(t), From: marker}
_, err := srv.stream.AddStreamBookmark(bookmark.EncodeBigEndian())
return err
func (srv *DataStreamServer) CommitEntriesToStream(entries []DataStreamEntry, bigEndian bool) error {
for _, entry := range entries {
entryType := entry.EntryType()
if entryType == types.EntryTypeBookmark {
_, err := srv.stream.AddStreamBookmark(entry.Bytes(bigEndian))
if err != nil {
return err
}
} else {
mapped, ok := entryTypeMappings[entryType]
if !ok {
return fmt.Errorf("unsupported stream entry type: %v", entryType)
}
_, err := srv.stream.AddStreamEntry(mapped, entry.Bytes(bigEndian))
if err != nil {
return err
}
}
}
return nil
}

func (srv *DataStreamServer) CreateBookmarkEntry(t BookmarkType, marker uint64) *types.Bookmark {
return &types.Bookmark{Type: byte(t), From: marker}
}

func (srv *DataStreamServer) AddBlockStart(block *types2.Block, batchNumber uint64, forkId uint16, ger libcommon.Hash, deltaTimestamp uint32, l1InfoIndex uint32, l1BlockHash libcommon.Hash) error {
b := &types.StartL2Block{
func (srv *DataStreamServer) CreateBlockStartEntry(block *eritypes.Block, batchNumber uint64, forkId uint16, ger libcommon.Hash, deltaTimestamp uint32, l1InfoIndex uint32, l1BlockHash libcommon.Hash) *types.StartL2Block {
return &types.StartL2Block{
BatchNumber: batchNumber,
L2BlockNumber: block.NumberU64(),
Timestamp: int64(block.Time()),
Expand All @@ -49,49 +78,27 @@ func (srv *DataStreamServer) AddBlockStart(block *types2.Block, batchNumber uint
ForkId: forkId,
ChainId: uint32(srv.chainId),
}
_, err := srv.stream.AddStreamEntry(1, types.EncodeStartL2BlockBigEndian(b))
return err
}

func (srv *DataStreamServer) AddBlockEnd(blockNumber uint64, blockHash, stateRoot libcommon.Hash) error {
end := &types.EndL2Block{
func (srv *DataStreamServer) CreateBlockEndEntry(blockNumber uint64, blockHash, stateRoot libcommon.Hash) *types.EndL2Block {
return &types.EndL2Block{
L2BlockNumber: blockNumber,
L2Blockhash: blockHash,
StateRoot: stateRoot,
}
_, err := srv.stream.AddStreamEntry(3, types.EncodeEndL2BlockBigEndian(end))
return err
}

func (srv *DataStreamServer) AddGerUpdate(batchNumber uint64, ger libcommon.Hash, fork uint16, block *types2.Block) (uint64, error) {
update := types.GerUpdate{
BatchNumber: batchNumber,
Timestamp: block.Time(),
GlobalExitRoot: ger,
Coinbase: block.Coinbase(),
ForkId: fork,
ChainId: uint32(srv.chainId),
StateRoot: block.Root(),
}

return srv.stream.AddStreamEntry(EntryTypeUpdateGer, update.EncodeToBytesBigEndian())
}

func (srv *DataStreamServer) AddGerUpdateFromDb(ger *types.GerUpdate) (uint64, error) {
return srv.stream.AddStreamEntry(EntryTypeUpdateGer, ger.EncodeToBytesBigEndian())
}

func (srv *DataStreamServer) AddTransaction(
func (srv *DataStreamServer) CreateTransactionEntry(
effectiveGasPricePercentage uint8,
stateRoot libcommon.Hash,
fork uint16,
tx types2.Transaction,
) (uint64, error) {
tx eritypes.Transaction,
) (*types.L2Transaction, error) {
buf := make([]byte, 0)
writer := bytes.NewBuffer(buf)
err := tx.EncodeRLP(writer)
if err != nil {
return 0, err
return nil, err
}

encoded := writer.Bytes()
Expand All @@ -102,13 +109,148 @@ func (srv *DataStreamServer) AddTransaction(

length := len(encoded)

l2Tx := types.L2Transaction{
return &types.L2Transaction{
EffectiveGasPricePercentage: effectiveGasPricePercentage,
IsValid: 1, // TODO: SEQ: we don't store this value anywhere currently as a sync node
StateRoot: stateRoot,
EncodedLength: uint32(length),
Encoded: encoded,
}, nil
}

func (srv *DataStreamServer) CreateAndCommitEntriesToStream(
block *eritypes.Block,
reader *hermez_db.HermezDbReader,
lastBlock *eritypes.Block,
batchNumber uint64,
bigEndian bool,
) error {
entries, err := srv.CreateStreamEntries(block, reader, lastBlock, batchNumber)
if err != nil {
return err
}
return srv.CommitEntriesToStream(entries, bigEndian)
}

func (srv *DataStreamServer) CreateStreamEntries(
block *eritypes.Block,
reader *hermez_db.HermezDbReader,
lastBlock *eritypes.Block,
batchNumber uint64,
) ([]DataStreamEntry, error) {
fork, err := reader.GetForkId(batchNumber)
if err != nil {
return nil, err
}

var entries []DataStreamEntry

bookmark := srv.CreateBookmarkEntry(BlockBookmarkType, block.NumberU64())
entries = append(entries, bookmark)

deltaTimestamp := block.Time() - lastBlock.Time()

var ger libcommon.Hash
var l1BlockHash libcommon.Hash

l1Index, err := reader.GetBlockL1InfoTreeIndex(block.NumberU64())
if err != nil {
return nil, err
}

if block.NumberU64() == 1 {
// injected batch at the start of the network
injected, err := reader.GetL1InjectedBatch(0)
if err != nil {
return nil, err
}
ger = injected.LastGlobalExitRoot
l1BlockHash = injected.L1ParentHash

// block 1 in the stream has a delta timestamp of the block time itself
deltaTimestamp = block.Time()
} else {
// standard behaviour for non-injected or forced batches
if l1Index != 0 {
// read the index info itself
l1Info, err := reader.GetL1InfoTreeUpdate(l1Index)
if err != nil {
return nil, err
}
if l1Info != nil {
ger = l1Info.GER
l1BlockHash = l1Info.ParentHash
}
}
}

blockStart := srv.CreateBlockStartEntry(block, batchNumber, uint16(fork), ger, uint32(deltaTimestamp), uint32(l1Index), l1BlockHash)
entries = append(entries, blockStart)

for _, tx := range block.Transactions() {
effectiveGasPricePercentage, err := reader.GetEffectiveGasPricePercentage(tx.Hash())
if err != nil {
return nil, err
}
stateRoot, err := reader.GetStateRoot(block.NumberU64())
if err != nil {
return nil, err
}
transaction, err := srv.CreateTransactionEntry(effectiveGasPricePercentage, stateRoot, uint16(fork), tx)
if err != nil {
return nil, err
}
entries = append(entries, transaction)
}

return srv.stream.AddStreamEntry(EntryTypeL2Tx, types.EncodeL2TransactionBigEndian(l2Tx))
blockEnd := srv.CreateBlockEndEntry(block.NumberU64(), block.Root(), block.Root())
entries = append(entries, blockEnd)

return entries, nil
}

func (srv *DataStreamServer) CreateAndBuildStreamEntryBytes(
block *eritypes.Block,
reader *hermez_db.HermezDbReader,
lastBlock *eritypes.Block,
batchNumber uint64,
bigEndian bool,
) ([]byte, error) {
entries, err := srv.CreateStreamEntries(block, reader, lastBlock, batchNumber)
if err != nil {
return nil, err
}

var result []byte
for _, entry := range entries {
b := encodeEntryToBytes(entry, bigEndian)
result = append(result, b...)
}

return result, nil
}

const (
PACKET_TYPE_DATA = 2
// NOOP_ENTRY_NUMBER is used because we don't care about the entry number when feeding an atrificial
// stream to the executor, if this ever changes then we'll need to populate an actual number
NOOP_ENTRY_NUMBER = 0
)

func encodeEntryToBytes(entry DataStreamEntry, bigEndian bool) []byte {
data := entry.Bytes(bigEndian)
var totalLength = 1 + 4 + 4 + 8 + uint32(len(data))
buf := make([]byte, 1)
buf[0] = PACKET_TYPE_DATA
if bigEndian {
buf = binary.BigEndian.AppendUint32(buf, totalLength)
buf = binary.BigEndian.AppendUint32(buf, uint32(entry.EntryType()))
buf = binary.BigEndian.AppendUint64(buf, uint64(NOOP_ENTRY_NUMBER))
} else {
buf = binary.LittleEndian.AppendUint32(buf, totalLength)
buf = binary.LittleEndian.AppendUint32(buf, uint32(entry.EntryType()))
buf = binary.LittleEndian.AppendUint64(buf, uint64(NOOP_ENTRY_NUMBER))
}
buf = append(buf, data...)
return buf
}
27 changes: 24 additions & 3 deletions zk/datastream/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ const (
startL2BlockDataLengthPreEtrogForkId7 = 78
endL2BlockDataLength = 72

// EntryTypeL2Block represents a L2 block
EntryTypeStartL2Block EntryType = 1
EntryTypeEndL2Block EntryType = 3
EntryTypeStartL2Block = EntryType(1)
EntryTypeEndL2Block = EntryType(3)
)

// StartL2Block represents a zkEvm block
Expand All @@ -33,6 +32,17 @@ type StartL2Block struct {
ChainId uint32 // 4 bytes
}

func (s *StartL2Block) EntryType() EntryType {
return EntryTypeStartL2Block
}

func (s *StartL2Block) Bytes(bigEndian bool) []byte {
if bigEndian {
return EncodeStartL2BlockBigEndian(s)
}
return EncodeStartL2Block(s)
}

// decodes a StartL2Block from a byte array
func DecodeStartL2Block(data []byte) (*StartL2Block, error) {
if len(data) != startL2BlockDataLength {
Expand Down Expand Up @@ -145,6 +155,17 @@ type EndL2Block struct {
StateRoot common.Hash // 32 bytes
}

func (b *EndL2Block) EntryType() EntryType {
return EntryTypeEndL2Block
}

func (b *EndL2Block) Bytes(bigEndian bool) []byte {
if bigEndian {
return EncodeEndL2BlockBigEndian(b)
}
return EncodeEndL2Block(b)
}

// DecodeEndL2Block decodes a EndL2Block from a byte array
func DecodeEndL2Block(data []byte) (*EndL2Block, error) {
if len(data) != endL2BlockDataLength {
Expand Down
12 changes: 12 additions & 0 deletions zk/datastream/types/bookmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,25 @@ import (

const (
BookmarkTypeStart byte = 0
EntryTypeBookmark = EntryType(176)
)

type Bookmark struct {
Type byte
From uint64
}

func (b *Bookmark) EntryType() EntryType {
return EntryTypeBookmark
}

func (b *Bookmark) Bytes(bigEndian bool) []byte {
if bigEndian {
return b.EncodeBigEndian()
}
return b.Encode()
}

func NewL2BlockBookmark(fromBlock uint64) *Bookmark {
return &Bookmark{
Type: BookmarkTypeStart,
Expand Down
Loading
Loading