Skip to content

Commit

Permalink
Improve live trading streaming packets (#8584)
Browse files Browse the repository at this point in the history
* Improve live trading streaming packets

* Further improvements

* Add test for default empty holding

- Improve deserialization
  • Loading branch information
Martin-Molinero authored Feb 13, 2025
1 parent 9be02d7 commit fabf119
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 44 deletions.
5 changes: 5 additions & 0 deletions Brokerages/Brokerage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ protected virtual List<Holding> GetAccountHoldings(Dictionary<string, string> br

if (brokerageData != null && brokerageData.Remove("live-holdings", out var value) && !string.IsNullOrEmpty(value))
{
if (Log.DebuggingEnabled)
{
Log.Debug($"Brokerage.GetAccountHoldings(): raw value: {value}");
}

// remove the key, we really only want to return the cached value on the first request
var result = JsonConvert.DeserializeObject<List<Holding>>(value);
if (result == null)
Expand Down
25 changes: 25 additions & 0 deletions Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,31 @@ public static decimal SmartRounding(this decimal input)
return input.RoundToSignificantDigits(7).Normalize();
}

/// <summary>
/// Provides global smart rounding to a shorter version
/// </summary>
public static decimal SmartRoundingShort(this decimal input)
{
input = Normalize(input);
if (input <= 1)
{
// 0.99 > input
return input;
}
else if (input <= 10)
{
// 1.01 to 9.99
return Math.Round(input, 2);
}
else if (input <= 100)
{
// 99.9 to 10.1
return Math.Round(input, 1);
}
// 100 to inf
return Math.Truncate(input);
}

/// <summary>
/// Casts the specified input value to a decimal while acknowledging the overflow conditions
/// </summary>
Expand Down
136 changes: 121 additions & 15 deletions Common/Global.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

using System;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.ComponentModel;
using QuantConnect.Securities;
using Newtonsoft.Json.Converters;
using System.Runtime.Serialization;
Expand Down Expand Up @@ -60,48 +62,104 @@ public static class DateFormat
/// <summary>
/// Singular holding of assets from backend live nodes:
/// </summary>
[JsonObject]
[JsonConverter(typeof(HoldingJsonConverter))]
public class Holding
{
private decimal? _conversionRate;
private decimal _marketValue;
private decimal _unrealizedPnl;
private decimal _unrealizedPnLPercent;

/// Symbol of the Holding:
[JsonProperty(PropertyName = "symbol")]
[JsonIgnore]
public Symbol Symbol { get; set; } = Symbol.Empty;

/// Type of the security
[JsonProperty(PropertyName = "type")]
[JsonIgnore]
public SecurityType Type => Symbol.SecurityType;

/// The currency symbol of the holding, such as $
[JsonProperty(PropertyName = "currencySymbol")]
[DefaultValue("$")]
[JsonProperty(PropertyName = "c", DefaultValueHandling = DefaultValueHandling.Ignore)]
public string CurrencySymbol { get; set; }

/// Average Price of our Holding in the currency the symbol is traded in
[JsonProperty(PropertyName = "averagePrice", DefaultValueHandling = DefaultValueHandling.Ignore)]
[JsonConverter(typeof(DecimalJsonConverter))]
[JsonProperty(PropertyName = "a", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal AveragePrice { get; set; }

/// Quantity of Symbol We Hold.
[JsonProperty(PropertyName = "quantity", DefaultValueHandling = DefaultValueHandling.Ignore)]
[JsonConverter(typeof(DecimalJsonConverter))]
[JsonProperty(PropertyName = "q", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal Quantity { get; set; }

/// Current Market Price of the Asset in the currency the symbol is traded in
[JsonProperty(PropertyName = "marketPrice", DefaultValueHandling = DefaultValueHandling.Ignore)]
[JsonConverter(typeof(DecimalJsonConverter))]
[JsonProperty(PropertyName = "p", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal MarketPrice { get; set; }

/// Current market conversion rate into the account currency
[JsonProperty(PropertyName = "conversionRate", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal? ConversionRate { get; set; }
[JsonConverter(typeof(DecimalJsonConverter))]
[JsonProperty(PropertyName = "r", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal? ConversionRate
{
get
{
return _conversionRate;
}
set
{
if (value != 1)
{
_conversionRate = value;
}
}
}

/// Current market value of the holding
[JsonProperty(PropertyName = "marketValue", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal MarketValue { get; set; }
[JsonConverter(typeof(DecimalJsonConverter))]
[JsonProperty(PropertyName = "v", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal MarketValue
{
get
{
return _marketValue;
}
set
{
_marketValue = value.SmartRoundingShort();
}
}

/// Current unrealized P/L of the holding
[JsonProperty(PropertyName = "unrealizedPnl", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal UnrealizedPnL { get; set; }
[JsonConverter(typeof(DecimalJsonConverter))]
[JsonProperty(PropertyName = "u", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal UnrealizedPnL
{
get
{
return _unrealizedPnl;
}
set
{
_unrealizedPnl = value.SmartRoundingShort();
}
}

/// Current unrealized P/L % of the holding
[JsonProperty(DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal UnrealizedPnLPercent { get; set; }
[JsonConverter(typeof(DecimalJsonConverter))]
[JsonProperty(PropertyName = "up", DefaultValueHandling = DefaultValueHandling.Ignore)]
public decimal UnrealizedPnLPercent
{
get
{
return _unrealizedPnLPercent;
}
set
{
_unrealizedPnLPercent = value.SmartRoundingShort();
}
}

/// Create a new default holding:
public Holding()
Expand Down Expand Up @@ -159,6 +217,54 @@ public override string ToString()
{
return Messages.Holding.ToString(this);
}

private class DecimalJsonConverter : JsonConverter
{
public override bool CanRead => false;
public override bool CanConvert(Type objectType) => typeof(decimal) == objectType;
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
writer.WriteRawValue(((decimal)value).NormalizeToStr());
}
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
throw new NotImplementedException();
}
}
private class HoldingJsonConverter : JsonConverter
{
public override bool CanWrite => false;
public override bool CanConvert(Type objectType) => typeof(Holding) == objectType;
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
var jObject = JObject.Load(reader);
var result = new Holding
{
Symbol = jObject["symbol"]?.ToObject<Symbol>() ?? jObject["Symbol"]?.ToObject<Symbol>() ?? Symbol.Empty,
CurrencySymbol = jObject["c"]?.Value<string>() ?? jObject["currencySymbol"]?.Value<string>() ?? jObject["CurrencySymbol"]?.Value<string>() ?? string.Empty,
AveragePrice = jObject["a"]?.Value<decimal>() ?? jObject["averagePrice"]?.Value<decimal>() ?? jObject["AveragePrice"]?.Value<decimal>() ?? 0,
Quantity = jObject["q"]?.Value<decimal>() ?? jObject["quantity"]?.Value<decimal>() ?? jObject["Quantity"]?.Value<decimal>() ?? 0,
MarketPrice = jObject["p"]?.Value<decimal>() ?? jObject["marketPrice"]?.Value<decimal>() ?? jObject["MarketPrice"]?.Value<decimal>() ?? 0,
ConversionRate = jObject["r"]?.Value<decimal>() ?? jObject["conversionRate"]?.Value<decimal>() ?? jObject["ConversionRate"]?.Value<decimal>() ?? null,
MarketValue = jObject["v"]?.Value<decimal>() ?? jObject["marketValue"]?.Value<decimal>() ?? jObject["MarketValue"]?.Value<decimal>() ?? 0,
UnrealizedPnL = jObject["u"]?.Value<decimal>() ?? jObject["unrealizedPnl"]?.Value<decimal>() ?? jObject["UnrealizedPnl"]?.Value<decimal>() ?? 0,
UnrealizedPnLPercent = jObject["up"]?.Value<decimal>() ?? jObject["unrealizedPnLPercent"]?.Value<decimal>() ?? jObject["UnrealizedPnLPercent"]?.Value<decimal>() ?? 0,
};
if (!result.ConversionRate.HasValue)
{
result.ConversionRate = 1;
}
if (string.IsNullOrEmpty(result.CurrencySymbol))
{
result.CurrencySymbol = "$";
}
return result;
}
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
throw new NotImplementedException();
}
}
}

/// <summary>
Expand Down
11 changes: 8 additions & 3 deletions Common/Messages/Messages.QuantConnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,15 @@ public static class Holding
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string ToString(QuantConnect.Holding instance)
{
var value = Invariant($@"{instance.Symbol.Value}: {instance.Quantity} @ {
instance.CurrencySymbol}{instance.AveragePrice} - Market: {instance.CurrencySymbol}{instance.MarketPrice}");
var currencySymbol = instance.CurrencySymbol;
if (string.IsNullOrEmpty(currencySymbol))
{
currencySymbol = "$";
}
var value = Invariant($@"{instance.Symbol?.Value}: {instance.Quantity} @ {
currencySymbol}{instance.AveragePrice} - Market: {currencySymbol}{instance.MarketPrice}");

if (instance.ConversionRate != 1m)
if (instance.ConversionRate.HasValue && instance.ConversionRate != 1m)
{
value += Invariant($" - Conversion: {instance.ConversionRate}");
}
Expand Down
21 changes: 0 additions & 21 deletions Common/Packets/LiveResultPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,16 @@ public class LiveResultPacket : Packet
/// </summary>
public int ProjectId { get; set; }

/// <summary>
/// User session Id who issued the result packet
/// </summary>
public string SessionId { get; set; } = string.Empty;

/// <summary>
/// Live Algorithm Id (DeployId) for this result packet
/// </summary>
public string DeployId { get; set; } = string.Empty;

/// <summary>
/// Compile Id algorithm which generated this result packet
/// </summary>
public string CompileId { get; set; } = string.Empty;

/// <summary>
/// Result data object for this result packet
/// </summary>
public LiveResult Results { get; set; } = new LiveResult();

/// <summary>
/// Processing time / running time for the live algorithm.
/// </summary>
public double ProcessingTime { get; set; }

/// <summary>
/// Default constructor for JSON Serialization
/// </summary>
Expand All @@ -80,15 +65,12 @@ public LiveResultPacket(string json)
try
{
var packet = JsonConvert.DeserializeObject<LiveResultPacket>(json);
CompileId = packet.CompileId;
Channel = packet.Channel;
SessionId = packet.SessionId;
DeployId = packet.DeployId;
Type = packet.Type;
UserId = packet.UserId;
ProjectId = packet.ProjectId;
Results = packet.Results;
ProcessingTime = packet.ProcessingTime;
}
catch (Exception err)
{
Expand All @@ -106,13 +88,10 @@ public LiveResultPacket(LiveNodePacket job, LiveResult results)
{
try
{
SessionId = job.SessionId;
CompileId = job.CompileId;
DeployId = job.DeployId;
Results = results;
UserId = job.UserId;
ProjectId = job.ProjectId;
SessionId = job.SessionId;
Channel = job.Channel;
}
catch (Exception err) {
Expand Down
7 changes: 2 additions & 5 deletions Engine/Results/LiveTradingResultHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ private void Update()
Log.Debug("LiveTradingResultHandler.Update(): End build delta charts");

//Profit loss changes, get the banner statistics, summary information on the performance for the headers.
var deltaStatistics = new Dictionary<string, string>();
var serverStatistics = GetServerStatistics(utcNow);
var holdings = GetHoldings(Algorithm.Securities.Values, Algorithm.SubscriptionManager.SubscriptionDataConfigService);

Expand All @@ -232,7 +231,7 @@ private void Update()

// since we're sending multiple packets, let's do it async and forget about it
// chart data can get big so let's break them up into groups
var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings, Algorithm.Portfolio.CashBook, deltaStatistics, runtimeStatistics, serverStatistics, deltaOrderEvents);
var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings, Algorithm.Portfolio.CashBook, runtimeStatistics, serverStatistics, deltaOrderEvents);

foreach (var liveResultPacket in splitPackets)
{
Expand All @@ -256,6 +255,7 @@ private void Update()

var orderEvents = GetOrderEventsToStore();

var deltaStatistics = new Dictionary<string, string>();
var orders = new Dictionary<int, Order>(TransactionHandler.Orders);
var complete = new LiveResultPacket(_job, new LiveResult(new LiveResultParameters(chartComplete, orders, Algorithm.Transactions.TransactionRecord, holdings, Algorithm.Portfolio.CashBook, deltaStatistics, runtimeStatistics, orderEvents, serverStatistics, state: GetAlgorithmState())));
StoreResult(complete);
Expand Down Expand Up @@ -469,7 +469,6 @@ private IEnumerable<LiveResultPacket> SplitPackets(Dictionary<string, Chart> del
Dictionary<int, Order> deltaOrders,
Dictionary<string, Holding> holdings,
CashBook cashbook,
Dictionary<string, string> deltaStatistics,
SortedDictionary<string, string> runtimeStatistics,
Dictionary<string, string> serverStatistics,
List<OrderEvent> deltaOrderEvents)
Expand Down Expand Up @@ -514,7 +513,6 @@ private IEnumerable<LiveResultPacket> SplitPackets(Dictionary<string, Chart> del
new LiveResultPacket(_job, new LiveResult { Holdings = holdings, CashBook = cashbook}),
new LiveResultPacket(_job, new LiveResult
{
Statistics = deltaStatistics,
RuntimeStatistics = runtimeStatistics,
ServerStatistics = serverStatistics
})
Expand Down Expand Up @@ -824,7 +822,6 @@ protected void SendFinalResult()
result = LiveResultPacket.CreateEmpty(_job);
result.Results.State = endState;
}
result.ProcessingTime = (endTime - StartTime).TotalSeconds;

StoreInsights();

Expand Down
Loading

0 comments on commit fabf119

Please sign in to comment.