From 0cf6ef51f59ea2130c13fb97f2fff5ba2142e5c6 Mon Sep 17 00:00:00 2001 From: Karl Ranna Date: Tue, 5 Jan 2021 20:43:24 +0000 Subject: [PATCH] feat: dynamic minimum quantity --- src/arby.ts | 1 + ...minimum-order-quantity-filter.spec.ts.snap | 3 - src/centralized/ccxt/create-order.spec.ts | 32 ++--- src/centralized/ccxt/create-order.ts | 21 ++- src/centralized/execute-order.spec.ts | 14 +- src/centralized/execute-order.ts | 39 ++---- .../minimum-order-quantity-filter.spec.ts | 48 ------- .../minimum-order-quantity-filter.ts | 58 +++++---- src/centralized/order-builder.spec.ts | 60 ++------- src/centralized/order-builder.ts | 120 +++++++----------- src/centralized/order.ts | 70 ++++++---- src/store.spec.ts | 19 +++ src/store.ts | 14 +- src/trade/accumulate-fills.ts | 17 +-- 14 files changed, 218 insertions(+), 298 deletions(-) delete mode 100644 src/centralized/__snapshots__/minimum-order-quantity-filter.spec.ts.snap delete mode 100644 src/centralized/minimum-order-quantity-filter.spec.ts diff --git a/src/arby.ts b/src/arby.ts index 674cf84..8e06997 100644 --- a/src/arby.ts +++ b/src/arby.ts @@ -102,6 +102,7 @@ export const startArby = ({ loggers.global.info('Starting. Hello, Arby.'); logConfig(config, loggers.global); verifyMarkets(config, CEXmarkets); + store.setMarkets(CEXmarkets); const tradeComplete$ = trade$({ config, loggers, diff --git a/src/centralized/__snapshots__/minimum-order-quantity-filter.spec.ts.snap b/src/centralized/__snapshots__/minimum-order-quantity-filter.spec.ts.snap deleted file mode 100644 index 4ba3c8b..0000000 --- a/src/centralized/__snapshots__/minimum-order-quantity-filter.spec.ts.snap +++ /dev/null @@ -1,3 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`quantityAboveMinimum throws for unknown asset KILRAU 1`] = `"Could not retrieve minimum order quantity for KILRAU"`; diff --git a/src/centralized/ccxt/create-order.spec.ts b/src/centralized/ccxt/create-order.spec.ts index 0b9cc62..dbbc9b0 100644 --- a/src/centralized/ccxt/create-order.spec.ts +++ b/src/centralized/ccxt/create-order.spec.ts @@ -27,12 +27,12 @@ describe('CCXT', () => { }, }; const expectedSymbol = `${config.CEX_BASEASSET}/${config.CEX_QUOTEASSET}`; - const sellOrder$ = createOrder$({ + const sellOrder$ = createOrder$( config, exchange, - side: OrderSide.SELL, - quantity: orderQuantity, - }); + OrderSide.SELL, + orderQuantity + ); sellOrder$.subscribe({ next: actualOrderResponse => { expect(actualOrderResponse).toEqual(orderResponse); @@ -68,12 +68,12 @@ describe('CCXT', () => { }; orderQuantity = new BigNumber('0.12345678'); const expectedSymbol = `${config.CEX_BASEASSET}/${config.CEX_QUOTEASSET}`; - const buyOrder$ = createOrder$({ + const buyOrder$ = createOrder$( config, exchange, - side: OrderSide.BUY, - quantity: orderQuantity, - }); + OrderSide.BUY, + orderQuantity + ); buyOrder$.subscribe({ next: actualOrderResponse => { expect(actualOrderResponse).toEqual(orderResponse); @@ -108,12 +108,12 @@ describe('CCXT', () => { }, }; const expectedSymbol = `${config.CEX_BASEASSET}/${config.CEX_QUOTEASSET}`; - const buyOrder$ = createOrder$({ + const buyOrder$ = createOrder$( config, exchange, - side: OrderSide.BUY, - quantity: orderQuantity, - }); + OrderSide.BUY, + orderQuantity + ); buyOrder$.subscribe({ next: actualOrderResponse => { expect(actualOrderResponse).toEqual(orderResponse); @@ -150,12 +150,12 @@ describe('CCXT', () => { }, }; const expectedSymbol = `${config.CEX_BASEASSET}/${config.CEX_QUOTEASSET}`; - const sellOrder$ = createOrder$({ + const sellOrder$ = createOrder$( config, exchange, - side: OrderSide.SELL, - quantity: orderQuantity, - }); + OrderSide.SELL, + orderQuantity + ); sellOrder$.subscribe({ next: actualOrderResponse => { expect(actualOrderResponse).toEqual(orderResponse); diff --git a/src/centralized/ccxt/create-order.ts b/src/centralized/ccxt/create-order.ts index 4d103e9..a68c1ad 100644 --- a/src/centralized/ccxt/create-order.ts +++ b/src/centralized/ccxt/create-order.ts @@ -4,19 +4,12 @@ import BigNumber from 'bignumber.js'; import { Observable, from, defer } from 'rxjs'; import { Config } from '../../config'; -type CreateOrderParams = { - config: Config; - exchange: Exchange; - side: OrderSide; - quantity: BigNumber; -}; - -const createOrder$ = ({ - config, - exchange, - side, - quantity, -}: CreateOrderParams): Observable => { +const createOrder$ = ( + config: Config, + exchange: Exchange, + side: OrderSide, + quantity: BigNumber +): Observable => { return defer(() => { const price = undefined; const params = @@ -33,4 +26,6 @@ const createOrder$ = ({ }); }; +type CreateOrderParams = Parameters; + export { createOrder$, CreateOrderParams }; diff --git a/src/centralized/execute-order.spec.ts b/src/centralized/execute-order.spec.ts index 26ec2e5..d251b46 100644 --- a/src/centralized/execute-order.spec.ts +++ b/src/centralized/execute-order.spec.ts @@ -26,14 +26,14 @@ const assertExecuteCEXorder = ( return (cold(inputEvents.createOrder$) as unknown) as Observable; }; const CEX = (null as unknown) as Exchange; - const CEXorder$ = executeCEXorder$({ + const CEXorder$ = executeCEXorder$( CEX, - config: inputEvents.config, - logger: getLoggers().centralized, - price: inputEvents.price, - order: inputEvents.order, - createOrder$, - }); + inputEvents.config, + getLoggers().centralized, + inputEvents.price, + inputEvents.order, + createOrder$ + ); expectObservable(CEXorder$, inputEvents.unsubscribe).toBe(expected, { a: null, }); diff --git a/src/centralized/execute-order.ts b/src/centralized/execute-order.ts index 2e3a91c..b6e9dda 100644 --- a/src/centralized/execute-order.ts +++ b/src/centralized/execute-order.ts @@ -14,38 +14,19 @@ import { Logger } from '../logger'; import { CreateOrderParams } from './ccxt/create-order'; import { CEXorder } from './order-builder'; -type ExecuteCEXorderParams = { - CEX: Exchange; - config: Config; - logger: Logger; - price: BigNumber; - order: CEXorder; - createOrder$: ({ - config, - exchange, - side, - quantity, - }: CreateOrderParams) => Observable; -}; - -const executeCEXorder$ = ({ - CEX, - config, - logger, - price, - order, - createOrder$, -}: ExecuteCEXorderParams): Observable => { +const executeCEXorder$ = ( + CEX: Exchange, + config: Config, + logger: Logger, + price: BigNumber, + order: CEXorder, + createOrder$: (...args: CreateOrderParams) => Observable +): Observable => { if (!config.TEST_MODE) { logger.info( `Starting centralized exchange ${config.CEX_BASEASSET}/${config.CEX_QUOTEASSET} market ${order.side} order (quantity: ${order.quantity})` ); - return createOrder$({ - exchange: CEX, - config, - side: order.side, - quantity: order.quantity, - }).pipe( + return createOrder$(config, CEX, order.side, order.quantity).pipe( tap(order => logger.info( `Centralized exchange order finished: ${JSON.stringify(order)}` @@ -77,4 +58,6 @@ const executeCEXorder$ = ({ } }; +type ExecuteCEXorderParams = Parameters; + export { executeCEXorder$, ExecuteCEXorderParams }; diff --git a/src/centralized/minimum-order-quantity-filter.spec.ts b/src/centralized/minimum-order-quantity-filter.spec.ts deleted file mode 100644 index 909383b..0000000 --- a/src/centralized/minimum-order-quantity-filter.spec.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { - quantityAboveMinimum, - MINIMUM_ORDER_SIZE, -} from './minimum-order-quantity-filter'; -import BigNumber from 'bignumber.js'; - -describe('quantityAboveMinimum', () => { - it('returns true for minimum ETH quantity', () => { - expect(quantityAboveMinimum('ETH')(MINIMUM_ORDER_SIZE.ETH)).toEqual(true); - }); - - it('returns false less than minimum ETH quantity', () => { - const quantity = MINIMUM_ORDER_SIZE.ETH.minus( - new BigNumber('0.000000000000000001') - ); - expect(quantityAboveMinimum('ETH')(quantity)).toEqual(false); - }); - - it('returns true for minimum DAI quantity', () => { - expect(quantityAboveMinimum('DAI')(MINIMUM_ORDER_SIZE.DAI)).toEqual(true); - }); - - it('returns false less than minimum DAI quantity', () => { - const quantity = MINIMUM_ORDER_SIZE.DAI.minus( - new BigNumber('0.000000000000000001') - ); - expect(quantityAboveMinimum('DAI')(quantity)).toEqual(false); - }); - - it('returns true for minimum USDT quantity', () => { - expect(quantityAboveMinimum('USDT')(MINIMUM_ORDER_SIZE.USDT)).toEqual(true); - }); - - it('returns false less than minimum USDT quantity', () => { - const quantity = MINIMUM_ORDER_SIZE.USDT.minus( - new BigNumber('0.000000000000000001') - ); - expect(quantityAboveMinimum('USDT')(quantity)).toEqual(false); - }); - - it('throws for unknown asset KILRAU', () => { - expect.assertions(1); - const quantity = new BigNumber('1'); - expect(() => { - quantityAboveMinimum('KILRAU')(quantity); - }).toThrowErrorMatchingSnapshot(); - }); -}); diff --git a/src/centralized/minimum-order-quantity-filter.ts b/src/centralized/minimum-order-quantity-filter.ts index 1912de1..dd52fd6 100644 --- a/src/centralized/minimum-order-quantity-filter.ts +++ b/src/centralized/minimum-order-quantity-filter.ts @@ -1,29 +1,35 @@ -import BigNumber from 'bignumber.js'; -import { errors } from '../opendex/errors'; +import { BigNumber } from 'bignumber.js'; +import { curry } from 'ramda'; +import { EMPTY, Observable, of } from 'rxjs'; +import { mergeMap, take } from 'rxjs/operators'; +import { Logger } from '../logger'; +import { ArbyStore } from '../store'; -type MinimumCEXquantities = { - [key: string]: BigNumber; -}; - -const MINIMUM_ORDER_SIZE: MinimumCEXquantities = { - BTC: new BigNumber('0.001'), - ETH: new BigNumber('0.05'), - DAI: new BigNumber('15'), - USDT: new BigNumber('15'), -}; - -const getMinimumOrderSize = (asset: string): BigNumber => { - const minimumOrderSize = MINIMUM_ORDER_SIZE[asset]; - if (!minimumOrderSize) { - throw errors.CEX_INVALID_MINIMUM_ORDER_QUANTITY(asset); +const quantityAboveMinimum = curry( + ( + store: ArbyStore, + logger: Logger, + assetToTradeOnCEX: string, + minimumQuantity$: Observable, + quantity: BigNumber + ) => { + logger.info( + `Swap success. Accumulated ${assetToTradeOnCEX} quantity: ${quantity.toFixed()}` + ); + store.resetLastOrderUpdatePrice(); + return minimumQuantity$.pipe( + take(1), + mergeMap(minimumQuantity => { + if (quantity.isGreaterThanOrEqualTo(minimumQuantity)) { + return of(quantity); + } + logger.info( + `Will not execute CEX order because ${quantity.toFixed()} is below the minimum allowed CEX quantity ${minimumQuantity.toFixed()}` + ); + return EMPTY; + }) + ); } - return minimumOrderSize; -}; - -const quantityAboveMinimum = (asset: string) => { - return (quantity: BigNumber): boolean => { - return quantity.isGreaterThanOrEqualTo(getMinimumOrderSize(asset)); - }; -}; +); -export { quantityAboveMinimum, MINIMUM_ORDER_SIZE }; +export { quantityAboveMinimum }; diff --git a/src/centralized/order-builder.spec.ts b/src/centralized/order-builder.spec.ts index 8910bf1..4671981 100644 --- a/src/centralized/order-builder.spec.ts +++ b/src/centralized/order-builder.spec.ts @@ -1,11 +1,10 @@ import BigNumber from 'bignumber.js'; -import { Observable } from 'rxjs'; +import { Observable, of } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { Config } from '../config'; import { OrderSide } from '../constants'; import { SwapSuccess } from '../proto/xudrpc_pb'; -import { ArbyStore, getArbyStore } from '../store'; -import { getLoggers, testConfig } from '../test-utils'; +import { testConfig } from '../test-utils'; import { CEXorder, getOrderBuilder$ } from './order-builder'; let testScheduler: TestScheduler; @@ -28,9 +27,7 @@ const assertOrderBuilder = ( expectedValues: { a: CEXorder; }, - config: Config, - expectedAssetToTradeOnCEX: string, - store?: ArbyStore + config: Config ) => { testScheduler.run(helpers => { const { cold, expectObservable } = helpers; @@ -48,33 +45,20 @@ const assertOrderBuilder = ( }; const accumulateOrderFillsForAssetReceived = jest .fn() - .mockImplementation(() => { - return (v: any) => v; - }); - const quantityAboveMinimum = jest.fn().mockImplementation(() => { - return () => true; - }); - const orderBuilder$ = getOrderBuilder$({ + .mockImplementation((v: any) => v); + const filterMinimumQuantity = (qty: BigNumber) => of(qty); + const orderBuilder$ = getOrderBuilder$( config, - logger: getLoggers().centralized, getOpenDEXswapSuccess$, - accumulateOrderFillsForBaseAssetReceived: accumulateOrderFillsForAssetReceived, - accumulateOrderFillsForQuoteAssetReceived: accumulateOrderFillsForAssetReceived, - quantityAboveMinimum, - store: store ? store : getArbyStore(), - }); + accumulateOrderFillsForAssetReceived, + accumulateOrderFillsForAssetReceived, + filterMinimumQuantity + ); expectObservable(orderBuilder$, inputEvents.unsubscribe).toBe( expected, expectedValues ); expect(accumulateOrderFillsForAssetReceived).toHaveBeenCalledTimes(2); - expect(accumulateOrderFillsForAssetReceived).toHaveBeenCalledWith( - expect.objectContaining(config) - ); - expect(quantityAboveMinimum).toHaveBeenCalledTimes(2); - expect(quantityAboveMinimum).toHaveBeenCalledWith( - expectedAssetToTradeOnCEX - ); }); }; @@ -86,7 +70,7 @@ describe('getCentralizedExchangeOrder$', () => { }); it('accumulates buy and sell orders for ETHBTC', () => { - expect.assertions(6); + expect.assertions(2); const inputEvents = { receivedBaseAssetSwapSuccess$: '1s a', receivedQuoteAssetSwapSuccess$: '1400ms b', @@ -122,25 +106,17 @@ describe('getCentralizedExchangeOrder$', () => { BASEASSET: BASEASSET, QUOTEASSET: QUOTEASSET, }; - const expectedAssetToTradeOnCEX = BASEASSET; - const store = { - ...getArbyStore(), - ...{ resetLastOrderUpdatePrice: jest.fn() }, - }; assertOrderBuilder( inputEvents, inputValues, expected, expectedValues, - config, - expectedAssetToTradeOnCEX, - store + config ); - expect(store.resetLastOrderUpdatePrice).toHaveBeenCalledTimes(4); }); it('accumulates buy and sell orders for BTCUSDT', () => { - expect.assertions(6); + expect.assertions(2); const inputEvents = { receivedBaseAssetSwapSuccess$: '1s a', receivedQuoteAssetSwapSuccess$: '1400ms b', @@ -176,20 +152,12 @@ describe('getCentralizedExchangeOrder$', () => { BASEASSET: BASEASSET, QUOTEASSET: QUOTEASSET, }; - const expectedAssetToTradeOnCEX = QUOTEASSET; - const store = { - ...getArbyStore(), - ...{ resetLastOrderUpdatePrice: jest.fn() }, - }; assertOrderBuilder( inputEvents, inputValues, expected, expectedValues, - config, - expectedAssetToTradeOnCEX, - store + config ); - expect(store.resetLastOrderUpdatePrice).toHaveBeenCalledTimes(4); }); }); diff --git a/src/centralized/order-builder.ts b/src/centralized/order-builder.ts index 51a1da3..4d20bd8 100644 --- a/src/centralized/order-builder.ts +++ b/src/centralized/order-builder.ts @@ -1,9 +1,8 @@ import BigNumber from 'bignumber.js'; -import { merge, Observable, of } from 'rxjs'; -import { filter, map, mergeMap, repeat, take } from 'rxjs/operators'; +import { merge, Observable } from 'rxjs'; +import { map, mergeMap, repeat, take } from 'rxjs/operators'; import { Config } from '../config'; import { OrderSide } from '../constants'; -import { Logger } from '../logger'; import { GetOpenDEXswapSuccessParams, OpenDEXswapSuccess, @@ -11,42 +10,27 @@ import { import { getXudClient$ } from '../opendex/xud/client'; import { subscribeXudSwaps$ } from '../opendex/xud/subscribe-swaps'; import { SwapSuccess } from '../proto/xudrpc_pb'; -import { ArbyStore } from '../store'; -type GetOrderBuilderParams = { - config: Config; - logger: Logger; +type CEXorder = { + quantity: BigNumber; + side: OrderSide; +}; + +const getOrderBuilder$ = ( + config: Config, getOpenDEXswapSuccess$: ({ config, getXudClient$, subscribeXudSwaps$, - }: GetOpenDEXswapSuccessParams) => OpenDEXswapSuccess; + }: GetOpenDEXswapSuccessParams) => OpenDEXswapSuccess, accumulateOrderFillsForBaseAssetReceived: ( - config: Config - ) => (source: Observable) => Observable; + source: Observable + ) => Observable, accumulateOrderFillsForQuoteAssetReceived: ( - config: Config - ) => (source: Observable) => Observable; - quantityAboveMinimum: ( - asset: string - ) => (filledQuantity: BigNumber) => boolean; - store: ArbyStore; -}; - -type CEXorder = { - quantity: BigNumber; - side: OrderSide; -}; - -const getOrderBuilder$ = ({ - config, - logger, - getOpenDEXswapSuccess$, - accumulateOrderFillsForBaseAssetReceived, - accumulateOrderFillsForQuoteAssetReceived, - quantityAboveMinimum, - store, -}: GetOrderBuilderParams): Observable => { + source: Observable + ) => Observable, + filterMinimumQuantity: (quantity: BigNumber) => Observable +): Observable => { const { receivedBaseAssetSwapSuccess$, receivedQuoteAssetSwapSuccess$, @@ -55,53 +39,37 @@ const getOrderBuilder$ = ({ getXudClient$, subscribeXudSwaps$, }); - const assetToTradeOnCEX: string = - config.CEX_QUOTEASSET === 'BTC' - ? config.CEX_BASEASSET - : config.CEX_QUOTEASSET; - const receivedQuoteAssetOrder$ = receivedQuoteAssetSwapSuccess$.pipe( - // accumulate OpenDEX order fills when receiving - // quote asset - accumulateOrderFillsForQuoteAssetReceived(config), - mergeMap((quantity: BigNumber) => { - logger.info( - `Swap success. Accumulated ${assetToTradeOnCEX} quantity: ${quantity.toFixed()}` - ); - store.resetLastOrderUpdatePrice(); - return of(quantity); - }), - // filter based on minimum CEX order quantity - filter(quantityAboveMinimum(assetToTradeOnCEX)), - map(quantity => { - return { quantity, side: OrderSide.BUY }; - }), - // reset the filled quantity and start from - // the beginning - take(1), - repeat() + const assetReceived = ( + swapSuccess: Observable, + side: OrderSide, + accumulator: (source: Observable) => Observable + ) => { + return swapSuccess.pipe( + // accumulate OpenDEX order fills + accumulator, + mergeMap(filterMinimumQuantity), + map(quantity => { + return { quantity, side }; + }), + // reset the filled quantity and start from + // the beginning + take(1), + repeat() + ); + }; + const receivedQuoteAssetOrder$ = assetReceived( + receivedQuoteAssetSwapSuccess$, + OrderSide.BUY, + accumulateOrderFillsForQuoteAssetReceived ); - const receivedBaseAssetOrder$ = receivedBaseAssetSwapSuccess$.pipe( - // accumulate OpenDEX order fills when receiving - // quote asset - accumulateOrderFillsForBaseAssetReceived(config), - mergeMap((quantity: BigNumber) => { - logger.info( - `Swap success. Accumulated ${assetToTradeOnCEX} quantity: ${quantity.toFixed()}` - ); - store.resetLastOrderUpdatePrice(); - return of(quantity); - }), - // filter based on minimum CEX order quantity - filter(quantityAboveMinimum(assetToTradeOnCEX)), - map(quantity => { - return { quantity, side: OrderSide.SELL }; - }), - // reset the filled quantity and start from - // the beginning - take(1), - repeat() + const receivedBaseAssetOrder$ = assetReceived( + receivedBaseAssetSwapSuccess$, + OrderSide.SELL, + accumulateOrderFillsForBaseAssetReceived ); return merge(receivedQuoteAssetOrder$, receivedBaseAssetOrder$); }; +type GetOrderBuilderParams = Parameters; + export { getOrderBuilder$, GetOrderBuilderParams, CEXorder }; diff --git a/src/centralized/order.ts b/src/centralized/order.ts index 5c11a66..83cfe5e 100644 --- a/src/centralized/order.ts +++ b/src/centralized/order.ts @@ -1,12 +1,14 @@ import BigNumber from 'bignumber.js'; -import { Exchange } from 'ccxt'; -import { Observable, timer } from 'rxjs'; +import { Dictionary, Exchange, Market } from 'ccxt'; +import { combineLatest, Observable, timer } from 'rxjs'; import { catchError, mergeMap, mergeMapTo, withLatestFrom, + map, } from 'rxjs/operators'; +import { ArbyStore } from 'src/store'; import { Config } from '../config'; import { Logger } from '../logger'; import { getOpenDEXswapSuccess$ } from '../opendex/swap-success'; @@ -18,24 +20,13 @@ import { createOrder$ } from './ccxt/create-order'; import { ExecuteCEXorderParams } from './execute-order'; import { quantityAboveMinimum } from './minimum-order-quantity-filter'; import { CEXorder, GetOrderBuilderParams } from './order-builder'; -import { ArbyStore } from 'src/store'; type GetCentralizedExchangeOrderParams = { CEX: Exchange; logger: Logger; config: Config; - executeCEXorder$: ({ - logger, - price, - order, - }: ExecuteCEXorderParams) => Observable; - getOrderBuilder$: ({ - config, - getOpenDEXswapSuccess$, - accumulateOrderFillsForBaseAssetReceived, - accumulateOrderFillsForQuoteAssetReceived, - quantityAboveMinimum, - }: GetOrderBuilderParams) => Observable; + executeCEXorder$: (...args: ExecuteCEXorderParams) => Observable; + getOrderBuilder$: (...args: GetOrderBuilderParams) => Observable; centralizedExchangePrice$: Observable; deriveCEXorderQuantity: ( order: CEXorder, @@ -55,15 +46,42 @@ const getCentralizedExchangeOrder$ = ({ deriveCEXorderQuantity, store, }: GetCentralizedExchangeOrderParams): Observable => { - return getOrderBuilder$({ - config, + const minimumBaseAssetQuantity$ = (store.selectState('markets') as Observable< + Dictionary + >).pipe( + map(markets => { + const tradingPair = `${config.CEX_BASEASSET}/${config.CEX_QUOTEASSET}`; + const market = markets[tradingPair]; + return new BigNumber(market.limits.amount.min); + }) + ); + const minimumQuantity$ = combineLatest([ + minimumBaseAssetQuantity$, + centralizedExchangePrice$, + ]).pipe( + map(([minimumQuantity, price]) => + config.BASEASSET === 'BTC' + ? minimumQuantity.multipliedBy(price) + : minimumQuantity + ) + ); + const assetToTradeOnCEX: string = + config.CEX_QUOTEASSET === 'BTC' + ? config.CEX_BASEASSET + : config.CEX_QUOTEASSET; + const filterMinimumQuantity = quantityAboveMinimum( + store, logger, + assetToTradeOnCEX, + minimumQuantity$ + ); + return getOrderBuilder$( + config, getOpenDEXswapSuccess$, - accumulateOrderFillsForBaseAssetReceived, - accumulateOrderFillsForQuoteAssetReceived, - quantityAboveMinimum, - store, - }).pipe( + accumulateOrderFillsForBaseAssetReceived(config), + accumulateOrderFillsForQuoteAssetReceived(config), + filterMinimumQuantity + ).pipe( withLatestFrom( centralizedExchangePrice$.pipe( catchError((_e, caught) => { @@ -72,14 +90,14 @@ const getCentralizedExchangeOrder$ = ({ ) ), mergeMap(([order, price]) => { - return executeCEXorder$({ + return executeCEXorder$( CEX, - createOrder$, config, logger, price, - order: deriveCEXorderQuantity(order, price, config), - }); + deriveCEXorderQuantity(order, price, config), + createOrder$ + ); }) ); }; diff --git a/src/store.spec.ts b/src/store.spec.ts index de8b678..e13773c 100644 --- a/src/store.spec.ts +++ b/src/store.spec.ts @@ -1,4 +1,5 @@ import BigNumber from 'bignumber.js'; +import { Dictionary, Market } from 'ccxt'; import { getArbyStore } from './store'; describe('ArbyStore', () => { @@ -18,6 +19,24 @@ describe('ArbyStore', () => { }); }); + it('selectState returns empty initial markets', done => { + const { selectState } = getArbyStore(); + selectState('markets').subscribe(markets => { + expect(markets).toEqual({}); + done(); + }); + }); + + it('selectState returns updated markets', done => { + const { selectState, setMarkets } = getArbyStore(); + const testMarkets = ({ 'BTC/USDT': true } as unknown) as Dictionary; + setMarkets(testMarkets); + selectState('markets').subscribe(markets => { + expect(markets).toEqual(testMarkets); + done(); + }); + }); + it('selectState returns updated last sell order price', done => { const { selectState, updateLastSellOrderUpdatePrice } = getArbyStore(); const updatedPrice = new BigNumber('123'); diff --git a/src/store.ts b/src/store.ts index b2e3513..72843a2 100644 --- a/src/store.ts +++ b/src/store.ts @@ -1,18 +1,23 @@ import BigNumber from 'bignumber.js'; import { BehaviorSubject, Subject, Observable } from 'rxjs'; import { scan, pluck, distinctUntilKeyChanged } from 'rxjs/operators'; +import { Dictionary, Market } from 'ccxt'; type ArbyStore = { updateLastSellOrderUpdatePrice: (price: BigNumber) => void; updateLastBuyOrderUpdatePrice: (price: BigNumber) => void; resetLastOrderUpdatePrice: () => void; - selectState: (stateKey: ArbyStoreDataKeys) => Observable; + setMarkets: (markets: Dictionary) => void; + selectState: ( + stateKey: ArbyStoreDataKeys + ) => Observable>; stateChanges: () => Observable; }; type ArbyStoreData = { lastSellOrderUpdatePrice: BigNumber; lastBuyOrderUpdatePrice: BigNumber; + markets: Dictionary; }; type ArbyStoreDataKeys = keyof ArbyStoreData; @@ -21,6 +26,7 @@ const getArbyStore = (): ArbyStore => { const initialState: ArbyStoreData = { lastSellOrderUpdatePrice: new BigNumber('0'), lastBuyOrderUpdatePrice: new BigNumber('0'), + markets: {}, }; const store = new BehaviorSubject(initialState); const stateUpdates = new Subject() as Subject>; @@ -47,6 +53,11 @@ const getArbyStore = (): ArbyStore => { lastBuyOrderUpdatePrice: new BigNumber('0'), }); }; + const setMarkets = (markets: Dictionary) => { + stateUpdates.next({ + markets, + }); + }; const selectState = (stateKey: ArbyStoreDataKeys) => { return store.pipe(distinctUntilKeyChanged(stateKey), pluck(stateKey)); }; @@ -59,6 +70,7 @@ const getArbyStore = (): ArbyStore => { selectState, resetLastOrderUpdatePrice, stateChanges, + setMarkets, }; }; diff --git a/src/trade/accumulate-fills.ts b/src/trade/accumulate-fills.ts index 0a32f66..bed245d 100644 --- a/src/trade/accumulate-fills.ts +++ b/src/trade/accumulate-fills.ts @@ -1,12 +1,13 @@ import BigNumber from 'bignumber.js'; +import { curry } from 'ramda'; import { Observable } from 'rxjs'; import { scan } from 'rxjs/operators'; import { Config } from '../config'; import { SwapSuccess } from '../proto/xudrpc_pb'; import { satsToCoinsStr } from '../utils'; -const accumulateOrderFillsForBaseAssetReceived = (config: Config) => { - return (source: Observable) => { +const accumulateOrderFillsForBaseAssetReceived = curry( + (config: Config, source: Observable) => { const SEED_VALUE = new BigNumber('0'); return source.pipe( scan((acc: BigNumber, curr: SwapSuccess) => { @@ -26,11 +27,11 @@ const accumulateOrderFillsForBaseAssetReceived = (config: Config) => { } }, SEED_VALUE) ); - }; -}; + } +); -const accumulateOrderFillsForQuoteAssetReceived = (config: Config) => { - return (source: Observable) => { +const accumulateOrderFillsForQuoteAssetReceived = curry( + (config: Config, source: Observable) => { const SEED_VALUE = new BigNumber('0'); return source.pipe( scan((acc: BigNumber, curr: SwapSuccess) => { @@ -50,8 +51,8 @@ const accumulateOrderFillsForQuoteAssetReceived = (config: Config) => { } }, SEED_VALUE) ); - }; -}; + } +); export { accumulateOrderFillsForBaseAssetReceived,