Skip to content

Commit

Permalink
Обновил API T-Invest до версии 1.24. Добавил поток статусов ордеров д…
Browse files Browse the repository at this point in the history
…ля более точного определения статусов
  • Loading branch information
nikitabuida committed Nov 18, 2024
1 parent 4ef5710 commit fc9e201
Show file tree
Hide file tree
Showing 13 changed files with 1,203 additions and 935 deletions.
185 changes: 130 additions & 55 deletions project/OsEngine/Market/Servers/TinkoffInvestments/TinkoffServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,32 @@ public TinkoffInvestmentsServerRealization()
ServerTime = DateTime.UtcNow;

Thread worker = new Thread(ConnectionCheckThread);
worker.Name = "CheckAliveTinkoff";
worker.Name = "CheckAliveTInvest";
worker.Start();

Thread worker2 = new Thread(DataMessageReader);
worker2.Name = "DataMessageReaderTinkoff";
worker2.Name = "DataMessageReaderTInvest";
worker2.Start();

Thread worker3 = new Thread(PortfolioMessageReader);
worker3.Name = "PortfolioMessageReaderTinkoff";
worker3.Name = "PortfolioMessageReaderTInvest";
worker3.Start();

Thread worker4 = new Thread(PositionsMessageReader);
worker4.Name = "PositionsMessageReaderTinkoff";
worker4.Name = "PositionsMessageReaderTInvest";
worker4.Start();

Thread worker5 = new Thread(MyTradesMessageReader);
worker5.Name = "MyTradesAndOrdersMessageReaderTinkoff";
worker5.Name = "MyTradesMessageReaderTInvest";
worker5.Start();

Thread worker6 = new Thread(LastPricesPoller);
worker6.Name = "LastPricesPollingTinkoff";
worker6.Name = "LastPricesPollingTInvest";
worker6.Start();

Thread worker7 = new Thread(OrderStateMessageReader);
worker5.Name = "OrderStateMessageReaderTInvest";
worker5.Start();
}

public void Connect()
Expand All @@ -81,7 +85,7 @@ public void Connect()
_myPortfolios.Clear();
_subscribedSecurities.Clear();

SendLogMessage("Start TinkoffInvestments Connection", LogMessageType.System);
SendLogMessage("Start T-Invest Connection", LogMessageType.System);

_accessToken = ((ServerParameterString)ServerParameters[0]).Value;
_customTerminalId = ((ServerParameterString)ServerParameters[5]).Value;
Expand All @@ -90,7 +94,7 @@ public void Connect()

if (string.IsNullOrEmpty(_accessToken))
{
SendLogMessage("Connection terminated. You must specify the api token. You can get it on the TinkoffInvestments website",
SendLogMessage("Connection terminated. You must specify the api token. You can get it on the T-Invest website",
LogMessageType.Error);
return;
}
Expand Down Expand Up @@ -227,12 +231,25 @@ public void Dispose()
}
}

if (_myOrderStateDataStream != null)
{
try
{
_myOrderStateDataStream.Dispose();
}
catch (Exception ex)
{
SendLogMessage("Error disposing stream", LogMessageType.Error);
}
}

_marketDataStream = null;
_portfolioDataStream = null;
_positionsDataStream = null;
_myTradesDataStream = null;
_myOrderStateDataStream = null;

SendLogMessage("Connection Closed by TinkoffInvestments. Data streams Closed Event", LogMessageType.System);
SendLogMessage("Connection Closed by T-Invest. Data streams Closed Event", LogMessageType.System);

_subscribedSecurities.Clear();
_myPortfolios.Clear();
Expand Down Expand Up @@ -1448,6 +1465,10 @@ private void ReconnectGRPCStreams()
Accounts = { accountsList }
}, _gRpcMetadata, cancellationToken: _cancellationTokenSource.Token);

_myOrderStateDataStream = _ordersStreamClient.OrderStateStream(new OrderStateStreamRequest
{
Accounts = { accountsList }
}, _gRpcMetadata, cancellationToken: _cancellationTokenSource.Token);

