Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: auto migration badger/badger数据自动迁移 #210

Merged
merged 4 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dagstore/market_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMarket(t *testing.T) {
assert.Nil(t, err)
pmgr.AddMemPieceStorage(memPieceStorage)

r := models.NewInMemoryRepo()
r := models.NewInMemoryRepo(t)
err = r.StorageDealRepo().SaveDeal(ctx, &markettypes.MinerDeal{
ClientDealProposal: market.ClientDealProposal{
Proposal: market.DealProposal{
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/filecoin-project/go-cbor-util v0.0.1
github.com/filecoin-project/go-commp-utils v0.1.3
github.com/filecoin-project/go-data-transfer v1.15.2
github.com/filecoin-project/go-ds-versioning v0.1.1
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.23.1
Expand Down Expand Up @@ -93,6 +94,7 @@ require (
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f
gorm.io/driver/mysql v1.1.1
gorm.io/driver/sqlite v1.1.4
gorm.io/gorm v1.21.12
Expand Down Expand Up @@ -143,7 +145,6 @@ require (
github.com/filecoin-project/go-amt-ipld/v4 v4.0.0 // indirect
github.com/filecoin-project/go-bitfield v0.2.4 // indirect
github.com/filecoin-project/go-crypto v0.0.1 // indirect
github.com/filecoin-project/go-ds-versioning v0.1.1 // indirect
github.com/filecoin-project/go-hamt-ipld v0.1.5 // indirect
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 // indirect
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 // indirect
Expand Down Expand Up @@ -320,7 +321,6 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/api v0.56.0 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.45.0 // indirect
Expand Down
33 changes: 33 additions & 0 deletions models/badger/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"path"

"github.com/filecoin-project/venus-market/v2/models/badger/migrate"

"github.com/filecoin-project/venus-market/v2/blockstore"
"github.com/filecoin-project/venus-market/v2/config"
"github.com/filecoin-project/venus-market/v2/models/repo"
Expand Down Expand Up @@ -200,6 +202,11 @@ func NewBadgerRepo(params BadgerDSParams) repo.Repo {
}
}

func NewMigratedBadgerRepo(params BadgerDSParams) (repo.Repo, error) {
repo := NewBadgerRepo(params)
return repo, repo.Migrate()
}

func (r *BadgerRepo) FundRepo() repo.FundRepo {
return NewFundRepo(r.dsParams.FundDS)
}
Expand Down Expand Up @@ -238,6 +245,32 @@ func (r *BadgerRepo) Close() error {
}

func (r *BadgerRepo) Migrate() error {
ctx := context.TODO()

var migrateDss = map[string]datastore.Batching{
migrate.DsNameFundedAddrState: r.dsParams.FundDS,
migrate.DsNameStorageDeal: r.dsParams.StorageDealsDS,
migrate.DsNamePaychInfoDs: r.dsParams.PaychInfoDS,
migrate.DsNamePaychMsgDs: r.dsParams.PaychMsgDS,
migrate.DsNameStorageAskDs: r.dsParams.AskDS,
migrate.DsNameRetrievalAskDs: r.dsParams.RetrAskDs,
migrate.DsNameCidInfoDs: r.dsParams.CidInfoDs,
migrate.DsNameRetrievalDealsDs: r.dsParams.RetrievalDealsDs,
}
// the returned 'newDss' would be wrapped with current version namespace.
// so, must set all 'ds' back later.
newDss, err := migrate.Migrate(ctx, migrateDss)
if err != nil {
return err
}
r.dsParams.FundDS = newDss[migrate.DsNameFundedAddrState]
r.dsParams.StorageDealsDS = newDss[migrate.DsNameStorageDeal]
r.dsParams.PaychMsgDS = newDss[migrate.DsNamePaychMsgDs]
r.dsParams.PaychInfoDS = newDss[migrate.DsNamePaychInfoDs]
r.dsParams.AskDS = newDss[migrate.DsNameStorageAskDs]
r.dsParams.RetrAskDs = newDss[migrate.DsNameRetrievalAskDs]
r.dsParams.CidInfoDs = newDss[migrate.DsNameCidInfoDs]
r.dsParams.RetrievalDealsDs = newDss[migrate.DsNameRetrievalDealsDs]
zl03jsj marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
274 changes: 274 additions & 0 deletions models/badger/migrate/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package migrate

import (
"context"
"errors"
"fmt"
"time"

versioning "github.com/filecoin-project/go-ds-versioning/pkg"
"github.com/filecoin-project/go-ds-versioning/pkg/statestore"
"github.com/filecoin-project/go-ds-versioning/pkg/versioned"
"github.com/filecoin-project/go-fil-markets/piecestore"
v220 "github.com/filecoin-project/venus-market/v2/models/badger/migrate/v2.2.0"
"github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
)

const (
DsNameFundedAddrState = "FundedAddrStateDs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

下面6个命名是***Ds,开始两个没有

DsNameStorageDeal = "StorageDealDs"
DsNamePaychInfoDs = "PayChanInfoDs"
DsNamePaychMsgDs = "PayChanMsgDs"
DsNameStorageAskDs = "StorageAskDs"
DsNameRetrievalAskDs = "RetrievalAskDs"
DsNameCidInfoDs = "CidInfoDs"
DsNameRetrievalDealsDs = "RetrievalDealsDS"
)

var versioningKey = datastore.NewKey("/versions/current")

var log = logging.Logger("badger-migration")

type migrateFunc struct {
version versioning.VersionKey
mf versioning.MigrationFunc
}

type migrateFuncSchedule []migrateFunc

func (mfs migrateFuncSchedule) targetVersion() versioning.VersionKey {
return mfs[len(mfs)-1].version
}

func (mfs migrateFuncSchedule) subScheduleFrom(fromVersion string) migrateFuncSchedule {
if len(fromVersion) == 0 {
return mfs
}
var startPos int
for idx, mf := range mfs {
if string(mf.version) == fromVersion {
startPos = idx
break
}
}
if startPos == 0 {
return nil
}
return mfs[startPos:]
}

func timeStampNow() market.TimeStamp {
ts := time.Now().Unix()
return market.TimeStamp{CreatedAt: uint64(ts), UpdatedAt: uint64(ts)}
}

var migrateDs = map[string][]migrateFunc{
DsNameFundedAddrState: {
{version: "1", mf: func(old *v220.FundedAddressState) (*market.FundedAddressState, error) {
return &market.FundedAddressState{
Addr: old.Addr,
AmtReserved: old.AmtReserved,
MsgCid: old.MsgCid,
}, nil
}},
},
DsNameStorageDeal: {
{version: "1", mf: func(old *v220.MinerDeal) (*market.MinerDeal, error) {
return &market.MinerDeal{
ClientDealProposal: old.ClientDealProposal,
ProposalCid: old.ProposalCid,
AddFundsCid: old.AddFundsCid,
PublishCid: old.PublishCid,
Miner: old.Miner,
Client: old.Client,
State: old.State,
PiecePath: old.PiecePath,
PayloadSize: old.PayloadSize,
MetadataPath: old.MetadataPath,
SlashEpoch: old.SlashEpoch,
FastRetrieval: old.FastRetrieval,
Message: old.Message,
FundsReserved: old.FundsReserved,
Ref: old.Ref,
AvailableForRetrieval: old.AvailableForRetrieval,
DealID: old.DealID,
CreationTime: old.CreationTime,
TransferChannelID: old.TransferChannelID,
SectorNumber: old.SectorNumber,
Offset: old.Offset,
PieceStatus: market.PieceStatus(old.PieceStatus),
InboundCAR: old.InboundCAR,
TimeStamp: timeStampNow(),
}, nil
},
},
},
DsNamePaychInfoDs: {
{version: "1", mf: func(old *v220.ChannelInfo) (*market.ChannelInfo, error) {
info := &market.ChannelInfo{
ChannelID: old.ChannelID,
Channel: old.Channel,
Control: old.Control,
Target: old.Target,
Direction: old.Direction,
//Vouchers: old.Vouchers,
NextLane: old.NextLane,
Amount: old.Amount,
PendingAmount: old.PendingAmount,
CreateMsg: old.CreateMsg,
AddFundsMsg: old.AddFundsMsg,
Settling: old.Settling,
TimeStamp: timeStampNow(),
}
if len(old.Vouchers) == 0 {
return info, nil
}

info.Vouchers = make([]*market.VoucherInfo, len(old.Vouchers))
for idx, vch := range old.Vouchers {
info.Vouchers[idx] = &market.VoucherInfo{
Voucher: vch.Voucher,
Proof: vch.Proof,
Submitted: vch.Submitted,
}
}
return info, nil
},
},
},
DsNamePaychMsgDs: {
{version: "1", mf: func(old *v220.MsgInfo) (*market.MsgInfo, error) {
return &market.MsgInfo{
ChannelID: old.ChannelID,
MsgCid: old.MsgCid,
Received: old.Received,
Err: old.Err,
TimeStamp: timeStampNow(),
}, nil
}},
},
DsNameStorageAskDs: {
{version: "1", mf: func(old *v220.SignedStorageAsk) (*market.SignedStorageAsk, error) {
return &market.SignedStorageAsk{
Ask: old.Ask,
Signature: old.Signature,
TimeStamp: timeStampNow(),
}, nil
}},
},
DsNameRetrievalAskDs: {
{version: "1", mf: func(old *v220.RetrievalAsk) (*market.RetrievalAsk, error) {
return &market.RetrievalAsk{
Miner: old.Miner,
PricePerByte: old.PricePerByte,
UnsealPrice: old.UnsealPrice,
PaymentInterval: old.PaymentInterval,
PaymentIntervalIncrease: old.PaymentIntervalIncrease,
TimeStamp: timeStampNow()}, nil
}},
},
DsNameCidInfoDs: {
{version: "1", mf: func(old *v220.CIDInfo) (*piecestore.CIDInfo, error) {
return &piecestore.CIDInfo{
CID: old.CID,
PieceBlockLocations: old.PieceBlockLocations,
}, nil
}},
},
DsNameRetrievalDealsDs: {
{version: "1", mf: func(old *v220.ProviderDealState) (*market.ProviderDealState, error) {
return &market.ProviderDealState{
DealProposal: old.DealProposal,
StoreID: old.StoreID,
SelStorageProposalCid: old.SelStorageProposalCid,
ChannelID: old.ChannelID,
Status: old.Status,
Receiver: old.Receiver,
TotalSent: old.TotalSent,
FundsReceived: old.FundsReceived,
Message: old.Message,
CurrentInterval: old.CurrentInterval,
LegacyProtocol: old.LegacyProtocol,
TimeStamp: timeStampNow()}, nil
}},
},
}

func migrateOne(ctx context.Context, name string, mfs migrateFuncSchedule, ds datastore.Batching) (datastore.Batching, error) {
var oldVersion string
if v, err := ds.Get(ctx, versioningKey); err != nil {
if !errors.Is(err, datastore.ErrNotFound) {
return nil, err
}
} else {
oldVersion = string(v)
}
var targetVersion = mfs.targetVersion()
var dsWithOldVersion datastore.Batching
if len(oldVersion) == 0 {
dsWithOldVersion = ds
} else {
dsWithOldVersion = namespace.Wrap(ds, datastore.NewKey(oldVersion))
if oldVersion == string(targetVersion) {
log.Infof("doesn't need migration for %s, current version is:%s", name, oldVersion)
return dsWithOldVersion, nil
}
}
log.Infof("migrate: %s from %s to %s", name, oldVersion, string(targetVersion))
mfs = mfs.subScheduleFrom(oldVersion)
if len(mfs) == 0 {
return nil, fmt.Errorf("migrate:%s failed, can't find schedule from:%s", name, oldVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有可执行的migrate要返回错误吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是开发者需要保证对应的迁移必须存在, 所以, 这里正常情况是肯定存在的, 如果不存在,说明是代码写错了.

}
var migrationBuilders versioned.BuilderList = make([]versioned.Builder, len(mfs))
for idx, mf := range mfs {
migrationBuilders[idx] = versioned.NewVersionedBuilder(mf.mf, mf.version)
}
migrations, err := migrationBuilders.Build()
if err != nil {
return nil, err
}
_, doMigrate := statestore.NewVersionedStateStore(dsWithOldVersion, migrations, targetVersion)
if err := doMigrate(ctx); err != nil {
var rollbackErr error
// if error happens, just rollback the version number
if len(oldVersion) == 0 {
rollbackErr = ds.Delete(ctx, versioningKey)
} else {
rollbackErr = ds.Put(ctx, versioningKey, []byte(oldVersion))
}
// there are nothing we can do to get back the data.
if rollbackErr != nil {
log.Errorf("migrate: %s failed, rollback version failed:%v\n", name, rollbackErr)
}
return nil, err
}

return namespace.Wrap(ds, datastore.NewKey(string(targetVersion))), nil
}

func Migrate(ctx context.Context, dss map[string]datastore.Batching) (map[string]datastore.Batching, error) {
var err error
for name, ds := range dss {
mfs, exist := migrateDs[name]

zl03jsj marked this conversation as resolved.
Show resolved Hide resolved
if !exist {
dss[name] = ds
log.Warnf("no migration sechedules for : %s", name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sechedules 拼写错误

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是写错了, 应该是'schedule'

continue
}
var versionedDs datastore.Batching
versionedDs, err = migrateOne(ctx, name, mfs, ds)
// todo: version为空同时, 有同时存在两个版本的类型的可能性, 为了兼容, 这里暂时不返回错误.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“version为空同时” 读不通顺

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doge]原因是, 中间有几个提交, 类型已经升级了, 但是还没有引入自动迁移的功能.为了兼容, 所以这里暂时不返回错误, 等到下个版本之后, 这里就可以直接返回错误了.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是说这句话不通顺,什么是“为空同时”

Copy link
Contributor Author

@zl03jsj zl03jsj Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是说这句话不通顺,什么是“为空同时”

为空时, hand slipped..

// 后续的版本升级中如果出错, 应该直接返回.
if err != nil {
dss[name] = ds
log.Warnf("migrate:%s failed:%s", name, err.Error())
continue
}
dss[name] = versionedDs
}
return dss, nil
}
7 changes: 7 additions & 0 deletions models/badger/migrate/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package migrate

import "github.com/ipfs/go-datastore"

type DsKeyAble interface {
KeyWithNamespace() datastore.Key
}
Loading