Skip to content

Commit

Permalink
refactor dex keeper
Browse files Browse the repository at this point in the history
  • Loading branch information
Erheng Lu committed May 8, 2020
1 parent d92a2ca commit da397b1
Show file tree
Hide file tree
Showing 33 changed files with 1,371 additions and 1,193 deletions.
71 changes: 23 additions & 48 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ type BinanceChain struct {
// keepers
CoinKeeper bank.Keeper
DexKeeper *dex.DexKeeper
DexMiniTokenKeeper *dex.DexMiniTokenKeeper
DexGlobalKeeper *dex.DexGlobalKeeper
AccountKeeper auth.AccountKeeper
TokenMapper tkstore.Mapper
MiniTokenMapper miniTkstore.MiniTokenMapper
Expand Down Expand Up @@ -139,8 +137,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
app.MiniTokenMapper = miniTkstore.NewMiniTokenMapper(cdc, common.MiniTokenStoreKey)
app.CoinKeeper = bank.NewBaseKeeper(app.AccountKeeper)
app.ParamHub = paramhub.NewKeeper(cdc, common.ParamsStoreKey, common.TParamsStoreKey)
tradingPairMapper := dex.NewTradingPairMapper(app.Codec, common.PairStoreKey, false)
miniTokenTradingPairMapper := dex.NewTradingPairMapper(app.Codec, common.MiniTokenPairStoreKey, true)
tradingPairMapper := dex.NewTradingPairMapper(app.Codec, common.PairStoreKey)

app.stakeKeeper = stake.NewKeeper(
cdc,
Expand Down Expand Up @@ -225,9 +222,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
common.GovStoreKey,
common.TimeLockStoreKey,
common.AtomicSwapStoreKey,
common.DexMiniStoreKey,
common.MiniTokenStoreKey,
common.MiniTokenPairStoreKey,
)
app.SetAnteHandler(tx.NewAnteHandler(app.AccountKeeper))
app.SetPreChecker(tx.NewTxPreChecker())
Expand All @@ -251,7 +246,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
}

