-
Notifications
You must be signed in to change notification settings - Fork 233
/
Copy pathcrawler.go
271 lines (232 loc) · 7.07 KB
/
crawler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package crawler
import (
"context"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
logging "github.com/ipfs/go-log/v2"
//lint:ignore SA1019 TODO migrate away from gogo pb
"github.com/libp2p/go-msgio/protoio"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)
var (
logger = logging.Logger("dht-crawler")
_ Crawler = (*DefaultCrawler)(nil)
)
type (
// Crawler connects to hosts in the DHT to track routing tables of peers.
Crawler interface {
// Run crawls the DHT starting from the startingPeers, and calls either handleSuccess or handleFail depending on whether a peer was successfully contacted or not.
Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail)
}
// DefaultCrawler provides a default implementation of Crawler.
DefaultCrawler struct {
parallelism int
connectTimeout time.Duration
queryTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
dialAddressExtendDur time.Duration
}
)
// NewDefaultCrawler creates a new DefaultCrawler
func NewDefaultCrawler(host host.Host, opts ...Option) (*DefaultCrawler, error) {
o := new(options)
if err := defaults(o); err != nil {
return nil, err
}
for _, opt := range opts {
if err := opt(o); err != nil {
return nil, err
}
}
pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout})
if err != nil {
return nil, err
}
return &DefaultCrawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
queryTimeout: 3 * o.connectTimeout,
host: host,
dhtRPC: pm,
dialAddressExtendDur: o.dialAddressExtendDur,
}, nil
}
// MessageSender handles sending wire protocol messages to a given peer
type messageSender struct {
h host.Host
protocols []protocol.ID
timeout time.Duration
}
// SendRequest sends a peer a message and waits for its response
func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
tctx, cancel := context.WithTimeout(ctx, ms.timeout)
defer cancel()
s, err := ms.h.NewStream(tctx, p, ms.protocols...)
if err != nil {
return nil, err
}
w := protoio.NewDelimitedWriter(s)
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}
r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
defer func() { _ = s.Close() }()
msg := new(pb.Message)
if err := ctxReadMsg(tctx, r, msg); err != nil {
_ = s.Reset()
return nil, err
}
return msg, nil
}
func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error {
errc := make(chan error, 1)
go func(r protoio.ReadCloser) {
defer close(errc)
err := r.ReadMsg(mes)
errc <- err
}(rc)
select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// SendMessage sends a peer a message without waiting on a response
func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
if err != nil {
return err
}
defer func() { _ = s.Close() }()
w := protoio.NewDelimitedWriter(s)
return w.WriteMsg(pmes)
}
// HandleQueryResult is a callback on successful peer query
type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)
// Run crawls dht peers from an initial seed of `startingPeers`
func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
results := make(chan *queryResult, 1)
// Start worker goroutines
var wg sync.WaitGroup
wg.Add(c.parallelism)
for i := 0; i < c.parallelism; i++ {
go func() {
defer wg.Done()
for p := range jobs {
qctx, cancel := context.WithTimeout(ctx, c.queryTimeout)
res := c.queryPeer(qctx, p)
cancel() // do not defer, cleanup after each job
results <- res
}
}()
}
defer wg.Wait()
defer close(jobs)
var toDial []*peer.AddrInfo
peersSeen := make(map[peer.ID]struct{})
numSkipped := 0
for _, ai := range startingPeers {
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, c.dialAddressExtendDur)
}
if len(extendAddrs) == 0 {
numSkipped++
continue
}
toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
}
if numSkipped > 0 {
logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial))
}
numQueried := 0
outstanding := 0
for len(toDial) > 0 || outstanding > 0 {
var jobCh chan peer.ID
var nextPeerID peer.ID
if len(toDial) > 0 {
jobCh = jobs
nextPeerID = toDial[0].ID
}
select {
case res := <-results:
if len(res.data) > 0 {
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, c.dialAddressExtendDur)
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
}
rtPeers = append(rtPeers, ai)
}
if handleSuccess != nil {
handleSuccess(res.peer, rtPeers)
}
} else if handleFail != nil {
handleFail(res.peer, res.err)
}
outstanding--
case jobCh <- nextPeerID:
outstanding++
numQueried++
toDial = toDial[1:]
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen))
}
}
}
type queryResult struct {
peer peer.ID
data map[peer.ID]*peer.AddrInfo
err error
}
func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil)
if err != nil {
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}
connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout)
defer cancel()
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
if err != nil {
logger.Debugf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}
localPeers := make(map[peer.ID]*peer.AddrInfo)
var retErr error
for cpl := 0; cpl <= 15; cpl++ {
generatePeer, err := tmpRT.GenRandPeerID(uint(cpl))
if err != nil {
panic(err)
}
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer)
if err != nil {
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err)
retErr = err
break
}
for _, ai := range peers {
if _, ok := localPeers[ai.ID]; !ok {
localPeers[ai.ID] = ai
}
}
}
if retErr != nil {
return &queryResult{nextPeer, nil, retErr}
}
return &queryResult{nextPeer, localPeers, retErr}
}