From 107a7f1a36099e23017cf4020ef9e7ddccd28738 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 11 Aug 2021 18:48:32 -0400 Subject: [PATCH 1/2] moved validation outside of the ProtocolMessenger --- dht.go | 2 +- fullrt/dht.go | 56 +++++++++++++++++++--------------------- internal/errors.go | 2 +- pb/protocol_messenger.go | 32 ++++++----------------- routing.go | 56 +++++++++++++++++++--------------------- 5 files changed, 64 insertions(+), 84 deletions(-) diff --git a/dht.go b/dht.go index c5fbeeb54..5ac1a3666 100644 --- a/dht.go +++ b/dht.go @@ -191,7 +191,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) dht.Validator = cfg.Validator dht.msgSender = net.NewMessageSenderImpl(h, dht.protocols) - dht.protoMessenger, err = pb.NewProtocolMessenger(dht.msgSender, pb.WithValidator(dht.Validator)) + dht.protoMessenger, err = pb.NewProtocolMessenger(dht.msgSender) if err != nil { return nil, err } diff --git a/fullrt/dht.go b/fullrt/dht.go index a6e96d4e5..f618cf820 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -127,7 +127,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful } ms := net.NewMessageSenderImpl(h, []protocol.ID{dhtcfg.ProtocolPrefix + "/kad/1.0.0"}) - protoMessenger, err := dht_pb.NewProtocolMessenger(ms, dht_pb.WithValidator(dhtcfg.Validator)) + protoMessenger, err := dht_pb.NewProtocolMessenger(ms) if err != nil { return nil, err } @@ -720,37 +720,10 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str }) rec, peers, err := dht.protoMessenger.GetValue(ctx, p, key) - switch err { - case routing.ErrNotFound: - // in this case, they responded with nothing, - // still send a notification so listeners can know the - // request has completed 'successfully' - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.PeerResponse, - ID: p, - }) - return nil - case nil, internal.ErrInvalidRecord: - // in either of these cases, we want to keep going - default: + if err != nil { return err } - // TODO: What should happen if the record is invalid? - // Pre-existing code counted it towards the quorum, but should it? - if rec != nil && rec.GetValue() != nil { - rv := RecvdVal{ - Val: rec.GetValue(), - From: p, - } - - select { - case valCh <- rv: - case <-ctx.Done(): - return ctx.Err() - } - } - // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.PeerResponse, @@ -758,6 +731,31 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str Responses: peers, }) + if rec == nil { + return nil + } + + val := rec.GetValue() + if val == nil { + logger.Debug("received a nil record value") + return nil + } + if err := dht.Validator.Validate(key, val); err != nil { + // make sure record is valid + logger.Debugw("received invalid record (discarded)", "error", err) + return nil + } + + // the record is present and valid, send it out for processing + select { + case valCh <- RecvdVal{ + Val: val, + From: p, + }: + case <-ctx.Done(): + return ctx.Err() + } + return nil } diff --git a/internal/errors.go b/internal/errors.go index 3c32a83dc..4f8453cd2 100644 --- a/internal/errors.go +++ b/internal/errors.go @@ -2,4 +2,4 @@ package internal import "errors" -var ErrInvalidRecord = errors.New("received invalid record") +var ErrIncorrectRecord = errors.New("received incorrect record") diff --git a/pb/protocol_messenger.go b/pb/protocol_messenger.go index 7524f59b9..883a86872 100644 --- a/pb/protocol_messenger.go +++ b/pb/protocol_messenger.go @@ -6,12 +6,9 @@ import ( "errors" "fmt" + logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/routing" - - logging "github.com/ipfs/go-log" - record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" "github.com/multiformats/go-multihash" @@ -27,19 +24,11 @@ var logger = logging.Logger("dht") // Note: the ProtocolMessenger's MessageSender still needs to deal with some wire protocol details such as using // varint-delineated protobufs type ProtocolMessenger struct { - m MessageSender - validator record.Validator + m MessageSender } type ProtocolMessengerOption func(*ProtocolMessenger) error -func WithValidator(validator record.Validator) ProtocolMessengerOption { - return func(messenger *ProtocolMessenger) error { - messenger.validator = validator - return nil - } -} - // NewProtocolMessenger creates a new ProtocolMessenger that is used for sending DHT messages to peers and processing // their responses. func NewProtocolMessenger(msgSender MessageSender, opts ...ProtocolMessengerOption) (*ProtocolMessenger, error) { @@ -99,21 +88,16 @@ func (pm *ProtocolMessenger) GetValue(ctx context.Context, p peer.ID, key string // Success! We were given the value logger.Debug("got value") - // make sure record is valid. - err = pm.validator.Validate(string(rec.GetKey()), rec.GetValue()) - if err != nil { - logger.Debug("received invalid record (discarded)") - // return a sentinel to signify an invalid record was received - return nil, peers, internal.ErrInvalidRecord + // Check that record matches the one we are looking for (validation of the record does not happen here) + if !bytes.Equal([]byte(key), rec.GetKey()) { + logger.Debug("received incorrect record") + return nil, nil, internal.ErrIncorrectRecord } - return rec, peers, err - } - if len(peers) > 0 { - return nil, peers, nil + return rec, peers, err } - return nil, nil, routing.ErrNotFound + return nil, peers, nil } // GetClosestPeers asks a peer to return the K (a DHT-wide parameter) DHT server peers closest in XOR space to the id diff --git a/routing.go b/routing.go index ca269e514..2ec5bbcdc 100644 --- a/routing.go +++ b/routing.go @@ -297,37 +297,10 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st }) rec, peers, err := dht.protoMessenger.GetValue(ctx, p, key) - switch err { - case routing.ErrNotFound: - // in this case, they responded with nothing, - // still send a notification so listeners can know the - // request has completed 'successfully' - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.PeerResponse, - ID: p, - }) - return nil, err - case nil, internal.ErrInvalidRecord: - // in either of these cases, we want to keep going - default: + if err != nil { return nil, err } - // TODO: What should happen if the record is invalid? - // Pre-existing code counted it towards the quorum, but should it? - if rec != nil && rec.GetValue() != nil { - rv := recvdVal{ - Val: rec.GetValue(), - From: p, - } - - select { - case valCh <- rv: - case <-ctx.Done(): - return nil, ctx.Err() - } - } - // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.PeerResponse, @@ -335,7 +308,32 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st Responses: peers, }) - return peers, err + if rec == nil { + return peers, nil + } + + val := rec.GetValue() + if val == nil { + logger.Debug("received a nil record value") + return peers, nil + } + if err := dht.Validator.Validate(key, val); err != nil { + // make sure record is valid + logger.Debugw("received invalid record (discarded)", "error", err) + return peers, nil + } + + // the record is present and valid, send it out for processing + select { + case valCh <- recvdVal{ + Val: val, + From: p, + }: + case <-ctx.Done(): + return nil, ctx.Err() + } + + return peers, nil }, func() bool { select { From 394a1526b8fcdec9ff7d90eabd1ef73bd0bbf92d Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 11 Aug 2021 18:48:59 -0400 Subject: [PATCH 2/2] fullrt: delete GetValues function --- fullrt/dht.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index f618cf820..825eabfab 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -596,27 +596,6 @@ func (dht *FullRT) searchValueQuorum(ctx context.Context, key string, valCh <-ch }) } -// GetValues gets nvals values corresponding to the given key. -func (dht *FullRT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { - if !dht.enableValues { - return nil, routing.ErrNotSupported - } - - queryCtx, cancel := context.WithCancel(ctx) - defer cancel() - valCh, _ := dht.getValues(queryCtx, key, nil) - - out := make([]RecvdVal, 0, nvals) - for val := range valCh { - out = append(out, val) - if len(out) == nvals { - cancel() - } - } - - return out, ctx.Err() -} - func (dht *FullRT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { loop: