-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
payment_result.go
299 lines (246 loc) · 8.3 KB
/
payment_result.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package htlcswitch
import (
"bytes"
"encoding/binary"
"errors"
"io"
"sync"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
)
var (
// networkResultStoreBucketKey is used for the root level bucket that
// stores the network result for each payment ID.
networkResultStoreBucketKey = []byte("network-result-store-bucket")
// ErrPaymentIDNotFound is an error returned if the given paymentID is
// not found.
ErrPaymentIDNotFound = errors.New("paymentID not found")
// ErrPaymentIDAlreadyExists is returned if we try to write a pending
// payment whose paymentID already exists.
ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
)
// PaymentResult wraps a decoded result received from the network after a
// payment attempt was made. This is what is eventually handed to the router
// for processing.
type PaymentResult struct {
// Preimage is set by the switch in case a sent HTLC was settled.
Preimage [32]byte
// Error is non-nil in case a HTLC send failed, and the HTLC is now
// irrevocably canceled. If the payment failed during forwarding, this
// error will be a *ForwardingError.
Error error
}
// networkResult is the raw result received from the network after a payment
// attempt has been made. Since the switch doesn't always have the necessary
// data to decode the raw message, we store it together with some meta data,
// and decode it when the router query for the final result.
type networkResult struct {
// msg is the received result. This should be of type UpdateFulfillHTLC
// or UpdateFailHTLC.
msg lnwire.Message
// unencrypted indicates whether the failure encoded in the message is
// unencrypted, and hence doesn't need to be decrypted.
unencrypted bool
// isResolution indicates whether this is a resolution message, in
// which the failure reason might not be included.
isResolution bool
}
// serializeNetworkResult serializes the networkResult.
func serializeNetworkResult(w io.Writer, n *networkResult) error {
return channeldb.WriteElements(w, n.msg, n.unencrypted, n.isResolution)
}
// deserializeNetworkResult deserializes the networkResult.
func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
n := &networkResult{}
if err := channeldb.ReadElements(r,
&n.msg, &n.unencrypted, &n.isResolution,
); err != nil {
return nil, err
}
return n, nil
}
// networkResultStore is a persistent store that stores any results of HTLCs in
// flight on the network. Since payment results are inherently asynchronous, it
// is used as a common access point for senders of HTLCs, to know when a result
// is back. The Switch will checkpoint any received result to the store, and
// the store will keep results and notify the callers about them.
type networkResultStore struct {
backend kvdb.Backend
// results is a map from paymentIDs to channels where subscribers to
// payment results will be notified.
results map[uint64][]chan *networkResult
resultsMtx sync.Mutex
// attemptIDMtx is a multimutex used to make sure the database and
// result subscribers map is consistent for each attempt ID in case of
// concurrent callers.
attemptIDMtx *multimutex.Mutex[uint64]
}
func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
return &networkResultStore{
backend: db,
results: make(map[uint64][]chan *networkResult),
attemptIDMtx: multimutex.NewMutex[uint64](),
}
}
// storeResult stores the networkResult for the given attemptID, and notifies
// any subscribers.
func (store *networkResultStore) storeResult(attemptID uint64,
result *networkResult) error {
// We get a mutex for this attempt ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.attemptIDMtx.Lock(attemptID)
defer store.attemptIDMtx.Unlock(attemptID)
log.Debugf("Storing result for attemptID=%v", attemptID)
// Serialize the payment result.
var b bytes.Buffer
if err := serializeNetworkResult(&b, result); err != nil {
return err
}
var attemptIDBytes [8]byte
binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)
err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
networkResults, err := tx.CreateTopLevelBucket(
networkResultStoreBucketKey,
)
if err != nil {
return err
}
return networkResults.Put(attemptIDBytes[:], b.Bytes())
})
if err != nil {
return err
}
// Now that the result is stored in the database, we can notify any
// active subscribers.
store.resultsMtx.Lock()
for _, res := range store.results[attemptID] {
res <- result
}
delete(store.results, attemptID)
store.resultsMtx.Unlock()
return nil
}
// subscribeResult is used to get the HTLC attempt result for the given attempt
// ID. It returns a channel on which the result will be delivered when ready.
func (store *networkResultStore) subscribeResult(attemptID uint64) (
<-chan *networkResult, error) {
// We get a mutex for this payment ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.attemptIDMtx.Lock(attemptID)
defer store.attemptIDMtx.Unlock(attemptID)
log.Debugf("Subscribing to result for attemptID=%v", attemptID)
var (
result *networkResult
resultChan = make(chan *networkResult, 1)
)
err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
var err error
result, err = fetchResult(tx, attemptID)
switch {
// Result not yet available, we will notify once a result is
// available.
case err == ErrPaymentIDNotFound:
return nil
case err != nil:
return err
// The result was found, and will be returned immediately.
default:
return nil
}
}, func() {
result = nil
})
if err != nil {
return nil, err
}
// If the result was found, we can send it on the result channel
// imemdiately.
if result != nil {
resultChan <- result
return resultChan, nil
}
// Otherwise we store the result channel for when the result is
// available.
store.resultsMtx.Lock()
store.results[attemptID] = append(
store.results[attemptID], resultChan,
)
store.resultsMtx.Unlock()
return resultChan, nil
}
// getResult attempts to immediately fetch the result for the given pid from
// the store. If no result is available, ErrPaymentIDNotFound is returned.
func (store *networkResultStore) getResult(pid uint64) (
*networkResult, error) {
var result *networkResult
err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
var err error
result, err = fetchResult(tx, pid)
return err
}, func() {
result = nil
})
if err != nil {
return nil, err
}
return result, nil
}
func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
var attemptIDBytes [8]byte
binary.BigEndian.PutUint64(attemptIDBytes[:], pid)
networkResults := tx.ReadBucket(networkResultStoreBucketKey)
if networkResults == nil {
return nil, ErrPaymentIDNotFound
}
// Check whether a result is already available.
resultBytes := networkResults.Get(attemptIDBytes[:])
if resultBytes == nil {
return nil, ErrPaymentIDNotFound
}
// Decode the result we found.
r := bytes.NewReader(resultBytes)
return deserializeNetworkResult(r)
}
// cleanStore removes all entries from the store, except the payment IDs given.
// NOTE: Since every result not listed in the keep map will be deleted, care
// should be taken to ensure no new payment attempts are being made
// concurrently while this process is ongoing, as its result might end up being
// deleted.
func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
return kvdb.Update(store.backend, func(tx kvdb.RwTx) error {
networkResults, err := tx.CreateTopLevelBucket(
networkResultStoreBucketKey,
)
if err != nil {
return err
}
// Iterate through the bucket, deleting all items not in the
// keep map.
var toClean [][]byte
if err := networkResults.ForEach(func(k, _ []byte) error {
pid := binary.BigEndian.Uint64(k)
if _, ok := keep[pid]; ok {
return nil
}
toClean = append(toClean, k)
return nil
}); err != nil {
return err
}
for _, k := range toClean {
err := networkResults.Delete(k)
if err != nil {
return err
}
}
if len(toClean) > 0 {
log.Infof("Removed %d stale entries from network "+
"result store", len(toClean))
}
return nil
}, func() {})
}