Skip to content

Commit

Permalink
chore: remove mempool.postCheck (Finschia#158) (Finschia#217)
Browse files Browse the repository at this point in the history
* fix: error handling after check tx

* fix: typo

* chore: (mempool) remove postCheck and impl reserve

* chore: fix tests

* chore: revise log (remove checkTx.Code)

* chore: add `CONTRACT` for `mem.proxyAppConn.CheckTxAsync()`

* chore: revise numTxs, txsBytes for `ErrMempoolIsFull` in reserve()

* chore: revise to remove redundant `isFull()`

* fix: remove tx from cache when `app errors` or `failed to reserve`

* Revert "chore: revise to remove redundant `isFull()`"

This reverts commit 55990ec.
  • Loading branch information
egonspace committed Jul 8, 2021
1 parent 7ea6e35 commit 800cab3
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 100 deletions.
1 change: 0 additions & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func (emptyMempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestCacheAfterUpdate(t *testing.T) {
updateTxs = append(updateTxs, tx)
}
err := mempool.Update(newTestBlock(int64(tcIndex), updateTxs),
abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
abciResponses(len(updateTxs), abci.CodeTypeOK), nil)
require.NoError(t, err)

for _, v := range tc.reAddIndices {
Expand Down
85 changes: 50 additions & 35 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type CListMempool struct {
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes

reserved int // the number of checking tx and it should be considered when checking mempool full
reservedBytes int64 // size of checking tx and it should be considered when checking mempool full
reservedMtx sync.Mutex

// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
Expand All @@ -50,7 +54,6 @@ type CListMempool struct {
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
updateMtx tmsync.RWMutex
preCheck PreCheckFunc
postCheck PostCheckFunc

wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
Expand Down Expand Up @@ -126,13 +129,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.preCheck = f }
}

// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx. Only applies to the first created block.
// After that, Update overwrites the existing value.
func WithPostCheck(f PostCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.postCheck = f }
}

// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics }
Expand Down Expand Up @@ -287,6 +283,14 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
return ErrTxInCache
}

// reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()`
if err := mem.reserve(int64(txSize)); err != nil {
// remove from cache
mem.cache.Remove(tx)
return err
}

// CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas)
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))

Expand Down Expand Up @@ -396,6 +400,35 @@ func (mem *CListMempool) isFull(txSize int) error {
return nil
}

func (mem *CListMempool) reserve(txSize int64) error {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)

if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize + mem.reserved, mem.config.Size,
txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes,
}
}

mem.reserved++
mem.reservedBytes += txSize
return nil
}

func (mem *CListMempool) releaseReserve(txSize int64) {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

mem.reserved--
mem.reservedBytes -= txSize
}

// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
Expand All @@ -408,20 +441,7 @@ func (mem *CListMempool) resCbFirstTime(
) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
// Check mempool isn't full again to reduce the chance of exceeding the
// limits.
if err := mem.isFull(len(tx)); err != nil {
// remove from cache (mempool might have a space later)
mem.cache.Remove(tx)
mem.logger.Error(err.Error())
return
}

if r.CheckTx.Code == abci.CodeTypeOK {
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
Expand All @@ -438,14 +458,17 @@ func (mem *CListMempool) resCbFirstTime(
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
mem.logger.Info("Rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr)
mem.logger.Debug("rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r)
mem.metrics.FailedTxs.Add(1)
if !mem.config.KeepInvalidTxsInCache {
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}
}

// release `reserve` regardless it's OK or not (it might be good later)
mem.releaseReserve(int64(len(tx)))
default:
// ignore other messages
}
Expand All @@ -466,15 +489,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
memTx.tx,
tx))
}
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
if r.CheckTx.Code == abci.CodeTypeOK {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache)
}
Expand Down Expand Up @@ -567,12 +586,11 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
return txs
}