// remaining plugin init
app.initDex(tradingPairMapper, miniTokenTradingPairMapper)
app.initDex(tradingPairMapper)
app.initGovHooks()
app.initPlugins()
app.initParams()
Expand Down Expand Up @@ -281,9 +276,7 @@ func SetUpgradeConfig(upgradeConfig *config.UpgradeConfig) {
upgrade.Mgr.RegisterStoreKeys(upgrade.BEP9, common.TimeLockStoreKey.Name())
upgrade.Mgr.RegisterStoreKeys(upgrade.BEP3, common.AtomicSwapStoreKey.Name())

upgrade.Mgr.RegisterStoreKeys(upgrade.BEP8, common.DexMiniStoreKey.Name())
upgrade.Mgr.RegisterStoreKeys(upgrade.BEP8, common.MiniTokenStoreKey.Name())
upgrade.Mgr.RegisterStoreKeys(upgrade.BEP8, common.MiniTokenPairStoreKey.Name())

// register msg types of upgrade
upgrade.Mgr.RegisterMsgTypes(upgrade.BEP9,
Expand Down Expand Up @@ -323,18 +316,10 @@ func (app *BinanceChain) initRunningMode() {
}
}

func (app *BinanceChain) initDex(pairMapper dex.TradingPairMapper, miniPairMapper dex.TradingPairMapper) {
func (app *BinanceChain) initDex(pairMapper dex.TradingPairMapper) {

app.DexGlobalKeeper = dex.NewGlobalKeeper(app.Codec, app.AccountKeeper, app.publicationConfig.ShouldPublishAny())
app.DexGlobalKeeper.SubscribeParamChange(app.ParamHub)

app.DexKeeper = dex.NewOrderKeeper(common.DexStoreKey, pairMapper,
app.RegisterCodespace(dex.DefaultCodespace), app.baseConfig.OrderKeeperConcurrency, app.Codec,
app.DexGlobalKeeper)

app.DexMiniTokenKeeper = dex.NewMiniKeeper(common.DexMiniStoreKey, miniPairMapper,
app.RegisterCodespace(dex.DefaultCodespace), app.baseConfig.OrderKeeperConcurrency, app.Codec,
app.DexGlobalKeeper)
app.DexKeeper = dex.NewDexKeeper(common.DexStoreKey, pairMapper, app.RegisterCodespace(dex.DefaultCodespace), app.Codec, app.AccountKeeper, app.publicationConfig.ShouldPublishAny(), app.baseConfig.OrderKeeperConcurrency)
app.DexKeeper.SubscribeParamChange(app.ParamHub)

// do not proceed if we are in a unit test and `CheckState` is unset.
if app.CheckState == nil {
Expand All @@ -349,7 +334,6 @@ func (app *BinanceChain) initDex(pairMapper dex.TradingPairMapper, miniPairMappe

order.Init(
app.DexKeeper,
app.DexMiniTokenKeeper,
app.CheckState.Ctx,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack,
Expand All @@ -363,7 +347,7 @@ func (app *BinanceChain) initDex(pairMapper dex.TradingPairMapper, miniPairMappe
func (app *BinanceChain) initPlugins() {
tokens.InitPlugin(app, app.TokenMapper, app.MiniTokenMapper, app.AccountKeeper, app.CoinKeeper, app.timeLockKeeper, app.swapKeeper)
minitokens.InitPlugin(app, app.MiniTokenMapper, app.AccountKeeper, app.CoinKeeper)
dex.InitPlugin(app, app.DexKeeper, app.DexMiniTokenKeeper, app.DexGlobalKeeper, app.TokenMapper, app.MiniTokenMapper, app.AccountKeeper, app.govKeeper)
dex.InitPlugin(app, app.DexKeeper, app.TokenMapper, app.MiniTokenMapper, app.AccountKeeper, app.govKeeper)
param.InitPlugin(app, app.ParamHub)
account.InitPlugin(app, app.AccountKeeper)
}
Expand Down Expand Up @@ -557,9 +541,9 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
var miniTradesToPublish []*pub.Trade
if sdk.IsUpgrade(upgrade.BEP19) || !isBreatheBlock {
if app.publicationConfig.ShouldPublishAny() && pub.IsLive {
tradesToPublish, miniTradesToPublish = pub.MatchAndAllocateAllForPublish(app.DexKeeper, app.DexMiniTokenKeeper, ctx, isBreatheBlock)
tradesToPublish, miniTradesToPublish = pub.MatchAndAllocateAllForPublish(app.DexKeeper, ctx, isBreatheBlock)
} else {
order.MatchAndAllocateSymbols(app.DexKeeper, app.DexMiniTokenKeeper, ctx, nil, isBreatheBlock, app.Logger)
app.DexKeeper.MatchAndAllocateSymbols(ctx, nil, isBreatheBlock)
}
}

Expand All @@ -570,9 +554,6 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
app.takeSnapshotHeight = height
icoDone := ico.EndBlockAsync(ctx)
dex.EndBreatheBlock(ctx, app.DexKeeper, app.govKeeper, height, blockTime)
if sdk.IsUpgrade(upgrade.BEP8) {
dex.EndBreatheBlock(ctx, app.DexMiniTokenKeeper, app.govKeeper, height, blockTime)
}
param.EndBreatheBlock(ctx, app.ParamHub)
tokens.EndBreatheBlock(ctx, app.swapKeeper)
// other end blockers
Expand All @@ -582,9 +563,7 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
}

app.DexKeeper.StoreTradePrices(ctx)
if sdk.IsUpgrade(upgrade.BEP8) {
app.DexMiniTokenKeeper.StoreTradePrices(ctx)
}

blockFee := distributeFee(ctx, app.AccountKeeper, app.ValAddrCache, app.publicationConfig.PublishBlockFee)

tags, passed, failed := gov.EndBlocker(ctx, app.govKeeper)
Expand Down Expand Up @@ -614,10 +593,7 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a

// clean up intermediate cached data
app.DexKeeper.ClearOrderChanges()
if sdk.IsUpgrade(upgrade.BEP8) {
app.DexMiniTokenKeeper.ClearOrderChanges()
}
app.DexGlobalKeeper.ClearRoundFee()
app.DexKeeper.ClearRoundFee()
}
fees.Pool.Clear()
// just clean it, no matter use it or not.
Expand Down Expand Up @@ -811,8 +787,8 @@ func (app *BinanceChain) publish(tradesToPublish []*pub.Trade, miniTradesToPubli
duration := pub.Timer(app.Logger, fmt.Sprintf("collect publish information, height=%d", height), func() {
if app.publicationConfig.PublishAccountBalance {
txRelatedAccounts := app.Pool.TxRelatedAddrs()
tradeRelatedAccounts := pub.GetTradeAndOrdersRelatedAccounts(app.DexKeeper, tradesToPublish)
miniTradeRelatedAccounts := pub.GetTradeAndOrdersRelatedAccounts(app.DexMiniTokenKeeper, miniTradesToPublish)
tradeRelatedAccounts := pub.GetTradeAndOrdersRelatedAccounts(app.DexKeeper, tradesToPublish, dex.PairType.BEP2)
miniTradeRelatedAccounts := pub.GetTradeAndOrdersRelatedAccounts(app.DexKeeper, miniTradesToPublish, dex.PairType.MINI)
tradeRelatedAccounts = append(tradeRelatedAccounts, miniTradeRelatedAccounts...)
accountsToPublish = pub.GetAccountBalances(
app.AccountKeeper,
Expand All @@ -831,8 +807,8 @@ func (app *BinanceChain) publish(tradesToPublish []*pub.Trade, miniTradesToPubli
blockToPublish = pub.GetBlockPublished(app.Pool, header, blockHash)
}
if app.publicationConfig.PublishOrderBook {
latestPriceLevels = app.DexKeeper.GetOrderBooks(pub.MaxOrderBookLevel)
miniLatestPriceLevels = app.DexMiniTokenKeeper.GetOrderBooks(pub.MaxOrderBookLevel)
latestPriceLevels = app.DexKeeper.GetOrderBooks(pub.MaxOrderBookLevel, dex.PairType.BEP2)
miniLatestPriceLevels = app.DexKeeper.GetOrderBooks(pub.MaxOrderBookLevel, dex.PairType.MINI)
}
})

Expand All @@ -843,10 +819,10 @@ func (app *BinanceChain) publish(tradesToPublish []*pub.Trade, miniTradesToPubli
pub.Logger.Info("start to publish", "height", height,
"blockTime", blockTime, "numOfTrades", len(tradesToPublish),
"numOfOrders", // the order num we collected here doesn't include trade related orders
len(app.DexKeeper.OrderChanges),
len(app.DexKeeper.GetOrderChanges(dex.PairType.BEP2)),
"numOfMiniTrades", len(miniTradesToPublish),
"numOfMiniOrders", // the order num we collected here doesn't include trade related orders
len(app.DexMiniTokenKeeper.OrderChanges),
len(app.DexKeeper.GetOrderChanges(dex.PairType.MINI)),
"numOfProposals",
proposalsToPublish.NumOfMsgs,
"numOfStakeUpdates",
Expand All @@ -863,27 +839,26 @@ func (app *BinanceChain) publish(tradesToPublish []*pub.Trade, miniTradesToPubli
miniTradesToPublish,
proposalsToPublish,
stakeUpdates,
app.DexKeeper.OrderChanges, // thread-safety is guarded by the signal from RemoveDoneCh
app.DexMiniTokenKeeper.OrderChanges,
app.DexKeeper.OrderInfosForPub, // thread-safety is guarded by the signal from RemoveDoneCh
app.DexMiniTokenKeeper.OrderInfosForPub,
app.DexKeeper.GetOrderChanges(dex.PairType.BEP2), // thread-safety is guarded by the signal from RemoveDoneCh
app.DexKeeper.GetOrderChanges(dex.PairType.MINI),
app.DexKeeper.GetOrderInfosForPub(dex.PairType.BEP2), // thread-safety is guarded by the signal from RemoveDoneCh
app.DexKeeper.GetOrderInfosForPub(dex.PairType.MINI),
accountsToPublish,
latestPriceLevels,
miniLatestPriceLevels,
blockFee,
app.DexGlobalKeeper.RoundOrderFees, //only use DexKeeper RoundOrderFees
app.DexKeeper.RoundOrderFees, //only use DexKeeper RoundOrderFees
transferToPublish,
blockToPublish)

// remove item from OrderInfoForPublish when we published removed order (cancel, iocnofill, fullyfilled, expired)
for id := range pub.ToRemoveOrderIdCh {
pub.Logger.Debug("delete order from order changes map", "orderId", id)
delete(app.DexKeeper.OrderInfosForPub, id)
delete(app.DexMiniTokenKeeper.OrderInfosForPub, id)
delete(app.DexKeeper.GetOrderInfosForPub(dex.PairType.BEP2), id) //TODO change to removeOrderInfosForPub method
}
for id := range pub.ToRemoveMiniOrderIdCh {
pub.Logger.Debug("delete mini order from order changes map", "orderId", id)
delete(app.DexMiniTokenKeeper.OrderInfosForPub, id)
delete(app.DexKeeper.GetOrderInfosForPub(dex.PairType.MINI), id)
}

pub.Logger.Debug("finish publish", "height", height)
Expand Down
4 changes: 2 additions & 2 deletions app/app_pub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestAppPub_MatchOrder(t *testing.T) {

ctx := app.DeliverState.Ctx
msg := orderPkg.NewNewOrderMsg(buyerAcc.GetAddress(), orderPkg.GenerateOrderID(1, buyerAcc.GetAddress()), orderPkg.Side.BUY, "XYZ-000_BNB", 102000, 300000000)
handler := orderPkg.NewHandler(app.GetCodec(), app.DexKeeper, app.AccountKeeper)
handler := orderPkg.NewHandler(app.GetCodec(), app.DexKeeper)
app.DeliverState.Ctx = app.DeliverState.Ctx.WithBlockHeight(41).WithBlockTime(time.Unix(0, 100))
buyerAcc.SetSequence(1)
app.AccountKeeper.SetAccount(ctx, buyerAcc)
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestAppPub_MatchOrder(t *testing.T) {

func TestAppPub_MatchAndCancelFee(t *testing.T) {
assert, require, app, buyerAcc, sellerAcc := setupAppTest(t)
handler := orderPkg.NewHandler(app.GetCodec(), app.DexKeeper, app.AccountKeeper, app.DexGlobalKeeper)
handler := orderPkg.NewHandler(app.GetCodec(), app.DexKeeper)
ctx := app.DeliverState.Ctx

// ==== Place a to-be-matched sell order and a to-be-cancelled buy order (in different symbol)
Expand Down
9 changes: 2 additions & 7 deletions app/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,12 @@ func (app *BinanceChain) processErrAbciResponseForPub(txBytes []byte) {
case order.NewOrderMsg:
app.Logger.Info("failed to process NewOrderMsg", "oid", msg.Id)
// The error on deliver should be rare and only impact witness publisher's performance
app.DexKeeper.OrderChangesMtx.Lock()
app.DexKeeper.OrderChanges = append(app.DexKeeper.OrderChanges, order.OrderChange{msg.Id, order.FailedBlocking, "", msg})
app.DexKeeper.OrderChangesMtx.Unlock()
app.DexKeeper.UpdateOrderChangeSync(order.OrderChange{msg.Id, order.FailedBlocking, "", msg}, msg.Symbol)
case order.CancelOrderMsg:
app.Logger.Info("failed to process CancelOrderMsg", "oid", msg.RefId)
// The error on deliver should be rare and only impact witness publisher's performance
app.DexKeeper.OrderChangesMtx.Lock()
// OrderInfo must has been in keeper.OrderInfosForPub
app.DexKeeper.OrderChanges = append(app.DexKeeper.OrderChanges, order.OrderChange{msg.RefId, order.FailedBlocking, "", msg})
app.DexKeeper.OrderChangesMtx.Unlock()
app.DexKeeper.UpdateOrderChangeSync(order.OrderChange{msg.RefId, order.FailedBlocking, "", msg}, msg.Symbol)
default:
// deliberately do nothing for message other than NewOrderMsg
// in future, we may publish fail status of send msg
Expand Down Expand Up @@ -189,7 +185,6 @@ func (app *BinanceChain) getLastBreatheBlockHeight() int64 {
func (app *BinanceChain) reInitChain() error {
order.Init(
app.DexKeeper,
app.DexMiniTokenKeeper,
app.CheckState.Ctx,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack,
Expand Down
Loading

0 comments on commit da397b1

Please sign in to comment.