Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

segment the memory peerstore + granular locks #78

Merged
merged 12 commits into from
May 18, 2019
14 changes: 9 additions & 5 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,13 @@ type Peerstore interface {
KeyBook
PeerMetadata
Metrics
ProtoBook

// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(peer.ID) PeerInfo

GetProtocols(peer.ID) ([]string, error)
AddProtocols(peer.ID, ...string) error
SetProtocols(peer.ID, ...string) error
SupportsProtocols(peer.ID, ...string) ([]string, error)

// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
}
Expand Down Expand Up @@ -142,3 +138,11 @@ type KeyBook interface {
// PeersWithKeys returns all the peer IDs stored in the KeyBook
PeersWithKeys() peer.IDSlice
}

// ProtoBook tracks the protocols supported by peers
type ProtoBook interface {
GetProtocols(peer.ID) ([]string, error)
AddProtocols(peer.ID, ...string) error
SetProtocols(peer.ID, ...string) error
SupportsProtocols(peer.ID, ...string) ([]string, error)
}
117 changes: 8 additions & 109 deletions peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,30 @@ package peerstore
import (
"fmt"
"io"
"sync"

peer "github.com/libp2p/go-libp2p-peer"
)

var _ Peerstore = (*peerstore)(nil)

const maxInternedProtocols = 512
const maxInternedProtocolSize = 256

type peerstore struct {
Metrics

KeyBook
AddrBook
ProtoBook
PeerMetadata

// lock for protocol information, separate from datastore lock
protolock sync.RWMutex
internedProtocols map[string]string
}

// NewPeerstore creates a data structure that stores peer data, backed by the
// supplied implementations of KeyBook, AddrBook and PeerMetadata.
func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore {
func NewPeerstore(kb KeyBook, ab AddrBook, pb ProtoBook, md PeerMetadata) Peerstore {
return &peerstore{
KeyBook: kb,
AddrBook: ab,
PeerMetadata: md,
Metrics: NewMetrics(),
internedProtocols: make(map[string]string),
KeyBook: kb,
AddrBook: ab,
ProtoBook: pb,
PeerMetadata: md,
Metrics: NewMetrics(),
}
}

Expand All @@ -49,6 +42,7 @@ func (ps *peerstore) Close() (err error) {

weakClose("keybook", ps.KeyBook)
weakClose("addressbook", ps.AddrBook)
weakClose("protobook", ps.ProtoBook)
weakClose("peermetadata", ps.PeerMetadata)

if len(errs) > 0 {
Expand Down Expand Up @@ -80,101 +74,6 @@ func (ps *peerstore) PeerInfo(p peer.ID) PeerInfo {
}
}

func (ps *peerstore) internProtocol(s string) string {
if len(s) > maxInternedProtocolSize {
return s
}

if interned, ok := ps.internedProtocols[s]; ok {
return interned
}

if len(ps.internedProtocols) >= maxInternedProtocols {
ps.internedProtocols = make(map[string]string, maxInternedProtocols)
}

ps.internedProtocols[s] = s
return s
}

func (ps *peerstore) SetProtocols(p peer.ID, protos ...string) error {
ps.protolock.Lock()
defer ps.protolock.Unlock()

protomap := make(map[string]struct{}, len(protos))
for _, proto := range protos {
protomap[ps.internProtocol(proto)] = struct{}{}
}

return ps.Put(p, "protocols", protomap)
}

func (ps *peerstore) AddProtocols(p peer.ID, protos ...string) error {
ps.protolock.Lock()
defer ps.protolock.Unlock()
protomap, err := ps.getProtocolMap(p)
if err != nil {
return err
}

for _, proto := range protos {
protomap[ps.internProtocol(proto)] = struct{}{}
}

return ps.Put(p, "protocols", protomap)
}

func (ps *peerstore) getProtocolMap(p peer.ID) (map[string]struct{}, error) {
iprotomap, err := ps.Get(p, "protocols")
switch err {
default:
return nil, err
case ErrNotFound:
return make(map[string]struct{}), nil
case nil:
cast, ok := iprotomap.(map[string]struct{})
if !ok {
return nil, fmt.Errorf("stored protocol set was not a map")
}

return cast, nil
}
}

func (ps *peerstore) GetProtocols(p peer.ID) ([]string, error) {
ps.protolock.RLock()
defer ps.protolock.RUnlock()
pmap, err := ps.getProtocolMap(p)
if err != nil {
return nil, err
}

out := make([]string, 0, len(pmap))
for k := range pmap {
out = append(out, k)
}

return out, nil
}

func (ps *peerstore) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) {
ps.protolock.RLock()
defer ps.protolock.RUnlock()
pmap, err := ps.getProtocolMap(p)
if err != nil {
return nil, err
}

out := make([]string, 0, len(protos))
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
out = append(out, proto)
}
}

return out, nil
}

func PeerInfos(ps Peerstore, peers peer.IDSlice) []PeerInfo {
pi := make([]PeerInfo, len(peers))
for i, p := range peers {
Expand Down
4 changes: 3 additions & 1 deletion pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (pstore.
return nil, err
}

ps := pstore.NewPeerstore(keyBook, addrBook, peerMetadata)
protoBook := NewProtoBook(peerMetadata)

ps := pstore.NewPeerstore(keyBook, addrBook, protoBook, peerMetadata)
return ps, nil
}

Expand Down
120 changes: 120 additions & 0 deletions pstoreds/protobook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package pstoreds

import (
"fmt"
"sync"

peer "github.com/libp2p/go-libp2p-peer"

pstore "github.com/libp2p/go-libp2p-peerstore"
)

type protoSegment struct {
sync.RWMutex
}

type protoSegments [256]*protoSegment

func (s *protoSegments) get(p peer.ID) *protoSegment {
return s[byte(p[len(p)-1])]
}

type dsProtoBook struct {
segments protoSegments
meta pstore.PeerMetadata
}

var _ pstore.ProtoBook = (*dsProtoBook)(nil)

func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook {
return &dsProtoBook{
meta: meta,
segments: func() (ret protoSegments) {
for i := range ret {
ret[i] = &protoSegment{}
}
return ret
}(),
}
}

func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error {
pb.segments.get(p).Lock()
defer pb.segments.get(p).Unlock()

protomap := make(map[string]struct{}, len(protos))
for _, proto := range protos {
protomap[proto] = struct{}{}
}

return pb.meta.Put(p, "protocols", protomap)
}

func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error {
pb.segments.get(p).Lock()
defer pb.segments.get(p).Unlock()

pmap, err := pb.getProtocolMap(p)
if err != nil {
return err
}

for _, proto := range protos {
pmap[proto] = struct{}{}
}

return pb.meta.Put(p, "protocols", pmap)
}

func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) {
pb.segments.get(p).RLock()
defer pb.segments.get(p).RUnlock()

pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}

res := make([]string, 0, len(pmap))
for proto := range pmap {
res = append(res, proto)
}

return res, nil
}

func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) {
pb.segments.get(p).RLock()
defer pb.segments.get(p).RUnlock()

pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}

res := make([]string, 0, len(protos))
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
res = append(res, proto)
}
}

return res, nil
}

func (pb *dsProtoBook) getProtocolMap(p peer.ID) (map[string]struct{}, error) {
iprotomap, err := pb.meta.Get(p, "protocols")
switch err {
default:
return nil, err
case pstore.ErrNotFound:
return make(map[string]struct{}), nil
case nil:
cast, ok := iprotomap.(map[string]struct{})
if !ok {
return nil, fmt.Errorf("stored protocol set was not a map")
}

return cast, nil
}
}
Loading