diff --git a/models/mysql/address.go b/models/mysql/address.go index 61f4b870..804d225b 100644 --- a/models/mysql/address.go +++ b/models/mysql/address.go @@ -170,8 +170,8 @@ func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr address.Address) UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error } -func (s mysqlAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return s.DB.WithContext(ctx).Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). +func (s mysqlAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error { + return s.DB.Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error } diff --git a/models/mysql/address_test.go b/models/mysql/address_test.go index e8d88958..18245be5 100644 --- a/models/mysql/address_test.go +++ b/models/mysql/address_test.go @@ -163,7 +163,6 @@ func testDelAddress(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { } func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { - ctx := context.Background() addr := testutil.AddressProvider()(t) nonce := uint64(10) @@ -174,7 +173,7 @@ func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - err := r.AddressRepo().UpdateNonce(ctx, addr, nonce) + err := r.AddressRepo().UpdateNonce(addr, nonce) assert.NoError(t, err) } diff --git a/models/repo/address_repo.go b/models/repo/address_repo.go index 8110b93a..5a2914c3 100644 --- a/models/repo/address_repo.go +++ b/models/repo/address_repo.go @@ -19,7 +19,7 @@ type AddressRepo interface { ListAddress(ctx context.Context) ([]*types.Address, error) ListActiveAddress(ctx context.Context) ([]*types.Address, error) DelAddress(ctx context.Context, addr address.Address) error - UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error + UpdateNonce(addr address.Address, nonce uint64) error UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error UpdateSelectMsgNum(ctx context.Context, addr address.Address, num uint64) error UpdateFeeParams(ctx context.Context, addr address.Address, gasOverEstimation, gasOverPremium float64, maxFee, gasFeeCap, baseFee big.Int) error diff --git a/models/sqlite/address.go b/models/sqlite/address.go index b05b8840..0867ea7d 100644 --- a/models/sqlite/address.go +++ b/models/sqlite/address.go @@ -164,8 +164,8 @@ func (s sqliteAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addr return result, nil } -func (s sqliteAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). +func (s sqliteAddressRepo) UpdateNonce(addr address.Address, nonce uint64) error { + return s.DB.Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error } diff --git a/models/sqlite/address_test.go b/models/sqlite/address_test.go index c59aacdd..84e84b35 100644 --- a/models/sqlite/address_test.go +++ b/models/sqlite/address_test.go @@ -116,13 +116,13 @@ func TestAddress(t *testing.T) { t.Run("UpdateNonce", func(t *testing.T) { nonce := uint64(5) - assert.NoError(t, addressRepo.UpdateNonce(ctx, addrInfo.Addr, nonce)) + assert.NoError(t, addressRepo.UpdateNonce(addrInfo.Addr, nonce)) r, err := addressRepo.GetAddress(ctx, addrInfo.Addr) assert.NoError(t, err) assert.Equal(t, nonce, r.Nonce) // set nonce for a not exist address - err = addressRepo.UpdateNonce(ctx, randAddr, nonce) + err = addressRepo.UpdateNonce(randAddr, nonce) assert.NoError(t, err) _, err = addressRepo.GetAddress(ctx, randAddr) assert.Contains(t, err.Error(), gorm.ErrRecordNotFound.Error()) diff --git a/service/address_service.go b/service/address_service.go index 911a7d12..be870a72 100644 --- a/service/address_service.go +++ b/service/address_service.go @@ -75,8 +75,8 @@ func (addressService *AddressService) SaveAddress(ctx context.Context, address * return address.ID, err } -func (addressService *AddressService) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return addressService.repo.AddressRepo().UpdateNonce(ctx, addr, nonce) +func (addressService *AddressService) UpdateNonce(_ context.Context, addr address.Address, nonce uint64) error { + return addressService.repo.AddressRepo().UpdateNonce(addr, nonce) } func (addressService *AddressService) GetAddress(ctx context.Context, addr address.Address) (*types.Address, error) { diff --git a/service/message_selector.go b/service/message_selector.go index 09e33d30..57099bf5 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -308,14 +308,19 @@ func (w *work) startSelectMessage( w.log.Errorf("select message failed: %v", err) return } - w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg), - len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start)) - recordMetric(ctx, w.addr, selectResult) + if len(selectResult.SelectMsg) != 0 || len(selectResult.ToPushMsg) != 0 || len(selectResult.ErrMsg) != 0 { + w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg), + len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start)) - if err := w.saveSelectedMessages(ctx, selectResult); err != nil { - w.log.Errorf("failed to save selected messages to db %v", err) - return + recordMetric(ctx, w.addr, selectResult) + + if len(selectResult.SelectMsg) > 0 || len(selectResult.ErrMsg) > 0 { + if err := w.saveSelectedMessages(selectResult); err != nil { + w.log.Errorf("failed to save selected messages to db %v", err) + return + } + } } for _, msg := range selectResult.SelectMsg { @@ -351,7 +356,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, w.log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs) addrInfo.Nonce = nonceInLatestTs addrInfo.UpdatedAt = time.Now() - err := w.repo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce) + maxMsgNonce, err := w.getMaxMessageNonceFromDB(addrInfo.Addr) + if err == nil { + if maxMsgNonce > addrInfo.Nonce { + addrInfo.Nonce = maxMsgNonce + 1 + w.log.Warnf("max message nonce in db %d", maxMsgNonce) + } + } + err = w.repo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce) if err != nil { return nil, fmt.Errorf("update nonce failed: %v", err) } @@ -362,14 +374,13 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, // calc the message needed nonceGap := addrInfo.Nonce - nonceInLatestTs if nonceGap >= maxAllowPendingMessage { - w.log.Errorf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap) + w.log.Warnf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap) return &MsgSelectResult{ ToPushMsg: toPushMessage, Address: addrInfo, }, nil } wantCount := maxAllowPendingMessage - nonceGap - w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount) // get unfill message selectCount := mathutil.MinUint64(wantCount, 100) @@ -379,12 +390,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, } if len(messages) == 0 { - w.log.Infof("have no unfill message") + w.log.Debugf("have no unfill message") return &MsgSelectResult{ ToPushMsg: toPushMessage, Address: addrInfo, }, nil } + w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, + nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount) var errMsg []msgErrInfo count := uint64(0) @@ -477,13 +490,54 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce nonceInLatestTs := actor.Nonce // todo actor nonce maybe the latest ts. not need appliedNonce if nonceInTs, ok := appliedNonce.Get(w.addr); ok { - w.log.Infof("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs) + w.log.Debugf("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs) nonceInLatestTs = nonceInTs } return nonceInLatestTs, actor.Nonce, nil } +func (w *work) getMaxMessageNonceFromDB(addr address.Address) (uint64, error) { + var maxNonce uint64 + msgs, err := w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{ + State: []types.MessageState{ + types.FillMsg, + }, + From: []address.Address{ + addr, + }, + Limit: 100, + }) + if err == nil { + for _, msg := range msgs { + if maxNonce < msg.Nonce { + maxNonce = msg.Nonce + } + } + return maxNonce, nil + } + + msgs, err = w.repo.MessageRepo().ListMessageByParams(&types.MsgQueryParams{ + State: []types.MessageState{ + types.OnChainMsg, + }, + From: []address.Address{ + addr, + }, + Limit: 10, + }) + if err != nil { + return 0, err + } + for _, msg := range msgs { + if maxNonce < msg.Nonce { + maxNonce = msg.Nonce + } + } + + return maxNonce, nil +} + func (w *work) getFilledMessage(nonceInLatestTs uint64) []*venusTypes.SignedMessage { filledMessage, err := w.repo.MessageRepo().ListFilledMessageByAddress(w.addr) if err != nil { @@ -580,7 +634,7 @@ func (w *work) signMessage(ctx context.Context, msg *types.Message, accounts []s return sigI.(*crypto.Signature), nil } -func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelectResult) error { +func (w *work) saveSelectedMessages(selectResult *MsgSelectResult) error { startSaveDB := time.Now() w.log.Infof("start save messages to database") err := w.repo.Transaction(func(txRepo repo.TxRepo) error { @@ -590,9 +644,10 @@ func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelect } addrInfo := selectResult.Address - if err := txRepo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce); err != nil { + if err := txRepo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce); err != nil { return err } + w.log.Infof("update nonce to %v", addrInfo.Nonce) } for _, m := range selectResult.ErrMsg { diff --git a/service/message_selector_test.go b/service/message_selector_test.go index e87dc6f9..e5544fa5 100644 --- a/service/message_selector_test.go +++ b/service/message_selector_test.go @@ -718,7 +718,7 @@ func selectMsgWithAddress(ctx context.Context, } allSelectRes.ErrMsg = append(allSelectRes.ErrMsg, selectResult.ErrMsg...) - assert.NoError(t, work.saveSelectedMessages(ctx, selectResult)) + assert.NoError(t, work.saveSelectedMessages(selectResult)) } return allSelectRes