diff --git a/config/consensus.go b/config/consensus.go
index c3592e40d9..25b5dc7858 100644
--- a/config/consensus.go
+++ b/config/consensus.go
@@ -633,6 +633,9 @@ var StateProofTopVoters int
// in a block must not exceed MaxTxnBytesPerBlock.
var MaxTxnBytesPerBlock int
+// MaxAppTxnForeignApps is the max number of foreign apps per txn across all consensus versions
+var MaxAppTxnForeignApps int
+
func checkSetMax(value int, curMax *int) {
if value > *curMax {
*curMax = value
@@ -681,6 +684,8 @@ func checkSetAllocBounds(p ConsensusParams) {
checkSetMax(p.MaxAppKeyLen, &MaxAppBytesKeyLen)
checkSetMax(int(p.StateProofTopVoters), &StateProofTopVoters)
checkSetMax(p.MaxTxnBytesPerBlock, &MaxTxnBytesPerBlock)
+
+ checkSetMax(p.MaxAppTxnForeignApps, &MaxAppTxnForeignApps)
}
// SaveConfigurableConsensus saves the configurable protocols file to the provided data directory.
diff --git a/config/localTemplate.go b/config/localTemplate.go
index 74c0b2e08a..7e92d5ed7a 100644
--- a/config/localTemplate.go
+++ b/config/localTemplate.go
@@ -42,7 +42,7 @@ type Local struct {
// Version tracks the current version of the defaults so we can migrate old -> new
// This is specifically important whenever we decide to change the default value
// for an existing parameter. This field tag must be updated any time we add a new version.
- Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31"`
+ Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31" version[32]:"32"`
// Archival nodes retain a full copy of the block history. Non-Archival nodes will delete old blocks and only retain what's need to properly validate blockchain messages (the precise number of recent blocks depends on the consensus parameters. Currently the last 1321 blocks are required). This means that non-Archival nodes require significantly less storage than Archival nodes. Relays (nodes with a valid NetAddress) are always Archival, regardless of this setting. This may change in the future. If setting this to true for the first time, the existing ledger may need to be deleted to get the historical values stored as the setting only effects current blocks forward. To do this, shutdown the node and delete all .sqlite files within the data/testnet-version directory, except the crash.sqlite file. Restart the node and wait for the node to sync.
Archival bool `version[0]:"false"`
@@ -231,7 +231,21 @@ type Local struct {
// TxBacklogReservedCapacityPerPeer determines how much dedicated serving capacity the TxBacklog gives each peer
TxBacklogReservedCapacityPerPeer int `version[27]:"20"`
- // EnableTxBacklogRateLimiting controls if a rate limiter and congestion manager shouild be attached to the tx backlog enqueue process
+ // TxBacklogAppTxRateLimiterMaxSize denotes a max size for the tx rate limiter
+ // calculated as "a thousand apps on a network of thousand of peers"
+ TxBacklogAppTxRateLimiterMaxSize int `version[32]:"1048576"`
+
+ // TxBacklogAppTxPerSecondRate determines a target app per second rate for the app tx rate limiter
+ TxBacklogAppTxPerSecondRate int `version[32]:"100"`
+
+ // TxBacklogRateLimitingCongestionRatio determines the backlog filling threshold percentage at which the app limiter kicks in
+ // or the tx backlog rate limiter kicks off.
+ TxBacklogRateLimitingCongestionPct int `version[32]:"50"`
+
+ // EnableTxBacklogAppRateLimiting controls if an app rate limiter should be attached to the tx backlog enqueue process
+ EnableTxBacklogAppRateLimiting bool `version[32]:"true"`
+
+ // EnableTxBacklogRateLimiting controls if a rate limiter and congestion manager should be attached to the tx backlog enqueue process
// if enabled, the over-all TXBacklog Size will be larger by MAX_PEERS*TxBacklogReservedCapacityPerPeer
EnableTxBacklogRateLimiting bool `version[27]:"false" version[30]:"true"`
diff --git a/config/local_defaults.go b/config/local_defaults.go
index 12689e4e9b..06a26f2c1c 100644
--- a/config/local_defaults.go
+++ b/config/local_defaults.go
@@ -20,7 +20,7 @@
package config
var defaultLocal = Local{
- Version: 31,
+ Version: 32,
AccountUpdatesStatsInterval: 5000000000,
AccountsRebuildSynchronousMode: 1,
AgreementIncomingBundlesQueueLength: 15,
@@ -81,6 +81,7 @@ var defaultLocal = Local{
EnableRequestLogger: false,
EnableRuntimeMetrics: false,
EnableTopAccountsReporting: false,
+ EnableTxBacklogAppRateLimiting: true,
EnableTxBacklogRateLimiting: true,
EnableTxnEvalTracer: false,
EnableUsageLog: false,
@@ -141,6 +142,9 @@ var defaultLocal = Local{
TrackerDBDir: "",
TransactionSyncDataExchangeRate: 0,
TransactionSyncSignificantMessageThreshold: 0,
+ TxBacklogAppTxPerSecondRate: 100,
+ TxBacklogAppTxRateLimiterMaxSize: 1048576,
+ TxBacklogRateLimitingCongestionPct: 50,
TxBacklogReservedCapacityPerPeer: 20,
TxBacklogServiceRateWindowSeconds: 10,
TxBacklogSize: 26000,
diff --git a/data/appRateLimiter.go b/data/appRateLimiter.go
new file mode 100644
index 0000000000..b7684409a8
--- /dev/null
+++ b/data/appRateLimiter.go
@@ -0,0 +1,322 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package data
+
+import (
+ "encoding/binary"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/crypto"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/data/transactions"
+ "github.com/algorand/go-algorand/protocol"
+ "github.com/algorand/go-algorand/util"
+ "github.com/algorand/go-deadlock"
+ "golang.org/x/crypto/blake2b"
+)
+
+const numBuckets = 128
+
+type keyType [8]byte
+
+// appRateLimiter implements a sliding window counter rate limiter for applications.
+// It is a sharded map with numBuckets of maps each protected by its own mutex.
+// Bucket is selected by hashing the application index with a seed (see memhash64).
+// LRU is used to evict entries from each bucket, and "last use" is updated on each attempt, not admission.
+// This is mostly done to simplify the implementation and does not look affecting the correctness.
+type appRateLimiter struct {
+ maxBucketSize int
+ serviceRatePerWindow uint64
+ serviceRateWindow time.Duration
+
+ // seed for hashing application index to bucket
+ seed uint64
+ // salt for hashing application index + origin address
+ salt [16]byte
+
+ buckets [numBuckets]appRateLimiterBucket
+
+ // evictions
+ // TODO: delete?
+ evictions uint64
+ evictionTime uint64
+}
+
+type appRateLimiterBucket struct {
+ entries map[keyType]*appRateLimiterEntry
+ lru *util.List[keyType]
+ mu deadlock.RWMutex // mutex protects both map and the list access
+}
+
+type appRateLimiterEntry struct {
+ prev atomic.Int64
+ cur atomic.Int64
+ interval int64 // numeric representation of the current interval value
+ lruElement *util.ListNode[keyType]
+}
+
+// makeAppRateLimiter creates a new appRateLimiter from the parameters:
+// maxCacheSize is the maximum number of entries to keep in the cache to keep it memory bounded
+// maxAppPeerRate is the maximum number of admitted apps per peer per second
+// serviceRateWindow is the service window
+func makeAppRateLimiter(maxCacheSize int, maxAppPeerRate uint64, serviceRateWindow time.Duration) *appRateLimiter {
+ // convert target per app rate to per window service rate
+ serviceRatePerWindow := maxAppPeerRate * uint64(serviceRateWindow/time.Second)
+ maxBucketSize := maxCacheSize / numBuckets
+ if maxBucketSize == 0 {
+ // got the max size less then buckets, use maps of 1
+ maxBucketSize = 1
+ }
+ r := &appRateLimiter{
+ maxBucketSize: maxBucketSize,
+ serviceRatePerWindow: serviceRatePerWindow,
+ serviceRateWindow: serviceRateWindow,
+ seed: crypto.RandUint64(),
+ }
+ crypto.RandBytes(r.salt[:])
+
+ for i := 0; i < numBuckets; i++ {
+ r.buckets[i] = appRateLimiterBucket{entries: make(map[keyType]*appRateLimiterEntry), lru: util.NewList[keyType]()}
+ }
+ return r
+}
+
+func (r *appRateLimiter) entry(b *appRateLimiterBucket, key keyType, curInt int64) (*appRateLimiterEntry, bool) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if len(b.entries) >= r.maxBucketSize {
+ // evict the oldest entry
+ start := time.Now()
+
+ el := b.lru.Back()
+ delete(b.entries, el.Value)
+ b.lru.Remove(el)
+
+ atomic.AddUint64(&r.evictions, 1)
+ atomic.AddUint64(&r.evictionTime, uint64(time.Since(start)))
+ }
+
+ entry, ok := b.entries[key]
+ if ok {
+ el := entry.lruElement
+ // note, the entry is marked as recently used even before the rate limiting decision
+ // since it does not make sense to evict keys that are actively attempted
+ b.lru.MoveToFront(el)
+
+ // the same logic is applicable to the intervals: if a new interval is started, update the entry
+ // by moving the current value to the previous and resetting the current.
+ // this is done under a lock so that the interval is not updated concurrently.
+ // The rationale is even this requests is going to be dropped the new interval already started
+ // and it is OK to start a new interval and have it prepared for upcoming requests
+ var newPrev int64 = 0
+ switch entry.interval {
+ case curInt:
+ // the interval is the same, do nothing
+ case curInt - 1:
+ // these are continuous intervals, use current value as a new previous
+ newPrev = entry.cur.Load()
+ fallthrough
+ default:
+ // non-contiguous intervals, reset the entry
+ entry.prev.Store(newPrev)
+ entry.cur.Store(0)
+ entry.interval = curInt
+ }
+ } else {
+ el := b.lru.PushFront(key)
+ entry = &appRateLimiterEntry{interval: curInt, lruElement: el}
+ b.entries[key] = entry
+ }
+ return entry, ok
+}
+
+// interval calculates the interval numeric representation based on the given time
+func (r *appRateLimiter) interval(nowNano int64) int64 {
+ return nowNano / int64(r.serviceRateWindow)
+}
+
+// fraction calculates the fraction of the interval that is elapsed since the given time
+func (r *appRateLimiter) fraction(nowNano int64) float64 {
+ return float64(nowNano%int64(r.serviceRateWindow)) / float64(r.serviceRateWindow)
+}
+
+// shouldDrop returns true if the given transaction group should be dropped based on the
+// on the rate for the applications in the group: the entire group is dropped if a single application
+// exceeds the rate.
+func (r *appRateLimiter) shouldDrop(txgroup []transactions.SignedTxn, origin []byte) bool {
+ return r.shouldDropAt(txgroup, origin, time.Now().UnixNano())
+}
+
+// shouldDropAt is the same as shouldDrop but accepts the current time as a parameter
+// in order to make it testable
+func (r *appRateLimiter) shouldDropAt(txgroup []transactions.SignedTxn, origin []byte, nowNano int64) bool {
+ keysBuckets := txgroupToKeys(txgroup, origin, r.seed, r.salt, numBuckets)
+ defer putAppKeyBuf(keysBuckets)
+ if len(keysBuckets.keys) == 0 {
+ return false
+ }
+ return r.shouldDropKeys(keysBuckets.buckets, keysBuckets.keys, nowNano)
+}
+
+func (r *appRateLimiter) shouldDropKeys(buckets []int, keys []keyType, nowNano int64) bool {
+ curInt := r.interval(nowNano)
+ curFraction := r.fraction(nowNano)
+
+ for i, key := range keys {
+ // TODO: reuse last entry for matched keys and buckets?
+ b := buckets[i]
+ entry, has := r.entry(&r.buckets[b], key, curInt)
+ if !has {
+ // new entry, defaults are provided by entry() function
+ // admit and increment
+ entry.cur.Add(1)
+ continue
+ }
+
+ rate := int64(float64(entry.prev.Load())*(1-curFraction)) + entry.cur.Load() + 1
+ if rate > int64(r.serviceRatePerWindow) {
+ return true
+ }
+ entry.cur.Add(1)
+ }
+
+ return false
+}
+
+func (r *appRateLimiter) len() int {
+ var count int
+ for i := 0; i < numBuckets; i++ {
+ r.buckets[i].mu.RLock()
+ count += len(r.buckets[i].entries)
+ r.buckets[i].mu.RUnlock()
+ }
+ return count
+}
+
+var appKeyPool = sync.Pool{
+ New: func() interface{} {
+ return &appKeyBuf{
+ // max config.MaxTxGroupSize apps per txgroup, each app has up to MaxAppTxnForeignApps extra foreign apps
+ // at moment of writing config.MaxTxGroupSize = 16, config.MaxAppTxnForeignApps = 8
+ keys: make([]keyType, 0, config.MaxTxGroupSize*(1+config.MaxAppTxnForeignApps)),
+ buckets: make([]int, 0, config.MaxTxGroupSize*(1+config.MaxAppTxnForeignApps)),
+ }
+ },
+}
+
+// appKeyBuf is a reusable storage for key and bucket slices
+type appKeyBuf struct {
+ keys []keyType
+ buckets []int
+}
+
+func getAppKeyBuf() *appKeyBuf {
+ buf := appKeyPool.Get().(*appKeyBuf)
+ buf.buckets = buf.buckets[:0]
+ buf.keys = buf.keys[:0]
+ return buf
+}
+
+func putAppKeyBuf(buf *appKeyBuf) {
+ appKeyPool.Put(buf)
+}
+
+// txgroupToKeys converts txgroup data to keys
+func txgroupToKeys(txgroup []transactions.SignedTxn, origin []byte, seed uint64, salt [16]byte, numBuckets int) *appKeyBuf {
+ keysBuckets := getAppKeyBuf()
+ // since blake2 is a crypto hash function it seems OK to shrink 32 bytes digest down to 8.
+ // Rationale: we expect thousands of apps sent from thousands of peers,
+ // so required millions of unique pairs => 8 bytes should be enough.
+ // The 16 bytes salt makes it harder to find collisions if an adversary attempts to censor
+ // some app by finding a collision with some app and flood a network with such transactions:
+ // h(app + relay_ip) = h(app2 + relay_ip).
+
+ // uint64 + 16 bytes of salt + up to 16 bytes of address
+ // salt and origin are fixed so pre-copy them into the buf
+ var buf [8 + 16 + 16]byte
+ copy(buf[8:], salt[:])
+ copied := copy(buf[8+16:], origin)
+ bufLen := 8 + 16 + copied
+
+ txnToDigest := func(appIdx basics.AppIndex) (key keyType) {
+ binary.LittleEndian.PutUint64(buf[:8], uint64(appIdx))
+ h := blake2b.Sum256(buf[:bufLen])
+ copy(key[:], h[:len(keyType{})])
+ return
+ }
+ txnToBucket := func(appIdx basics.AppIndex) int {
+ return int(memhash64(uint64(appIdx), seed) % uint64(numBuckets))
+ }
+ seen := make(map[basics.AppIndex]struct{}, len(txgroup)*(1+config.MaxAppTxnForeignApps))
+ valid := func(appIdx basics.AppIndex) bool {
+ if appIdx != 0 {
+ _, ok := seen[appIdx]
+ return !ok
+ }
+ return false
+ }
+ for i := range txgroup {
+ if txgroup[i].Txn.Type == protocol.ApplicationCallTx {
+ appIdx := txgroup[i].Txn.ApplicationID
+ if valid(appIdx) {
+ keysBuckets.buckets = append(keysBuckets.buckets, txnToBucket(appIdx))
+ keysBuckets.keys = append(keysBuckets.keys, txnToDigest(appIdx))
+ seen[appIdx] = struct{}{}
+ }
+ // hash appIdx into a bucket, do not use modulo without hashing first since it could
+ // assign two vanilla (and presumable, popular) apps to the same bucket.
+ if len(txgroup[i].Txn.ForeignApps) > 0 {
+ for _, appIdx := range txgroup[i].Txn.ForeignApps {
+ if valid(appIdx) {
+ keysBuckets.buckets = append(keysBuckets.buckets, txnToBucket(appIdx))
+ keysBuckets.keys = append(keysBuckets.keys, txnToDigest(appIdx))
+ seen[appIdx] = struct{}{}
+ }
+ }
+ }
+ }
+ }
+ return keysBuckets
+}
+
+const (
+ // Constants for multiplication: four random odd 64-bit numbers.
+ m1 = 16877499708836156737
+ m2 = 2820277070424839065
+ m3 = 9497967016996688599
+ m4 = 15839092249703872147
+)
+
+// memhash64 is uint64 hash function from go runtime
+// https://go-review.googlesource.com/c/go/+/59352/4/src/runtime/hash64.go#96
+func memhash64(val uint64, seed uint64) uint64 {
+ h := seed
+ h ^= val
+ h = rotl31(h*m1) * m2
+ h ^= h >> 29
+ h *= m3
+ h ^= h >> 32
+ return h
+}
+
+func rotl31(x uint64) uint64 {
+ return (x << 31) | (x >> (64 - 31))
+}
diff --git a/data/appRateLimiter_test.go b/data/appRateLimiter_test.go
new file mode 100644
index 0000000000..2da9fddfa0
--- /dev/null
+++ b/data/appRateLimiter_test.go
@@ -0,0 +1,526 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package data
+
+import (
+ "encoding/binary"
+ "testing"
+ "time"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/crypto"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/data/transactions"
+ "github.com/algorand/go-algorand/protocol"
+ "github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/stretchr/testify/require"
+ "golang.org/x/crypto/blake2b"
+ "golang.org/x/exp/rand"
+)
+
+func TestAppRateLimiter_Make(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ rate := uint64(10)
+ window := 1 * time.Second
+ rm := makeAppRateLimiter(10, rate, window)
+
+ require.Equal(t, 1, rm.maxBucketSize)
+ require.NotEmpty(t, rm.seed)
+ require.NotEmpty(t, rm.salt)
+ for i := 0; i < len(rm.buckets); i++ {
+ require.NotNil(t, rm.buckets[i].entries)
+ require.NotNil(t, rm.buckets[i].lru)
+ }
+}
+
+func TestAppRateLimiter_NoApps(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ rate := uint64(10)
+ window := 1 * time.Second
+ rm := makeAppRateLimiter(10, rate, window)
+
+ txns := []transactions.SignedTxn{
+ {
+ Txn: transactions.Transaction{
+ Type: protocol.AssetConfigTx,
+ },
+ },
+ {
+ Txn: transactions.Transaction{
+ Type: protocol.PaymentTx,
+ },
+ },
+ }
+ drop := rm.shouldDrop(txns, nil)
+ require.False(t, drop)
+}
+
+func getAppTxnGroup(appIdx basics.AppIndex) []transactions.SignedTxn {
+ apptxn := transactions.Transaction{
+ Type: protocol.ApplicationCallTx,
+ ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{
+ ApplicationID: appIdx,
+ },
+ }
+
+ return []transactions.SignedTxn{{Txn: apptxn}}
+}
+
+func TestAppRateLimiter_Basics(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ rate := uint64(10)
+ window := 1 * time.Second
+ rm := makeAppRateLimiter(512, rate, window)
+
+ txns := getAppTxnGroup(1)
+ now := time.Now().UnixNano()
+ drop := rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+
+ for i := len(txns); i < int(rate); i++ {
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+ }
+
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.True(t, drop)
+
+ require.Equal(t, 1, rm.len())
+
+ // check a single group cannot exceed the rate
+ apptxn2 := txns[0].Txn
+ apptxn2.ApplicationID = 2
+ txns = make([]transactions.SignedTxn, 0, rate+1)
+ for i := 0; i < int(rate+1); i++ {
+ txns = append(txns, transactions.SignedTxn{
+ Txn: apptxn2,
+ })
+ }
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+
+ // check multple groups can exceed the rate (-1 comes from the previous check)
+ for i := 0; i < int(rate)-1; i++ {
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+ }
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.True(t, drop)
+
+ require.Equal(t, 2, rm.len())
+
+ // check foreign apps in the same group do not trigger the rate limit
+ apptxn3 := txns[0].Txn
+ apptxn3.ApplicationID = 3
+ for i := 0; i < int(rate); i++ {
+ apptxn3.ForeignApps = append(apptxn3.ForeignApps, 3)
+ }
+ txns = []transactions.SignedTxn{{Txn: apptxn3}}
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+
+ // check multple groups with foreign apps can exceed the rate (-1 comes from the previous check)
+ for i := 0; i < int(rate)-1; i++ {
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+ }
+ drop = rm.shouldDropAt(txns, nil, now)
+ require.True(t, drop)
+
+ require.Equal(t, 3, rm.len())
+}
+
+// TestAppRateLimiter_Interval checks prev + cur rate approximation logic
+func TestAppRateLimiter_Interval(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ rate := uint64(10)
+ window := 10 * time.Second
+ perSecondRate := uint64(window) / rate / uint64(time.Second)
+ rm := makeAppRateLimiter(512, perSecondRate, window)
+
+ txns := getAppTxnGroup(1)
+ now := time.Date(2023, 9, 11, 10, 10, 11, 0, time.UTC).UnixNano() // 11 sec => 1 sec into the interval
+
+ // fill 80% of the current interval
+ // switch to the next interval
+ // ensure only 30% of the rate is available (8 * 0.9 = 7.2 => 7)
+ // 0.9 is calculated as 1 - 0.1 (fraction of the interval elapsed)
+ // since the next interval at second 21 would by 1 sec (== 10% == 0.1) after the interval beginning
+ for i := 0; i < int(0.8*float64(rate)); i++ {
+ drop := rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+ }
+
+ next := now + int64(window)
+ for i := 0; i < int(0.3*float64(rate)); i++ {
+ drop := rm.shouldDropAt(txns, nil, next)
+ require.False(t, drop)
+ }
+
+ drop := rm.shouldDropAt(txns, nil, next)
+ require.True(t, drop)
+}
+
+// TestAppRateLimiter_IntervalFull checks the cur counter accounts only admitted requests
+func TestAppRateLimiter_IntervalAdmitted(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ rate := uint64(10)
+ window := 10 * time.Second
+ perSecondRate := uint64(window) / rate / uint64(time.Second)
+ rm := makeAppRateLimiter(512, perSecondRate, window)
+
+ txns := getAppTxnGroup(1)
+ bk := txgroupToKeys(getAppTxnGroup(basics.AppIndex(1)), nil, rm.seed, rm.salt, numBuckets)
+ require.Equal(t, 1, len(bk.buckets))
+ require.Equal(t, 1, len(bk.keys))
+ b := bk.buckets[0]
+ k := bk.keys[0]
+ now := time.Date(2023, 9, 11, 10, 10, 11, 0, time.UTC).UnixNano() // 11 sec => 1 sec into the interval
+
+ // fill a current interval with more than rate requests
+ // ensure the counter does not exceed the rate
+ for i := 0; i < int(rate); i++ {
+ drop := rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+ }
+ drop := rm.shouldDropAt(txns, nil, now)
+ require.True(t, drop)
+
+ entry := rm.buckets[b].entries[k]
+ require.NotNil(t, entry)
+ require.Equal(t, int64(rate), entry.cur.Load())
+}
+
+// TestAppRateLimiter_IntervalSkip checks that the rate is reset when no requests within some interval
+func TestAppRateLimiter_IntervalSkip(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ rate := uint64(10)
+ window := 10 * time.Second
+ perSecondRate := uint64(window) / rate / uint64(time.Second)
+ rm := makeAppRateLimiter(512, perSecondRate, window)
+
+ txns := getAppTxnGroup(1)
+ now := time.Date(2023, 9, 11, 10, 10, 11, 0, time.UTC).UnixNano() // 11 sec => 1 sec into the interval
+
+ // fill 80% of the current interval
+ // switch to the next next interval
+ // ensure all capacity is available
+
+ for i := 0; i < int(0.8*float64(rate)); i++ {
+ drop := rm.shouldDropAt(txns, nil, now)
+ require.False(t, drop)
+ }
+
+ nextnext := now + int64(2*window)
+ for i := 0; i < int(rate); i++ {
+ drop := rm.shouldDropAt(txns, nil, nextnext)
+ require.False(t, drop)
+ }
+
+ drop := rm.shouldDropAt(txns, nil, nextnext)
+ require.True(t, drop)
+}
+
+func TestAppRateLimiter_IPAddr(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ rate := uint64(10)
+ window := 10 * time.Second
+ perSecondRate := uint64(window) / rate / uint64(time.Second)
+ rm := makeAppRateLimiter(512, perSecondRate, window)
+
+ txns := getAppTxnGroup(1)
+ now := time.Now().UnixNano()
+
+ for i := 0; i < int(rate); i++ {
+ drop := rm.shouldDropAt(txns, []byte{1}, now)
+ require.False(t, drop)
+ drop = rm.shouldDropAt(txns, []byte{2}, now)
+ require.False(t, drop)
+ }
+
+ drop := rm.shouldDropAt(txns, []byte{1}, now)
+ require.True(t, drop)
+ drop = rm.shouldDropAt(txns, []byte{2}, now)
+ require.True(t, drop)
+}
+
+// TestAppRateLimiter_MaxSize puts size+1 elements into a single bucket and ensures the total size is capped
+func TestAppRateLimiter_MaxSize(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ const bucketSize = 4
+ const size = bucketSize * numBuckets
+ const rate uint64 = 10
+ window := 10 * time.Second
+ rm := makeAppRateLimiter(size, rate, window)
+
+ for i := 1; i <= int(size)+1; i++ {
+ drop := rm.shouldDrop(getAppTxnGroup(basics.AppIndex(1)), []byte{byte(i)})
+ require.False(t, drop)
+ }
+ bucket := int(memhash64(uint64(1), rm.seed) % numBuckets)
+ require.Equal(t, bucketSize, len(rm.buckets[bucket].entries))
+ var totalSize int
+ for i := 0; i < len(rm.buckets); i++ {
+ totalSize += len(rm.buckets[i].entries)
+ if i != bucket {
+ require.Equal(t, 0, len(rm.buckets[i].entries))
+ }
+ }
+ require.LessOrEqual(t, totalSize, int(size))
+}
+
+// TestAppRateLimiter_EvictOrder ensures that the least recent used is evicted
+func TestAppRateLimiter_EvictOrder(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ const bucketSize = 4
+ const size = bucketSize * numBuckets
+ const rate uint64 = 10
+ window := 10 * time.Second
+ rm := makeAppRateLimiter(size, rate, window)
+
+ keys := make([]keyType, 0, int(bucketSize)+1)
+ bucket := int(memhash64(uint64(1), rm.seed) % numBuckets)
+ for i := 0; i < bucketSize; i++ {
+ bk := txgroupToKeys(getAppTxnGroup(basics.AppIndex(1)), []byte{byte(i)}, rm.seed, rm.salt, numBuckets)
+ require.Equal(t, 1, len(bk.buckets))
+ require.Equal(t, 1, len(bk.keys))
+ require.Equal(t, bucket, bk.buckets[0])
+ keys = append(keys, bk.keys[0])
+ drop := rm.shouldDrop(getAppTxnGroup(basics.AppIndex(1)), []byte{byte(i)})
+ require.False(t, drop)
+ }
+ require.Equal(t, bucketSize, len(rm.buckets[bucket].entries))
+
+ // add one more and expect the first evicted
+ bk := txgroupToKeys(getAppTxnGroup(basics.AppIndex(1)), []byte{byte(bucketSize)}, rm.seed, rm.salt, numBuckets)
+ require.Equal(t, 1, len(bk.buckets))
+ require.Equal(t, 1, len(bk.keys))
+ require.Equal(t, bucket, bk.buckets[0])
+ drop := rm.shouldDrop(getAppTxnGroup(basics.AppIndex(1)), []byte{byte(bucketSize)})
+ require.False(t, drop)
+
+ require.Equal(t, bucketSize, len(rm.buckets[bucket].entries))
+ require.NotContains(t, rm.buckets[bucket].entries, keys[0])
+ for i := 1; i < len(keys); i++ {
+ require.Contains(t, rm.buckets[bucket].entries, keys[i])
+ }
+
+ var totalSize int
+ for i := 0; i < len(rm.buckets); i++ {
+ totalSize += len(rm.buckets[i].entries)
+ if i != bucket {
+ require.Equal(t, 0, len(rm.buckets[i].entries))
+ }
+ }
+ require.LessOrEqual(t, totalSize, int(size))
+}
+
+func BenchmarkBlake2(b *testing.B) {
+ var salt [16]byte
+ crypto.RandBytes(salt[:])
+ origin := make([]byte, 4)
+
+ var buf [8 + 16 + 16]byte // uint64 + 16 bytes of salt + up to 16 bytes of address
+
+ b.Run("blake2b-sum256", func(b *testing.B) {
+ total := 0
+ for i := 0; i < b.N; i++ {
+ binary.LittleEndian.PutUint64(buf[:8], rand.Uint64())
+ copy(buf[8:], salt[:])
+ copied := copy(buf[8+16:], origin)
+ h := blake2b.Sum256(buf[:8+16+copied])
+ total += len(h[:])
+ }
+ b.Logf("total1: %d", total) // to prevent optimizing out the loop
+ })
+
+ b.Run("blake2b-sum8", func(b *testing.B) {
+ total := 0
+ for i := 0; i < b.N; i++ {
+ d, err := blake2b.New(8, nil)
+ require.NoError(b, err)
+
+ binary.LittleEndian.PutUint64(buf[:8], rand.Uint64())
+ copy(buf[8:], salt[:])
+ copied := copy(buf[8+16:], origin)
+
+ _, err = d.Write(buf[:8+16+copied])
+ require.NoError(b, err)
+ h := d.Sum([]byte{})
+ total += len(h[:])
+ }
+ b.Logf("total2: %d", total)
+ })
+}
+
+func BenchmarkAppRateLimiter(b *testing.B) {
+ cfg := config.GetDefaultLocal()
+
+ b.Run("multi bucket no evict", func(b *testing.B) {
+ rm := makeAppRateLimiter(
+ cfg.TxBacklogAppTxRateLimiterMaxSize,
+ uint64(cfg.TxBacklogAppTxPerSecondRate),
+ time.Duration(cfg.TxBacklogServiceRateWindowSeconds)*time.Second,
+ )
+ dropped := 0
+ for i := 0; i < b.N; i++ {
+ if rm.shouldDrop(getAppTxnGroup(basics.AppIndex(i%512)), []byte{byte(i), byte(i % 256)}) {
+ dropped++
+ }
+ }
+ b.ReportMetric(float64(dropped)/float64(b.N), "%_drop")
+ if rm.evictions > 0 {
+ b.Logf("# evictions %d, time %d us", rm.evictions, rm.evictionTime/uint64(time.Microsecond))
+ }
+ })
+
+ b.Run("single bucket no evict", func(b *testing.B) {
+ rm := makeAppRateLimiter(
+ cfg.TxBacklogAppTxRateLimiterMaxSize,
+ uint64(cfg.TxBacklogAppTxPerSecondRate),
+ time.Duration(cfg.TxBacklogServiceRateWindowSeconds)*time.Second,
+ )
+ dropped := 0
+ for i := 0; i < b.N; i++ {
+ if rm.shouldDrop(getAppTxnGroup(basics.AppIndex(1)), []byte{byte(i), byte(i % 256)}) {
+ dropped++
+ }
+ }
+ b.ReportMetric(float64(dropped)/float64(b.N), "%_drop")
+ if rm.evictions > 0 {
+ b.Logf("# evictions %d, time %d us", rm.evictions, rm.evictionTime/uint64(time.Microsecond))
+ }
+ })
+
+ b.Run("single bucket w evict", func(b *testing.B) {
+ rm := makeAppRateLimiter(
+ cfg.TxBacklogAppTxRateLimiterMaxSize,
+ uint64(cfg.TxBacklogAppTxPerSecondRate),
+ time.Duration(cfg.TxBacklogServiceRateWindowSeconds)*time.Second,
+ )
+ dropped := 0
+ for i := 0; i < b.N; i++ {
+ if rm.shouldDrop(getAppTxnGroup(basics.AppIndex(1)), []byte{byte(i), byte(i / 256), byte(i % 256)}) {
+ dropped++
+ }
+ }
+ b.ReportMetric(float64(dropped)/float64(b.N), "%_drop")
+ if rm.evictions > 0 {
+ b.Logf("# evictions %d, time %d us", rm.evictions, rm.evictionTime/uint64(time.Microsecond))
+ }
+ })
+}
+
+func TestAppRateLimiter_TxgroupToKeys(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ apptxn := transactions.Transaction{
+ Type: protocol.ApplicationCallTx,
+ ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{
+ ApplicationID: 0,
+ ForeignApps: []basics.AppIndex{0},
+ },
+ }
+ txgroup := []transactions.SignedTxn{{Txn: apptxn}}
+
+ kb := txgroupToKeys(txgroup, nil, 123, [16]byte{}, 1)
+ require.Equal(t, 0, len(kb.keys))
+ require.Equal(t, len(kb.buckets), len(kb.buckets))
+ putAppKeyBuf(kb)
+
+ txgroup[0].Txn.ApplicationID = 1
+ kb = txgroupToKeys(txgroup, nil, 123, [16]byte{}, 1)
+ require.Equal(t, 1, len(kb.keys))
+ require.Equal(t, len(kb.buckets), len(kb.buckets))
+ putAppKeyBuf(kb)
+
+ txgroup[0].Txn.ForeignApps = append(txgroup[0].Txn.ForeignApps, 1)
+ kb = txgroupToKeys(txgroup, nil, 123, [16]byte{}, 1)
+ require.Equal(t, 1, len(kb.keys))
+ require.Equal(t, len(kb.buckets), len(kb.buckets))
+ putAppKeyBuf(kb)
+
+ txgroup[0].Txn.ForeignApps = append(txgroup[0].Txn.ForeignApps, 2)
+ kb = txgroupToKeys(txgroup, nil, 123, [16]byte{}, 1)
+ require.Equal(t, 2, len(kb.keys))
+ require.Equal(t, len(kb.buckets), len(kb.buckets))
+ putAppKeyBuf(kb)
+
+ apptxn.ApplicationID = 2
+ txgroup = append(txgroup, transactions.SignedTxn{Txn: apptxn})
+ kb = txgroupToKeys(txgroup, nil, 123, [16]byte{}, 1)
+ require.Equal(t, 2, len(kb.keys))
+ require.Equal(t, len(kb.buckets), len(kb.buckets))
+ putAppKeyBuf(kb)
+}
+
+func BenchmarkAppRateLimiter_TxgroupToKeys(b *testing.B) {
+ rnd := rand.New(rand.NewSource(123))
+
+ txgroups := make([][]transactions.SignedTxn, 0, b.N)
+ for i := 0; i < b.N; i++ {
+ txgroup := make([]transactions.SignedTxn, 0, config.MaxTxGroupSize)
+ for j := 0; j < config.MaxTxGroupSize; j++ {
+ apptxn := transactions.Transaction{
+ Type: protocol.ApplicationCallTx,
+ ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{
+ ApplicationID: basics.AppIndex(rnd.Uint64()),
+ ForeignApps: []basics.AppIndex{basics.AppIndex(rnd.Uint64()), basics.AppIndex(rnd.Uint64()), basics.AppIndex(rnd.Uint64()), basics.AppIndex(rnd.Uint64())},
+ },
+ }
+ txgroup = append(txgroup, transactions.SignedTxn{Txn: apptxn})
+ }
+ txgroups = append(txgroups, txgroup)
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ origin := make([]byte, 4)
+ _, err := rnd.Read(origin)
+ require.NoError(b, err)
+ require.NotEmpty(b, origin)
+
+ salt := [16]byte{}
+ _, err = rnd.Read(salt[:])
+ require.NoError(b, err)
+ require.NotEmpty(b, salt)
+
+ for i := 0; i < b.N; i++ {
+ kb := txgroupToKeys(txgroups[i], origin, 123, salt, numBuckets)
+ putAppKeyBuf(kb)
+ }
+}
diff --git a/data/txHandler.go b/data/txHandler.go
index a5f44a1d07..4689a497b4 100644
--- a/data/txHandler.go
+++ b/data/txHandler.go
@@ -56,6 +56,7 @@ var transactionMessageTxGroupExcessive = metrics.MakeCounter(metrics.Transaction
var transactionMessageTxGroupFull = metrics.MakeCounter(metrics.TransactionMessageTxGroupFull)
var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg)
var transactionMessagesDupCanonical = metrics.MakeCounter(metrics.TransactionMessagesDupCanonical)
+var transactionMessagesAppLimiterDrop = metrics.MakeCounter(metrics.TransactionMessagesAppLimiterDrop)
var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize)
var transactionGroupTxSyncHandled = metrics.MakeCounter(metrics.TransactionGroupTxSyncHandled)
@@ -111,23 +112,25 @@ type txBacklogMsg struct {
// TxHandler handles transaction messages
type TxHandler struct {
- txPool *pools.TransactionPool
- ledger *Ledger
- genesisID string
- genesisHash crypto.Digest
- txVerificationPool execpool.BacklogPool
- backlogQueue chan *txBacklogMsg
- postVerificationQueue chan *verify.VerificationResult
- backlogWg sync.WaitGroup
- net network.GossipNode
- msgCache *txSaltedCache
- txCanonicalCache *digestCache
- ctx context.Context
- ctxCancel context.CancelFunc
- streamVerifier *execpool.StreamToBatch
- streamVerifierChan chan execpool.InputJob
- streamVerifierDropped chan *verify.UnverifiedTxnSigJob
- erl *util.ElasticRateLimiter
+ txPool *pools.TransactionPool
+ ledger *Ledger
+ genesisID string
+ genesisHash crypto.Digest
+ txVerificationPool execpool.BacklogPool
+ backlogQueue chan *txBacklogMsg
+ backlogCongestionThreshold float64
+ postVerificationQueue chan *verify.VerificationResult
+ backlogWg sync.WaitGroup
+ net network.GossipNode
+ msgCache *txSaltedCache
+ txCanonicalCache *digestCache
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ streamVerifier *execpool.StreamToBatch
+ streamVerifierChan chan execpool.InputJob
+ streamVerifierDropped chan *verify.UnverifiedTxnSigJob
+ erl *util.ElasticRateLimiter
+ appLimiter *appRateLimiter
}
// TxHandlerOpts is TxHandler configuration options
@@ -178,14 +181,29 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
handler.txCanonicalCache = makeDigestCache(int(opts.Config.TxIncomingFilterMaxSize))
}
- if opts.Config.EnableTxBacklogRateLimiting {
- rateLimiter := util.NewElasticRateLimiter(
- txBacklogSize,
- opts.Config.TxBacklogReservedCapacityPerPeer,
- time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
- txBacklogDroppedCongestionManagement,
- )
- handler.erl = rateLimiter
+ if opts.Config.EnableTxBacklogRateLimiting || opts.Config.EnableTxBacklogAppRateLimiting {
+ if opts.Config.TxBacklogRateLimitingCongestionPct > 100 || opts.Config.TxBacklogRateLimitingCongestionPct < 0 {
+ return nil, fmt.Errorf("invalid value for TxBacklogRateLimitingCongestionPct: %d", opts.Config.TxBacklogRateLimitingCongestionPct)
+ }
+ if opts.Config.EnableTxBacklogAppRateLimiting && opts.Config.TxBacklogAppTxRateLimiterMaxSize == 0 {
+ return nil, fmt.Errorf("invalid value for TxBacklogAppTxRateLimiterMaxSize: %d. App rate limiter enabled with zero size", opts.Config.TxBacklogAppTxRateLimiterMaxSize)
+ }
+ handler.backlogCongestionThreshold = float64(opts.Config.TxBacklogRateLimitingCongestionPct) / 100
+ if opts.Config.EnableTxBacklogRateLimiting {
+ handler.erl = util.NewElasticRateLimiter(
+ txBacklogSize,
+ opts.Config.TxBacklogReservedCapacityPerPeer,
+ time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
+ txBacklogDroppedCongestionManagement,
+ )
+ }
+ if opts.Config.EnableTxBacklogAppRateLimiting {
+ handler.appLimiter = makeAppRateLimiter(
+ opts.Config.TxBacklogAppTxRateLimiterMaxSize,
+ uint64(opts.Config.TxBacklogAppTxPerSecondRate),
+ time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
+ )
+ }
}
// prepare the transaction stream verifier
@@ -578,7 +596,9 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
var err error
var capguard *util.ErlCapacityGuard
+ var congested bool
if handler.erl != nil {
+ congested = float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
@@ -591,7 +611,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
return network.OutgoingMessage{Action: network.Ignore}
}
// if the backlog Queue has 50% of its buffer back, turn congestion control off
- if float64(cap(handler.backlogQueue))*0.5 > float64(len(handler.backlogQueue)) {
+ if !congested {
handler.erl.DisableCongestionControl()
}
}
@@ -640,6 +660,12 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
}
}
+ // rate limit per application in a group. Limiting any app in a group drops the entire message.
+ if handler.appLimiter != nil && congested && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) {
+ transactionMessagesAppLimiterDrop.Inc(nil)
+ return network.OutgoingMessage{Action: network.Ignore}
+ }
+
select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &rawmsg,
diff --git a/data/txHandler_test.go b/data/txHandler_test.go
index 486bd7c0f8..894fef9d4d 100644
--- a/data/txHandler_test.go
+++ b/data/txHandler_test.go
@@ -62,6 +62,9 @@ type mockSender struct{}
func (m mockSender) OnClose(func()) {}
+func (m mockSender) IPAddr() []byte { return nil }
+func (m mockSender) RoutingAddr() []byte { return nil }
+
// txHandlerConfig is a subset of tx handler related options from config.Local
type txHandlerConfig struct {
enableFilteringRawMsg bool
@@ -2503,3 +2506,178 @@ func TestTxHandlerRestartWithBacklogAndTxPool(t *testing.T) { //nolint:parallelt
require.False(t, inBad, "invalid transaction accepted")
}
}
+
+// check ERL and AppRateLimiter enablement with separate config values,
+// and the app limiter kicks in after congestion.
+func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ ledgerName := fmt.Sprintf("%s-mem", t.Name())
+ const inMem = true
+ log := logging.TestingLog(t)
+ log.SetLevel(logging.Panic)
+
+ cfg := config.GetDefaultLocal()
+ cfg.TxBacklogAppTxRateLimiterMaxSize = 100
+ cfg.TxBacklogServiceRateWindowSeconds = 1
+ cfg.TxBacklogAppTxPerSecondRate = 3
+ cfg.TxBacklogReservedCapacityPerPeer = 2
+ cfg.TxBacklogSize = 1
+ cfg.IncomingConnectionsLimit = 1
+ ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg)
+ require.NoError(t, err)
+ defer ledger.Close()
+
+ l := ledger
+
+ func() {
+ cfg.EnableTxBacklogRateLimiting = false
+ cfg.EnableTxBacklogAppRateLimiting = false
+ handler, err := makeTestTxHandler(l, cfg)
+ require.NoError(t, err)
+ defer handler.txVerificationPool.Shutdown()
+ defer close(handler.streamVerifierDropped)
+
+ require.Nil(t, handler.erl)
+ require.Nil(t, handler.appLimiter)
+ }()
+
+ func() {
+ cfg.EnableTxBacklogRateLimiting = true
+ cfg.EnableTxBacklogAppRateLimiting = false
+ handler, err := makeTestTxHandler(l, cfg)
+ require.NoError(t, err)
+ defer handler.txVerificationPool.Shutdown()
+ defer close(handler.streamVerifierDropped)
+
+ require.NotNil(t, handler.erl)
+ require.Nil(t, handler.appLimiter)
+ }()
+
+ cfg.EnableTxBacklogRateLimiting = true
+ cfg.EnableTxBacklogAppRateLimiting = true
+ handler, err := makeTestTxHandler(l, cfg)
+ require.NoError(t, err)
+ defer handler.txVerificationPool.Shutdown()
+ defer close(handler.streamVerifierDropped)
+ require.NotNil(t, handler.erl)
+ require.NotNil(t, handler.appLimiter)
+
+ var addr basics.Address
+ crypto.RandBytes(addr[:])
+
+ tx := transactions.Transaction{
+ Type: protocol.ApplicationCallTx,
+ Header: transactions.Header{
+ Sender: addr,
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
+ FirstValid: 0,
+ LastValid: basics.Round(proto.MaxTxnLife),
+ Note: make([]byte, 2),
+ },
+ ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{
+ ApplicationID: 1,
+ },
+ }
+ signedTx := tx.Sign(keypair()) // some random key
+ blob := protocol.Encode(&signedTx)
+ sender := mockSender{}
+
+ // submit and ensure it is accepted
+ congested := float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
+ require.False(t, congested)
+
+ action := handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender})
+ require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
+ require.Equal(t, 1, len(handler.backlogQueue))
+
+ // repeat the same txn, we are still not congested
+ congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
+ require.False(t, congested)
+
+ signedTx = tx.Sign(keypair())
+ blob = protocol.Encode(&signedTx)
+ action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender})
+ require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
+ require.Equal(t, 2, len(handler.backlogQueue))
+ require.Equal(t, 0, handler.appLimiter.len()) // no rate limiting yet
+
+ congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
+ require.True(t, congested)
+
+ // submit it again and the app rate limiter should kick in
+ signedTx = tx.Sign(keypair())
+ blob = protocol.Encode(&signedTx)
+ action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender})
+ require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
+ require.Equal(t, 3, len(handler.backlogQueue))
+
+ require.Equal(t, 1, handler.appLimiter.len())
+}
+
+func TestTxHandlerAppRateLimiter(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ const numUsers = 10
+ log := logging.TestingLog(t)
+ log.SetLevel(logging.Panic)
+
+ // prepare the accounts
+ addresses, secrets, genesis := makeTestGenesisAccounts(t, numUsers)
+ genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
+ ledgerName := fmt.Sprintf("%s-mem", t.Name())
+ const inMem = true
+
+ cfg := config.GetDefaultLocal()
+ cfg.EnableTxBacklogRateLimiting = true
+ cfg.TxBacklogAppTxRateLimiterMaxSize = 100
+ cfg.TxBacklogServiceRateWindowSeconds = 1
+ cfg.TxBacklogAppTxPerSecondRate = 3
+ ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
+ require.NoError(t, err)
+ defer ledger.Close()
+
+ l := ledger
+ handler, err := makeTestTxHandler(l, cfg)
+ require.NoError(t, err)
+ defer handler.txVerificationPool.Shutdown()
+ defer close(handler.streamVerifierDropped)
+
+ tx := transactions.Transaction{
+ Type: protocol.ApplicationCallTx,
+ Header: transactions.Header{
+ Sender: addresses[0],
+ Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
+ FirstValid: 0,
+ LastValid: basics.Round(proto.MaxTxnLife),
+ Note: make([]byte, 2),
+ },
+ ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{
+ ApplicationID: 1,
+ },
+ }
+ signedTx := tx.Sign(secrets[1])
+ blob := protocol.Encode(&signedTx)
+
+ action := handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: mockSender{}})
+ require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
+ require.Equal(t, 1, len(handler.backlogQueue))
+
+ // trigger the rate limiter and ensure the txn is ignored
+ tx2 := tx
+ for i := 0; i < cfg.TxBacklogAppTxPerSecondRate*cfg.TxBacklogServiceRateWindowSeconds; i++ {
+ tx2.ForeignApps = append(tx2.ForeignApps, 1)
+ }
+ signedTx2 := tx.Sign(secrets[1])
+ blob2 := protocol.Encode(&signedTx2)
+
+ action = handler.processIncomingTxn(network.IncomingMessage{Data: blob2, Sender: mockSender{}})
+ require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
+ require.Equal(t, 1, len(handler.backlogQueue))
+
+ // backlogQueue has the first txn, but the second one is dropped
+ msg := <-handler.backlogQueue
+ require.Equal(t, msg.rawmsg.Data, blob, blob)
+}
diff --git a/installer/config.json.example b/installer/config.json.example
index fccf558c44..ce02380331 100644
--- a/installer/config.json.example
+++ b/installer/config.json.example
@@ -1,5 +1,5 @@
{
- "Version": 31,
+ "Version": 32,
"AccountUpdatesStatsInterval": 5000000000,
"AccountsRebuildSynchronousMode": 1,
"AgreementIncomingBundlesQueueLength": 15,
@@ -60,6 +60,7 @@
"EnableRequestLogger": false,
"EnableRuntimeMetrics": false,
"EnableTopAccountsReporting": false,
+ "EnableTxBacklogAppRateLimiting": true,
"EnableTxBacklogRateLimiting": true,
"EnableTxnEvalTracer": false,
"EnableUsageLog": false,
@@ -120,6 +121,9 @@
"TrackerDBDir": "",
"TransactionSyncDataExchangeRate": 0,
"TransactionSyncSignificantMessageThreshold": 0,
+ "TxBacklogAppTxPerSecondRate": 100,
+ "TxBacklogAppTxRateLimiterMaxSize": 1048576,
+ "TxBacklogRateLimitingCongestionPct": 50,
"TxBacklogReservedCapacityPerPeer": 20,
"TxBacklogServiceRateWindowSeconds": 10,
"TxBacklogSize": 26000,
diff --git a/network/p2pPeer.go b/network/p2pPeer.go
index 7d788180e6..343459d243 100644
--- a/network/p2pPeer.go
+++ b/network/p2pPeer.go
@@ -23,10 +23,12 @@ import (
"net"
"time"
+ "github.com/algorand/go-algorand/logging"
"github.com/algorand/websocket"
"github.com/libp2p/go-libp2p/core/network"
yamux "github.com/libp2p/go-yamux/v4"
+ mnet "github.com/multiformats/go-multiaddr/net"
)
type wsPeerConnP2PImpl struct {
@@ -82,3 +84,11 @@ func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error {
}
func (c *wsPeerConnP2PImpl) UnderlyingConn() net.Conn { return nil }
+
+func (c *wsPeerConnP2PImpl) RemoteAddr() net.Addr {
+ netaddr, err := mnet.ToNetAddr(c.stream.Conn().RemoteMultiaddr())
+ if err != nil {
+ logging.Base().Errorf("Error converting multiaddr to netaddr: %v", err)
+ }
+ return netaddr
+}
diff --git a/network/wsPeer.go b/network/wsPeer.go
index 9daf7b0ece..55dba8e568 100644
--- a/network/wsPeer.go
+++ b/network/wsPeer.go
@@ -120,6 +120,7 @@ var defaultSendMessageTags = map[protocol.Tag]bool{
// interface allows substituting debug implementation for *websocket.Conn
type wsPeerWebsocketConn interface {
+ RemoteAddr() net.Addr
RemoteAddrString() string
NextReader() (int, io.Reader, error)
WriteMessage(int, []byte) error
@@ -321,6 +322,12 @@ type HTTPPeer interface {
GetHTTPClient() *http.Client
}
+// IPAddressable is addressable with either IPv4 or IPv6 address
+type IPAddressable interface {
+ IPAddr() []byte
+ RoutingAddr() []byte
+}
+
// UnicastPeer is another possible interface for the opaque Peer.
// It is possible that we can only initiate a connection to a peer over websockets.
type UnicastPeer interface {
@@ -369,6 +376,45 @@ func (wp *wsPeer) Version() string {
return wp.version
}
+func (wp *wsPeer) IPAddr() []byte {
+ remote := wp.conn.RemoteAddr()
+ if remote == nil {
+ return nil
+ }
+ ip := remote.(*net.TCPAddr).IP
+ result := ip.To4()
+ if result == nil {
+ result = ip.To16()
+ }
+ return result
+}
+
+// RoutingAddr returns meaningful routing part of the address:
+// ipv4 for ipv4 addresses
+// top 8 bytes of ipv6 for ipv6 addresses
+// low 4 bytes for ipv4 embedded into ipv6
+// see http://www.tcpipguide.com/free/t_IPv6IPv4AddressEmbedding.htm for details.
+func (wp *wsPeer) RoutingAddr() []byte {
+ isZeros := func(ip []byte) bool {
+ for i := 0; i < len(ip); i++ {
+ if ip[i] != 0 {
+ return false
+ }
+ }
+ return true
+ }
+
+ ip := wp.IPAddr()
+ if len(ip) != net.IPv6len {
+ return ip
+ }
+ // ipv6, check if it's ipv4 embedded
+ if isZeros(ip[0:10]) {
+ return ip[12:16]
+ }
+ return ip[0:8]
+}
+
// Unicast sends the given bytes to this specific peer. Does not wait for message to be sent.
// (Implements UnicastPeer)
func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag) error {
diff --git a/network/wsPeer_test.go b/network/wsPeer_test.go
index b6f3a4d2f0..59217047ce 100644
--- a/network/wsPeer_test.go
+++ b/network/wsPeer_test.go
@@ -22,6 +22,8 @@ import (
"go/ast"
"go/parser"
"go/token"
+ "io"
+ "net"
"path/filepath"
"sort"
"strings"
@@ -264,3 +266,48 @@ func getProtocolTags(t *testing.T) []string {
require.Len(t, declaredTags, len(protocol.TagList))
return declaredTags
}
+
+type tcpipMockConn struct{ addr net.TCPAddr }
+
+func (m *tcpipMockConn) RemoteAddr() net.Addr { return &m.addr }
+func (m *tcpipMockConn) RemoteAddrString() string { return "" }
+func (m *tcpipMockConn) NextReader() (int, io.Reader, error) { return 0, nil, nil }
+func (m *tcpipMockConn) WriteMessage(int, []byte) error { return nil }
+func (m *tcpipMockConn) CloseWithMessage([]byte, time.Time) error { return nil }
+func (m *tcpipMockConn) SetReadLimit(int64) {}
+func (m *tcpipMockConn) CloseWithoutFlush() error { return nil }
+func (m *tcpipMockConn) UnderlyingConn() net.Conn { return nil }
+
+func TestWsPeerIPAddr(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ conn := &tcpipMockConn{}
+ peer := wsPeer{
+ conn: conn,
+ }
+ // some raw IPv4 address
+ conn.addr.IP = []byte{127, 0, 0, 1}
+ require.Equal(t, []byte{127, 0, 0, 1}, peer.IPAddr())
+ require.Equal(t, []byte{127, 0, 0, 1}, peer.RoutingAddr())
+
+ // IPv4 constructed from net.IPv4
+ conn.addr.IP = net.IPv4(127, 0, 0, 2)
+ require.Equal(t, []byte{127, 0, 0, 2}, peer.IPAddr())
+ require.Equal(t, []byte{127, 0, 0, 2}, peer.RoutingAddr())
+
+ // some IPv6 address
+ conn.addr.IP = net.IPv6linklocalallrouters
+ require.Equal(t, []byte(net.IPv6linklocalallrouters), peer.IPAddr())
+ require.Equal(t, []byte(net.IPv6linklocalallrouters[0:8]), peer.RoutingAddr())
+
+ // embedded IPv4 into IPv6
+ conn.addr.IP = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 127, 0, 0, 3}
+ require.Equal(t, 16, len(conn.addr.IP))
+ require.Equal(t, []byte{127, 0, 0, 3}, peer.IPAddr())
+ require.Equal(t, []byte{127, 0, 0, 3}, peer.RoutingAddr())
+ conn.addr.IP = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 127, 0, 0, 4}
+ require.Equal(t, 16, len(conn.addr.IP))
+ require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 127, 0, 0, 4}, peer.IPAddr())
+ require.Equal(t, []byte{127, 0, 0, 4}, peer.RoutingAddr())
+}
diff --git a/test/testdata/configs/config-v32.json b/test/testdata/configs/config-v32.json
new file mode 100644
index 0000000000..ce02380331
--- /dev/null
+++ b/test/testdata/configs/config-v32.json
@@ -0,0 +1,139 @@
+{
+ "Version": 32,
+ "AccountUpdatesStatsInterval": 5000000000,
+ "AccountsRebuildSynchronousMode": 1,
+ "AgreementIncomingBundlesQueueLength": 15,
+ "AgreementIncomingProposalsQueueLength": 50,
+ "AgreementIncomingVotesQueueLength": 20000,
+ "AnnounceParticipationKey": true,
+ "Archival": false,
+ "BaseLoggerDebugLevel": 4,
+ "BlockDBDir": "",
+ "BlockServiceCustomFallbackEndpoints": "",
+ "BlockServiceMemCap": 500000000,
+ "BroadcastConnectionsLimit": -1,
+ "CadaverDirectory": "",
+ "CadaverSizeTarget": 0,
+ "CatchpointDir": "",
+ "CatchpointFileHistoryLength": 365,
+ "CatchpointInterval": 10000,
+ "CatchpointTracking": 0,
+ "CatchupBlockDownloadRetryAttempts": 1000,
+ "CatchupBlockValidateMode": 0,
+ "CatchupFailurePeerRefreshRate": 10,
+ "CatchupGossipBlockFetchTimeoutSec": 4,
+ "CatchupHTTPBlockFetchTimeoutSec": 4,
+ "CatchupLedgerDownloadRetryAttempts": 50,
+ "CatchupParallelBlocks": 16,
+ "ColdDataDir": "",
+ "ConnectionsRateLimitingCount": 60,
+ "ConnectionsRateLimitingWindowSeconds": 1,
+ "CrashDBDir": "",
+ "DNSBootstrapID": ".algorand.network?backup=.algorand.net&dedup=.algorand-.(network|net)",
+ "DNSSecurityFlags": 1,
+ "DeadlockDetection": 0,
+ "DeadlockDetectionThreshold": 30,
+ "DisableAPIAuth": false,
+ "DisableLedgerLRUCache": false,
+ "DisableLocalhostConnectionRateLimit": true,
+ "DisableNetworking": false,
+ "DisableOutgoingConnectionThrottling": false,
+ "EnableAccountUpdatesStats": false,
+ "EnableAgreementReporting": false,
+ "EnableAgreementTimeMetrics": false,
+ "EnableAssembleStats": false,
+ "EnableBlockService": false,
+ "EnableBlockServiceFallbackToArchiver": false,
+ "EnableCatchupFromArchiveServers": false,
+ "EnableDeveloperAPI": false,
+ "EnableExperimentalAPI": false,
+ "EnableFollowMode": false,
+ "EnableGossipBlockService": true,
+ "EnableIncomingMessageFilter": false,
+ "EnableLedgerService": false,
+ "EnableMetricReporting": false,
+ "EnableOutgoingNetworkMessageFiltering": true,
+ "EnableP2P": false,
+ "EnablePingHandler": true,
+ "EnableProcessBlockStats": false,
+ "EnableProfiler": false,
+ "EnableRequestLogger": false,
+ "EnableRuntimeMetrics": false,
+ "EnableTopAccountsReporting": false,
+ "EnableTxBacklogAppRateLimiting": true,
+ "EnableTxBacklogRateLimiting": true,
+ "EnableTxnEvalTracer": false,
+ "EnableUsageLog": false,
+ "EnableVerbosedTransactionSyncLogging": false,
+ "EndpointAddress": "127.0.0.1:0",
+ "FallbackDNSResolverAddress": "",
+ "ForceFetchTransactions": false,
+ "ForceRelayMessages": false,
+ "GossipFanout": 4,
+ "HeartbeatUpdateInterval": 600,
+ "HotDataDir": "",
+ "IncomingConnectionsLimit": 2400,
+ "IncomingMessageFilterBucketCount": 5,
+ "IncomingMessageFilterBucketSize": 512,
+ "LedgerSynchronousMode": 2,
+ "LogArchiveDir": "",
+ "LogArchiveMaxAge": "",
+ "LogArchiveName": "node.archive.log",
+ "LogFileDir": "",
+ "LogSizeLimit": 1073741824,
+ "MaxAPIBoxPerApplication": 100000,
+ "MaxAPIResourcesPerAccount": 100000,
+ "MaxAcctLookback": 4,
+ "MaxBlockHistoryLookback": 0,
+ "MaxCatchpointDownloadDuration": 43200000000000,
+ "MaxConnectionsPerIP": 15,
+ "MinCatchpointFileDownloadBytesPerSecond": 20480,
+ "NetAddress": "",
+ "NetworkMessageTraceServer": "",
+ "NetworkProtocolVersion": "",
+ "NodeExporterListenAddress": ":9100",
+ "NodeExporterPath": "./node_exporter",
+ "OptimizeAccountsDatabaseOnStartup": false,
+ "OutgoingMessageFilterBucketCount": 3,
+ "OutgoingMessageFilterBucketSize": 128,
+ "P2PPersistPeerID": false,
+ "P2PPrivateKeyLocation": "",
+ "ParticipationKeysRefreshInterval": 60000000000,
+ "PeerConnectionsUpdateInterval": 3600,
+ "PeerPingPeriodSeconds": 0,
+ "PriorityPeers": {},
+ "ProposalAssemblyTime": 500000000,
+ "PublicAddress": "",
+ "ReconnectTime": 60000000000,
+ "ReservedFDs": 256,
+ "RestConnectionsHardLimit": 2048,
+ "RestConnectionsSoftLimit": 1024,
+ "RestReadTimeoutSeconds": 15,
+ "RestWriteTimeoutSeconds": 120,
+ "RunHosted": false,
+ "StateproofDir": "",
+ "StorageEngine": "sqlite",
+ "SuggestedFeeBlockHistory": 3,
+ "SuggestedFeeSlidingWindowSize": 50,
+ "TLSCertFile": "",
+ "TLSKeyFile": "",
+ "TelemetryToLog": true,
+ "TrackerDBDir": "",
+ "TransactionSyncDataExchangeRate": 0,
+ "TransactionSyncSignificantMessageThreshold": 0,
+ "TxBacklogAppTxPerSecondRate": 100,
+ "TxBacklogAppTxRateLimiterMaxSize": 1048576,
+ "TxBacklogRateLimitingCongestionPct": 50,
+ "TxBacklogReservedCapacityPerPeer": 20,
+ "TxBacklogServiceRateWindowSeconds": 10,
+ "TxBacklogSize": 26000,
+ "TxIncomingFilterMaxSize": 500000,
+ "TxIncomingFilteringFlags": 1,
+ "TxPoolExponentialIncreaseFactor": 2,
+ "TxPoolSize": 75000,
+ "TxSyncIntervalSeconds": 60,
+ "TxSyncServeResponseSize": 1000000,
+ "TxSyncTimeoutSeconds": 30,
+ "UseXForwardedForAddressField": "",
+ "VerifiedTranscationsCacheSize": 150000
+}
diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go
index cb376eb0a3..cebece25fc 100644
--- a/util/metrics/metrics.go
+++ b/util/metrics/metrics.go
@@ -123,6 +123,8 @@ var (
TransactionMessagesDupRawMsg = MetricName{Name: "algod_transaction_messages_dropped_dup_raw", Description: "Number of dupe raw transaction messages dropped"}
// TransactionMessagesDupCanonical "Number of transaction messages dropped after canonical re-encoding"
TransactionMessagesDupCanonical = MetricName{Name: "algod_transaction_messages_dropped_dup_canonical", Description: "Number of transaction messages dropped after canonical re-encoding"}
+ // TransactionMessagesAppLimiterDrop "Number of transaction messages dropped after app limits check"
+ TransactionMessagesAppLimiterDrop = MetricName{Name: "algod_transaction_messages_dropped_app_limiter", Description: "Number of transaction messages dropped after app limits check"}
// TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue"
TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"}