-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathf3.go
250 lines (216 loc) · 7.23 KB
/
f3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package lf3
import (
"context"
"errors"
"path/filepath"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-f3"
"github.com/filecoin-project/go-f3/blssig"
"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
type F3API interface {
GetOrRenewParticipationTicket(ctx context.Context, minerID uint64, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error)
Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error)
GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error)
GetManifest(ctx context.Context) (*manifest.Manifest, error)
GetPowerTable(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error)
GetF3PowerTable(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error)
IsEnabled() bool
IsRunning() (bool, error)
Progress() (gpbft.Instant, error)
ListParticipants() ([]api.F3Participant, error)
}
type F3 struct {
inner *f3.F3
ec *ecWrapper
signer gpbft.Signer
leaser *leaser
}
var _ F3API = (*F3)(nil)
type F3Params struct {
fx.In
ManifestProvider manifest.ManifestProvider
PubSub *pubsub.PubSub
Host host.Host
ChainStore *store.ChainStore
Syncer *chain.Syncer
StateManager *stmgr.StateManager
Datastore dtypes.MetadataDS
Wallet api.Wallet
Config *Config
LockedRepo repo.LockedRepo
Net api.Net
}
var log = logging.Logger("f3")
func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) {
ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3"))
ec := &ecWrapper{
ChainStore: params.ChainStore,
StateManager: params.StateManager,
Syncer: params.Syncer,
}
verif := blssig.VerifierWithKeyOnG1()
f3FsPath := filepath.Join(params.LockedRepo.Path(), "f3")
module, err := f3.New(mctx, params.ManifestProvider, ds,
params.Host, params.PubSub, verif, ec, f3FsPath)
if err != nil {
return nil, xerrors.Errorf("creating F3: %w", err)
}
nodeId, err := params.Net.ID(mctx)
if err != nil {
return nil, xerrors.Errorf("getting node ID: %w", err)
}
// maxLeasableInstances is the maximum number of leased F3 instances this node
// would give out.
const maxLeasableInstances = 5
status := func() (*manifest.Manifest, gpbft.Instant) {
return module.Manifest(), module.Progress()
}
fff := &F3{
inner: module,
ec: ec,
signer: &signer{params.Wallet},
leaser: newParticipationLeaser(nodeId, status, maxLeasableInstances),
}
// Start F3
lc.Append(fx.Hook{
OnStart: fff.inner.Start,
OnStop: fff.inner.Stop,
})
// Start signing F3 messages.
lCtx, cancel := context.WithCancel(mctx)
doneCh := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go func() {
defer close(doneCh)
fff.runSigningLoop(lCtx)
}()
return nil
},
OnStop: func(context.Context) error {
cancel()
<-doneCh
return nil
},
})
return fff, nil
}
func (fff *F3) runSigningLoop(ctx context.Context) {
participateOnce := func(ctx context.Context, mb *gpbft.MessageBuilder, minerID uint64) error {
signatureBuilder, err := mb.PrepareSigningInputs(gpbft.ActorID(minerID))
if errors.Is(err, gpbft.ErrNoPower) {
// we don't have any power in F3, continue
log.Debug("no power to participate in F3: %+v", err)
return nil
}
if err != nil {
return xerrors.Errorf("preparing signing inputs: %+v", err)
}
// if worker keys were stored not in the node, the signatureBuilder can be send there
// the sign can be called where the keys are stored and then
// {signatureBuilder, payloadSig, vrfSig} can be sent back to lotus for broadcast
payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, fff.signer)
if err != nil {
return xerrors.Errorf("signing message: %+v", err)
}
log.Debugf("miner with id %d is sending message in F3", minerID)
fff.inner.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig)
return nil
}
msgCh := fff.inner.MessagesToSign()
var mb *gpbft.MessageBuilder
alreadyParticipated := make(map[uint64]struct{})
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case <-fff.leaser.notifyParticipation:
if mb == nil {
continue
}
case mb = <-msgCh: // never closed
clear(alreadyParticipated)
}
participants := fff.leaser.getParticipantsByInstance(mb.NetworkName, mb.Payload.Instance)
for _, id := range participants {
if _, ok := alreadyParticipated[id]; ok {
continue
} else if err := participateOnce(ctx, mb, id); err != nil {
log.Errorf("while participating for miner f0%d: %+v", id, err)
} else {
alreadyParticipated[id] = struct{}{}
}
}
}
}
func (fff *F3) GetOrRenewParticipationTicket(_ context.Context, minerID uint64, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) {
return fff.leaser.getOrRenewParticipationTicket(minerID, previous, instances)
}
func (fff *F3) Participate(_ context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) {
return fff.leaser.participate(ticket)
}
func (fff *F3) GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error) {
return fff.inner.GetCert(ctx, instance)
}
func (fff *F3) GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error) {
return fff.inner.GetLatestCert(ctx)
}
func (fff *F3) GetManifest(ctx context.Context) (*manifest.Manifest, error) {
m := fff.inner.Manifest()
if m.InitialPowerTable.Defined() {
return m, nil
}
cert0, err := fff.inner.GetCert(ctx, 0)
if err != nil {
return m, nil
}
var mCopy = *m
m = &mCopy
m.InitialPowerTable = cert0.ECChain.Base().PowerTable
return m, nil
}
func (fff *F3) GetPowerTable(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) {
return fff.ec.getPowerTableLotusTSK(ctx, tsk)
}
func (fff *F3) GetF3PowerTable(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) {
return fff.inner.GetPowerTable(ctx, tsk.Bytes())
}
func (fff *F3) IsEnabled() bool {
return true
}
func (fff *F3) IsRunning() (bool, error) {
return fff.inner.IsRunning(), nil
}
func (fff *F3) Progress() (gpbft.Instant, error) {
return fff.inner.Progress(), nil
}
func (fff *F3) ListParticipants() ([]api.F3Participant, error) {
leases := fff.leaser.getValidLeases()
participants := make([]api.F3Participant, len(leases))
for i, lease := range leases {
participants[i] = api.F3Participant{
MinerID: lease.MinerID,
FromInstance: lease.FromInstance,
ValidityTerm: lease.ValidityTerm,
}
}
return participants, nil
}