Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove context when update nonce #386

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions models/mysql/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr address.Address)
UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error
}

func (s mysqlAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error {
return s.DB.WithContext(ctx).Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted).
func (s mysqlAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error {
return s.DB.Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted).
UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error
}

Expand Down
3 changes: 1 addition & 2 deletions models/mysql/address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func testDelAddress(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) {
}

func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) {
ctx := context.Background()
addr := testutil.AddressProvider()(t)
nonce := uint64(10)

Expand All @@ -174,7 +173,7 @@ func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) {
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

err := r.AddressRepo().UpdateNonce(ctx, addr, nonce)
err := r.AddressRepo().UpdateNonce(addr, nonce)
assert.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion models/repo/address_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type AddressRepo interface {
ListAddress(ctx context.Context) ([]*types.Address, error)
ListActiveAddress(ctx context.Context) ([]*types.Address, error)
DelAddress(ctx context.Context, addr address.Address) error
UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error
UpdateNonce(addr address.Address, nonce uint64) error
UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error
UpdateSelectMsgNum(ctx context.Context, addr address.Address, num uint64) error
UpdateFeeParams(ctx context.Context, addr address.Address, gasOverEstimation, gasOverPremium float64, maxFee, gasFeeCap, baseFee big.Int) error
Expand Down
4 changes: 2 additions & 2 deletions models/sqlite/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (s sqliteAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addr
return result, nil
}

func (s sqliteAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error {
return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()).
func (s sqliteAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error {
return s.DB.Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()).
UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error
}

Expand Down
4 changes: 2 additions & 2 deletions models/sqlite/address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ func TestAddress(t *testing.T) {

t.Run("UpdateNonce", func(t *testing.T) {
nonce := uint64(5)
assert.NoError(t, addressRepo.UpdateNonce(ctx, addrInfo.Addr, nonce))
assert.NoError(t, addressRepo.UpdateNonce(addrInfo.Addr, nonce))
r, err := addressRepo.GetAddress(ctx, addrInfo.Addr)
assert.NoError(t, err)
assert.Equal(t, nonce, r.Nonce)

// set nonce for a not exist address
err = addressRepo.UpdateNonce(ctx, randAddr, nonce)
err = addressRepo.UpdateNonce(randAddr, nonce)
assert.NoError(t, err)
_, err = addressRepo.GetAddress(ctx, randAddr)
assert.Contains(t, err.Error(), gorm.ErrRecordNotFound.Error())
Expand Down
4 changes: 2 additions & 2 deletions service/address_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func (addressService *AddressService) SaveAddress(ctx context.Context, address *
return address.ID, err
}

func (addressService *AddressService) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error {
return addressService.repo.AddressRepo().UpdateNonce(ctx, addr, nonce)
func (addressService *AddressService) UpdateNonce(_ context.Context, addr address.Address, nonce uint64) error {
return addressService.repo.AddressRepo().UpdateNonce(addr, nonce)
}

func (addressService *AddressService) GetAddress(ctx context.Context, addr address.Address) (*types.Address, error) {
Expand Down
81 changes: 68 additions & 13 deletions service/message_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,19 @@ func (w *work) startSelectMessage(
w.log.Errorf("select message failed: %v", err)
return
}
w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg),
len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start))

recordMetric(ctx, w.addr, selectResult)
if len(selectResult.SelectMsg) != 0 || len(selectResult.ToPushMsg) != 0 || len(selectResult.ErrMsg) != 0 {
w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg),
len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start))

if err := w.saveSelectedMessages(ctx, selectResult); err != nil {
w.log.Errorf("failed to save selected messages to db %v", err)
return
recordMetric(ctx, w.addr, selectResult)

if len(selectResult.SelectMsg) > 0 || len(selectResult.ErrMsg) > 0 {
if err := w.saveSelectedMessages(selectResult); err != nil {
w.log.Errorf("failed to save selected messages to db %v", err)
return
}
}
}

for _, msg := range selectResult.SelectMsg {
Expand Down Expand Up @@ -351,7 +356,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
w.log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs)
addrInfo.Nonce = nonceInLatestTs
addrInfo.UpdatedAt = time.Now()
err := w.repo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce)
maxMsgNonce, err := w.getMaxMessageNonceFromDB(addrInfo.Addr)
if err == nil {
if maxMsgNonce > addrInfo.Nonce {
addrInfo.Nonce = maxMsgNonce + 1
w.log.Warnf("max message nonce in db %d", maxMsgNonce)
}
}
err = w.repo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce)
if err != nil {
return nil, fmt.Errorf("update nonce failed: %v", err)
}
Expand All @@ -362,14 +374,13 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
// calc the message needed
nonceGap := addrInfo.Nonce - nonceInLatestTs
if nonceGap >= maxAllowPendingMessage {
w.log.Errorf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap)
w.log.Warnf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap)
return &MsgSelectResult{
ToPushMsg: toPushMessage,
Address: addrInfo,
}, nil
}
wantCount := maxAllowPendingMessage - nonceGap
w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount)

