Skip to content

Commit

Permalink
Merge pull request #3565 from filecoin-project/feat/lotus-pcr-aggrega…
Browse files Browse the repository at this point in the history
…te-tipsets

lotus-pcr: add tipset aggregation
  • Loading branch information
Jakub Sztandera authored Sep 4, 2020
2 parents 52ce80f + d6691fe commit c573310
Showing 1 changed file with 38 additions and 11 deletions.
49 changes: 38 additions & 11 deletions cmd/lotus-pcr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ var runCmd = &cli.Command{
Name: "max-message-queue",
EnvVars: []string{"LOTUS_PCR_MAX_MESSAGE_QUEUE"},
Usage: "set the maximum number of messages that can be queue in the mpool",
Value: 3000,
Value: 300,
},
&cli.IntFlag{
Name: "aggregate-tipsets",
EnvVars: []string{"LOTUS_PCR_AGGREGATE_TIPSETS"},
Usage: "number of tipsets to process before sending messages",
Value: 1,
},
&cli.BoolFlag{
Name: "dry-run",
Expand Down Expand Up @@ -194,6 +200,7 @@ var runCmd = &cli.Command{
dryRun := cctx.Bool("dry-run")
preCommitEnabled := cctx.Bool("pre-commit")
proveCommitEnabled := cctx.Bool("prove-commit")
aggregateTipsets := cctx.Int("aggregate-tipsets")

rf := &refunder{
api: api,
Expand All @@ -204,16 +211,27 @@ var runCmd = &cli.Command{
proveCommitEnabled: proveCommitEnabled,
}

var refunds *MinersRefund = NewMinersRefund()
var rounds int = 0

for tipset := range tipsetsCh {
refunds, err := rf.ProcessTipset(ctx, tipset)
refunds, err = rf.ProcessTipset(ctx, tipset, refunds)
if err != nil {
return err
}

if err := rf.Refund(ctx, tipset, refunds); err != nil {
rounds = rounds + 1
if rounds < aggregateTipsets {
continue
}

if err := rf.Refund(ctx, tipset, refunds, rounds); err != nil {
return err
}

rounds = 0
refunds = NewMinersRefund()

if err := r.SetHeight(tipset.Height()); err != nil {
return err
}
Expand Down Expand Up @@ -247,13 +265,15 @@ var runCmd = &cli.Command{
}

type MinersRefund struct {
refunds map[address.Address]types.BigInt
count int
refunds map[address.Address]types.BigInt
count int
totalRefunds types.BigInt
}

func NewMinersRefund() *MinersRefund {
return &MinersRefund{
refunds: make(map[address.Address]types.BigInt),
refunds: make(map[address.Address]types.BigInt),
totalRefunds: types.NewInt(0),
}
}

Expand All @@ -263,6 +283,7 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) {
}

m.count = m.count + 1
m.totalRefunds = types.BigAdd(m.totalRefunds, value)

m.refunds[addr] = types.BigAdd(m.refunds[addr], value)
}
Expand All @@ -271,6 +292,10 @@ func (m *MinersRefund) Count() int {
return m.count
}

func (m *MinersRefund) TotalRefunds() types.BigInt {
return m.totalRefunds
}

func (m *MinersRefund) Miners() []address.Address {
miners := make([]address.Address, 0, len(m.refunds))
for addr := range m.refunds {
Expand Down Expand Up @@ -305,7 +330,7 @@ type refunder struct {
proveCommitEnabled bool
}

func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*MinersRefund, error) {
func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) {
cids := tipset.Cids()
if len(cids) == 0 {
log.Errorw("no cids in tipset", "height", tipset.Height(), "key", tipset.Key())
Expand All @@ -329,9 +354,8 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*Mi
return nil, nil
}

refunds := NewMinersRefund()

refundValue := types.NewInt(0)
tipsetRefunds := NewMinersRefund()
for i, msg := range msgs {
m := msg.Message

Expand Down Expand Up @@ -427,12 +451,15 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*Mi
log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue)

refunds.Track(m.From, refundValue)
tipsetRefunds.Track(m.From, refundValue)
}

log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "total_refunds", tipsetRefunds.TotalRefunds(), "messages_processed", tipsetRefunds.Count())

return refunds, nil
}

func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) error {
func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error {
if refunds.Count() == 0 {
log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil
Expand Down Expand Up @@ -490,7 +517,7 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi
refundSum = types.BigAdd(refundSum, msg.Value)
}

log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
return nil
}

Expand Down

0 comments on commit c573310

Please sign in to comment.