From 64602e729cd2e0726f21f105b5f3c63f9b25b603 Mon Sep 17 00:00:00 2001 From: zl Date: Fri, 14 Oct 2022 14:18:43 +0800 Subject: [PATCH 1/4] feat: badger auto migration --- dagstore/market_api_test.go | 2 +- models/badger/db.go | 30 + models/badger/migrate/migrate.go | 273 ++++ models/badger/migrate/v2.2.0/types.go | 1758 +++++++++++++++++++++++ models/badger/testing.go | 21 +- models/testhelpers.go | 7 +- retrievalprovider/piecestore_test.go | 10 +- storageprovider/storageprovider_test.go | 2 +- 8 files changed, 2090 insertions(+), 13 deletions(-) create mode 100644 models/badger/migrate/migrate.go create mode 100644 models/badger/migrate/v2.2.0/types.go diff --git a/dagstore/market_api_test.go b/dagstore/market_api_test.go index 1cd9215d..3e886b56 100644 --- a/dagstore/market_api_test.go +++ b/dagstore/market_api_test.go @@ -31,7 +31,7 @@ func TestMarket(t *testing.T) { assert.Nil(t, err) pmgr.AddMemPieceStorage(memPieceStorage) - r := models.NewInMemoryRepo() + r := models.NewInMemoryRepo(t) err = r.StorageDealRepo().SaveDeal(ctx, &markettypes.MinerDeal{ ClientDealProposal: market.ClientDealProposal{ Proposal: market.DealProposal{ diff --git a/models/badger/db.go b/models/badger/db.go index 8e219cd5..4a67766f 100644 --- a/models/badger/db.go +++ b/models/badger/db.go @@ -4,6 +4,8 @@ import ( "context" "path" + "github.com/filecoin-project/venus-market/v2/models/badger/migrate" + "github.com/filecoin-project/venus-market/v2/blockstore" "github.com/filecoin-project/venus-market/v2/config" "github.com/filecoin-project/venus-market/v2/models/repo" @@ -238,6 +240,34 @@ func (r *BadgerRepo) Close() error { } func (r *BadgerRepo) Migrate() error { + ctx := context.TODO() + + var migrateDss = map[string]datastore.Batching{ + migrate.DsNameFundedAddrState: r.dsParams.FundDS, + migrate.DsNameStorageDeal: r.dsParams.StorageDealsDS, + migrate.DsNamePaychInfoDs: r.dsParams.PaychInfoDS, + migrate.DsNamePaychMsgDs: r.dsParams.PaychMsgDS, + migrate.DsNameStorageAskDs: r.dsParams.AskDS, + migrate.DsNameRetrievalAskDs: r.dsParams.RetrAskDs, + migrate.DsNameCidInfoDs: r.dsParams.CidInfoDs, + migrate.DsNameRetrievalDealsDs: r.dsParams.RetrievalDealsDs, + } + + // the returned 'newDss' would be wrapped with current version namespace. + // so, must set all 'ds' back later. + newDss, err := migrate.Migrate(ctx, migrateDss) + if err != nil { + return err + } + + r.dsParams.FundDS = newDss[migrate.DsNameFundedAddrState] + r.dsParams.StorageDealsDS = newDss[migrate.DsNameStorageDeal] + r.dsParams.PaychMsgDS = newDss[migrate.DsNamePaychMsgDs] + r.dsParams.PaychInfoDS = newDss[migrate.DsNamePaychInfoDs] + r.dsParams.AskDS = newDss[migrate.DsNameStorageAskDs] + r.dsParams.RetrAskDs = newDss[migrate.DsNameRetrievalAskDs] + r.dsParams.CidInfoDs = newDss[migrate.DsNameCidInfoDs] + r.dsParams.RetrievalDealsDs = newDss[migrate.DsNameRetrievalDealsDs] return nil } diff --git a/models/badger/migrate/migrate.go b/models/badger/migrate/migrate.go new file mode 100644 index 00000000..e9150a2c --- /dev/null +++ b/models/badger/migrate/migrate.go @@ -0,0 +1,273 @@ +package migrate + +import ( + "context" + "errors" + "fmt" + "time" + + versioning "github.com/filecoin-project/go-ds-versioning/pkg" + "github.com/filecoin-project/go-ds-versioning/pkg/statestore" + "github.com/filecoin-project/go-ds-versioning/pkg/versioned" + "github.com/filecoin-project/go-fil-markets/piecestore" + v220 "github.com/filecoin-project/venus-market/v2/models/badger/migrate/v2.2.0" + "github.com/filecoin-project/venus/venus-shared/types/market" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + logging "github.com/ipfs/go-log/v2" +) + +const ( + DsNameFundedAddrState = "FundedAddrStateDs" + DsNameStorageDeal = "StorageDealDs" + DsNamePaychInfoDs = "PayChanInfoDs" + DsNamePaychMsgDs = "PayChanMsgDs" + DsNameStorageAskDs = "StorageAskDs" + DsNameRetrievalAskDs = "RetrievalAskDs" + DsNameCidInfoDs = "CidInfoDs" + DsNameRetrievalDealsDs = "RetrievalDealsDS" +) + +var versioningKey = datastore.NewKey("/versions/current") + +var log = logging.Logger("badger-migration") + +type migrateFunc struct { + version versioning.VersionKey + mf versioning.MigrationFunc +} + +type migrateFuncSchedule []migrateFunc + +func (mfs migrateFuncSchedule) targetVersion() versioning.VersionKey { + return mfs[len(mfs)-1].version +} + +func (mfs migrateFuncSchedule) subScheduleFrom(fromVersion string) migrateFuncSchedule { + if len(fromVersion) == 0 { + return mfs + } + var startPos int + for idx, mf := range mfs { + if string(mf.version) == fromVersion { + startPos = idx + break + } + } + if startPos == 0 { + return nil + } + return mfs[startPos:] +} + +func timeStampNow() market.TimeStamp { + ts := time.Now().Unix() + return market.TimeStamp{CreatedAt: uint64(ts), UpdatedAt: uint64(ts)} +} + +var migrateDs = map[string][]migrateFunc{ + DsNameFundedAddrState: { + {version: "1", mf: func(old *v220.FundedAddressState) (*market.FundedAddressState, error) { + return &market.FundedAddressState{ + Addr: old.Addr, + AmtReserved: old.AmtReserved, + MsgCid: old.MsgCid, + }, nil + }}, + }, + DsNameStorageDeal: { + {version: "1", mf: func(old *v220.MinerDeal) (*market.MinerDeal, error) { + return &market.MinerDeal{ + ClientDealProposal: old.ClientDealProposal, + ProposalCid: old.ProposalCid, + AddFundsCid: old.AddFundsCid, + PublishCid: old.PublishCid, + Miner: old.Miner, + Client: old.Client, + State: old.State, + PiecePath: old.PiecePath, + PayloadSize: old.PayloadSize, + MetadataPath: old.MetadataPath, + SlashEpoch: old.SlashEpoch, + FastRetrieval: old.FastRetrieval, + Message: old.Message, + FundsReserved: old.FundsReserved, + Ref: old.Ref, + AvailableForRetrieval: old.AvailableForRetrieval, + DealID: old.DealID, + CreationTime: old.CreationTime, + TransferChannelID: old.TransferChannelID, + SectorNumber: old.SectorNumber, + Offset: old.Offset, + PieceStatus: market.PieceStatus(old.PieceStatus), + InboundCAR: old.InboundCAR, + TimeStamp: timeStampNow(), + }, nil + }, + }, + }, + DsNamePaychInfoDs: { + {version: "1", mf: func(old *v220.ChannelInfo) (*market.ChannelInfo, error) { + info := &market.ChannelInfo{ + ChannelID: old.ChannelID, + Channel: old.Channel, + Control: old.Control, + Target: old.Target, + Direction: old.Direction, + //Vouchers: old.Vouchers, + NextLane: old.NextLane, + Amount: old.Amount, + PendingAmount: old.PendingAmount, + CreateMsg: old.CreateMsg, + AddFundsMsg: old.AddFundsMsg, + Settling: old.Settling, + TimeStamp: timeStampNow(), + } + if len(old.Vouchers) == 0 { + return info, nil + } + + info.Vouchers = make([]*market.VoucherInfo, len(old.Vouchers)) + for idx, vch := range old.Vouchers { + info.Vouchers[idx] = &market.VoucherInfo{ + Voucher: vch.Voucher, + Proof: vch.Proof, + Submitted: vch.Submitted, + } + } + return info, nil + }, + }, + }, + DsNamePaychMsgDs: { + {version: "1", mf: func(old *v220.MsgInfo) (*market.MsgInfo, error) { + return &market.MsgInfo{ + ChannelID: old.ChannelID, + MsgCid: old.MsgCid, + Received: old.Received, + Err: old.Err, + TimeStamp: timeStampNow(), + }, nil + }}, + }, + DsNameStorageAskDs: { + {version: "1", mf: func(old *v220.SignedStorageAsk) (*market.SignedStorageAsk, error) { + return &market.SignedStorageAsk{ + Ask: old.Ask, + Signature: old.Signature, + TimeStamp: timeStampNow(), + }, nil + }}, + }, + DsNameRetrievalAskDs: { + {version: "1", mf: func(old *v220.RetrievalAsk) (*market.RetrievalAsk, error) { + return &market.RetrievalAsk{ + Miner: old.Miner, + PricePerByte: old.PricePerByte, + UnsealPrice: old.UnsealPrice, + PaymentInterval: old.PaymentInterval, + PaymentIntervalIncrease: old.PaymentIntervalIncrease, + TimeStamp: timeStampNow()}, nil + }}, + }, + DsNameCidInfoDs: { + {version: "1", mf: func(old *v220.CIDInfo) (*piecestore.CIDInfo, error) { + return old, nil + }}, + }, + DsNameRetrievalDealsDs: { + {version: "1", mf: func(old *v220.ProviderDealState) (*market.ProviderDealState, error) { + return &market.ProviderDealState{ + DealProposal: old.DealProposal, + StoreID: old.StoreID, + SelStorageProposalCid: old.SelStorageProposalCid, + ChannelID: old.ChannelID, + Status: old.Status, + Receiver: old.Receiver, + TotalSent: old.TotalSent, + FundsReceived: old.FundsReceived, + Message: old.Message, + CurrentInterval: old.CurrentInterval, + LegacyProtocol: old.LegacyProtocol, + TimeStamp: timeStampNow()}, nil + }}, + }, +} + +func migrateOne(ctx context.Context, name string, mfs migrateFuncSchedule, ds datastore.Batching) (datastore.Batching, error) { + var oldVersion string + if v, err := ds.Get(ctx, versioningKey); err != nil { + if !errors.Is(err, datastore.ErrNotFound) { + return nil, err + } + } else { + oldVersion = string(v) + } + + var targetVersion = mfs.targetVersion() + + var dsWithOldVersion datastore.Batching + + if len(oldVersion) == 0 { + dsWithOldVersion = ds + } else { + if oldVersion == string(targetVersion) { + log.Infof("doesn't need migration for %s, current version is:%s", name, oldVersion) + return namespace.Wrap(ds, datastore.NewKey(oldVersion)), nil + } else { + dsWithOldVersion = namespace.Wrap(ds, datastore.NewKey(oldVersion)) + } + } + + log.Infof("migrate: %s from %s to %s", name, oldVersion, string(targetVersion)) + + mfs = mfs.subScheduleFrom(oldVersion) + + if len(mfs) == 0 { + return nil, fmt.Errorf("migrate:%s failed, can't find schedule from:%s", name, oldVersion) + } + + var migrationBuilders versioned.BuilderList = make([]versioned.Builder, len(mfs)) + + for idx, mf := range mfs { + migrationBuilders[idx] = versioned.NewVersionedBuilder(mf.mf, mf.version) + } + + migrations, err := migrationBuilders.Build() + if err != nil { + return nil, err + } + + _, doMigrate := statestore.NewVersionedStateStore(dsWithOldVersion, migrations, targetVersion) + if err := doMigrate(ctx); err != nil { + return nil, err + } + + return namespace.Wrap(ds, datastore.NewKey(string(targetVersion))), nil +} + +func Migrate(ctx context.Context, dss map[string]datastore.Batching) (map[string]datastore.Batching, error) { + var err error + for name, ds := range dss { + mfs, exist := migrateDs[name] + + if !exist { + return nil, fmt.Errorf("migration function for %s not found", name) + } + + var versionedDs datastore.Batching + + versionedDs, err = migrateOne(ctx, name, mfs, ds) + + // todo: version为空同时, 有同时存在两个版本的类型的可能性, 为了兼容, 这里暂时不返回错误. + // 后续的版本升级中如果出错, 应该直接返回. + if err != nil { + dss[name] = ds + + log.Warnf("migrate:%s failed:%w", name, err) + continue + } + dss[name] = versionedDs + } + return dss, nil +} diff --git a/models/badger/migrate/v2.2.0/types.go b/models/badger/migrate/v2.2.0/types.go new file mode 100644 index 00000000..8edc1344 --- /dev/null +++ b/models/badger/migrate/v2.2.0/types.go @@ -0,0 +1,1758 @@ +package v220 + +/* +所有的类型都来源于老版本(venus-shared/v1.6.0)的拷贝. 用于badger持久化的类型的自动化迁移. +*/ + +import ( + "fmt" + "io" + "math" + "sort" + + "github.com/filecoin-project/go-address" + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-fil-markets/filestore" + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/builtin/v8/market" + "github.com/filecoin-project/go-state-types/builtin/v8/paych" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/peer" + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" +) + +type SignedStorageAsk = storagemarket.SignedStorageAsk +type BlockLocation = piecestore.BlockLocation +type CIDInfo = piecestore.CIDInfo + +// FundedAddressState keeps track of the state of an address with funds in the +// datastore +type FundedAddressState struct { + Addr address.Address + // AmtReserved is the amount that must be kept in the address (cannot be + // withdrawn) + AmtReserved abi.TokenAmount + // MsgCid is the cid of an in-progress on-chain message + MsgCid *cid.Cid +} + +type PieceStatus string + +type MinerDeal struct { + market.ClientDealProposal + ProposalCid cid.Cid + AddFundsCid *cid.Cid + PublishCid *cid.Cid + Miner peer.ID + Client peer.ID + State storagemarket.StorageDealStatus + PiecePath filestore.Path + PayloadSize uint64 + MetadataPath filestore.Path + SlashEpoch abi.ChainEpoch + FastRetrieval bool + Message string + FundsReserved abi.TokenAmount + Ref *storagemarket.DataRef + AvailableForRetrieval bool + + DealID abi.DealID + CreationTime cbg.CborTime + + TransferChannelID *datatransfer.ChannelID `json:"TransferChannelId"` + SectorNumber abi.SectorNumber + + Offset abi.PaddedPieceSize + PieceStatus PieceStatus + + InboundCAR string +} + +// MsgInfo stores information about a create channel / add funds message +// that has been sent +type MsgInfo struct { + // ChannelID links the message to a channel + ChannelID string + // MsgCid is the CID of the message + MsgCid cid.Cid + // Received indicates whether a response has been received + Received bool + // Err is the error received in the response + Err string +} + +type VoucherInfo struct { + Voucher *paych.SignedVoucher + Proof []byte // ignored + Submitted bool +} + +// ChannelInfo keeps track of information about a channel +type ChannelInfo struct { + // ChannelID is a uuid set at channel creation + ChannelID string + // Channel address - may be nil if the channel hasn't been created yet + Channel *address.Address + // Control is the address of the local node + Control address.Address + // Target is the address of the remote node (on the other end of the channel) + Target address.Address + // Direction indicates if the channel is inbound (Control is the "to" address) + // or outbound (Control is the "from" address) + Direction uint64 + // Vouchers is a list of all vouchers sent on the channel + Vouchers []*VoucherInfo + // NextLane is the number of the next lane that should be used when the + // client requests a new lane (eg to create a voucher for a new deal) + NextLane uint64 + // Amount added to the channel. + // Note: This amount is only used by GetPaych to keep track of how much + // has locally been added to the channel. It should reflect the channel's + // Balance on chain as long as all operations occur on the same datastore. + Amount big.Int + // PendingAmount is the amount that we're awaiting confirmation of + PendingAmount big.Int + // CreateMsg is the CID of a pending create message (while waiting for confirmation) + CreateMsg *cid.Cid + // AddFundsMsg is the CID of a pending add funds message (while waiting for confirmation) + AddFundsMsg *cid.Cid + // Settling indicates whether the channel has entered into the settling state + Settling bool +} + +type RetrievalAsk struct { + Miner address.Address + PricePerByte abi.TokenAmount + UnsealPrice abi.TokenAmount + PaymentInterval uint64 + PaymentIntervalIncrease uint64 +} + +type ProviderDealState struct { + retrievalmarket.DealProposal + StoreID uint64 + SelStorageProposalCid cid.Cid + ChannelID *datatransfer.ChannelID + Status retrievalmarket.DealStatus + Receiver peer.ID + TotalSent uint64 + FundsReceived abi.TokenAmount + Message string + CurrentInterval uint64 + LegacyProtocol bool +} + +var _ = xerrors.Errorf +var _ = cid.Undef +var _ = math.E +var _ = sort.Sort + +var lengthBufFundedAddressState = []byte{131} + +func (t *FundedAddressState) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufFundedAddressState); err != nil { + return err + } + + // t.Addr (address.Address) (struct) + if err := t.Addr.MarshalCBOR(cw); err != nil { + return err + } + + // t.AmtReserved (big.Int) (struct) + if err := t.AmtReserved.MarshalCBOR(cw); err != nil { + return err + } + + // t.MsgCid (cid.Cid) (struct) + + if t.MsgCid == nil { + if _, err := cw.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(cw, *t.MsgCid); err != nil { + return xerrors.Errorf("failed to write cid field t.MsgCid: %w", err) + } + } + + return nil +} + +func (t *FundedAddressState) UnmarshalCBOR(r io.Reader) (err error) { + *t = FundedAddressState{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Addr (address.Address) (struct) + + { + + if err := t.Addr.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Addr: %w", err) + } + + } + // t.AmtReserved (big.Int) (struct) + + { + + if err := t.AmtReserved.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.AmtReserved: %w", err) + } + + } + // t.MsgCid (cid.Cid) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.MsgCid: %w", err) + } + + t.MsgCid = &c + } + + } + return nil +} + +var lengthBufMsgInfo = []byte{132} + +func (t *MsgInfo) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufMsgInfo); err != nil { + return err + } + + // t.ChannelID (string) (string) + if len(t.ChannelID) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.ChannelID was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.ChannelID))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.ChannelID)); err != nil { + return err + } + + // t.MsgCid (cid.Cid) (struct) + + if err := cbg.WriteCid(cw, t.MsgCid); err != nil { + return xerrors.Errorf("failed to write cid field t.MsgCid: %w", err) + } + + // t.Received (bool) (bool) + if err := cbg.WriteBool(w, t.Received); err != nil { + return err + } + + // t.Err (string) (string) + if len(t.Err) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Err was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Err))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Err)); err != nil { + return err + } + return nil +} + +func (t *MsgInfo) UnmarshalCBOR(r io.Reader) (err error) { + *t = MsgInfo{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.ChannelID (string) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.ChannelID = string(sval) + } + // t.MsgCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.MsgCid: %w", err) + } + + t.MsgCid = c + + } + // t.Received (bool) (bool) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.Received = false + case 21: + t.Received = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + // t.Err (string) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Err = string(sval) + } + return nil +} + +var lengthBufChannelInfo = []byte{140} + +func (t *ChannelInfo) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufChannelInfo); err != nil { + return err + } + + // t.ChannelID (string) (string) + if len(t.ChannelID) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.ChannelID was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.ChannelID))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.ChannelID)); err != nil { + return err + } + + // t.Channel (address.Address) (struct) + if err := t.Channel.MarshalCBOR(cw); err != nil { + return err + } + + // t.Control (address.Address) (struct) + if err := t.Control.MarshalCBOR(cw); err != nil { + return err + } + + // t.Target (address.Address) (struct) + if err := t.Target.MarshalCBOR(cw); err != nil { + return err + } + + // t.Direction (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Direction)); err != nil { + return err + } + + // t.Vouchers ([]*market.VoucherInfo) (slice) + if len(t.Vouchers) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Vouchers was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Vouchers))); err != nil { + return err + } + for _, v := range t.Vouchers { + if err := v.MarshalCBOR(cw); err != nil { + return err + } + } + + // t.NextLane (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.NextLane)); err != nil { + return err + } + + // t.Amount (big.Int) (struct) + if err := t.Amount.MarshalCBOR(cw); err != nil { + return err + } + + // t.PendingAmount (big.Int) (struct) + if err := t.PendingAmount.MarshalCBOR(cw); err != nil { + return err + } + + // t.CreateMsg (cid.Cid) (struct) + + if t.CreateMsg == nil { + if _, err := cw.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(cw, *t.CreateMsg); err != nil { + return xerrors.Errorf("failed to write cid field t.CreateMsg: %w", err) + } + } + + // t.AddFundsMsg (cid.Cid) (struct) + + if t.AddFundsMsg == nil { + if _, err := cw.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(cw, *t.AddFundsMsg); err != nil { + return xerrors.Errorf("failed to write cid field t.AddFundsMsg: %w", err) + } + } + + // t.Settling (bool) (bool) + if err := cbg.WriteBool(w, t.Settling); err != nil { + return err + } + return nil +} + +func (t *ChannelInfo) UnmarshalCBOR(r io.Reader) (err error) { + *t = ChannelInfo{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 12 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.ChannelID (string) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.ChannelID = string(sval) + } + // t.Channel (address.Address) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + t.Channel = new(address.Address) + if err := t.Channel.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Channel pointer: %w", err) + } + } + + } + // t.Control (address.Address) (struct) + + { + + if err := t.Control.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Control: %w", err) + } + + } + // t.Target (address.Address) (struct) + + { + + if err := t.Target.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Target: %w", err) + } + + } + // t.Direction (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Direction = uint64(extra) + + } + // t.Vouchers ([]*market.VoucherInfo) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Vouchers: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Vouchers = make([]*VoucherInfo, extra) + } + + for i := 0; i < int(extra); i++ { + + var v VoucherInfo + if err := v.UnmarshalCBOR(cr); err != nil { + return err + } + + t.Vouchers[i] = &v + } + + // t.NextLane (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.NextLane = uint64(extra) + + } + // t.Amount (big.Int) (struct) + + { + + if err := t.Amount.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Amount: %w", err) + } + + } + // t.PendingAmount (big.Int) (struct) + + { + + if err := t.PendingAmount.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.PendingAmount: %w", err) + } + + } + // t.CreateMsg (cid.Cid) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.CreateMsg: %w", err) + } + + t.CreateMsg = &c + } + + } + // t.AddFundsMsg (cid.Cid) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.AddFundsMsg: %w", err) + } + + t.AddFundsMsg = &c + } + + } + // t.Settling (bool) (bool) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.Settling = false + case 21: + t.Settling = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + return nil +} + +var lengthBufVoucherInfo = []byte{131} + +func (t *VoucherInfo) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufVoucherInfo); err != nil { + return err + } + + // t.Voucher (paych.SignedVoucher) (struct) + if err := t.Voucher.MarshalCBOR(cw); err != nil { + return err + } + + // t.Proof ([]uint8) (slice) + if len(t.Proof) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.Proof was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Proof))); err != nil { + return err + } + + if _, err := cw.Write(t.Proof[:]); err != nil { + return err + } + + // t.Submitted (bool) (bool) + if err := cbg.WriteBool(w, t.Submitted); err != nil { + return err + } + return nil +} + +func (t *VoucherInfo) UnmarshalCBOR(r io.Reader) (err error) { + *t = VoucherInfo{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Voucher (paych.SignedVoucher) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + t.Voucher = new(paych.SignedVoucher) + if err := t.Voucher.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Voucher pointer: %w", err) + } + } + + } + // t.Proof ([]uint8) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Proof: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Proof = make([]uint8, extra) + } + + if _, err := io.ReadFull(cr, t.Proof[:]); err != nil { + return err + } + // t.Submitted (bool) (bool) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.Submitted = false + case 21: + t.Submitted = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + return nil +} + +var lengthBufMinerDeal = []byte{151} + +func (t *MinerDeal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufMinerDeal); err != nil { + return err + } + + // t.ClientDealProposal (market.ClientDealProposal) (struct) + if err := t.ClientDealProposal.MarshalCBOR(cw); err != nil { + return err + } + + // t.ProposalCid (cid.Cid) (struct) + + if err := cbg.WriteCid(cw, t.ProposalCid); err != nil { + return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) + } + + // t.AddFundsCid (cid.Cid) (struct) + + if t.AddFundsCid == nil { + if _, err := cw.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(cw, *t.AddFundsCid); err != nil { + return xerrors.Errorf("failed to write cid field t.AddFundsCid: %w", err) + } + } + + // t.PublishCid (cid.Cid) (struct) + + if t.PublishCid == nil { + if _, err := cw.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(cw, *t.PublishCid); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishCid: %w", err) + } + } + + // t.Miner (peer.ID) (string) + if len(t.Miner) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Miner was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Miner))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Miner)); err != nil { + return err + } + + // t.Client (peer.ID) (string) + if len(t.Client) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Client was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Client))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Client)); err != nil { + return err + } + + // t.State (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.State)); err != nil { + return err + } + + // t.PiecePath (filestore.Path) (string) + if len(t.PiecePath) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.PiecePath was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.PiecePath))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.PiecePath)); err != nil { + return err + } + + // t.PayloadSize (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.PayloadSize)); err != nil { + return err + } + + // t.MetadataPath (filestore.Path) (string) + if len(t.MetadataPath) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.MetadataPath was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.MetadataPath))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.MetadataPath)); err != nil { + return err + } + + // t.SlashEpoch (abi.ChainEpoch) (int64) + if t.SlashEpoch >= 0 { + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.SlashEpoch)); err != nil { + return err + } + } else { + if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.SlashEpoch-1)); err != nil { + return err + } + } + + // t.FastRetrieval (bool) (bool) + if err := cbg.WriteBool(w, t.FastRetrieval); err != nil { + return err + } + + // t.Message (string) (string) + if len(t.Message) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Message was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Message))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Message)); err != nil { + return err + } + + // t.FundsReserved (big.Int) (struct) + if err := t.FundsReserved.MarshalCBOR(cw); err != nil { + return err + } + + // t.Ref (storagemarket.DataRef) (struct) + if err := t.Ref.MarshalCBOR(cw); err != nil { + return err + } + + // t.AvailableForRetrieval (bool) (bool) + if err := cbg.WriteBool(w, t.AvailableForRetrieval); err != nil { + return err + } + + // t.DealID (abi.DealID) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.DealID)); err != nil { + return err + } + + // t.CreationTime (typegen.CborTime) (struct) + if err := t.CreationTime.MarshalCBOR(cw); err != nil { + return err + } + + // t.TransferChannelID (datatransfer.ChannelID) (struct) + if err := t.TransferChannelID.MarshalCBOR(cw); err != nil { + return err + } + + // t.SectorNumber (abi.SectorNumber) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.SectorNumber)); err != nil { + return err + } + + // t.Offset (abi.PaddedPieceSize) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Offset)); err != nil { + return err + } + + // t.PieceStatus (market.PieceStatus) (string) + if len(t.PieceStatus) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.PieceStatus was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.PieceStatus))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.PieceStatus)); err != nil { + return err + } + + // t.InboundCAR (string) (string) + if len(t.InboundCAR) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.InboundCAR was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.InboundCAR))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.InboundCAR)); err != nil { + return err + } + return nil +} + +func (t *MinerDeal) UnmarshalCBOR(r io.Reader) (err error) { + *t = MinerDeal{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 23 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.ClientDealProposal (market.ClientDealProposal) (struct) + + { + + if err := t.ClientDealProposal.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.ClientDealProposal: %w", err) + } + + } + // t.ProposalCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err) + } + + t.ProposalCid = c + + } + // t.AddFundsCid (cid.Cid) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.AddFundsCid: %w", err) + } + + t.AddFundsCid = &c + } + + } + // t.PublishCid (cid.Cid) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishCid: %w", err) + } + + t.PublishCid = &c + } + + } + // t.Miner (peer.ID) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Miner = peer.ID(sval) + } + // t.Client (peer.ID) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Client = peer.ID(sval) + } + // t.State (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + + } + // t.PiecePath (filestore.Path) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.PiecePath = filestore.Path(sval) + } + // t.PayloadSize (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.PayloadSize = uint64(extra) + + } + // t.MetadataPath (filestore.Path) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.MetadataPath = filestore.Path(sval) + } + // t.SlashEpoch (abi.ChainEpoch) (int64) + { + maj, extra, err := cr.ReadHeader() + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.SlashEpoch = abi.ChainEpoch(extraI) + } + // t.FastRetrieval (bool) (bool) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.FastRetrieval = false + case 21: + t.FastRetrieval = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + // t.Message (string) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Message = string(sval) + } + // t.FundsReserved (big.Int) (struct) + + { + + if err := t.FundsReserved.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.FundsReserved: %w", err) + } + + } + // t.Ref (storagemarket.DataRef) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + t.Ref = new(storagemarket.DataRef) + if err := t.Ref.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Ref pointer: %w", err) + } + } + + } + // t.AvailableForRetrieval (bool) (bool) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.AvailableForRetrieval = false + case 21: + t.AvailableForRetrieval = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + // t.DealID (abi.DealID) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.DealID = abi.DealID(extra) + + } + // t.CreationTime (typegen.CborTime) (struct) + + { + + if err := t.CreationTime.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.CreationTime: %w", err) + } + + } + // t.TransferChannelID (datatransfer.ChannelID) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + t.TransferChannelID = new(datatransfer.ChannelID) + if err := t.TransferChannelID.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.TransferChannelID pointer: %w", err) + } + } + + } + // t.SectorNumber (abi.SectorNumber) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.SectorNumber = abi.SectorNumber(extra) + + } + // t.Offset (abi.PaddedPieceSize) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Offset = abi.PaddedPieceSize(extra) + + } + // t.PieceStatus (market.PieceStatus) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.PieceStatus = PieceStatus(sval) + } + // t.InboundCAR (string) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.InboundCAR = string(sval) + } + return nil +} + +var lengthBufRetrievalAsk = []byte{133} + +func (t *RetrievalAsk) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufRetrievalAsk); err != nil { + return err + } + + // t.Miner (address.Address) (struct) + if err := t.Miner.MarshalCBOR(cw); err != nil { + return err + } + + // t.PricePerByte (big.Int) (struct) + if err := t.PricePerByte.MarshalCBOR(cw); err != nil { + return err + } + + // t.UnsealPrice (big.Int) (struct) + if err := t.UnsealPrice.MarshalCBOR(cw); err != nil { + return err + } + + // t.PaymentInterval (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.PaymentInterval)); err != nil { + return err + } + + // t.PaymentIntervalIncrease (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.PaymentIntervalIncrease)); err != nil { + return err + } + + return nil +} + +func (t *RetrievalAsk) UnmarshalCBOR(r io.Reader) (err error) { + *t = RetrievalAsk{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 5 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Miner (address.Address) (struct) + + { + + if err := t.Miner.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.Miner: %w", err) + } + + } + // t.PricePerByte (big.Int) (struct) + + { + + if err := t.PricePerByte.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.PricePerByte: %w", err) + } + + } + // t.UnsealPrice (big.Int) (struct) + + { + + if err := t.UnsealPrice.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.UnsealPrice: %w", err) + } + + } + // t.PaymentInterval (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.PaymentInterval = uint64(extra) + + } + // t.PaymentIntervalIncrease (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.PaymentIntervalIncrease = uint64(extra) + + } + return nil +} + +var lengthBufProviderDealState = []byte{139} + +func (t *ProviderDealState) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufProviderDealState); err != nil { + return err + } + + // t.DealProposal (retrievalmarket.DealProposal) (struct) + if err := t.DealProposal.MarshalCBOR(cw); err != nil { + return err + } + + // t.StoreID (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.StoreID)); err != nil { + return err + } + + // t.SelStorageProposalCid (cid.Cid) (struct) + + if err := cbg.WriteCid(cw, t.SelStorageProposalCid); err != nil { + return xerrors.Errorf("failed to write cid field t.SelStorageProposalCid: %w", err) + } + + // t.ChannelID (datatransfer.ChannelID) (struct) + if err := t.ChannelID.MarshalCBOR(cw); err != nil { + return err + } + + // t.Status (retrievalmarket.DealStatus) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Status)); err != nil { + return err + } + + // t.Receiver (peer.ID) (string) + if len(t.Receiver) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Receiver was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Receiver))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Receiver)); err != nil { + return err + } + + // t.TotalSent (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.TotalSent)); err != nil { + return err + } + + // t.FundsReceived (big.Int) (struct) + if err := t.FundsReceived.MarshalCBOR(cw); err != nil { + return err + } + + // t.Message (string) (string) + if len(t.Message) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Message was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Message))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Message)); err != nil { + return err + } + + // t.CurrentInterval (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.CurrentInterval)); err != nil { + return err + } + + // t.LegacyProtocol (bool) (bool) + if err := cbg.WriteBool(w, t.LegacyProtocol); err != nil { + return err + } + return nil +} + +func (t *ProviderDealState) UnmarshalCBOR(r io.Reader) (err error) { + *t = ProviderDealState{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 11 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.DealProposal (retrievalmarket.DealProposal) (struct) + + { + + if err := t.DealProposal.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.DealProposal: %w", err) + } + + } + // t.StoreID (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.StoreID = uint64(extra) + + } + // t.SelStorageProposalCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.SelStorageProposalCid: %w", err) + } + + t.SelStorageProposalCid = c + + } + // t.ChannelID (datatransfer.ChannelID) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + t.ChannelID = new(datatransfer.ChannelID) + if err := t.ChannelID.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.ChannelID pointer: %w", err) + } + } + + } + // t.Status (retrievalmarket.DealStatus) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Status = retrievalmarket.DealStatus(extra) + + } + // t.Receiver (peer.ID) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Receiver = peer.ID(sval) + } + // t.TotalSent (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.TotalSent = uint64(extra) + + } + // t.FundsReceived (big.Int) (struct) + + { + + if err := t.FundsReceived.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.FundsReceived: %w", err) + } + + } + // t.Message (string) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Message = string(sval) + } + // t.CurrentInterval (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.CurrentInterval = uint64(extra) + + } + // t.LegacyProtocol (bool) (bool) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.LegacyProtocol = false + case 21: + t.LegacyProtocol = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + return nil +} diff --git a/models/badger/testing.go b/models/badger/testing.go index 44e7070a..2a9877b2 100644 --- a/models/badger/testing.go +++ b/models/badger/testing.go @@ -1,6 +1,7 @@ package badger import ( + "github.com/ipfs/go-datastore" "testing" "github.com/filecoin-project/venus-market/v2/models/repo" @@ -15,12 +16,14 @@ func setup(t *testing.T) repo.Repo { } func NewMemRepo() (repo.Repo, error) { - opts := &badger.DefaultOptions - opts.InMemory = true - db, err := badger.NewDatastore("", opts) + db, err := NewMemDb() if err != nil { return nil, err } + return WrapDbToRepo(db), nil +} + +func WrapDbToRepo(db datastore.Batching) repo.Repo { payChDs := NewPayChanDS(db) return NewBadgerRepo(BadgerDSParams{ FundDS: NewFundMgrDS(db), @@ -31,5 +34,15 @@ func NewMemRepo() (repo.Repo, error) { RetrAskDs: NewRetrievalAskDS(NewRetrievalProviderDS(db)), CidInfoDs: NewCidInfoDs(NewPieceMetaDs(db)), RetrievalDealsDs: NewRetrievalDealsDS(NewRetrievalProviderDS(db)), - }), nil + }) +} + +func NewMemDb() (datastore.Batching, error) { + opts := &badger.DefaultOptions + opts.InMemory = true + ds, err := badger.NewDatastore("", opts) + if err != nil { + return nil, err + } + return ds, nil } diff --git a/models/testhelpers.go b/models/testhelpers.go index dade5673..a06c1faa 100644 --- a/models/testhelpers.go +++ b/models/testhelpers.go @@ -3,10 +3,13 @@ package models import ( "github.com/filecoin-project/venus-market/v2/models/badger" "github.com/filecoin-project/venus-market/v2/models/repo" + "github.com/stretchr/testify/assert" + "testing" ) // NewInMemoryRepo makes a new instance of MemRepo -func NewInMemoryRepo() repo.Repo { - repo, _ := badger.NewMemRepo() +func NewInMemoryRepo(t *testing.T) repo.Repo { + repo, err := badger.NewMemRepo() + assert.NoError(t, err) return repo } diff --git a/retrievalprovider/piecestore_test.go b/retrievalprovider/piecestore_test.go index db0a734f..9258b7de 100644 --- a/retrievalprovider/piecestore_test.go +++ b/retrievalprovider/piecestore_test.go @@ -30,7 +30,7 @@ import ( func TestPieceInfo_GetPieceInfoByPieceCid(t *testing.T) { ctx := context.Background() - storageDealRepo := models.NewInMemoryRepo().StorageDealRepo() + storageDealRepo := models.NewInMemoryRepo(t).StorageDealRepo() dagStore := dagstore.NewMockDagStoreWrapper() pieceStore := PieceInfo{ dagstore: dagStore, @@ -46,9 +46,9 @@ func TestPieceInfo_GetPieceInfoByPieceCid(t *testing.T) { assert.Len(t, deals, 1) } -func TestPieceInfo_GetPieceInfoWithUnkownPieceCid(t *testing.T) { +func TestPieceInfo_GetPieceInfoWithUnknownPieceCid(t *testing.T) { ctx := context.Background() - storageDealRepo := models.NewInMemoryRepo().StorageDealRepo() + storageDealRepo := models.NewInMemoryRepo(t).StorageDealRepo() dagStore := dagstore.NewMockDagStoreWrapper() pieceStore := PieceInfo{ dagstore: dagStore, @@ -63,7 +63,7 @@ func TestPieceInfo_GetPieceInfoWithUnkownPieceCid(t *testing.T) { func TestPieceInfo_GetPieceInfoWithUnko(t *testing.T) { ctx := context.Background() - storageDealRepo := models.NewInMemoryRepo().StorageDealRepo() + storageDealRepo := models.NewInMemoryRepo(t).StorageDealRepo() dagStore := dagstore.NewMockDagStoreWrapper() pieceStore := PieceInfo{ dagstore: dagStore, @@ -84,7 +84,7 @@ func TestPieceInfo_GetPieceInfoWithUnko(t *testing.T) { func TestPieceInfo_DistinctDeals(t *testing.T) { ctx := context.Background() - storageDealRepo := models.NewInMemoryRepo().StorageDealRepo() + storageDealRepo := models.NewInMemoryRepo(t).StorageDealRepo() dagStore := dagstore.NewMockDagStoreWrapper() pieceStore := PieceInfo{ dagstore: dagStore, diff --git a/storageprovider/storageprovider_test.go b/storageprovider/storageprovider_test.go index 6504183b..2a825915 100644 --- a/storageprovider/storageprovider_test.go +++ b/storageprovider/storageprovider_test.go @@ -123,7 +123,7 @@ func setup(t *testing.T) StorageProvider { spn.addDeal(ctx, did, c) } - r := models.NewInMemoryRepo() + r := models.NewInMemoryRepo(t) ask, _ := NewStorageAsk(ctx, r, spn) h, err := network2.MockHost(ctx) if err != nil { From 07aba11f9f5fbe7869a087235e633274190bc9eb Mon Sep 17 00:00:00 2001 From: zl Date: Fri, 14 Oct 2022 18:01:50 +0800 Subject: [PATCH 2/4] feat: badger auto migration tests --- go.mod | 4 +- models/badger/db.go | 5 + models/badger/migrate/migrate.go | 18 ++-- models/badger/migrate/types.go | 7 ++ .../badger/migrate/v2.2.0/testing/testing.go | 102 ++++++++++++++++++ models/badger/migrate/v2.2.0/types.go | 100 ++++++++++++++--- models/badger/retrieval_ask.go | 2 +- models/badger/retrieval_deal.go | 4 +- models/badger/storage_ask.go | 2 +- models/badger/storage_deal.go | 26 ++--- models/badger/testing.go | 11 +- models/badger/utils.go | 38 ++++--- models/migrate_test.go | 77 +++++++++++++ models/module.go | 9 +- models/mysql/db.go | 38 +------ models/testhelpers.go | 3 +- 16 files changed, 342 insertions(+), 104 deletions(-) create mode 100644 models/badger/migrate/types.go create mode 100644 models/badger/migrate/v2.2.0/testing/testing.go create mode 100644 models/migrate_test.go diff --git a/go.mod b/go.mod index 0228b135..70d74d88 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.1 github.com/filecoin-project/go-commp-utils v0.1.3 github.com/filecoin-project/go-data-transfer v1.15.2 + github.com/filecoin-project/go-ds-versioning v0.1.1 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-markets v1.23.1 @@ -93,6 +94,7 @@ require ( go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f gorm.io/driver/mysql v1.1.1 gorm.io/driver/sqlite v1.1.4 gorm.io/gorm v1.21.12 @@ -143,7 +145,6 @@ require ( github.com/filecoin-project/go-amt-ipld/v4 v4.0.0 // indirect github.com/filecoin-project/go-bitfield v0.2.4 // indirect github.com/filecoin-project/go-crypto v0.0.1 // indirect - github.com/filecoin-project/go-ds-versioning v0.1.1 // indirect github.com/filecoin-project/go-hamt-ipld v0.1.5 // indirect github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 // indirect github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 // indirect @@ -320,7 +321,6 @@ require ( golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/tools v0.1.10 // indirect - golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/api v0.56.0 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect google.golang.org/grpc v1.45.0 // indirect diff --git a/models/badger/db.go b/models/badger/db.go index 4a67766f..520eb1ec 100644 --- a/models/badger/db.go +++ b/models/badger/db.go @@ -202,6 +202,11 @@ func NewBadgerRepo(params BadgerDSParams) repo.Repo { } } +func NewMigratedBadgerRepo(params BadgerDSParams) (repo.Repo, error) { + repo := NewBadgerRepo(params) + return repo, repo.Migrate() +} + func (r *BadgerRepo) FundRepo() repo.FundRepo { return NewFundRepo(r.dsParams.FundDS) } diff --git a/models/badger/migrate/migrate.go b/models/badger/migrate/migrate.go index e9150a2c..6c3c572f 100644 --- a/models/badger/migrate/migrate.go +++ b/models/badger/migrate/migrate.go @@ -172,7 +172,10 @@ var migrateDs = map[string][]migrateFunc{ }, DsNameCidInfoDs: { {version: "1", mf: func(old *v220.CIDInfo) (*piecestore.CIDInfo, error) { - return old, nil + return &piecestore.CIDInfo{ + CID: old.CID, + PieceBlockLocations: old.PieceBlockLocations, + }, nil }}, }, DsNameRetrievalDealsDs: { @@ -211,11 +214,11 @@ func migrateOne(ctx context.Context, name string, mfs migrateFuncSchedule, ds da if len(oldVersion) == 0 { dsWithOldVersion = ds } else { + dsWithOldVersion = namespace.Wrap(ds, datastore.NewKey(oldVersion)) + if oldVersion == string(targetVersion) { log.Infof("doesn't need migration for %s, current version is:%s", name, oldVersion) - return namespace.Wrap(ds, datastore.NewKey(oldVersion)), nil - } else { - dsWithOldVersion = namespace.Wrap(ds, datastore.NewKey(oldVersion)) + return dsWithOldVersion, nil } } @@ -252,7 +255,9 @@ func Migrate(ctx context.Context, dss map[string]datastore.Batching) (map[string mfs, exist := migrateDs[name] if !exist { - return nil, fmt.Errorf("migration function for %s not found", name) + dss[name] = ds + log.Warnf("no migration sechedules for : %s", name) + continue } var versionedDs datastore.Batching @@ -263,8 +268,7 @@ func Migrate(ctx context.Context, dss map[string]datastore.Batching) (map[string // 后续的版本升级中如果出错, 应该直接返回. if err != nil { dss[name] = ds - - log.Warnf("migrate:%s failed:%w", name, err) + log.Warnf("migrate:%s failed:%s", name, err.Error()) continue } dss[name] = versionedDs diff --git a/models/badger/migrate/types.go b/models/badger/migrate/types.go new file mode 100644 index 00000000..ff610081 --- /dev/null +++ b/models/badger/migrate/types.go @@ -0,0 +1,7 @@ +package migrate + +import "github.com/ipfs/go-datastore" + +type DsKeyAble interface { + KeyWithNamespace() datastore.Key +} diff --git a/models/badger/migrate/v2.2.0/testing/testing.go b/models/badger/migrate/v2.2.0/testing/testing.go new file mode 100644 index 00000000..b9839bf1 --- /dev/null +++ b/models/badger/migrate/v2.2.0/testing/testing.go @@ -0,0 +1,102 @@ +package testing + +import ( + "context" + "github.com/libp2p/go-libp2p-core/peer" + "testing" + + "github.com/filecoin-project/venus-market/v2/models/badger/migrate" + + "github.com/ipfs/go-cid" + + v220 "github.com/filecoin-project/venus-market/v2/models/badger/migrate/v2.2.0" + "github.com/filecoin-project/venus/venus-shared/testutil" + "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/assert" + cbg "github.com/whyrusleeping/cbor-gen" + + cbor "github.com/filecoin-project/go-cbor-util" +) + +func dsPutObj(ctx context.Context, t *testing.T, v migrate.DsKeyAble, ds datastore.Batching) { + data, err := cbor.Dump(v) + assert.NoError(t, err) + assert.NoError(t, ds.Put(ctx, v.KeyWithNamespace(), data)) +} + +func WriteTestcasesToDS(ctx context.Context, t *testing.T, ds datastore.Batching, count int) (payChMsgCIDs []cid.Cid) { + var peerIDProvider = func(t *testing.T) peer.ID { + peerId, err := peer.Decode("12D3KooWMjDC9AtFegcGJPJNvwV5fdiehTmx7awvUTXbktqboKbi") + assert.NoError(t, err) + return peerId + } + + { + for i := 0; i < count; i++ { + var stat v220.FundedAddressState + testutil.Provide(t, &stat) + dsPutObj(ctx, t, &stat, ds) + } + } + + { + for i := 0; i < count; i++ { + var deal v220.MinerDeal + testutil.Provide(t, &deal, peerIDProvider) + dsPutObj(ctx, t, &deal, ds) + } + } + + { + for i := 0; i < count; i++ { + var info v220.MsgInfo + testutil.Provide(t, &info) + dsPutObj(ctx, t, &info, ds) + payChMsgCIDs = append(payChMsgCIDs, info.MsgCid) + } + } + + { + for i := 0; i < count; i++ { + var info v220.ChannelInfo + testutil.Provide(t, &info) + dsPutObj(ctx, t, &info, ds) + } + } + + { + for i := 0; i < count; i++ { + var ask v220.SignedStorageAsk + testutil.Provide(t, &ask.Ask) + testutil.Provide(t, &ask.Signature) + dsPutObj(ctx, t, &ask, ds) + } + } + + { + for i := 0; i < count; i++ { + var ask v220.RetrievalAsk + testutil.Provide(t, &ask) + dsPutObj(ctx, t, &ask, ds) + } + } + + { + for i := 0; i < count; i++ { + var cidInfo v220.CIDInfo + testutil.Provide(t, &cidInfo.CID) + testutil.Provide(t, &cidInfo.PieceBlockLocations, testutil.WithSliceLen(2)) + dsPutObj(ctx, t, &cidInfo, ds) + } + } + + { + for i := 0; i < count; i++ { + var stat v220.ProviderDealState + testutil.Provide(t, &stat, peerIDProvider, + func(t *testing.T) *cbg.Deferred { return nil }) + dsPutObj(ctx, t, &stat, ds) + } + } + return +} diff --git a/models/badger/migrate/v2.2.0/types.go b/models/badger/migrate/v2.2.0/types.go index 8edc1344..d717ec02 100644 --- a/models/badger/migrate/v2.2.0/types.go +++ b/models/badger/migrate/v2.2.0/types.go @@ -1,7 +1,8 @@ +// nolint package v220 /* -所有的类型都来源于老版本(venus-shared/v1.6.0)的拷贝. 用于badger持久化的类型的自动化迁移. + 所有的类型都来源于老版本(venus-shared/v1.6.0)的拷贝. 用于badger持久化的类型的自动化迁移. */ import ( @@ -20,15 +21,35 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin/v8/market" "github.com/filecoin-project/go-state-types/builtin/v8/paych" + "github.com/filecoin-project/venus-market/v2/models/badger/statestore" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" ) -type SignedStorageAsk = storagemarket.SignedStorageAsk -type BlockLocation = piecestore.BlockLocation -type CIDInfo = piecestore.CIDInfo +type SignedStorageAsk struct { + storagemarket.SignedStorageAsk +} + +type BlockLocation piecestore.BlockLocation + +type CIDInfo struct { + piecestore.CIDInfo +} + +func (ask *SignedStorageAsk) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/storage/provider/storage-ask", + ask.Ask.Miner.String()}) +} + +func (cif *CIDInfo) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/storagemarket/cid-infos/", + cif.CID.String()}) +} // FundedAddressState keeps track of the state of an address with funds in the // datastore @@ -41,6 +62,13 @@ type FundedAddressState struct { MsgCid *cid.Cid } +func (t *FundedAddressState) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/fundmgr/Addr", + t.Addr.String(), + }) +} + type PieceStatus string type MinerDeal struct { @@ -73,6 +101,13 @@ type MinerDeal struct { InboundCAR string } +func (t *MinerDeal) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/storage/provider/deals", + statestore.ToKey(t.ProposalCid).String(), + }) +} + // MsgInfo stores information about a create channel / add funds message // that has been sent type MsgInfo struct { @@ -86,6 +121,13 @@ type MsgInfo struct { Err string } +func (t *MsgInfo) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/paych/MsgCid", + t.MsgCid.String(), + }) +} + type VoucherInfo struct { Voucher *paych.SignedVoucher Proof []byte // ignored @@ -125,6 +167,13 @@ type ChannelInfo struct { Settling bool } +func (t *ChannelInfo) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/paych/ChannelInfo", + t.ChannelID, + }) +} + type RetrievalAsk struct { Miner address.Address PricePerByte abi.TokenAmount @@ -133,6 +182,13 @@ type RetrievalAsk struct { PaymentIntervalIncrease uint64 } +func (t *RetrievalAsk) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/retrievals/provider/retrieval-ask", + t.Miner.String(), + }) +} + type ProviderDealState struct { retrievalmarket.DealProposal StoreID uint64 @@ -147,7 +203,19 @@ type ProviderDealState struct { LegacyProtocol bool } -var _ = xerrors.Errorf +// Identifier provides a unique id for this provider deal +func (t *ProviderDealState) Identifier() retrievalmarket.ProviderDealIdentifier { + return retrievalmarket.ProviderDealIdentifier{Receiver: t.Receiver, DealID: t.ID} +} + +func (t *ProviderDealState) KeyWithNamespace() datastore.Key { + return datastore.KeyWithNamespaces([]string{ + "/retrievals/provider/deals", + statestore.ToKey(t.Identifier()).String(), + }) +} + +var _ = xerrors.Errorf // nolint var _ = cid.Undef var _ = math.E var _ = sort.Sort @@ -184,7 +252,7 @@ func (t *FundedAddressState) MarshalCBOR(w io.Writer) error { } } else { if err := cbg.WriteCid(cw, *t.MsgCid); err != nil { - return xerrors.Errorf("failed to write cid field t.MsgCid: %w", err) + return fmt.Errorf("failed to write cid field t.MsgCid: %w", err) } } @@ -339,7 +407,7 @@ func (t *MsgInfo) UnmarshalCBOR(r io.Reader) (err error) { return err } - t.ChannelID = string(sval) + t.ChannelID = sval } // t.MsgCid (cid.Cid) (struct) @@ -426,7 +494,7 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error { // t.Direction (uint64) (uint64) - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Direction)); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, t.Direction); err != nil { return err } @@ -572,7 +640,7 @@ func (t *ChannelInfo) UnmarshalCBOR(r io.Reader) (err error) { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.Direction = uint64(extra) + t.Direction = extra } // t.Vouchers ([]*market.VoucherInfo) (slice) @@ -897,7 +965,7 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { // t.State (uint64) (uint64) - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.State)); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, t.State); err != nil { return err } @@ -955,7 +1023,7 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Message))); err != nil { return err } - if _, err := io.WriteString(w, string(t.Message)); err != nil { + if _, err := io.WriteString(w, t.Message); err != nil { return err } @@ -1572,13 +1640,13 @@ func (t *ProviderDealState) MarshalCBOR(w io.Writer) error { if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Message))); err != nil { return err } - if _, err := io.WriteString(w, string(t.Message)); err != nil { + if _, err := io.WriteString(w, t.Message); err != nil { return err } // t.CurrentInterval (uint64) (uint64) - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.CurrentInterval)); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, t.CurrentInterval); err != nil { return err } @@ -1701,7 +1769,7 @@ func (t *ProviderDealState) UnmarshalCBOR(r io.Reader) (err error) { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.TotalSent = uint64(extra) + t.TotalSent = extra } // t.FundsReceived (big.Int) (struct) @@ -1721,7 +1789,7 @@ func (t *ProviderDealState) UnmarshalCBOR(r io.Reader) (err error) { return err } - t.Message = string(sval) + t.Message = sval } // t.CurrentInterval (uint64) (uint64) @@ -1734,7 +1802,7 @@ func (t *ProviderDealState) UnmarshalCBOR(r io.Reader) (err error) { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.CurrentInterval = uint64(extra) + t.CurrentInterval = extra } // t.LegacyProtocol (bool) (bool) diff --git a/models/badger/retrieval_ask.go b/models/badger/retrieval_ask.go index f54a751a..41804c65 100644 --- a/models/badger/retrieval_ask.go +++ b/models/badger/retrieval_ask.go @@ -60,7 +60,7 @@ func (r *retrievalAskRepo) SetAsk(ctx context.Context, ask *types.RetrievalAsk) func (r *retrievalAskRepo) ListAsk(ctx context.Context) ([]*types.RetrievalAsk, error) { var results []*types.RetrievalAsk - err := travelDeals(ctx, r.ds, func(ask *types.RetrievalAsk) (bool, error) { + err := travelCborAbleDS(ctx, r.ds, func(ask *types.RetrievalAsk) (bool, error) { results = append(results, ask) return false, nil }) diff --git a/models/badger/retrieval_deal.go b/models/badger/retrieval_deal.go index 3f0795db..40bbc0b2 100644 --- a/models/badger/retrieval_deal.go +++ b/models/badger/retrieval_deal.go @@ -55,7 +55,7 @@ func (r retrievalDealRepo) GetDeal(ctx context.Context, id peer.ID, id2 retrieva func (r retrievalDealRepo) GetDealByTransferId(ctx context.Context, chid datatransfer.ChannelID) (*types.ProviderDealState, error) { var result *types.ProviderDealState - err := travelDeals(ctx, r.ds, func(deal *types.ProviderDealState) (stop bool, err error) { + err := travelCborAbleDS(ctx, r.ds, func(deal *types.ProviderDealState) (stop bool, err error) { if deal.ChannelID != nil && deal.ChannelID.Initiator == chid.Initiator && deal.ChannelID.Responder == chid.Responder && deal.ChannelID.ID == chid.ID { result = deal return true, nil @@ -114,7 +114,7 @@ func (r retrievalDealRepo) ListDeals(ctx context.Context, pageIndex, pageSize in func (r retrievalDealRepo) GroupRetrievalDealNumberByStatus(ctx context.Context, mAddr address.Address) (map[retrievalmarket.DealStatus]int64, error) { result := map[retrievalmarket.DealStatus]int64{} - return result, travelDeals(ctx, r.ds, func(deal *types.ProviderDealState) (stop bool, err error) { + return result, travelCborAbleDS(ctx, r.ds, func(deal *types.ProviderDealState) (stop bool, err error) { result[deal.Status]++ return false, nil }) diff --git a/models/badger/storage_ask.go b/models/badger/storage_ask.go index 1e36f121..f664475a 100644 --- a/models/badger/storage_ask.go +++ b/models/badger/storage_ask.go @@ -64,7 +64,7 @@ func (ar *storageAskRepo) SetAsk(ctx context.Context, ask *types.SignedStorageAs func (ar *storageAskRepo) ListAsk(ctx context.Context) ([]*types.SignedStorageAsk, error) { var results []*types.SignedStorageAsk - err := travelDeals(ctx, ar.ds, func(ask *types.SignedStorageAsk) (bool, error) { + err := travelCborAbleDS(ctx, ar.ds, func(ask *types.SignedStorageAsk) (bool, error) { results = append(results, ask) return false, nil }) diff --git a/models/badger/storage_deal.go b/models/badger/storage_deal.go index 83fea447..98b3b359 100644 --- a/models/badger/storage_deal.go +++ b/models/badger/storage_deal.go @@ -52,7 +52,7 @@ func (sdr *storageDealRepo) GetDeals(ctx context.Context, miner address.Address, var startIdx, idx = pageIndex * pageSize, 0 var storageDeals []*types.MinerDeal var err error - if err = travelDeals(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { + if err = travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { if deal.ClientDealProposal.Proposal.Provider != miner { return } @@ -84,7 +84,7 @@ func (sdr *storageDealRepo) GetDealsByPieceCidAndStatus(ctx context.Context, pie var storageDeals []*types.MinerDeal var err error - if err = travelDeals(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { + if err = travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { if deal.ClientDealProposal.Proposal.PieceCID != piececid { return } @@ -114,7 +114,7 @@ func (sdr *storageDealRepo) GetDealsByDataCidAndDealStatus(ctx context.Context, var storageDeals []*types.MinerDeal var err error - if err = travelDeals(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { + if err = travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { if mAddr != address.Undef && deal.ClientDealProposal.Proposal.Provider != mAddr { return } @@ -147,7 +147,7 @@ func (sdr *storageDealRepo) GetDealByAddrAndStatus(ctx context.Context, addr add var storageDeals []*types.MinerDeal var err error - if err = travelDeals(ctx, sdr.ds, + if err = travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { if addr == address.Undef || deal.ClientDealProposal.Proposal.Provider == addr { if _, ok := filter[deal.State]; !ok { @@ -189,7 +189,7 @@ func (sdr *storageDealRepo) UpdateDealStatus(ctx context.Context, proposalCid ci func (sdr *storageDealRepo) ListDealByAddr(ctx context.Context, miner address.Address) ([]*types.MinerDeal, error) { storageDeals := make([]*types.MinerDeal, 0) - if err := travelDeals(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { + if err := travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (stop bool, err error) { if deal.ClientDealProposal.Proposal.Provider == miner { storageDeals = append(storageDeals, deal) } @@ -203,7 +203,7 @@ func (sdr *storageDealRepo) ListDealByAddr(ctx context.Context, miner address.Ad func (sdr *storageDealRepo) ListDeal(ctx context.Context) ([]*types.MinerDeal, error) { storageDeals := make([]*types.MinerDeal, 0) - if err := travelDeals(ctx, sdr.ds, func(deal *types.MinerDeal) (bool, error) { + if err := travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (bool, error) { storageDeals = append(storageDeals, deal) return false, nil }); err != nil { @@ -218,7 +218,7 @@ func (sdr *storageDealRepo) GetPieceInfo(ctx context.Context, pieceCID cid.Cid) Deals: nil, } var err error - if err = travelDeals(ctx, sdr.ds, func(deal *types.MinerDeal) (bool, error) { + if err = travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (bool, error) { if deal.ClientDealProposal.Proposal.PieceCID.Equals(pieceCID) { pieceInfo.Deals = append(pieceInfo.Deals, piecestore.DealInfo{ DealID: deal.DealID, @@ -241,7 +241,7 @@ func (sdr *storageDealRepo) GetPieceInfo(ctx context.Context, pieceCID cid.Cid) func (sdr *storageDealRepo) ListPieceInfoKeys(ctx context.Context) ([]cid.Cid, error) { var cidsMap = make(map[cid.Cid]interface{}) - err := travelDeals(ctx, sdr.ds, + err := travelCborAbleDS(ctx, sdr.ds, func(deal *types.MinerDeal) (bool, error) { cidsMap[deal.ClientDealProposal.Proposal.PieceCID] = nil return false, nil @@ -262,7 +262,7 @@ func (sdr *storageDealRepo) ListPieceInfoKeys(ctx context.Context) ([]cid.Cid, e func (sdr *storageDealRepo) GetDealByDealID(ctx context.Context, mAddr address.Address, dealID abi.DealID) (*types.MinerDeal, error) { var deal *types.MinerDeal var err error - if err = travelDeals(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { + if err = travelCborAbleDS(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { if stop = inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.DealID == dealID; stop { deal = inDeal } @@ -279,7 +279,7 @@ func (sdr *storageDealRepo) GetDealByDealID(ctx context.Context, mAddr address.A func (sdr *storageDealRepo) GetDealsByPieceStatus(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus) ([]*types.MinerDeal, error) { var deals []*types.MinerDeal - return deals, travelDeals(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { + return deals, travelCborAbleDS(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { if inDeal.PieceStatus != pieceStatus { return } @@ -299,7 +299,7 @@ func (sdr *storageDealRepo) GetDealsByPieceStatusAndDealStatus(ctx context.Conte dict[status] = struct{}{} } - return deals, travelDeals(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { + return deals, travelCborAbleDS(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { if inDeal.PieceStatus != pieceStatus { return } @@ -318,7 +318,7 @@ func (sdr *storageDealRepo) GetDealsByPieceStatusAndDealStatus(ctx context.Conte func (sdr *storageDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) { var deal *types.MinerDeal - err := travelDeals(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { + err := travelCborAbleDS(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { if inDeal.ClientDealProposal.Proposal.PieceCID == pieceCID { deal = inDeal return true, nil @@ -336,7 +336,7 @@ func (sdr *storageDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) func (sdr *storageDealRepo) GroupStorageDealNumberByStatus(ctx context.Context, mAddr address.Address) (map[storagemarket.StorageDealStatus]int64, error) { result := map[storagemarket.StorageDealStatus]int64{} - return result, travelDeals(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { + return result, travelCborAbleDS(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { if mAddr != address.Undef && mAddr != inDeal.Proposal.Provider { return false, nil } diff --git a/models/badger/testing.go b/models/badger/testing.go index 2a9877b2..f3ad1123 100644 --- a/models/badger/testing.go +++ b/models/badger/testing.go @@ -1,9 +1,10 @@ package badger import ( - "github.com/ipfs/go-datastore" "testing" + "github.com/ipfs/go-datastore" + "github.com/filecoin-project/venus-market/v2/models/repo" badger "github.com/ipfs/go-ds-badger2" "github.com/stretchr/testify/assert" @@ -16,7 +17,7 @@ func setup(t *testing.T) repo.Repo { } func NewMemRepo() (repo.Repo, error) { - db, err := NewMemDb() + db, err := NewDatastore("") if err != nil { return nil, err } @@ -37,10 +38,10 @@ func WrapDbToRepo(db datastore.Batching) repo.Repo { }) } -func NewMemDb() (datastore.Batching, error) { +func NewDatastore(dir string) (datastore.Batching, error) { opts := &badger.DefaultOptions - opts.InMemory = true - ds, err := badger.NewDatastore("", opts) + opts.InMemory = len(dir) == 0 + ds, err := badger.NewDatastore(dir, opts) if err != nil { return nil, err } diff --git a/models/badger/utils.go b/models/badger/utils.go index 1079de86..ad0304e5 100644 --- a/models/badger/utils.go +++ b/models/badger/utils.go @@ -38,38 +38,50 @@ func checkCallbackAndGetParamType(i interface{}) (reflect.Type, error) { return in.Elem(), nil } -func travelDeals(ctx context.Context, ds datastore.Batching, callback interface{}) error { +func travelCborAbleDS(ctx context.Context, ds datastore.Batching, callback interface{}) error { instanceType, err := checkCallbackAndGetParamType(callback) if err != nil { return err } + return TravelBatching(ctx, ds, func(k string, v []byte) (bool, error) { + i := reflect.New(instanceType).Interface() + unmarshaler := i.(cbg.CBORUnmarshaler) + if err = cborrpc.ReadCborRPC(bytes.NewReader(v), unmarshaler); err != nil { + return true, err + } + rets := reflect.ValueOf(callback).Call([]reflect.Value{ + reflect.ValueOf(unmarshaler)}) + + if rets[0].Interface().(bool) { + return true, nil + } + + if !rets[1].IsNil() { + return true, rets[0].Interface().(error) + } + return false, nil + }) +} + +func TravelBatching(ctx context.Context, ds datastore.Batching, callback func(k string, v []byte) (bool, error)) error { result, err := ds.Query(ctx, query.Query{}) if err != nil { return err } defer result.Close() //nolint:errcheck - for res := range result.Next() { if res.Error != nil { return err } - i := reflect.New(instanceType).Interface() - unmarshaler := i.(cbg.CBORUnmarshaler) - if err = cborrpc.ReadCborRPC(bytes.NewReader(res.Value), unmarshaler); err != nil { + stop, err := callback(res.Key, res.Value) + if err != nil { return err } - rets := reflect.ValueOf(callback).Call([]reflect.Value{ - reflect.ValueOf(unmarshaler)}) - - if rets[0].Interface().(bool) { + if stop { return nil } - - if !rets[1].IsNil() { - return rets[0].Interface().(error) - } } return nil } diff --git a/models/migrate_test.go b/models/migrate_test.go new file mode 100644 index 00000000..dd9453c5 --- /dev/null +++ b/models/migrate_test.go @@ -0,0 +1,77 @@ +package models + +import ( + "context" + "testing" + + "github.com/filecoin-project/venus-market/v2/models/badger" + t220 "github.com/filecoin-project/venus-market/v2/models/badger/migrate/v2.2.0/testing" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + + "github.com/stretchr/testify/assert" +) + +func TestBadgerMigrate(t *testing.T) { + var ds datastore.Batching + var err error + var count = 3 + + var paychMsgCIDs []cid.Cid + + ctx := context.Background() + + ds, err = badger.NewDatastore("") + assert.NoError(t, err) + + paychMsgCIDs = t220.WriteTestcasesToDS(ctx, t, ds, count) + + repo := badger.WrapDbToRepo(ds) + + assert.NoError(t, repo.Migrate()) + + { + res, err := repo.StorageDealRepo().ListDeal(ctx) + assert.NoError(t, err) + assert.Equal(t, len(res), count) + } + + { + for _, cid := range paychMsgCIDs { + res, err := repo.PaychMsgInfoRepo().GetMessage(ctx, cid) + assert.NoError(t, err) + assert.NotNil(t, res) + + } + } + + { + res, err := repo.PaychChannelInfoRepo().ListChannel(ctx) + assert.NoError(t, err) + assert.Equal(t, len(res), count) + } + + { + res, err := repo.StorageAskRepo().ListAsk(ctx) + assert.NoError(t, err) + assert.Equal(t, len(res), count) + } + { + res, err := repo.RetrievalAskRepo().ListAsk(ctx) + assert.NoError(t, err) + assert.Equal(t, len(res), count) + } + + { + res, err := repo.RetrievalDealRepo().ListDeals(ctx, 1, 10) + assert.NoError(t, err) + assert.Equal(t, len(res), count) + + } + + { + res, err := repo.CidInfoRepo().ListCidInfoKeys(ctx) + assert.NoError(t, err) + assert.Equal(t, len(res), count) + } +} diff --git a/models/module.go b/models/module.go index 3e9fcbbc..b512a2ff 100644 --- a/models/module.go +++ b/models/module.go @@ -9,13 +9,10 @@ import ( "github.com/ipfs-force-community/venus-common-utils/builder" ) -var invokeDataMigrate = builder.NextInvoke() - // TODO: 这里没有考虑client和server的数据表是不一样的 var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option { return builder.Options( builder.Override(new(badger2.MetadataDS), badger2.NewMetadataDS), - builder.Override(invokeDataMigrate, func(r repo.Repo) error { return r.Migrate() }), builder.ApplyIfElse(func(s *builder.Settings) bool { return server }, builder.Options( @@ -45,8 +42,7 @@ var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option { builder.Override(new(badger2.PayChanMsgDs), badger2.NewPayChanMsgDs), builder.Override(new(badger2.FundMgrDS), badger2.NewFundMgrDS), builder.Override(new(badger2.RetrievalDealsDS), badger2.NewRetrievalDealsDS), - - builder.Override(new(repo.Repo), badger2.NewBadgerRepo), + builder.Override(new(repo.Repo), badger2.NewMigratedBadgerRepo), ), ), ), @@ -62,8 +58,7 @@ var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option { builder.Override(new(badger2.RetrievalClientDS), badger2.NewRetrievalClientDS), builder.Override(new(badger2.ImportClientDS), badger2.NewImportClientDS), builder.Override(new(badger2.ClientTransferDS), badger2.NewClientTransferDS), - - builder.Override(new(repo.Repo), badger2.NewBadgerRepo), + builder.Override(new(repo.Repo), badger2.NewMigratedBadgerRepo), ), ), ) diff --git a/models/mysql/db.go b/models/mysql/db.go index 00d005b1..57064af2 100644 --- a/models/mysql/db.go +++ b/models/mysql/db.go @@ -73,41 +73,7 @@ func (r MysqlRepo) Close() error { } func (r MysqlRepo) Migrate() error { - err := r.GetDb().AutoMigrate(cidInfo{}) - if err != nil { - return err - } - - err = r.GetDb().AutoMigrate(fundedAddressState{}) - if err != nil { - return err - } - - err = r.GetDb().AutoMigrate(channelInfo{}) - if err != nil { - return err - } - - err = r.GetDb().AutoMigrate(retrievalAsk{}) - if err != nil { - return err - } - - err = r.GetDb().AutoMigrate(retrievalDeal{}) - if err != nil { - return err - } - - err = r.GetDb().AutoMigrate(storageDeal{}) - if err != nil { - return err - } - - err = r.GetDb().AutoMigrate(storageAsk{}) - if err != nil { - return err - } - return nil + return r.AutoMigrate(retrievalAsk{}, cidInfo{}, storageAsk{}, fundedAddressState{}, storageDeal{}, channelInfo{}, msgInfo{}, retrievalDeal{}) } func (r MysqlRepo) Transaction(cb func(txRepo repo.TxRepo) error) error { @@ -150,7 +116,7 @@ func InitMysql(cfg *config.Mysql) (repo.Repo, error) { r := &MysqlRepo{DB: db} - return r, r.AutoMigrate(retrievalAsk{}, cidInfo{}, storageAsk{}, fundedAddressState{}, storageDeal{}, channelInfo{}, msgInfo{}, retrievalDeal{}) + return r, r.Migrate() } type DBCid cid.Cid diff --git a/models/testhelpers.go b/models/testhelpers.go index a06c1faa..024a9b91 100644 --- a/models/testhelpers.go +++ b/models/testhelpers.go @@ -1,10 +1,11 @@ package models import ( + "testing" + "github.com/filecoin-project/venus-market/v2/models/badger" "github.com/filecoin-project/venus-market/v2/models/repo" "github.com/stretchr/testify/assert" - "testing" ) // NewInMemoryRepo makes a new instance of MemRepo From be3fc20fa5b057b1ca7f9fe2ba1a22820a8b8e2e Mon Sep 17 00:00:00 2001 From: zl Date: Mon, 17 Oct 2022 13:03:37 +0800 Subject: [PATCH 3/4] fix: rollback version if migration failed. --- models/badger/migrate/migrate.go | 13 +++++++++++++ models/badger/migrate/v2.2.0/testing/testing.go | 3 ++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/models/badger/migrate/migrate.go b/models/badger/migrate/migrate.go index 6c3c572f..494e1286 100644 --- a/models/badger/migrate/migrate.go +++ b/models/badger/migrate/migrate.go @@ -243,6 +243,19 @@ func migrateOne(ctx context.Context, name string, mfs migrateFuncSchedule, ds da _, doMigrate := statestore.NewVersionedStateStore(dsWithOldVersion, migrations, targetVersion) if err := doMigrate(ctx); err != nil { + var rollbackErr error + + // if error happens, just rollback the version number + if len(oldVersion) == 0 { + rollbackErr = ds.Delete(ctx, versioningKey) + } else { + rollbackErr = ds.Put(ctx, versioningKey, []byte(oldVersion)) + } + + // there are nothing we can do to get back the data. + if rollbackErr != nil { + log.Errorf("migrate: %s failed, rollback version failed:%v\n", name, rollbackErr) + } return nil, err } diff --git a/models/badger/migrate/v2.2.0/testing/testing.go b/models/badger/migrate/v2.2.0/testing/testing.go index b9839bf1..4078cfe5 100644 --- a/models/badger/migrate/v2.2.0/testing/testing.go +++ b/models/badger/migrate/v2.2.0/testing/testing.go @@ -2,9 +2,10 @@ package testing import ( "context" - "github.com/libp2p/go-libp2p-core/peer" "testing" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/filecoin-project/venus-market/v2/models/badger/migrate" "github.com/ipfs/go-cid" From dc1b684576454f6ff9be14f0ed3a4fd6a7dc005d Mon Sep 17 00:00:00 2001 From: zl Date: Mon, 17 Oct 2022 15:09:24 +0800 Subject: [PATCH 4/4] chore: 1. remove empty lines 2. minor refacror on travelCborAbleDs function. --- models/badger/db.go | 2 -- models/badger/migrate/migrate.go | 16 ---------------- models/badger/utils.go | 8 +------- 3 files changed, 1 insertion(+), 25 deletions(-) diff --git a/models/badger/db.go b/models/badger/db.go index 520eb1ec..b7d20957 100644 --- a/models/badger/db.go +++ b/models/badger/db.go @@ -257,14 +257,12 @@ func (r *BadgerRepo) Migrate() error { migrate.DsNameCidInfoDs: r.dsParams.CidInfoDs, migrate.DsNameRetrievalDealsDs: r.dsParams.RetrievalDealsDs, } - // the returned 'newDss' would be wrapped with current version namespace. // so, must set all 'ds' back later. newDss, err := migrate.Migrate(ctx, migrateDss) if err != nil { return err } - r.dsParams.FundDS = newDss[migrate.DsNameFundedAddrState] r.dsParams.StorageDealsDS = newDss[migrate.DsNameStorageDeal] r.dsParams.PaychMsgDS = newDss[migrate.DsNamePaychMsgDs] diff --git a/models/badger/migrate/migrate.go b/models/badger/migrate/migrate.go index 494e1286..71293b98 100644 --- a/models/badger/migrate/migrate.go +++ b/models/badger/migrate/migrate.go @@ -206,52 +206,39 @@ func migrateOne(ctx context.Context, name string, mfs migrateFuncSchedule, ds da } else { oldVersion = string(v) } - var targetVersion = mfs.targetVersion() - var dsWithOldVersion datastore.Batching - if len(oldVersion) == 0 { dsWithOldVersion = ds } else { dsWithOldVersion = namespace.Wrap(ds, datastore.NewKey(oldVersion)) - if oldVersion == string(targetVersion) { log.Infof("doesn't need migration for %s, current version is:%s", name, oldVersion) return dsWithOldVersion, nil } } - log.Infof("migrate: %s from %s to %s", name, oldVersion, string(targetVersion)) - mfs = mfs.subScheduleFrom(oldVersion) - if len(mfs) == 0 { return nil, fmt.Errorf("migrate:%s failed, can't find schedule from:%s", name, oldVersion) } - var migrationBuilders versioned.BuilderList = make([]versioned.Builder, len(mfs)) - for idx, mf := range mfs { migrationBuilders[idx] = versioned.NewVersionedBuilder(mf.mf, mf.version) } - migrations, err := migrationBuilders.Build() if err != nil { return nil, err } - _, doMigrate := statestore.NewVersionedStateStore(dsWithOldVersion, migrations, targetVersion) if err := doMigrate(ctx); err != nil { var rollbackErr error - // if error happens, just rollback the version number if len(oldVersion) == 0 { rollbackErr = ds.Delete(ctx, versioningKey) } else { rollbackErr = ds.Put(ctx, versioningKey, []byte(oldVersion)) } - // there are nothing we can do to get back the data. if rollbackErr != nil { log.Errorf("migrate: %s failed, rollback version failed:%v\n", name, rollbackErr) @@ -272,11 +259,8 @@ func Migrate(ctx context.Context, dss map[string]datastore.Batching) (map[string log.Warnf("no migration sechedules for : %s", name) continue } - var versionedDs datastore.Batching - versionedDs, err = migrateOne(ctx, name, mfs, ds) - // todo: version为空同时, 有同时存在两个版本的类型的可能性, 为了兼容, 这里暂时不返回错误. // 后续的版本升级中如果出错, 应该直接返回. if err != nil { diff --git a/models/badger/utils.go b/models/badger/utils.go index ad0304e5..991dc7fe 100644 --- a/models/badger/utils.go +++ b/models/badger/utils.go @@ -51,16 +51,10 @@ func travelCborAbleDS(ctx context.Context, ds datastore.Batching, callback inter } rets := reflect.ValueOf(callback).Call([]reflect.Value{ reflect.ValueOf(unmarshaler)}) - - if rets[0].Interface().(bool) { - return true, nil - } - if !rets[1].IsNil() { return true, rets[0].Interface().(error) } - - return false, nil + return rets[0].Interface().(bool), nil }) }