_portfolioDataStream =
_operationsStreamClient.PortfolioStream(new PortfolioStreamRequest { Accounts = { accountsList } },
Expand Down Expand Up @@ -1478,6 +1499,7 @@ private void ActivateCurrentPortfolioListening()
private bool _useStreamForMarketData = true; // if we are over the limits, then stop using stream and turn to data polling (300+ subscribed secs)
private AsyncDuplexStreamingCall<MarketDataRequest, MarketDataResponse> _marketDataStream;
private AsyncServerStreamingCall<TradesStreamResponse> _myTradesDataStream;
private AsyncServerStreamingCall<OrderStateStreamResponse> _myOrderStateDataStream;
private AsyncServerStreamingCall<PortfolioStreamResponse> _portfolioDataStream;
private AsyncServerStreamingCall<PositionsStreamResponse> _positionsDataStream;

Expand Down Expand Up @@ -1529,7 +1551,7 @@ public void Subscrible(Security security)
{
SubscriptionAction = SubscriptionAction.Subscribe,
Instruments = { tradeInstrument },
TradeType = _filterOutDealerTrades ? TradeSourceType.TradeSourceExchange : TradeSourceType.TradeSourceAll
TradeSource = _filterOutDealerTrades ? TradeSourceType.TradeSourceExchange : TradeSourceType.TradeSourceAll
};
marketDataRequestTrades.SubscribeTradesRequest = subscribeTradesRequest;

Expand Down Expand Up @@ -2227,31 +2249,102 @@ private async void MyTradesMessageReader()
Thread.Sleep(1);
continue;
}

for (int i = 0; i < tradesResponse.OrderTrades.Trades.Count; i++)
{
MyTrade trade = new MyTrade();

// запрашиваем состояние ордера
GetOrderStateRequest getOrderStateRequest = new GetOrderStateRequest();
getOrderStateRequest.OrderId = tradesResponse.OrderTrades.OrderId;
getOrderStateRequest.AccountId = tradesResponse.OrderTrades.AccountId;
trade.SecurityNameCode = security.Name;
trade.Price = GetValue(tradesResponse.OrderTrades.Trades[i].Price);
trade.Volume = tradesResponse.OrderTrades.Trades[i].Quantity/security.Lot;
trade.NumberOrderParent = tradesResponse.OrderTrades.OrderId;
trade.NumberTrade = tradesResponse.OrderTrades.Trades[i].TradeId;
trade.Time = tradesResponse.OrderTrades.Trades[i].DateTime.ToDateTime();
trade.Side = tradesResponse.OrderTrades.Direction == OrderDirection.Buy
? Side.Buy
: Side.Sell;

OrderState state = null;
try
{
_rateGateOrders.WaitToProceed();
state = _ordersClient.GetOrderState(getOrderStateRequest, _gRpcMetadata);
if (MyTradeEvent != null)
{
MyTradeEvent(trade);
}
}
catch (RpcException ex)
{
string message = GetGRPCErrorMessage(ex);
SendLogMessage($"Error getting order state. Info: {message}", LogMessageType.Error);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// Handle the cancellation gracefully
SendLogMessage("My trades data stream was cancelled", LogMessageType.System);
Thread.Sleep(5000);
}
catch (RpcException exception)
{
SendLogMessage("My trades data stream was disconnected", LogMessageType.Error);

Thread.Sleep(1);
continue;
}
catch (Exception ex)
{
SendLogMessage("Error getting order state " + security.Name + " exception: " + ex.ToString(), LogMessageType.Error);
SendLogMessage("Server data was: " + tradesResponse.ToString(), LogMessageType.Error);
// need to reconnect everything
if (ServerStatus != ServerConnectStatus.Disconnect)
{
ServerStatus = ServerConnectStatus.Disconnect;
DisconnectEvent();
}
Thread.Sleep(5000);
}
catch (Exception exception)
{
SendLogMessage(exception.ToString(), LogMessageType.Error);
Thread.Sleep(5000);
}
}
}

