Skip to content

Commit

Permalink
Merge v2022.04.05 (#2)
Browse files Browse the repository at this point in the history
* block by timestamp for stable (erigontech#3617)

* Add timings of forward stages to logs (erigontech#3621)

* save

* save

* deleted bor and starknet from doc (erigontech#3627)

* add nosqlite tag (erigontech#3653)

* add nosqlite tag

* save

* save (erigontech#3665)

* save (erigontech#3663)

* linter up (erigontech#3672) (erigontech#3673)

* linter up (erigontech#3672)

* save

* save

* Revert node DB cache (erigontech#3581) (erigontech#3674) (erigontech#3675)

Revert "Prevent frequent commits to the node DB in sentries (erigontech#2505)".
This reverts commit 65a9a26.

* [stable] Fixes to discovery nodedb (erigontech#3691)

* Update to erigon-lib stable

* Discovery: throttle node DB commits (erigontech#3581) (erigontech#3656)

UpdateFindFails/UpdateLastPingReceived/UpdateLastPongReceived events
are causing bursty DB commits (100 per minute).

This optimization throttles the disk writes to happen at most once in a few seconds,
because this info doesn't need to be persisted immediately.

This helps on HDD drives.

* Update erigon-lib

* Discovery: split node records to a sepatate DB table (erigontech#3581) (erigontech#3667)

Problem:
QuerySeeds will poke 150 random entries in the whole node DB and ignore hitting "field" entries.
In a bootstrap scenario it might hit hundreds of :lastping :lastpong entries,
and very few true "node record" entries.
After running for 15 minutes I've got totalEntryCount=1508 nodeRecordCount=114 entries.
There's a 1/16 chance of hitting a "node record" entry.
It means finding just about 10 nodes of 114 total on average from 150 attempts.

Solution:
Split "node record" entries to a separate table such that QuerySeeds doesn't do idle cycle hits.

* Discovery: add Context to Listen. (erigontech#3577)

Add explicit Context to ListenV4 and ListenV5.
This makes it possible to stop listening by an external signal.

* Discovery: refactor public key to node ID conversions. (erigontech#3634)

Encode and hash logic was duplicated in multiple places.
* Move encoding to p2p/discover/v4wire
* Move hashing to p2p/enode/idscheme

* Change newRandomLookup to create a proper random key on a curve.

* Discovery: speed up lookup tests (erigontech#3677)

* Update erigon-lib

Co-authored-by: Alexey Sharp <[email protected]>
Co-authored-by: battlmonstr <[email protected]>

* [stable] Fixes for state overrides in RPC (erigontech#3693)

* State override support (erigontech#3628)

* added stateOverride type

* solved import cycle

* refactoring

* imported wrong package

* fixed Call arguments

* typo

* override for traceCall

* Fix eth call (erigontech#3618)

* added isFake

* using isFake instead of checkNonce

* Revert "using isFake instead of checkNonce"

This reverts commit 6a202bb.

* Revert "added isFake"

This reverts commit 2c48024.

* only checking EOA if we are checking for Nonce

Co-authored-by: Enrique Jose  Avila Asapche <[email protected]>

* new bootnodes (erigontech#3591) (erigontech#3695)

Co-authored-by: Enrique Jose  Avila Asapche <[email protected]>

* Update skip analysis and preverified hashes (erigontech#3700) (erigontech#3704)

Co-authored-by: Alexey Sharp <[email protected]>

Co-authored-by: Alexey Sharp <[email protected]>

* Update version.go (erigontech#3701)

* rpcdaemon: fix TxContext in traceBlock (erigontech#3716)

Previously `txCtx` is not updated for every tx, which
leads to wrong tracing results.

* Mdbx: WriteMap fallback on error (erigontech#3714)

* save

* save

* Pool cost fix (erigontech#3725)

* save

* save

* Update to erigon-lib stable

Co-authored-by: Alex Sharp <[email protected]>

* mdbx v0.11.6 (erigontech#3771)

* mdbx fix after v0.11.6 (erigontech#3775)

* save

* save

* save

* [stable] Event log subscription (erigontech#3773)

* Logs sub (erigontech#3666)

* save

* Add onLogs

* Fix lint

* Add proper logs

* Update go.mod

* goimports

* Add unwind

* feat/rpcadaemon_logs_sub (erigontech#3751)

* Fixes to subscribe logs (erigontech#3769)

* Fixes to subscribe logs

* Add criteria to logs subscription

* Skeleton of RPC daemon event log distribution

* Simplify

* Send aggregated filter to Erigon

* Change API

* Print

* Fixes

* Fix topics filtering

* Fill txHash and blockHash

* Timing logs, fill tx index

* Print

* More print

* Print

* Asynchronous sending of log events to RPC daemon

* Remove prints

* Only extract logs if there are subscribers

* Check empty when RPC daemon is removed

Co-authored-by: Alex Sharp <[email protected]>
Co-authored-by: Alexey Sharp <[email protected]>

* Fix up

* Update to erigon-lib stable

* Update to erigon-lib stable

Co-authored-by: primal_concrete_sledge <[email protected]>
Co-authored-by: Alex Sharp <[email protected]>
Co-authored-by: Alexey Sharp <[email protected]>

* Update version.go (erigontech#3776)

* Update Skip analysis and preverified hashes (erigontech#3777) (erigontech#3778)

* Update skip analysis

* Add preverified hashes for mainnet and ropsten

* preverified hashes and bootnode for sepolia

Co-authored-by: Alexey Sharp <[email protected]>

Co-authored-by: Alexey Sharp <[email protected]>

* Integration: reset StageFinish also (erigontech#3783)

* docker hub - fetch git tags before build erigontech#3781

* fix nil pointer in fetch.go (erigontech#3802)

* Update preverified hashes and skip analysis (erigontech#3831) (erigontech#3832)

* Update skip_analysis

* Preverified hashes

Co-authored-by: Alexey Sharp <[email protected]>

Co-authored-by: Alexey Sharp <[email protected]>

* Fix 'all defaults' case for eth_estimateGas (erigontech#3790) (erigontech#3824)

* Fix 'all defaults' case for eth_estimateGas

* fix tests

Co-authored-by: Igor Mandrigin <[email protected]>

Co-authored-by: Igor Mandrigin <[email protected]>
Co-authored-by: Igor Mandrigin <[email protected]>

* Update version.go (erigontech#3829)

* Change libmdbx submodule origin  (erigontech#3894)

* save

* Restore testdata

Co-authored-by: Alexey Sharp <[email protected]>

* Update to erigon-lib stable (erigontech#3895)

Co-authored-by: Alexey Sharp <[email protected]>

* Update version.go (erigontech#3896)

* Update skip_analysis.go (erigontech#3897) (erigontech#3898)

* save (erigontech#3904)

* [stable] Fixes for header download (erigontech#3911)

* Rollback preverified hashes for mainnet

* Not remove header

* Set verified = true

* Fix verified extendUp and connect

* Skip already persisted links

* Prevent rewriting historical headers

* Not load links after highestInDb

* Restore preverified

* Fix tests

* Fix error handling

Co-authored-by: Alexey Sharp <[email protected]>

* save (erigontech#3916)

* Update libmdbx source (erigontech#3974)

Same change as already merged in `devel`

* Makefile (erigontech#3779): pass docker build arguments (erigontech#4239)

Dockerfile requires some --build-arg options.
Fix "docker" target to pass them.
Fix GIT_TAG to reflect the most recent tag related to HEAD, instead of an unrelated most recent tag.
Use it as the image VERSION.

Image tags need to be passed explicitly if needed:

    DOCKER_FLAGS='-t erigon:latest' make docker

* save (erigontech#4346)

* Gray Glacier bomb delay (erigontech#4444)

* Update version.go on stable branch (erigontech#4447)

* Update version.go

* Fix lint

Co-authored-by: Alexey Sharp <[email protected]>

* Clean up

* in transaction execution, subtract from account balance only after enough gaspool is ensured (erigontech#4450)

- noticed the difference when executing testdata#10 in go-ethereum and erigon

* Update skip_analysis.go (erigontech#4452)

* Adjust version

Co-authored-by: Enrique Jose  Avila Asapche <[email protected]>
Co-authored-by: Alex Sharov <[email protected]>
Co-authored-by: battlmonstr <[email protected]>
Co-authored-by: ledgerwatch <[email protected]>
Co-authored-by: Alexey Sharp <[email protected]>
Co-authored-by: can <[email protected]>
Co-authored-by: Alex Sharp <[email protected]>
Co-authored-by: primal_concrete_sledge <[email protected]>
Co-authored-by: Igor Mandrigin <[email protected]>
Co-authored-by: Igor Mandrigin <[email protected]>
Co-authored-by: Andrea Lanfranchi <[email protected]>
Co-authored-by: Andrew Ashikhmin <[email protected]>
Co-authored-by: sudeep <[email protected]>
  • Loading branch information
14 people authored Jun 16, 2022
1 parent bd1d09e commit be6de12
Show file tree
Hide file tree
Showing 43 changed files with 3,657 additions and 104 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
docker:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- run: git submodule update --init --recursive --force
- run: docker build .
- uses: actions/checkout@v3
with:
submodules: recursive
fetch-depth: 0 # fetch git tags for "git describe"
- run: make docker
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
url = https://github.com/ethereum/tests
[submodule "libmdbx"]
path = libmdbx
url = https://github.com/erthink/libmdbx
url = https://github.com/torquem-ch/libmdbx.git
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ EXPOSE 8545 8546 30303 30303/udp 30304 30304/udp 8080 9090 6060
# https://github.com/opencontainers/image-spec/blob/main/annotations.md
ARG BUILD_DATE
ARG VCS_REF
ARG VERSION
LABEL org.label-schema.build-date=$BUILD_DATE \
org.label-schema.name="Erigon" \
org.label-schema.description="Erigon Ethereum Client" \
Expand Down
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ GOTEST = GODEBUG=cgocheck=0 $(GO) test -tags nosqlite -trimpath ./... -p 2

GIT_COMMIT ?= $(shell git rev-list -1 HEAD)
GIT_BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD)
GIT_TAG ?= $(shell git describe --tags `git rev-list --tags="v*" --max-count=1`)
GIT_TAG ?= $(shell git describe --tags '--match=v*')

CGO_CFLAGS := $(shell $(GO) env CGO_CFLAGS) # don't loose default
CGO_CFLAGS += -DMDBX_FORCE_ASSERTIONS=0 # Enable MDBX's asserts by default in 'devel' branch and disable in 'stable'
Expand All @@ -26,7 +26,12 @@ go-version:
fi

docker:
DOCKER_BUILDKIT=1 docker build -t erigon:latest --build-arg git_commit='${GIT_COMMIT}' --build-arg git_branch='${GIT_BRANCH}' --build-arg git_tag='${GIT_TAG}' .
DOCKER_BUILDKIT=1 docker build \
--build-arg "BUILD_DATE=$(shell date -Iseconds)" \
--build-arg VCS_REF=${GIT_COMMIT} \
--build-arg VERSION=${GIT_TAG} \
${DOCKER_FLAGS} \
.

xdg_data_home := ~/.local/share
ifdef XDG_DATA_HOME
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ NB! <code>In-depth links are marked by the microscope sign (🔬) </code>
**Disclaimer: this software is currently a tech preview. We will do our best to keep it stable and make no breaking
changes but we don't guarantee anything. Things can and will break.**

<code>🔬 Alpha/Beta versions difference: [here](https://erigon.substack.com/p/erigon-2-three-upgrades?s=r)</code>

System Requirements
===================
Expand All @@ -45,7 +46,7 @@ folder `<datadir>/etl-tmp` to another disk).

RAM: 16GB, 64-bit architecture, [Golang version >= 1.16](https://golang.org/doc/install)

<code>🔬 more info on disk storage is [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space)) </code>
<code>🔬 more details on disk storage [here](https://erigon.substack.com/p/disk-footprint-changes-in-new-erigon?s=r) and [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space).</code>

Usage
=====
Expand Down
13 changes: 11 additions & 2 deletions cmd/erigon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"os"

"github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon/params"
erigoncli "github.com/ledgerwatch/erigon/turbo/cli"
"github.com/ledgerwatch/erigon/turbo/node"
Expand All @@ -13,7 +13,16 @@ import (
)

func main() {
defer debug.LogPanic()
defer func() {
panicResult := recover()
if panicResult == nil {
return
}

log.Error("catch panic", "err", panicResult, "stack", dbg.Stack())
os.Exit(1)
}()

app := erigoncli.MakeApp(runErigon, erigoncli.DefaultFlags)
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.

stateStages.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders,
stages.TxPool, // TODO: enable TxPoolDB stage
stages.Finish)
)

execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, tmpDir)

Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type EthAPI interface {

// Sending related (see ./eth_call.go)
Call(ctx context.Context, args ethapi.CallArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *ethapi.StateOverrides) (hexutil.Bytes, error)
EstimateGas(ctx context.Context, args ethapi.CallArgs, blockNrOrHash *rpc.BlockNumberOrHash) (hexutil.Uint64, error)
EstimateGas(ctx context.Context, argsOrNil *ethapi.CallArgs, blockNrOrHash *rpc.BlockNumberOrHash) (hexutil.Uint64, error)
SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error)
SendTransaction(_ context.Context, txObject interface{}) (common.Hash, error)
Sign(ctx context.Context, _ common.Address, _ hexutil.Bytes) (hexutil.Bytes, error)
Expand Down
8 changes: 7 additions & 1 deletion cmd/rpcdaemon/commands/eth_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ func HeaderByNumberOrHash(ctx context.Context, tx kv.Tx, blockNrOrHash rpc.Block
}

// EstimateGas implements eth_estimateGas. Returns an estimate of how much gas is necessary to allow the transaction to complete. The transaction will not be added to the blockchain.
func (api *APIImpl) EstimateGas(ctx context.Context, args ethapi.CallArgs, blockNrOrHash *rpc.BlockNumberOrHash) (hexutil.Uint64, error) {
func (api *APIImpl) EstimateGas(ctx context.Context, argsOrNil *ethapi.CallArgs, blockNrOrHash *rpc.BlockNumberOrHash) (hexutil.Uint64, error) {
var args ethapi.CallArgs
// if we actually get CallArgs here, we use them
if argsOrNil != nil {
args = *argsOrNil
}

bNrOrHash := rpc.BlockNumberOrHashWithNumber(rpc.PendingBlockNumber)
if blockNrOrHash != nil {
bNrOrHash = *blockNrOrHash
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestEstimateGas(t *testing.T) {
api := NewEthAPI(NewBaseApi(nil, stateCache, false), db, nil, nil, nil, 5000000)
var from = common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")
var to = common.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e")
if _, err := api.EstimateGas(context.Background(), ethapi.CallArgs{
if _, err := api.EstimateGas(context.Background(), &ethapi.CallArgs{
From: &from,
To: &to,
}, nil); err != nil {
Expand Down
36 changes: 36 additions & 0 deletions cmd/rpcdaemon/commands/eth_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/filters"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/log/v3"
)
Expand Down Expand Up @@ -110,3 +111,38 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context) (*rpc.Subscripti

return rpcSub, nil
}

// SubscribeLogs send a notification each time a new log appears.
func (api *APIImpl) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) {
if api.filters == nil {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
defer debug.LogPanic()
logs := make(chan *types.Log, 1)
defer close(logs)
id := api.filters.SubscribeLogs(logs, crit)
defer api.filters.UnsubscribeLogs(id)

for {
select {
case h := <-logs:
err := notifier.Notify(rpcSub.ID, h)
if err != nil {
log.Warn("error while notifying subscription", "err", err)
}
case <-rpcSub.Err():
return
}
}
}()

return rpcSub, nil
}
8 changes: 7 additions & 1 deletion cmd/rpcdaemon/commands/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/tracers"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/internal/ethapi"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (api *PrivateDebugAPIImpl) traceBlock(ctx context.Context, blockNrOrHash rp
return rawdb.ReadHeader(tx, hash, number)
}

_, blockCtx, txCtx, ibs, reader, err := transactions.ComputeTxEnv(ctx, block, chainConfig, getHeader, contractHasTEVM, ethash.NewFaker(), tx, block.Hash(), 0)
_, blockCtx, _, ibs, reader, err := transactions.ComputeTxEnv(ctx, block, chainConfig, getHeader, contractHasTEVM, ethash.NewFaker(), tx, block.Hash(), 0)
if err != nil {
stream.WriteNil()
return err
Expand All @@ -82,6 +83,11 @@ func (api *PrivateDebugAPIImpl) traceBlock(ctx context.Context, blockNrOrHash rp
}
ibs.Prepare(tx.Hash(), block.Hash(), idx)
msg, _ := tx.AsMessage(*signer, block.BaseFee())
txCtx := vm.TxContext{
TxHash: tx.Hash(),
Origin: msg.From(),
GasPrice: msg.GasPrice().ToBig(),
}

transactions.TraceTx(ctx, msg, blockCtx, txCtx, ibs, config, chainConfig, stream)
_ = ibs.FinalizeTx(chainConfig.Rules(blockCtx.BlockNumber), reader)
Expand Down
121 changes: 121 additions & 0 deletions cmd/rpcdaemon/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ import (
"io"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/eth/filters"

"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/services"
Expand All @@ -28,6 +34,7 @@ type (
PendingLogsSubID SubscriptionID
PendingBlockSubID SubscriptionID
PendingTxsSubID SubscriptionID
LogsSubID uint64
)

type Filters struct {
Expand All @@ -39,6 +46,8 @@ type Filters struct {
pendingLogsSubs map[PendingLogsSubID]chan types.Logs
pendingBlockSubs map[PendingBlockSubID]chan *types.Block
pendingTxsSubs map[PendingTxsSubID]chan []types.Transaction
logsSubs *LogsFilterAggregator
logsRequestor atomic.Value
}

func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient) *Filters {
Expand All @@ -49,6 +58,7 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo
pendingTxsSubs: make(map[PendingTxsSubID]chan []types.Transaction),
pendingLogsSubs: make(map[PendingLogsSubID]chan types.Logs),
pendingBlockSubs: make(map[PendingBlockSubID]chan *types.Block),
logsSubs: NewLogsFilterAggregator(),
}

go func() {
Expand Down Expand Up @@ -83,6 +93,31 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo
}
}()

go func() {
if ethBackend == nil {
return
}
for {
select {
case <-ctx.Done():
return
default:
}
if err := ethBackend.SubscribeLogs(ctx, ff.OnNewLogs, &ff.logsRequestor); err != nil {
select {
case <-ctx.Done():
return
default:
}
if grpcutil.IsEndOfStream(err) || grpcutil.IsRetryLater(err) {
time.Sleep(3 * time.Second)
continue
}
log.Warn("rpc filters: error subscribing to logs", "err", err)
}
}
}()

if txPool != nil {
go func() {
for {
Expand Down Expand Up @@ -337,6 +372,73 @@ func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) {
delete(ff.pendingTxsSubs, id)
}

func (ff *Filters) SubscribeLogs(out chan *types.Log, crit filters.FilterCriteria) LogsSubID {
id, f := ff.logsSubs.insertLogsFilter(out)
f.addrs = map[common.Address]int{}
if len(crit.Addresses) == 0 {
f.allAddrs = 1
} else {
for _, addr := range crit.Addresses {
f.addrs[addr] = 1
}
}
f.topics = map[common.Hash]int{}
if len(crit.Topics) == 0 {
f.allTopics = 1
} else {
for _, topics := range crit.Topics {
for _, topic := range topics {
f.topics[topic] = 1
}
}
}
f.topicsOriginal = crit.Topics
ff.logsSubs.addLogsFilters(f)
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
}
for addr := range ff.logsSubs.aggLogsFilter.addrs {
lfr.Addresses = append(lfr.Addresses, gointerfaces.ConvertAddressToH160(addr))
}
for topic := range ff.logsSubs.aggLogsFilter.topics {
lfr.Topics = append(lfr.Topics, gointerfaces.ConvertHashToH256(topic))
}
ff.mu.Lock()
defer ff.mu.Unlock()
loaded := ff.logsRequestor.Load()
if loaded != nil {
if err := loaded.(func(*remote.LogsFilterRequest) error)(lfr); err != nil {
log.Warn("Could not update remote logs filter", "err", err)
ff.logsSubs.removeLogsFilter(id)
}
}
return id
}

func (ff *Filters) UnsubscribeLogs(id LogsSubID) {
ff.logsSubs.removeLogsFilter(id)
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
}
for addr := range ff.logsSubs.aggLogsFilter.addrs {
lfr.Addresses = append(lfr.Addresses, gointerfaces.ConvertAddressToH160(addr))
}
for topic := range ff.logsSubs.aggLogsFilter.topics {
lfr.Topics = append(lfr.Topics, gointerfaces.ConvertHashToH256(topic))
}
ff.mu.Lock()
defer ff.mu.Unlock()
loaded := ff.logsRequestor.Load()
if loaded != nil {
if err := loaded.(func(*remote.LogsFilterRequest) error)(lfr); err != nil {
log.Warn("Could not update remote logs filter", "err", err)
ff.logsSubs.removeLogsFilter(id)
}
}
}

func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) {
ff.mu.RLock()
defer ff.mu.RUnlock()
Expand Down Expand Up @@ -411,6 +513,25 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) {
}
}

func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) {
lg := &types.Log{
Address: gointerfaces.ConvertH160toAddress(reply.Address),
Data: reply.Data,
BlockNumber: reply.BlockNumber,
TxHash: gointerfaces.ConvertH256ToHash(reply.TransactionHash),
TxIndex: uint(reply.TransactionIndex),
BlockHash: gointerfaces.ConvertH256ToHash(reply.BlockHash),
Index: uint(reply.LogIndex),
Removed: reply.Removed,
}
t := make([]common.Hash, 0)
for _, v := range reply.Topics {
t = append(t, gointerfaces.ConvertH256ToHash(v))
}
lg.Topics = t
ff.logsSubs.distributeLog(reply)
}

func generateSubscriptionID() SubscriptionID {
var id [32]byte

Expand Down
Loading

0 comments on commit be6de12

Please sign in to comment.