Skip to content

Commit

Permalink
Implement pushing blocks data to the DB (ethereum#18)
Browse files Browse the repository at this point in the history
Co-authored-by: Bhakiyaraj Kalimuthu <[email protected]>
  • Loading branch information
2 people authored and avalonche committed Mar 9, 2023
1 parent b93e268 commit 10fa312
Show file tree
Hide file tree
Showing 16 changed files with 433 additions and 108 deletions.
25 changes: 12 additions & 13 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type IBuilder interface {
}

type Builder struct {
ds IDatabaseService
beaconClient IBeaconClient
relay IRelay
eth IEthereumService
Expand All @@ -55,12 +56,13 @@ type Builder struct {
bestBlockProfit *big.Int
}

func NewBuilder(sk *bls.SecretKey, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder {
func NewBuilder(sk *bls.SecretKey, ds IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder {
pkBytes := bls.PublicKeyFromSecretKey(sk).Compress()
pk := boostTypes.PublicKey{}
pk.FromSlice(pkBytes)

return &Builder{
ds: ds,
beaconClient: bc,
relay: relay,
eth: eth,
Expand All @@ -73,7 +75,7 @@ func NewBuilder(sk *bls.SecretKey, bc IBeaconClient, relay IRelay, builderSignin
}
}

func (b *Builder) onSealedBlock(executableData *beacon.ExecutableDataV1, block *types.Block, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error {
func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBundle, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error {
b.bestMu.Lock()
defer b.bestMu.Unlock()

Expand All @@ -88,6 +90,7 @@ func (b *Builder) onSealedBlock(executableData *beacon.ExecutableDataV1, block *
}
}

executableData := beacon.BlockToExecutableData(block)
payload, err := executableDataToExecutionPayload(executableData)
if err != nil {
log.Error("could not format execution payload", "err", err)
Expand Down Expand Up @@ -133,6 +136,7 @@ func (b *Builder) onSealedBlock(executableData *beacon.ExecutableDataV1, block *

b.bestBlockProfit.Set(block.Profit)

go b.ds.ConsumeBuiltBlock(block, bundles, &blockBidMsg)
return nil
}

Expand Down Expand Up @@ -166,20 +170,15 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error {
return errors.New("parent block not found in blocktree")
}

firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error {
executableData, block := b.eth.BuildBlock(attrs)
if executableData == nil || block == nil {
log.Error("did not receive the payload")
return errors.New("did not receive the payload")
}

err := b.onSealedBlock(executableData, block, proposerPubkey, vd.FeeRecipient, attrs)
blockHook := func(block *types.Block, bundles []types.SimulatedBundle) {
err := b.onSealedBlock(block, bundles, proposerPubkey, vd.FeeRecipient, attrs)
if err != nil {
log.Error("could not run block hook", "err", err)
return err
log.Error("could not run sealed block hook", "err", err)
}
}

return nil
firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error {
return b.eth.BuildBlock(attrs, blockHook)
})

return firstBlockResult
Expand Down
18 changes: 9 additions & 9 deletions builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestOnPayloadAttributes(t *testing.T) {
FeeRecipient: common.Address(feeRecipient),
StateRoot: common.Hash{0x07, 0x16},
ReceiptsRoot: common.Hash{0x08, 0x20},
LogsBloom: hexutil.MustDecode("0x000000000000000000000000000000"),
LogsBloom: types.Bloom{}.Bytes(),
Number: uint64(10),
GasLimit: uint64(50),
GasUsed: uint64(100),
Expand All @@ -56,13 +56,13 @@ func TestOnPayloadAttributes(t *testing.T) {

BaseFeePerGas: big.NewInt(16),

BlockHash: common.Hash{0x09, 0xff},
BlockHash: common.HexToHash("0xca4147f0d4150183ece9155068f34ee3c375448814e4ca557d482b1d40ee5407"),
Transactions: [][]byte{},
}

testBlock := &types.Block{
Profit: big.NewInt(10),
}
testBlock, err := beacon.ExecutableDataToBlock(*testExecutableData)
require.NoError(t, err)
testBlock.Profit = big.NewInt(10)

testPayloadAttributes := &BuilderPayloadAttributes{
Timestamp: hexutil.Uint64(104),
Expand All @@ -74,7 +74,7 @@ func TestOnPayloadAttributes(t *testing.T) {

testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock}

builder := NewBuilder(sk, &testBeacon, &testRelay, bDomain, testEthService)
builder := NewBuilder(sk, NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService)

builder.OnPayloadAttribute(testPayloadAttributes)

Expand All @@ -85,14 +85,14 @@ func TestOnPayloadAttributes(t *testing.T) {
expectedMessage := boostTypes.BidTrace{
Slot: uint64(25),
ParentHash: boostTypes.Hash{0x02, 0x03},
BlockHash: boostTypes.Hash{0x09, 0xff},
BuilderPubkey: builder.builderPublicKey,
ProposerPubkey: expectedProposerPubkey,
ProposerFeeRecipient: feeRecipient,
GasLimit: uint64(50),
GasUsed: uint64(100),
Value: boostTypes.U256Str{0x0a},
}
expectedMessage.BlockHash.FromSlice(hexutil.MustDecode("0xca4147f0d4150183ece9155068f34ee3c375448814e4ca557d482b1d40ee5407")[:])

require.Equal(t, expectedMessage, *testRelay.submittedMsg.Message)

Expand All @@ -109,13 +109,13 @@ func TestOnPayloadAttributes(t *testing.T) {
Timestamp: testExecutableData.Timestamp,
ExtraData: hexutil.MustDecode("0x0042fafc"),
BaseFeePerGas: boostTypes.U256Str{0x10},
BlockHash: boostTypes.Hash{0x09, 0xff},
BlockHash: expectedMessage.BlockHash,
Transactions: []hexutil.Bytes{},
}

require.Equal(t, expectedExecutionPayload, *testRelay.submittedMsg.ExecutionPayload)

expectedSignature, err := boostTypes.HexToSignature("0xb086abc231a515559128122a6618ad316a76195ad39aa28195c9e8921b98561ca4fd12e2e1ea8d50d8e22f7e36d42ee1084fef26672beceda7650a87061e412d7742705077ac3af3ca1a1c3494eccb22fe7c234fd547a285ba699ff87f0e7759")
expectedSignature, err := boostTypes.HexToSignature("0xad09f171b1da05636acfc86778c319af69e39c79515d44bdfed616ba2ef677ffd4d155d87b3363c6bae651ce1e92786216b75f1ac91dd65f3b1d1902bf8485e742170732dd82ffdf4decb0151eeb7926dd053efa9794b2ebed1a203e62bb13e9")

require.NoError(t, err)
require.Equal(t, expectedSignature, testRelay.submittedMsg.Signature)
Expand Down
136 changes: 136 additions & 0 deletions builder/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package builder

import (
"context"
"database/sql"
"math/big"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
boostTypes "github.com/flashbots/go-boost-utils/types"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)

type IDatabaseService interface {
ConsumeBuiltBlock(block *types.Block, bundles []types.SimulatedBundle, bidTrace *boostTypes.BidTrace)
}

type NilDbService struct{}

func (NilDbService) ConsumeBuiltBlock(*types.Block, []types.SimulatedBundle, *boostTypes.BidTrace) {}

type DatabaseService struct {
db *sqlx.DB

insertBuiltBlockStmt *sqlx.NamedStmt
insertBlockBuiltBundleNoIdStmt *sqlx.NamedStmt
insertBlockBuiltBundleWithIdStmt *sqlx.NamedStmt
insertMissingBundleStmt *sqlx.NamedStmt
}

func NewDatabaseService(postgresDSN string) (*DatabaseService, error) {
db, err := sqlx.Connect("postgres", postgresDSN)
if err != nil {
return nil, err
}

insertBuiltBlockStmt, err := db.PrepareNamed("insert into built_blocks (block_number, profit, slot, hash, gas_limit, gas_used, base_fee, parent_hash, proposer_pubkey, proposer_fee_recipient, builder_pubkey, timestamp, timestamp_datetime) values (:block_number, :profit, :slot, :hash, :gas_limit, :gas_used, :base_fee, :parent_hash, :proposer_pubkey, :proposer_fee_recipient, :builder_pubkey, :timestamp, to_timestamp(:timestamp)) returning block_id")
if err != nil {
return nil, err
}

insertBlockBuiltBundleNoIdStmt, err := db.PrepareNamed("insert into built_blocks_bundles (block_id, bundle_id) select :block_id, id from bundles where bundle_hash = :bundle_hash and param_block_number = :block_number returning bundle_id")
if err != nil {
return nil, err
}

insertBlockBuiltBundleWithIdStmt, err := db.PrepareNamed("insert into built_blocks_bundles (block_id, bundle_id) select :block_id, :bundle_id returning bundle_id")
if err != nil {
return nil, err
}

insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase) on conflict (bundle_hash, param_block_number) do nothing returning id")
if err != nil {
return nil, err
}

return &DatabaseService{
db: db,
insertBuiltBlockStmt: insertBuiltBlockStmt,
insertBlockBuiltBundleNoIdStmt: insertBlockBuiltBundleNoIdStmt,
insertBlockBuiltBundleWithIdStmt: insertBlockBuiltBundleWithIdStmt,
insertMissingBundleStmt: insertMissingBundleStmt,
}, nil
}

func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, bundles []types.SimulatedBundle, bidTrace *boostTypes.BidTrace) {
tx, err := ds.db.Beginx()

blockData := BuiltBlock{
BlockNumber: block.NumberU64(),
Profit: new(big.Rat).SetFrac(block.Profit, big.NewInt(1e18)).FloatString(18),
Slot: bidTrace.Slot,
Hash: block.Hash().String(),
GasLimit: block.GasLimit(),
GasUsed: block.GasUsed(),
BaseFee: block.BaseFee().Uint64(),
ParentHash: block.ParentHash().String(),
ProposerPubkey: bidTrace.ProposerPubkey.String(),
ProposerFeeRecipient: bidTrace.ProposerFeeRecipient.String(),
BuilderPubkey: bidTrace.BuilderPubkey.String(),
Timestamp: block.Time(),
}

ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()
var blockId uint64
if err = tx.NamedStmtContext(ctx, ds.insertBuiltBlockStmt).GetContext(ctx, &blockId, blockData); err != nil {
log.Error("could not insert built block", "err", err)
tx.Rollback()
return
}

for _, bundle := range bundles {
bundleData := BuiltBlockBundle{
BlockId: blockId,
BundleId: nil,
BlockNumber: blockData.BlockNumber,
BundleHash: bundle.OriginalBundle.Hash.String(),
}

var bundleId uint64
err := tx.NamedStmtContext(ctx, ds.insertBlockBuiltBundleNoIdStmt).GetContext(ctx, &bundleId, bundleData)
if err == nil {
continue
}

if err != sql.ErrNoRows {
log.Error("could not insert bundle", "err", err)
// Try anyway
}

missingBundleData := SimulatedBundleToDbBundle(&bundle)
err = ds.insertMissingBundleStmt.GetContext(ctx, &bundleId, missingBundleData) // not using the tx as it relies on the unique constraint!
if err == nil {
bundleData.BundleId = &bundleId
_, err = tx.NamedStmtContext(ctx, ds.insertBlockBuiltBundleWithIdStmt).ExecContext(ctx, bundleData)
if err != nil {
log.Error("could not insert built block bundle after inserting missing bundle", "err", err)
}
} else if err == sql.ErrNoRows /* conflict, someone else inserted the bundle before we could */ {
if err := tx.NamedStmtContext(ctx, ds.insertBlockBuiltBundleNoIdStmt).GetContext(ctx, &bundleId, bundleData); err != nil {
log.Error("could not insert bundle on retry", "err", err)
continue
}
} else {
log.Error("could not insert missing bundle", "err", err)
}
}

err = tx.Commit()
if err != nil {
log.Error("could not commit DB trasnaction", "err", err)
}
}
71 changes: 71 additions & 0 deletions builder/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package builder

import (
"math/big"
"os"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
boostTypes "github.com/flashbots/go-boost-utils/types"
"github.com/stretchr/testify/require"
)

func TestDatabaseBlockInsertion(t *testing.T) {
dsn := os.Getenv("FLASHBOTS_TEST_POSTGRES_DSN")
if dsn == "" {
return
}

ds, err := NewDatabaseService(dsn)
require.NoError(t, err)

_, err = ds.db.Exec("insert into bundles (id, param_block_number, bundle_hash) values (10, 20, '0x1078')")
require.NoError(t, err)

block := types.NewBlock(
&types.Header{
ParentHash: common.HexToHash("0xafafafa"),
Number: big.NewInt(132),
GasLimit: uint64(10000),
GasUsed: uint64(1000),
Time: 16000000,
BaseFee: big.NewInt(7),
}, nil, nil, nil, nil)
block.Profit = big.NewInt(10)

simBundle1 := types.SimulatedBundle{
MevGasPrice: big.NewInt(9),
TotalEth: big.NewInt(11),
EthSentToCoinbase: big.NewInt(10),
TotalGasUsed: uint64(100),
OriginalBundle: types.MevBundle{
Txs: types.Transactions{types.NewTransaction(uint64(50), common.Address{0x60}, big.NewInt(19), uint64(67), big.NewInt(43), []byte{})},
BlockNumber: big.NewInt(12),
MinTimestamp: uint64(1000000),
RevertingTxHashes: []common.Hash{common.Hash{0x10, 0x17}},
Hash: common.Hash{0x09, 0x78},
},
}
simBundle2 := types.SimulatedBundle{
MevGasPrice: big.NewInt(90),
TotalEth: big.NewInt(110),
EthSentToCoinbase: big.NewInt(100),
TotalGasUsed: uint64(1000),
OriginalBundle: types.MevBundle{
Txs: types.Transactions{types.NewTransaction(uint64(51), common.Address{0x61}, big.NewInt(109), uint64(167), big.NewInt(433), []byte{})},
BlockNumber: big.NewInt(20),
MinTimestamp: uint64(1000020),
RevertingTxHashes: []common.Hash{common.Hash{0x11, 0x17}},
Hash: common.Hash{0x10, 0x78},
},
}

bidTrace := &boostTypes.BidTrace{}

ds.ConsumeBuiltBlock(block, []types.SimulatedBundle{simBundle1, simBundle2}, bidTrace)

var dbBlock BuiltBlock
ds.db.Get(&dbBlock, "select * from built_blocks where hash = '0x24e6998e4d2b4fd85f7f0d27ac4b87aaf0ec18e156e4b6e194ab5dadee0cd004'")
t.Logf("block %v", dbBlock)
}
Loading

0 comments on commit 10fa312

Please sign in to comment.