Skip to content

Commit

Permalink
Merge pull request #1439 from get10101/fix/busy-loop-app-orderbook-ws…
Browse files Browse the repository at this point in the history
…-client

Use app's orderbook WS reconnect timeout in all cases
  • Loading branch information
bonomat authored Oct 6, 2023
2 parents f892332 + 7257c0b commit 04c626c
Showing 1 changed file with 98 additions and 101 deletions.
199 changes: 98 additions & 101 deletions mobile/native/src/orderbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use tokio::runtime::Runtime;
use tokio::sync::watch;
use uuid::Uuid;

/// The reconnect timeout should be high enough for the coordinator to get ready. If its too early
/// we may not be ready process messages which require dlc actions.
/// The reconnect timeout should be high enough for the coordinator to get ready after a restart. If
/// we reconnect too early we may not be ready process messages which require DLC actions.
const WS_RECONNECT_TIMEOUT: Duration = Duration::from_secs(5);

const EXPIRED_ORDER_PRUNING_INTERVAL: Duration = Duration::from_secs(30);

pub fn subscribe(
Expand Down Expand Up @@ -89,110 +90,106 @@ pub fn subscribe(
let url = url.clone();
let authenticate = authenticate;
let fcm_token = fcm_token.clone();
let (_, mut stream) =
match orderbook_client::subscribe_with_authentication(url, authenticate, fcm_token).await {
Ok(split_stream) => split_stream,
Err(e) => {
tracing::error!("Could not start up orderbook client: {e:#}");
continue;
},
};

if let Err(e) =
orderbook_status.send(ServiceStatus::Online) {
tracing::warn!("Cannot update orderbook status: {e:#}");
};

let mut cached_best_price : Prices = HashMap::new();
loop {
match stream.try_next().await {
Ok(Some(msg)) => {


let msg = match serde_json::from_str::<Message>(&msg) {
Ok(msg) => {
tracing::debug!(%msg, "New message from orderbook");
msg
},
Err(e) => {
tracing::error!(
"Could not deserialize message from orderbook. Error: {e:#}"
);
continue;
}
match orderbook_client::subscribe_with_authentication(url, authenticate, fcm_token).await {
Ok((_, mut stream)) => {
if let Err(e) =
orderbook_status.send(ServiceStatus::Online) {
tracing::warn!("Cannot update orderbook status: {e:#}");
};

match msg {
Message::Rollover => {
tracing::info!("Received a rollover request from orderbook.");
event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Pending)));

if let Err(e) = position::handler::rollover().await {
tracing::error!("Failed to rollover dlc. Error: {e:#}");
event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Failed)));
}
},
Message::AsyncMatch { order, filled_with } => {
let order_reason = order.clone().order_reason.into();
tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {order_reason:?}");
event::publish(&EventInternal::BackgroundNotification(BackgroundTask::AsyncTrade(order_reason)));

if let Err(e) = position::handler::async_trade(order.clone(), filled_with).await {
tracing::error!(order_id = %order.id, "Failed to process async trade. Error: {e:#}");
}
},
Message::Match(filled) => {
tracing::info!(order_id = %filled.order_id, "Received match from orderbook");

if let Err(e) = position::handler::trade(filled.clone()).await {
tracing::error!(order_id = %filled.order_id, "Trade request sent to coordinator failed. Error: {e:#}");
}
},
Message::AllOrders(initial_orders) => {
let mut orders = orders.lock();
if !orders.is_empty() {
tracing::debug!("Received new set of initial orders from orderbook, replacing the previously stored orders");
let mut cached_best_price : Prices = HashMap::new();
loop {
match stream.try_next().await {
Ok(Some(msg)) => {
let msg = match serde_json::from_str::<Message>(&msg) {
Ok(msg) => {
tracing::debug!(%msg, "New message from orderbook");
msg
},
Err(e) => {
tracing::error!(
"Could not deserialize message from orderbook. Error: {e:#}"
);
continue;
}
};

match msg {
Message::Rollover => {
tracing::info!("Received a rollover request from orderbook.");
event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Pending)));

if let Err(e) = position::handler::rollover().await {
tracing::error!("Failed to rollover dlc. Error: {e:#}");
event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Failed)));
}
},
Message::AsyncMatch { order, filled_with } => {
let order_reason = order.clone().order_reason.into();
tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {order_reason:?}");
event::publish(&EventInternal::BackgroundNotification(BackgroundTask::AsyncTrade(order_reason)));

if let Err(e) = position::handler::async_trade(order.clone(), filled_with).await {
tracing::error!(order_id = %order.id, "Failed to process async trade. Error: {e:#}");
}
},
Message::Match(filled) => {
tracing::info!(order_id = %filled.order_id, "Received match from orderbook");

if let Err(e) = position::handler::trade(filled.clone()).await {
tracing::error!(order_id = %filled.order_id, "Trade request sent to coordinator failed. Error: {e:#}");
}
},
Message::AllOrders(initial_orders) => {
let mut orders = orders.lock();
if !orders.is_empty() {
tracing::debug!("Received new set of initial orders from orderbook, replacing the previously stored orders");
}
else {
tracing::debug!(?orders, "Received all orders from orderbook");
}
*orders = initial_orders;
update_prices_if_needed(&mut cached_best_price, &orders);
},
Message::NewOrder(order) => {
let mut orders = orders.lock();
orders.push(order);
update_prices_if_needed(&mut cached_best_price, &orders);
}
Message::DeleteOrder(order_id) => {
let mut orders = orders.lock();
let found = remove_order(&mut orders, order_id);
if !found {
tracing::warn!(%order_id, "Could not remove non-existing order");
}
update_prices_if_needed(&mut cached_best_price, &orders);
},
Message::Update(updated_order) => {
let mut orders = orders.lock();
let found = remove_order(&mut orders, updated_order.id);
if !found {
tracing::warn!(?updated_order, "Update without prior knowledge of order");
}
orders.push(updated_order);
update_prices_if_needed(&mut cached_best_price, &orders);
},
_ => tracing::debug!(?msg, "Skipping message from orderbook"),
}
else {
tracing::debug!(?orders, "Received all orders from orderbook");
}
*orders = initial_orders;
update_prices_if_needed(&mut cached_best_price, &orders);
},
Message::NewOrder(order) => {
let mut orders = orders.lock();
orders.push(order);
update_prices_if_needed(&mut cached_best_price, &orders);
}
Message::DeleteOrder(order_id) => {
let mut orders = orders.lock();
let found = remove_order(&mut orders, order_id);
if !found {
tracing::warn!(%order_id, "Could not remove non-existing order");
}
update_prices_if_needed(&mut cached_best_price, &orders);
},
Message::Update(updated_order) => {
let mut orders = orders.lock();
let found = remove_order(&mut orders, updated_order.id);
if !found {
tracing::warn!(?updated_order, "Update without prior knowledge of order");
}
orders.push(updated_order);
update_prices_if_needed(&mut cached_best_price, &orders);
},
_ => tracing::debug!(?msg, "Skipping message from orderbook"),
Ok(None) => {
tracing::warn!("Orderbook WS stream closed");
break;
}
Err(error) => {
tracing::warn!(%error, "Orderbook WS stream closed with error");
break;
}
}
}
Ok(None) => {
tracing::warn!("Orderbook WS stream closed");
break;
}
Err(error) => {
tracing::warn!(%error, "Orderbook WS stream closed with error");
break;
}
}
};
},
Err(e) => {
tracing::error!("Could not start up orderbook client: {e:#}");
},
};

if let Err(e) =
Expand Down

0 comments on commit 04c626c

Please sign in to comment.