// Lock() must be help by the caller during execution.
// Lock() must be held by the caller during execution.
func (mem *CListMempool) Update(
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
// Set height
mem.height = block.Height
Expand All @@ -581,9 +599,6 @@ func (mem *CListMempool) Update(
if preCheck != nil {
mem.preCheck = preCheck
}
if postCheck != nil {
mem.postCheck = postCheck
}

for i, tx := range block.Txs {
if deliverTxResponses[i].Code == abci.CodeTypeOK {
Expand Down
39 changes: 16 additions & 23 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,30 +147,23 @@ func TestMempoolFilters(t *testing.T) {
emptyTxArr := []types.Tx{[]byte{}}

nopPreFilter := func(tx types.Tx) error { return nil }
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }

// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes
tests := []struct {
numTxsToCreate int
preFilter PreCheckFunc
postFilter PostCheckFunc
expectedNumTxs int
}{
{10, nopPreFilter, nopPostFilter, 10},
{10, PreCheckMaxBytes(10), nopPostFilter, 0},
{10, PreCheckMaxBytes(22), nopPostFilter, 10},
{10, nopPreFilter, PostCheckMaxGas(-1), 10},
{10, nopPreFilter, PostCheckMaxGas(0), 0},
{10, nopPreFilter, PostCheckMaxGas(1), 10},
{10, nopPreFilter, PostCheckMaxGas(3000), 10},
{10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0},
{10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10},
{10, PreCheckMaxBytes(22), PostCheckMaxGas(1), 10},
{10, PreCheckMaxBytes(22), PostCheckMaxGas(0), 0},
{10, nopPreFilter, 10},
{10, PreCheckMaxBytes(10), 0},
{10, PreCheckMaxBytes(20), 0},
{10, PreCheckMaxBytes(22), 10},
{10, PreCheckMaxBytes(30), 10},
}
for tcIndex, tt := range tests {
err := mempool.Update(newTestBlock(1, emptyTxArr), abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
err := mempool.Update(newTestBlock(1, emptyTxArr),
abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter)
require.NoError(t, err)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
Expand All @@ -187,7 +180,7 @@ func TestMempoolUpdate(t *testing.T) {
// 1. Adds valid txs to the cache
{
err := mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}),
abciResponses(1, abci.CodeTypeOK), nil, nil)
abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
if assert.Error(t, err) {
Expand All @@ -199,7 +192,7 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{})
require.NoError(t, err)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil, nil)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.Zero(t, mempool.Size())
}
Expand All @@ -208,7 +201,7 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
require.NoError(t, err)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil, nil)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil)
require.NoError(t, err)
assert.Zero(t, mempool.Size())

Expand Down Expand Up @@ -240,7 +233,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
_ = app.DeliverTx(abci.RequestDeliverTx{Tx: a})
_ = app.DeliverTx(abci.RequestDeliverTx{Tx: b})
err = mempool.Update(newTestBlock(1, []types.Tx{a, b}),
[]*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil)
[]*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil)
require.NoError(t, err)

// a must be added to the cache
Expand Down Expand Up @@ -296,7 +289,7 @@ func TestTxsAvailable(t *testing.T) {
// since there are still txs left
committedTxs, txs := txs[:50], txs[50:]
if err := mempool.Update(newTestBlock(1, committedTxs),
abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
Expand All @@ -309,7 +302,7 @@ func TestTxsAvailable(t *testing.T) {
// now call update with all the txs. it should not fire as there are no txs left
committedTxs = append(txs, moreTxs...) //nolint: gocritic
if err := mempool.Update(newTestBlock(2, committedTxs),
abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
Expand Down Expand Up @@ -369,7 +362,7 @@ func TestSerialReap(t *testing.T) {
txs = append(txs, txBytes)
}
if err := mempool.Update(newTestBlock(0, txs),
abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -541,7 +534,7 @@ func TestMempoolTxsBytes(t *testing.T) {

// 3. zero again after tx is removed by Update
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}),
abciResponses(1, abci.CodeTypeOK), nil, nil)
abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.EqualValues(t, 0, mempool.TxsBytes())

Expand Down Expand Up @@ -591,7 +584,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NotEmpty(t, res2.Data)

// Pretend like we committed nothing so txBytes gets rechecked and removed.
err = mempool.Update(newTestBlock(1, []types.Tx{}), abciResponses(0, abci.CodeTypeOK), nil, nil)
err = mempool.Update(newTestBlock(1, []types.Tx{}), abciResponses(0, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.EqualValues(t, 0, mempool.TxsBytes())

Expand Down
25 changes: 0 additions & 25 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Mempool interface {
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
) error

// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
Expand Down Expand Up @@ -84,11 +83,6 @@ type Mempool interface {
// transaction doesn't exceeded the block size.
type PreCheckFunc func(types.Tx) error

// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
Expand All @@ -113,22 +107,3 @@ func PreCheckMaxBytes(maxBytes int64) PreCheckFunc {
return nil
}
}

// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
// maxGas. Returns nil if maxGas is -1.
func PostCheckMaxGas(maxGas int64) PostCheckFunc {
return func(tx types.Tx, res *abci.ResponseCheckTx) error {
if maxGas == -1 {
return nil
}
if res.GasWanted < 0 {
return fmt.Errorf("gas wanted %d is negative",
res.GasWanted)
}
if res.GasWanted > maxGas {
return fmt.Errorf("gas wanted %d is greater than max gas %d",
res.GasWanted, maxGas)
}
return nil
}
}
1 change: 0 additions & 1 deletion mempool/mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func (Mempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestReactorConcurrency(t *testing.T) {
for i := range txs {
deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0}
}
err := reactors[0].mempool.Update(newTestBlock(1, txs), deliverTxResponses, nil, nil)
err := reactors[0].mempool.Update(newTestBlock(1, txs), deliverTxResponses, nil)
assert.NoError(t, err)
}()

Expand All @@ -120,7 +120,7 @@ func TestReactorConcurrency(t *testing.T) {
reactors[1].mempool.Lock()
defer reactors[1].mempool.Unlock()
err := reactors[1].mempool.Update(newTestBlock(1, []types.Tx{}),
make([]*abci.ResponseDeliverTx, 0), nil, nil)
make([]*abci.ResponseDeliverTx, 0), nil)
assert.NoError(t, err)
}()

Expand Down
1 change: 0 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, config.P2P.RecvAsync, config.P2P.MempoolRecvBufSize, mempool)
Expand Down
2 changes: 0 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func TestCreateProposalBlock(t *testing.T) {
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)

Expand Down Expand Up @@ -345,7 +344,6 @@ func TestMaxProposalBlockSize(t *testing.T) {
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)

Expand Down
Loading

0 comments on commit 800cab3

Please sign in to comment.