private async void OrderStateMessageReader()
{
Thread.Sleep(1000);

while (true)
{
try
{
if (_myOrderStateDataStream == null)
{
Thread.Sleep(1);
continue;
}

if (await _myOrderStateDataStream.ResponseStream.MoveNext() == false)
{
Thread.Sleep(1);
continue;
}

if (_myOrderStateDataStream == null)
{
Thread.Sleep(1);
continue;
}

OrderStateStreamResponse orderStateResponse = _myOrderStateDataStream.ResponseStream.Current;
if (orderStateResponse == null)
{
Thread.Sleep(1);
continue;
}

_lastMyTradesDataTime = DateTime.UtcNow;

if (orderStateResponse.Ping != null)
{
Thread.Sleep(1);
continue;
}

if (orderStateResponse.OrderState != null)
{
Security security = GetSecurity(orderStateResponse.OrderState.InstrumentUid);
OrderStateStreamResponse.Types.OrderState state = orderStateResponse.OrderState;

if (security == null)
{
Thread.Sleep(1);
continue;
}
Expand All @@ -2266,16 +2359,16 @@ private async void MyTradesMessageReader()
order.NumberUser = _orderNumbers[state.OrderRequestId];
order.NumberMarket = state.OrderId;
order.SecurityNameCode = security.Name;
order.PortfolioNumber = tradesResponse.OrderTrades.AccountId;
order.PortfolioNumber = state.AccountId;
order.Side = state.Direction == OrderDirection.Buy ? Side.Buy : Side.Sell;
order.TypeOrder = state.OrderType == OrderType.Limit
? OrderPriceType.Limit
: OrderPriceType.Market;

order.Volume = state.LotsRequested;
order.VolumeExecute = state.LotsExecuted;
order.Price = order.TypeOrder == OrderPriceType.Limit ? GetValue(state.InitialSecurityPrice) : 0;
order.TimeCallBack = state.OrderDate.ToDateTime();
order.Price = order.TypeOrder == OrderPriceType.Limit ? GetValue(state.InitialOrderPrice)/order.Volume : 0;
order.TimeCallBack = state.CreatedAt.ToDateTime();
order.SecurityClassCode = security.NameClass;

if (state.ExecutionReportStatus == OrderExecutionReportStatus.ExecutionReportStatusUnspecified)
Expand Down Expand Up @@ -2311,37 +2404,17 @@ private async void MyTradesMessageReader()
{
MyOrderEvent(order);
}

for (int i = 0; i < tradesResponse.OrderTrades.Trades.Count; i++)
{
MyTrade trade = new MyTrade();

trade.SecurityNameCode = security.Name;
trade.Price = GetValue(tradesResponse.OrderTrades.Trades[i].Price);
trade.Volume = tradesResponse.OrderTrades.Trades[i].Quantity/security.Lot;
trade.NumberOrderParent = tradesResponse.OrderTrades.OrderId;
trade.NumberTrade = tradesResponse.OrderTrades.Trades[i].TradeId;
trade.Time = tradesResponse.OrderTrades.Trades[i].DateTime.ToDateTime();
trade.Side = tradesResponse.OrderTrades.Direction == OrderDirection.Buy
? Side.Buy
: Side.Sell;

if (MyTradeEvent != null)
{
MyTradeEvent(trade);
}
}
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// Handle the cancellation gracefully
SendLogMessage("My trades data stream was cancelled", LogMessageType.System);
SendLogMessage("Order state data stream was cancelled", LogMessageType.System);
Thread.Sleep(5000);
}
catch (RpcException exception)
{
SendLogMessage("My trades data stream was disconnected", LogMessageType.Error);
SendLogMessage("Order state data stream was disconnected", LogMessageType.Error);

// need to reconnect everything
if (ServerStatus != ServerConnectStatus.Disconnect)
Expand All @@ -2358,6 +2431,7 @@ private async void MyTradesMessageReader()
}
}
}


public event Action<Order> MyOrderEvent;

Expand Down Expand Up @@ -2385,6 +2459,7 @@ public void SendOrder(Order order)
request.Price = ConvertToQuotation(order.Price);
request.InstrumentId = security.NameId;
request.AccountId = order.PortfolioNumber;
request.TimeInForce = TimeInForceType.TimeInForceDay; // по-умолчанию сегодняшний день

// генерируем новый номер ордера и добавляем его в словарь
Guid newUid = Guid.NewGuid();
Expand Down
Binary file modified project/OsEngine/bin/Debug/OsEngine.exe
Binary file not shown.
Binary file modified project/OsEngine/bin/Debug/TinkoffInvestApi.dll
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit fc9e201

Please sign in to comment.