Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

segment the memory peerstore + granular locks #78

Merged
merged 12 commits into from
May 18, 2019
14 changes: 9 additions & 5 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,13 @@ type Peerstore interface {
KeyBook
PeerMetadata
Metrics
ProtoBook

// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(peer.ID) PeerInfo

GetProtocols(peer.ID) ([]string, error)
AddProtocols(peer.ID, ...string) error
SetProtocols(peer.ID, ...string) error
SupportsProtocols(peer.ID, ...string) ([]string, error)

// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
}
Expand Down Expand Up @@ -142,3 +138,11 @@ type KeyBook interface {
// PeersWithKeys returns all the peer IDs stored in the KeyBook
PeersWithKeys() peer.IDSlice
}

// ProtoBook tracks the protocols supported by peers
type ProtoBook interface {
GetProtocols(peer.ID) ([]string, error)
AddProtocols(peer.ID, ...string) error
SetProtocols(peer.ID, ...string) error
SupportsProtocols(peer.ID, ...string) ([]string, error)
}
117 changes: 8 additions & 109 deletions peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,30 @@ package peerstore
import (
"fmt"
"io"
"sync"

peer "github.com/libp2p/go-libp2p-peer"
)

var _ Peerstore = (*peerstore)(nil)

const maxInternedProtocols = 512
const maxInternedProtocolSize = 256

type peerstore struct {
Metrics

KeyBook
AddrBook
ProtoBook
PeerMetadata

// lock for protocol information, separate from datastore lock
protolock sync.RWMutex
internedProtocols map[string]string
}

// NewPeerstore creates a data structure that stores peer data, backed by the
// supplied implementations of KeyBook, AddrBook and PeerMetadata.
func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore {
func NewPeerstore(kb KeyBook, ab AddrBook, pb ProtoBook, md PeerMetadata) Peerstore {
return &peerstore{
KeyBook: kb,
AddrBook: ab,
PeerMetadata: md,
Metrics: NewMetrics(),
internedProtocols: make(map[string]string),
KeyBook: kb,
AddrBook: ab,
ProtoBook: pb,
PeerMetadata: md,
Metrics: NewMetrics(),
}
}

Expand All @@ -49,6 +42,7 @@ func (ps *peerstore) Close() (err error) {

weakClose("keybook", ps.KeyBook)
weakClose("addressbook", ps.AddrBook)
weakClose("protobook", ps.ProtoBook)
weakClose("peermetadata", ps.PeerMetadata)

if len(errs) > 0 {
Expand Down Expand Up @@ -80,101 +74,6 @@ func (ps *peerstore) PeerInfo(p peer.ID) PeerInfo {
}
}

func (ps *peerstore) internProtocol(s string) string {
if len(s) > maxInternedProtocolSize {
return s
}

if interned, ok := ps.internedProtocols[s]; ok {
return interned
}

if len(ps.internedProtocols) >= maxInternedProtocols {
ps.internedProtocols = make(map[string]string, maxInternedProtocols)
}

ps.internedProtocols[s] = s
return s
}

func (ps *peerstore) SetProtocols(p peer.ID, protos ...string) error {
ps.protolock.Lock()
defer ps.protolock.Unlock()

protomap := make(map[string]struct{}, len(protos))
for _, proto := range protos {
protomap[ps.internProtocol(proto)] = struct{}{}
}

return ps.Put(p, "protocols", protomap)
}

func (ps *peerstore) AddProtocols(p peer.ID, protos ...string) error {
ps.protolock.Lock()
defer ps.protolock.Unlock()
protomap, err := ps.getProtocolMap(p)
if err != nil {
return err
}

for _, proto := range protos {
protomap[ps.internProtocol(proto)] = struct{}{}
}

return ps.Put(p, "protocols", protomap)
}

func (ps *peerstore) getProtocolMap(p peer.ID) (map[string]struct{}, error) {
iprotomap, err := ps.Get(p, "protocols")
switch err {
default:
return nil, err
case ErrNotFound:
return make(map[string]struct{}), nil
case nil:
cast, ok := iprotomap.(map[string]struct{})
if !ok {
return nil, fmt.Errorf("stored protocol set was not a map")
}

return cast, nil
}
}

func (ps *peerstore) GetProtocols(p peer.ID) ([]string, error) {
ps.protolock.RLock()
defer ps.protolock.RUnlock()
pmap, err := ps.getProtocolMap(p)
if err != nil {
return nil, err
}

out := make([]string, 0, len(pmap))
for k := range pmap {
out = append(out, k)
}

return out, nil
}

func (ps *peerstore) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) {
ps.protolock.RLock()
defer ps.protolock.RUnlock()
pmap, err := ps.getProtocolMap(p)
if err != nil {
return nil, err
}

out := make([]string, 0, len(protos))
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
out = append(out, proto)
}
}

