Skip to content

Commit

Permalink
set cache to 100_000 and disable libp2p resource manager
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Oct 17, 2023
1 parent 3eea530 commit 8b7490c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 41 deletions.
16 changes: 12 additions & 4 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Config struct {

func DefaultConfig() Config {
return Config{
ATXSize: 50_000,
ATXSize: 100_000,
MalfeasenceSize: 1_000,
}
}
Expand Down Expand Up @@ -237,7 +237,10 @@ func (db *CachedDB) GetEpochWeight(epoch types.EpochID) (uint64, []types.ATXID,
}

// IterateEpochATXHeaders iterates over ActivationTxs that target an epoch.
func (db *CachedDB) IterateEpochATXHeaders(epoch types.EpochID, iter func(*types.ActivationTxHeader) error) error {
func (db *CachedDB) IterateEpochATXHeaders(
epoch types.EpochID,
iter func(*types.ActivationTxHeader) error,
) error {
ids, err := atxs.GetIDsByEpoch(db, epoch-1)
if err != nil {
return err
Expand All @@ -254,7 +257,9 @@ func (db *CachedDB) IterateEpochATXHeaders(epoch types.EpochID, iter func(*types
return nil
}

func (db *CachedDB) IterateMalfeasanceProofs(iter func(types.NodeID, *types.MalfeasanceProof) error) error {
func (db *CachedDB) IterateMalfeasanceProofs(
iter func(types.NodeID, *types.MalfeasanceProof) error,
) error {
ids, err := identities.GetMalicious(db)
if err != nil {
return err
Expand Down Expand Up @@ -283,7 +288,10 @@ func (db *CachedDB) GetLastAtx(nodeID types.NodeID) (*types.ActivationTxHeader,
}

// GetEpochAtx gets the atx header of specified node ID published in the specified epoch.
func (db *CachedDB) GetEpochAtx(epoch types.EpochID, nodeID types.NodeID) (*types.ActivationTxHeader, error) {
func (db *CachedDB) GetEpochAtx(
epoch types.EpochID,
nodeID types.NodeID,
) (*types.ActivationTxHeader, error) {
vatx, err := atxs.GetByEpochAndNodeID(db, epoch, nodeID)
if err != nil {
return nil, fmt.Errorf("no epoch atx found: %w", err)
Expand Down
100 changes: 63 additions & 37 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@ import (
// DefaultConfig config.
func DefaultConfig() Config {
return Config{
Listen: "/ip4/0.0.0.0/tcp/7513",
Flood: false,
MinPeers: 20,
LowPeers: 40,
HighPeers: 100,
AutoscalePeers: true,
GracePeersShutdown: 30 * time.Second,
MaxMessageSize: 2 << 20,
AcceptQueue: tptu.AcceptQueueLength,
EnableHolepunching: true,
InboundFraction: 0.8,
OutboundFraction: 1.1,
RelayServer: RelayServer{TTL: 20 * time.Minute, Reservations: 512},
Listen: "/ip4/0.0.0.0/tcp/7513",
Flood: false,
MinPeers: 20,
LowPeers: 40,
HighPeers: 100,
AutoscalePeers: true,
GracePeersShutdown: 30 * time.Second,
MaxMessageSize: 2 << 20,
DisableResourceManager: true,
AcceptQueue: tptu.AcceptQueueLength,
EnableHolepunching: true,
InboundFraction: 0.8,
OutboundFraction: 1.1,
RelayServer: RelayServer{TTL: 20 * time.Minute, Reservations: 512},
IP4Blocklist: []string{
// localhost
"127.0.0.0/8",
Expand Down Expand Up @@ -114,7 +115,8 @@ type RelayServer struct {

func (cfg *Config) Validate() error {
if len(cfg.ForceReachability) > 0 {
if cfg.ForceReachability != PublicReachability && cfg.ForceReachability != PrivateReachability {
if cfg.ForceReachability != PublicReachability &&
cfg.ForceReachability != PrivateReachability {
return fmt.Errorf("p2p-reachability flag is invalid. should be one of %s, %s. got %s",
PublicReachability, PrivateReachability, cfg.ForceReachability,
)
Expand All @@ -130,7 +132,13 @@ func (cfg *Config) Validate() error {
}

// New initializes libp2p host configured for spacemesh.
func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ...Opt) (*Host, error) {
func New(
_ context.Context,
logger log.Log,
cfg Config,
prologue []byte,
opts ...Opt,
) (*Host, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -142,7 +150,11 @@ func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ..
}
lp2plog.SetPrimaryCore(logger.Core())
lp2plog.SetAllLoggers(lp2plog.LogLevel(cfg.LogLevel))
cm, err := connmgr.NewConnManager(cfg.LowPeers, cfg.HighPeers, connmgr.WithGracePeriod(cfg.GracePeersShutdown))
cm, err := connmgr.NewConnManager(
cfg.LowPeers,
cfg.HighPeers,
connmgr.WithGracePeriod(cfg.GracePeersShutdown),
)
if err != nil {
return nil, fmt.Errorf("p2p create conn mgr: %w", err)
}
Expand Down Expand Up @@ -182,23 +194,28 @@ func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ..
libp2p.Identity(key),
libp2p.ListenAddrStrings(cfg.Listen),
libp2p.UserAgent("go-spacemesh"),
libp2p.Transport(func(upgrader transport.Upgrader, rcmgr network.ResourceManager) (transport.Transport, error) {
opts := []tcp.Option{}
if cfg.DisableReusePort {
opts = append(opts, tcp.DisableReuseport())
}
if cfg.Metrics {
opts = append(opts, tcp.WithMetrics())
}
return tcp.NewTCPTransport(upgrader, rcmgr, opts...)
}),
libp2p.Security(noise.ID, func(id protocol.ID, privkey crypto.PrivKey, muxers []tptu.StreamMuxer) (*noise.SessionTransport, error) {
tp, err := noise.New(id, privkey, muxers)
if err != nil {
return nil, err
}
return tp.WithSessionOptions(noise.Prologue(prologue))
}),
libp2p.Transport(
func(upgrader transport.Upgrader, rcmgr network.ResourceManager) (transport.Transport, error) {
opts := []tcp.Option{}
if cfg.DisableReusePort {
opts = append(opts, tcp.DisableReuseport())
}
if cfg.Metrics {
opts = append(opts, tcp.WithMetrics())
}
return tcp.NewTCPTransport(upgrader, rcmgr, opts...)
},
),
libp2p.Security(
noise.ID,
func(id protocol.ID, privkey crypto.PrivKey, muxers []tptu.StreamMuxer) (*noise.SessionTransport, error) {
tp, err := noise.New(id, privkey, muxers)
if err != nil {
return nil, err
}
return tp.WithSessionOptions(noise.Prologue(prologue))
},
),
libp2p.Muxer("/yamux/1.0.0", &streamer),
libp2p.Peerstore(ps),
libp2p.BandwidthReporter(p2pmetrics.NewBandwidthCollector()),
Expand All @@ -214,9 +231,12 @@ func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ..
if err != nil {
panic(err) // validated in config
}
lopts = append(lopts, libp2p.AddrsFactory(func([]multiaddr.Multiaddr) []multiaddr.Multiaddr {
return []multiaddr.Multiaddr{addr}
}))
lopts = append(
lopts,
libp2p.AddrsFactory(func([]multiaddr.Multiaddr) []multiaddr.Multiaddr {
return []multiaddr.Multiaddr{addr}
}),
)
}
if cfg.EnableHolepunching {
lopts = append(lopts,
Expand Down Expand Up @@ -253,7 +273,13 @@ func New(_ context.Context, logger log.Log, cfg Config, prologue []byte, opts ..
logger.Zap().Info("local node identity", zap.Stringer("identity", h.ID()))
// TODO(dshulyak) this is small mess. refactor to avoid this patching
// both New and Upgrade should use options.
opts = append(opts, WithConfig(cfg), WithLog(logger), WithBootnodes(bootnodesMap), WithDirectNodes(directMap))
opts = append(
opts,
WithConfig(cfg),
WithLog(logger),
WithBootnodes(bootnodesMap),
WithDirectNodes(directMap),
)
return Upgrade(h, opts...)
}

Expand Down

0 comments on commit 8b7490c

Please sign in to comment.