Skip to content

Commit

Permalink
feat: introduce thiserror for library errors
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Dec 26, 2024
1 parent 819b179 commit 8c29d78
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 70 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ tokio-stream = "0.1"
# metrics
tracing = "0.1"

# extra
thiserror = "2.0.9"

[dev-dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand Down
96 changes: 53 additions & 43 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,33 @@ use tokio::{
};
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
use tonic::{codec::CompressionEncoding, transport::Channel, Request};
use tracing::{debug, error, info, trace};

use crate::generated::api::{
api_client::ApiClient, BlockSubmissionMsg, BlockSubmissionResponse, TxFilter,
};
use crate::utils::{append_metadata, parse_execution_payload_to_block};
use crate::{Dispatcher, SendType};

type FiberResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;

const BELLATRIX_DATA_VERSION: u32 = 3;
const CAPELLA_DATA_VERSION: u32 = 4;
const DENEB_DATA_VERSION: u32 = 5;

/// Error type for the Fiber client.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum FiberError {
#[error("Tonic transport error: {0}")]
Tonic(#[from] tonic::transport::Error),
#[error("Error while parsing hex: {0}")]
Hex(#[from] FromHexError),
#[error("Error when receiving on a channel: {0}")]
Recv(#[from] tokio::sync::oneshot::error::RecvError),
}

/// Result type for the Fiber client.
pub(crate) type FiberResult<T> = std::result::Result<T, FiberError>;

/// Options for the API client
#[derive(Debug, Default, Clone, Copy)]
pub struct ClientOptions {
Expand Down Expand Up @@ -191,8 +205,7 @@ impl Client {
.sequence_response
.into_iter()
.map(|resp| B256::from_str(&resp.hash))
.collect::<Result<Vec<B256>, FromHexError>>()
.map_err(|e| e.to_string())?;
.collect::<Result<Vec<B256>, FromHexError>>()?;

Ok((hashes, timestamp))
}
Expand All @@ -217,8 +230,7 @@ impl Client {
.sequence_response
.into_iter()
.map(|resp| B256::from_str(&resp.hash))
.collect::<Result<Vec<B256>, FromHexError>>()
.map_err(|e| e.to_string())?;
.collect::<Result<Vec<B256>, FromHexError>>()?;

Ok((hashes, timestamp))
}
Expand Down Expand Up @@ -269,21 +281,21 @@ impl Client {
let mut stream = match client.subscribe_new_txs_v2(req).await {
Ok(stream) => stream.into_inner(),
Err(e) => {
tracing::error!(error = ?e, "Error in transaction stream, retrying...");
error!(error = ?e, "Error in transaction stream, retrying...");
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};

tracing::info!("Transaction stream established");
info!("Transaction stream established");

while let Some(item) = stream.next().await {
match item {
Ok(transaction) => {
let signer = match Address::try_from(transaction.sender.as_slice()) {
Ok(sender) => sender,
Err(e) => {
tracing::error!(error = ?e, "Error deserializing sender");
error!(error = ?e, "Error deserializing sender");
continue;
}
};
Expand All @@ -296,29 +308,29 @@ impl Client {
// Note: In case we receive a transaction in its network protocol
// encoding, we strip the blob out and try to decode it again.
if e.to_string() == "unexpected list" {
tracing::debug!("Received blob transaction in network protocol encoding");
debug!("Received blob transaction in network protocol encoding");

match PooledTransaction::decode_2718(
&mut transaction.rlp_transaction.as_ref(),
) {
Ok(pooled) => pooled.into_envelope(),
Err(e) => {
tracing::error!(error = ?e, "Error deserializing blob transaction");
error!(error = ?e, "Error deserializing blob transaction");
continue;
}
}
} else {
tracing::error!(error = ?e, "Error deserializing transaction");
error!(error = ?e, "Error deserializing transaction");
continue;
}
}
};

tracing::trace!(hash = ?signed_transaction.tx_hash(), "Received transaction");
trace!(hash = ?signed_transaction.tx_hash(), "Received transaction");
let _ = tx.send(Recovered::new_unchecked(signed_transaction, signer));
}
Err(e) => {
tracing::error!(error = ?e, "Error in transaction stream, retrying...");
error!(error = ?e, "Error in transaction stream, retrying...");
// If we get an error, we set the inner stream to None and break out of the loop.
// Next iteration will retry the stream.
break;
Expand Down Expand Up @@ -360,28 +372,28 @@ impl Client {
let mut stream = match client.subscribe_new_txs_v2(req).await {
Ok(stream) => stream.into_inner(),
Err(e) => {
tracing::error!(error = ?e, "Error in transaction stream, retrying...");
error!(error = ?e, "Error in transaction stream, retrying...");
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};

tracing::info!("Raw transaction stream established");
info!("Raw transaction stream established");

while let Some(item) = stream.next().await {
match item {
Ok(transaction) => {
let sender = match Address::try_from(transaction.sender.as_slice()) {
Ok(sender) => sender,
Err(e) => {
tracing::error!(error = ?e, "Error deserializing sender");
error!(error = ?e, "Error deserializing sender");
continue;
}
};
let _ = tx.send((sender, transaction.rlp_transaction.into()));
}
Err(e) => {
tracing::error!(error = ?e, "Error in transaction stream, retrying...");
error!(error = ?e, "Error in transaction stream, retrying...");
// If we get an error, we set the inner stream to None and break out of the loop.
// Next iteration will retry the stream.
break;
Expand Down Expand Up @@ -415,21 +427,21 @@ impl Client {
let mut stream = match client.subscribe_new_blob_txs(req).await {
Ok(stream) => stream.into_inner(),
Err(e) => {
tracing::error!(error = ?e, "Error in transaction stream, retrying...");
error!(error = ?e, "Error in transaction stream, retrying...");
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};

tracing::info!("Blob transaction stream established");
info!("Blob transaction stream established");

while let Some(item) = stream.next().await {
match item {
Ok(transaction) => {
let signer = match Address::try_from(transaction.sender.as_slice()) {
Ok(sender) => sender,
Err(e) => {
tracing::error!(error = ?e, "Error deserializing sender");
error!(error = ?e, "Error deserializing sender");
continue;
}
};
Expand All @@ -439,25 +451,23 @@ impl Client {
) {
Ok(pooled) => pooled,
Err(e) => {
tracing::error!(error = ?e, "Error deserializing blob transaction");
error!(error = ?e, "Error deserializing blob transaction");
continue;
}
};
let blob_tx = match pooled {
PooledTransaction::Eip4844(blob_tx) => blob_tx,
_ => {
tracing::error!(
"Wrong transaction type for blob transaction stream"
);
error!("Wrong transaction type for blob transaction stream");
continue;
}
};

tracing::trace!("Received blob transaction");
trace!("Received blob transaction");
let _ = tx.send(Recovered::new_unchecked(blob_tx, signer));
}
Err(e) => {
tracing::error!(error = ?e, "Error in blob transaction stream, retrying...");
error!(error = ?e, "Error in blob transaction stream, retrying...");
// If we get an error, we set the inner stream to None and break out of the loop.
// Next iteration will retry the stream.
break;
Expand Down Expand Up @@ -494,29 +504,29 @@ impl Client {
let mut stream = match client.subscribe_new_blob_txs(req).await {
Ok(stream) => stream.into_inner(),
Err(e) => {
tracing::error!(error = ?e, "Error in transaction stream, retrying...");
error!(error = ?e, "Error in transaction stream, retrying...");
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};

tracing::info!("Raw blob transaction stream established");
info!("Raw blob transaction stream established");

while let Some(item) = stream.next().await {
match item {
Ok(transaction) => {
let signer = match Address::try_from(transaction.sender.as_slice()) {
Ok(sender) => sender,
Err(e) => {
tracing::error!(error = ?e, "Error deserializing sender");
error!(error = ?e, "Error deserializing sender");
continue;
}
};

let _ = tx.send((signer, transaction.rlp_transaction.into()));
}
Err(e) => {
tracing::error!(error = ?e, "Error in blob transaction stream, retrying...");
error!(error = ?e, "Error in blob transaction stream, retrying...");
// If we get an error, we set the inner stream to None and break out of the loop.
// Next iteration will retry the stream.
break;
Expand Down Expand Up @@ -554,13 +564,13 @@ impl Client {
let mut stream = match client.subscribe_execution_payloads_v2(req).await {
Ok(stream) => stream.into_inner(),
Err(e) => {
tracing::error!(error = ?e, "Error in execution payload stream, retrying...");
error!(error = ?e, "Error in execution payload stream, retrying...");
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};

tracing::info!("Execution payload stream established");
info!("Execution payload stream established");

while let Some(item) = stream.next().await {
match item {
Expand All @@ -579,7 +589,7 @@ impl Client {
.map(ExecutionPayload::V3)
}
_ => {
tracing::error!(
error!(
data_version = payload.data_version,
"Error deserializing execution payload: invalid data version"
);
Expand All @@ -590,7 +600,7 @@ impl Client {
let execution_payload = match payload_deserialized {
Ok(payload) => payload,
Err(e) => {
tracing::error!(error = ?e, "Error deserializing execution payload");
error!(error = ?e, "Error deserializing execution payload");
continue;
}
};
Expand All @@ -601,7 +611,7 @@ impl Client {
let _ = tx.send(block);
}
Err(e) => {
tracing::error!(error = ?e, "Error in execution payload stream, retrying...");
error!(error = ?e, "Error in execution payload stream, retrying...");
// If we get an error, we set the inner stream to None and break out of the loop.
// Next iteration will retry the stream.
break;
Expand Down Expand Up @@ -633,13 +643,13 @@ impl Client {
let mut stream = match client.subscribe_beacon_blocks_v2(req).await {
Ok(stream) => stream.into_inner(),
Err(e) => {
tracing::error!(error = ?e, "Error in beacon block stream, retrying...");
error!(error = ?e, "Error in beacon block stream, retrying...");
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};

tracing::info!("Beacon block stream established");
info!("Beacon block stream established");

while let Some(item) = stream.next().await {
match item {
Expand All @@ -648,12 +658,12 @@ impl Client {
let _ = tx.send(payload_deserialized);
}
Err(e) => {
tracing::error!(error = ?e, "Error deserializing beacon block");
error!(error = ?e, "Error deserializing beacon block");
continue;
}
},
Err(e) => {
tracing::error!(error = ?e, "Error in beacon block stream, retrying...");
error!(error = ?e, "Error in beacon block stream, retrying...");
// If we get an error, we set the inner stream to None and break out of the loop.
// Next iteration will retry the stream.
break;
Expand Down Expand Up @@ -685,21 +695,21 @@ impl Client {
let mut stream = match client.subscribe_beacon_blocks_v2(req).await {
Ok(stream) => stream.into_inner(),
Err(e) => {
tracing::error!(error = ?e, "Error in beacon block stream, retrying...");
error!(error = ?e, "Error in beacon block stream, retrying...");
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};

tracing::info!("Beacon block stream established");
info!("Beacon block stream established");

while let Some(item) = stream.next().await {
match item {
Ok(item) => {
let _ = tx.send(item.ssz_block.into());
}
Err(e) => {
tracing::error!(error = ?e, "Error in beacon block stream, retrying...");
error!(error = ?e, "Error in beacon block stream, retrying...");
// If we get an error, we set the inner stream to None and break out of the loop.
// Next iteration will retry the stream.
break;
Expand Down
Loading

0 comments on commit 8c29d78

Please sign in to comment.