return out, nil
}

func PeerInfos(ps Peerstore, peers peer.IDSlice) []PeerInfo {
pi := make([]PeerInfo, len(peers))
for i, p := range peers {
Expand Down
4 changes: 3 additions & 1 deletion pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (pstore.
return nil, err
}

ps := pstore.NewPeerstore(keyBook, addrBook, peerMetadata)
protoBook := NewProtoBook(peerMetadata)

ps := pstore.NewPeerstore(keyBook, addrBook, protoBook, peerMetadata)
return ps, nil
}

Expand Down
118 changes: 118 additions & 0 deletions pstoreds/protobook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package pstoreds

import (
"fmt"
"sync"

peer "github.com/libp2p/go-libp2p-peer"

pstore "github.com/libp2p/go-libp2p-peerstore"
)

type dsProtoBook struct {
lks [256]sync.RWMutex
meta pstore.PeerMetadata
}

var _ pstore.ProtoBook = (*dsProtoBook)(nil)

func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook {
return &dsProtoBook{meta: meta}
}

func (pb *dsProtoBook) lock(p peer.ID) {
pb.lks[byte(p[len(p)-1])].Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a segment(p peer.ID) *segment function would be nice (could even drop all of these functions and just call pb.segment(p).Lock().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will do. I'll call it get similar to how we do it in the memory peerstore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

func (pb *dsProtoBook) unlock(p peer.ID) {
pb.lks[byte(p[len(p)-1])].Unlock()
}

func (pb *dsProtoBook) rlock(p peer.ID) {
pb.lks[byte(p[len(p)-1])].RLock()
}

func (pb *dsProtoBook) runlock(p peer.ID) {
pb.lks[byte(p[len(p)-1])].RUnlock()
}

func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error {
pb.lock(p)
defer pb.unlock(p)

protomap := make(map[string]struct{}, len(protos))
for _, proto := range protos {
protomap[proto] = struct{}{}
}

return pb.meta.Put(p, "protocols", protomap)
}

func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error {
pb.lock(p)
defer pb.unlock(p)

pmap, err := pb.getProtocolMap(p)
if err != nil {
return err
}

for _, proto := range protos {
pmap[proto] = struct{}{}
}

return pb.meta.Put(p, "protocols", pmap)
}

func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) {
pb.rlock(p)
defer pb.runlock(p)

pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}

res := make([]string, 0, len(pmap))
for proto := range pmap {
res = append(res, proto)
}

return res, nil
}

func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) {
pb.rlock(p)
defer pb.runlock(p)

pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}

res := make([]string, 0, len(protos))
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
res = append(res, proto)
}
}

return res, nil
}

func (pb *dsProtoBook) getProtocolMap(p peer.ID) (map[string]struct{}, error) {
iprotomap, err := pb.meta.Get(p, "protocols")
switch err {
default:
return nil, err
case pstore.ErrNotFound:
return make(map[string]struct{}), nil
case nil:
cast, ok := iprotomap.(map[string]struct{})
if !ok {
return nil, fmt.Errorf("stored protocol set was not a map")
}

return cast, nil
}
}
Loading