Skip to content

Commit

Permalink
price-reporter: coinbase: Use new asymmetric key auth (#900)
Browse files Browse the repository at this point in the history
* price-reporter: coinbase: Use new asymmetric advanced trading auth

* price-reporter: coinbase: Optimize order book parsing
  • Loading branch information
joeykraut authored Feb 8, 2025
1 parent 428b64d commit 39e6c29
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 86 deletions.
44 changes: 33 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ indexmap = "2.0.2"
itertools = "0.10"
serde = { version = "1.0" }
serde_json = "1.0.64"
thiserror = "2.0"
tracing = "0.1"
tracing-opentelemetry = "0.22"
metrics = "=0.22.3"
Expand Down
20 changes: 11 additions & 9 deletions config/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,14 @@ pub struct Cli {
// -----------
// | Secrets |
// -----------
/// The Coinbase API key to use for price streaming
#[clap(long = "coinbase-key", value_parser)]
pub coinbase_api_key: Option<String>,
/// The Coinbase API key name to use for price streaming
#[clap(long = "coinbase-key-name", value_parser)]
pub coinbase_key_name: Option<String>,
/// The Coinbase API secret to use for price streaming
#[clap(long = "coinbase-secret", value_parser)]
pub coinbase_api_secret: Option<String>,
///
/// Expected as a PKCS#8 encoded ES256 private key
#[clap(long = "coinbase-key-secret", value_parser)]
pub coinbase_key_secret: Option<String>,
/// The Ethereum RPC node websocket address to dial for on-chain data
#[clap(long = "eth-websocket", value_parser)]
pub eth_websocket_addr: Option<String>,
Expand Down Expand Up @@ -382,9 +384,9 @@ pub struct RelayerConfig {
/// The cluster ID, a parsed version of the cluster's pubkey
pub cluster_id: ClusterId,
/// The Coinbase API key to use for price streaming
pub coinbase_api_key: Option<String>,
pub coinbase_key_name: Option<String>,
/// The Coinbase API secret to use for price streaming
pub coinbase_api_secret: Option<String>,
pub coinbase_key_secret: Option<String>,
/// The HTTP addressable Arbitrum JSON-RPC node
pub rpc_url: Option<String>,
/// The Arbitrum private keys used to send transactions
Expand Down Expand Up @@ -498,8 +500,8 @@ impl Clone for RelayerConfig {
cluster_symmetric_key: self.cluster_symmetric_key,
admin_api_key: self.admin_api_key,
cluster_id: self.cluster_id.clone(),
coinbase_api_key: self.coinbase_api_key.clone(),
coinbase_api_secret: self.coinbase_api_secret.clone(),
coinbase_key_name: self.coinbase_key_name.clone(),
coinbase_key_secret: self.coinbase_key_secret.clone(),
rpc_url: self.rpc_url.clone(),
arbitrum_private_keys: self.arbitrum_private_keys.clone(),
fee_key: self.fee_key,
Expand Down
4 changes: 2 additions & 2 deletions config/src/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ pub(crate) fn parse_config_from_args(cli_args: Cli) -> Result<RelayerConfig, Str
cluster_symmetric_key,
admin_api_key,
cluster_id,
coinbase_api_key: cli_args.coinbase_api_key,
coinbase_api_secret: cli_args.coinbase_api_secret,
coinbase_key_name: cli_args.coinbase_key_name,
coinbase_key_secret: cli_args.coinbase_key_secret,
rpc_url: cli_args.rpc_url,
arbitrum_private_keys,
fee_key,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ async fn main() -> Result<(), CoordinatorError> {
job_receiver: Some(price_reporter_worker_receiver).into(),
cancel_channel: price_reporter_cancel_receiver,
exchange_conn_config: ExchangeConnectionsConfig {
coinbase_api_key: args.coinbase_api_key,
coinbase_api_secret: args.coinbase_api_secret,
coinbase_key_name: args.coinbase_key_name,
coinbase_key_secret: args.coinbase_key_secret,
eth_websocket_addr: args.eth_websocket_addr,
},
price_reporter_url: args.price_reporter_url,
Expand Down
4 changes: 2 additions & 2 deletions mock-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ impl MockNodeController {

let conf = PriceReporterConfig {
exchange_conn_config: ExchangeConnectionsConfig {
coinbase_api_key: config.coinbase_api_key.clone(),
coinbase_api_secret: config.coinbase_api_secret.clone(),
coinbase_key_name: config.coinbase_key_name.clone(),
coinbase_key_secret: config.coinbase_key_secret.clone(),
eth_websocket_addr: config.eth_websocket_addr.clone(),
},
price_reporter_url: config.price_reporter_url.clone(),
Expand Down
3 changes: 2 additions & 1 deletion workers/price-reporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mocks = ["bimap"]

[dependencies]
# === Cryptography === #
hmac-sha256 = "1.1"
jsonwebtoken = "9.3"

# === Async + Runtime === #
async-trait = { workspace = true }
Expand Down Expand Up @@ -41,5 +41,6 @@ lazy_static = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
statrs = "0.16"
thiserror = { workspace = true }
tracing = { workspace = true }
url = "2.4"
23 changes: 15 additions & 8 deletions workers/price-reporter/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,45 @@ use std::error::Error;
use std::fmt::{self, Display};

use common::types::{exchange::Exchange, token::Token};
use thiserror::Error;

/// The core error type used by the ExchangeConnection. All thrown errors are
/// handled by the PriceReporter, either for restarts or panics upon too many
/// consecutive errors.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Error)]
pub enum ExchangeConnectionError {
/// A websocket remote connection hangup.
#[error("remote connection hangup: {0}")]
ConnectionHangup(String),
/// A cryptographic error occurred
#[error("cryptographic error: {0}")]
Crypto(String),
/// An initial websocket subscription to a remote server failed.
#[error("initial websocket subscription failed: {0}")]
HandshakeFailure(String),
/// Could not parse a remote server message.
#[error("could not parse remote server message: {0}")]
InvalidMessage(String),
/// The maximum retry count was exceeded while trying to re-establish
/// an exchange connection
#[error(
"maximum retry count exceeded while trying to re-establish an exchange connection to {0}"
)]
MaxRetries(Exchange),
/// The given pair is not supported by the exchange
#[error("the given pair ({0}, {1}) is not supported by the exchange ({2})")]
UnsupportedPair(Token, Token, Exchange),
/// Error sending on the `write` end of the websocket
#[error("error sending on the `write` end of the websocket: {0}")]
SendError(String),
/// Error saving the state of a price stream
#[error("error saving the state of a price stream: {0}")]
SaveState(String),
/// Tried to initialize an ExchangeConnection that was already initialized
#[error("tried to initialize an ExchangeConnection that was already initialized: {0}")]
AlreadyInitialized(Exchange, Token, Token),
}

impl Error for ExchangeConnectionError {}
impl Display for ExchangeConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}

/// The core error type thrown by the PriceReporter worker.
#[derive(Clone, Debug)]
pub enum PriceReporterError {
Expand Down
Loading

0 comments on commit 39e6c29

Please sign in to comment.