// get unfill message
selectCount := mathutil.MinUint64(wantCount, 100)
Expand All @@ -379,12 +390,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
}

if len(messages) == 0 {
w.log.Infof("have no unfill message")
w.log.Debugf("have no unfill message")
return &MsgSelectResult{
ToPushMsg: toPushMessage,
Address: addrInfo,
}, nil
}
w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce,
nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount)

var errMsg []msgErrInfo
count := uint64(0)
Expand Down Expand Up @@ -477,13 +490,54 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce
nonceInLatestTs := actor.Nonce
// todo actor nonce maybe the latest ts. not need appliedNonce
if nonceInTs, ok := appliedNonce.Get(w.addr); ok {
w.log.Infof("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs)
w.log.Debugf("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs)
nonceInLatestTs = nonceInTs
}

return nonceInLatestTs, actor.Nonce, nil
}

func (w *work) getMaxMessageNonceFromDB(addr address.Address) (uint64, error) {
var maxNonce uint64
msgs, err := w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{
State: []types.MessageState{
types.FillMsg,
},
From: []address.Address{
addr,
},
Limit: 100,
})
if err == nil {
for _, msg := range msgs {
if maxNonce < msg.Nonce {
maxNonce = msg.Nonce
}
}
return maxNonce, nil
}

msgs, err = w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{
State: []types.MessageState{
types.OnChainMsg,
},
From: []address.Address{
addr,
},
Limit: 10,
})
if err != nil {
return 0, err
}
for _, msg := range msgs {
if maxNonce < msg.Nonce {
maxNonce = msg.Nonce
}
}

return maxNonce, nil
}

func (w *work) getFilledMessage(nonceInLatestTs uint64) []*venusTypes.SignedMessage {
filledMessage, err := w.repo.MessageRepo().ListFilledMessageByAddress(w.addr)
if err != nil {
Expand Down Expand Up @@ -580,7 +634,7 @@ func (w *work) signMessage(ctx context.Context, msg *types.Message, accounts []s
return sigI.(*crypto.Signature), nil
}

func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelectResult) error {
func (w *work) saveSelectedMessages(selectResult *MsgSelectResult) error {
startSaveDB := time.Now()
w.log.Infof("start save messages to database")
err := w.repo.Transaction(func(txRepo repo.TxRepo) error {
Expand All @@ -590,9 +644,10 @@ func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelect
}

addrInfo := selectResult.Address
if err := txRepo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce); err != nil {
if err := txRepo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce); err != nil {
return err
}
w.log.Infof("update nonce to %v", addrInfo.Nonce)
}

for _, m := range selectResult.ErrMsg {
Expand Down
2 changes: 1 addition & 1 deletion service/message_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func selectMsgWithAddress(ctx context.Context,
}
allSelectRes.ErrMsg = append(allSelectRes.ErrMsg, selectResult.ErrMsg...)

assert.NoError(t, work.saveSelectedMessages(ctx, selectResult))
assert.NoError(t, work.saveSelectedMessages(selectResult))
}

return allSelectRes
Expand Down
Loading