Skip to content

Commit

Permalink
methods to read single txn by txnID (#4215)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored May 20, 2022
1 parent e328049 commit 7ecdf51
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 16 deletions.
3 changes: 1 addition & 2 deletions cmd/rpcdaemon/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ type BodyReader interface {

type TxnReader interface {
TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error)
//TxnByHashDeprecated(ctx context.Context, tx kv.Getter, txnHash common.Hash) (txn types.Transaction, blockHash common.Hash, blockNum, txnIndex uint64, err error)
//TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error)
}

type HeaderAndCanonicalReader interface {
HeaderReader
CanonicalReader
Expand Down
14 changes: 14 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,20 @@ func ReadStorageBody(db kv.Getter, hash common.Hash, number uint64) (types.BodyF
return *bodyForStorage, nil
}

func CanonicalTxnByID(db kv.Getter, id uint64) (types.Transaction, error) {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, id)
v, err := db.GetOne(kv.EthTx, txIdKey)
if err != nil {
return nil, err
}
txn, err := types.DecodeTransaction(rlp.NewStream(bytes.NewReader(v), uint64(len(v))))
if err != nil {
return nil, err
}
return txn, nil
}

func CanonicalTransactions(db kv.Getter, baseTxId uint64, amount uint32) ([]types.Transaction, error) {
if amount == 0 {
return []types.Transaction{}, nil
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,10 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
if cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled {
if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks {
canDeleteTo := snapshotsync.CanDeleteTo(s.ForwardProgress, cfg.blockRetire.Snapshots())
if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil {
if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 100); err != nil {
return nil
}
if err = PruneTable(tx, kv.Senders, canDeleteTo, ctx, 1_000); err != nil {
if err = PruneTable(tx, kv.Senders, canDeleteTo, ctx, 100); err != nil {
return err
}
}
Expand Down
130 changes: 118 additions & 12 deletions turbo/snapshotsync/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package snapshotsync
import (
"bytes"
"context"
"encoding/binary"
"fmt"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
Expand Down Expand Up @@ -88,6 +89,28 @@ func (back *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash co
}
return *n, true, nil
}
func (back *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
return nil, err
}
var k [8 + 32]byte
binary.BigEndian.PutUint64(k[:], blockNum)
copy(k[8:], canonicalHash[:])
b, err := rawdb.ReadBodyForStorageByKey(tx, k[:])
if err != nil {
return nil, err
}
if b == nil {
return nil, nil
}

txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i))
if err != nil {
return nil, err
}
return txn, nil
}

type RemoteBlockReader struct {
client remote.ETHBACKENDClient
Expand All @@ -108,6 +131,10 @@ func (back *RemoteBlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnH
return reply.BlockNumber, true, nil
}

func (back *RemoteBlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) {
panic("not implemented")
}

func (back *RemoteBlockReader) BlockWithSenders(ctx context.Context, _ kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error) {
reply, err := back.client.Block(ctx, &remote.BlockRequest{BlockHash: gointerfaces.ConvertHashToH256(hash), BlockHeight: blockHeight})
if err != nil {
Expand Down Expand Up @@ -475,34 +502,42 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash,
}

func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.Body, uint64, uint32, []byte, error) {
b, buf, err := back.bodyForStorageFromSnapshot(blockHeight, sn, buf)
if err != nil {
return nil, 0, 0, buf, err
}

body := new(types.Body)
body.Uncles = b.Uncles
var txsAmount uint32
if b.TxAmount >= 2 {
txsAmount = b.TxAmount - 2
}
return body, b.BaseTxId + 1, txsAmount, buf, nil // empty txs in the beginning and end of block
}

func (back *BlockReaderWithSnapshots) bodyForStorageFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.BodyForStorage, []byte, error) {
if sn.idxBodyNumber == nil {
return nil, 0, 0, buf, nil
return nil, buf, nil
}
bodyOffset := sn.idxBodyNumber.Lookup2(blockHeight - sn.idxBodyNumber.BaseDataID())

gg := sn.seg.MakeGetter()
gg.Reset(bodyOffset)
buf, _ = gg.Next(buf[:0])
if len(buf) == 0 {
return nil, 0, 0, buf, nil
return nil, nil, nil
}
b := &types.BodyForStorage{}
reader := bytes.NewReader(buf)
if err := rlp.Decode(reader, b); err != nil {
return nil, 0, 0, buf, err
return nil, buf, err
}

if b.BaseTxId < sn.idxBodyNumber.BaseDataID() {
return nil, 0, 0, buf, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.idxBodyNumber.BaseDataID(), sn.seg.FilePath())
return nil, buf, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.idxBodyNumber.BaseDataID(), sn.seg.FilePath())
}

body := new(types.Body)
body.Uncles = b.Uncles
var txsAmount uint32
if b.TxAmount >= 2 {
txsAmount = b.TxAmount - 2
}
return body, b.BaseTxId + 1, txsAmount, buf, nil // empty txs in the beginning and end of block
return b, buf, nil
}

func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) (txs []types.Transaction, senders []common.Address, err error) {
Expand Down Expand Up @@ -542,6 +577,21 @@ func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmoun
return txs, senders, nil
}

func (back *BlockReaderWithSnapshots) txnByID(txnID uint64, sn *TxnSegment, buf []byte) (txn types.Transaction, err error) {
offset := sn.IdxTxnHash.Lookup2(txnID - sn.IdxTxnHash.BaseDataID())
gg := sn.Seg.MakeGetter()
gg.Reset(offset)
buf, _ = gg.Next(buf[:0])
sender, txnRlp := buf[1:1+20], buf[1+20:]

txn, err = types.DecodeTransaction(rlp.NewStream(bytes.NewReader(txnRlp), uint64(len(txnRlp))))
if err != nil {
return
}
txn.SetSender(*(*common.Address)(sender)) // see: https://tip.golang.org/ref/spec#Conversions_from_slice_to_array_pointer
return
}

func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) {
for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
Expand Down Expand Up @@ -578,6 +628,62 @@ func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, segments []
return
}

func (back *BlockReaderWithSnapshots) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) {
var b *types.BodyForStorage
ok, err := back.sn.ViewBodies(blockNum, func(segment *BodySegment) error {
b, _, err = back.bodyForStorageFromSnapshot(blockNum, segment, nil)
if err != nil {
return err
}
if b == nil {
return nil
}

return nil
})
if ok {
ok, err = back.sn.Txs.ViewSegment(blockNum, func(segment *TxnSegment) error {
// +1 because block has system-txn in the beginning of block
txn, err = back.txnByID(b.BaseTxId+1+uint64(i), segment, nil)
if err != nil {
return err
}
if txn == nil {
return nil
}
return nil
})
if err != nil {
return nil, err
}
if ok {
return txn, nil
}
return nil, nil
}

canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
return nil, err
}
var k [8 + 32]byte
binary.BigEndian.PutUint64(k[:], blockNum)
copy(k[8:], canonicalHash[:])
b, err = rawdb.ReadBodyForStorageByKey(tx, k[:])
if err != nil {
return nil, err
}
if b == nil {
return nil, nil
}

txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i))
if err != nil {
return nil, err
}
return txn, nil
}

// TxnLookup - find blockNumber and txnID by txnHash
func (back *BlockReaderWithSnapshots) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) {
n, err := rawdb.ReadTxLookupEntry(tx, txnHash)
Expand Down

0 comments on commit 7ecdf51

Please sign in to comment.