Skip to content

Commit

Permalink
feat: check the signature on updating public address of the node (#37)
Browse files Browse the repository at this point in the history
* feat: check the signature on updating public address of the node

* add comment

* fixes

* address comments
  • Loading branch information
Cifko authored Dec 9, 2024
1 parent ad347ac commit b0fd59f
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 71 deletions.
27 changes: 1 addition & 26 deletions atoma-proxy/docs/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ paths:
}
```
operationId: node_public_address_registration
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/NodePublicAddressAssignment'
required: true
responses:
'200':
description: Node public address registered successfully
Expand Down Expand Up @@ -282,26 +276,7 @@ paths:
schema: {}
'500':
description: Failed to retrieve list of available models
components:
schemas:
NodePublicAddressAssignment:
type: object
description: |-
Represents the payload for the node public address registration request.
This struct represents the payload for the node public address registration request.
required:
- node_small_id
- public_address
properties:
node_small_id:
type: integer
format: int64
description: Unique small integer identifier for the node
minimum: 0
public_address:
type: string
description: The public address of the node
components: {}
tags:
- name: health
description: Health check
Expand Down
105 changes: 102 additions & 3 deletions atoma-proxy/src/server/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use std::str::FromStr;
use std::sync::Arc;

use anyhow::Result;
use atoma_state::types::AtomaAtomaStateManagerEvent;
use axum::http::StatusCode;
use atoma_utils::constants::SIGNATURE;
use atoma_utils::verify_signature;
use axum::body::Body;
use axum::extract::Request;
use axum::http::{HeaderMap, StatusCode};
use axum::middleware::from_fn_with_state;
use axum::{
extract::State,
routing::{get, post},
Json, Router,
};
use blake2::digest::consts::U32;
use blake2::digest::generic_array::GenericArray;
use blake2::{Blake2b, Digest};
use flume::Sender;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sui_sdk::types::base_types::SuiAddress;
use sui_sdk::types::crypto::{PublicKey as SuiPublicKey, Signature, SuiSignature};
use tokenizers::Tokenizer;
use tokio::sync::watch;
use tokio::sync::{oneshot, watch};
use tokio::{net::TcpListener, sync::RwLock};
use tower::ServiceBuilder;
use tracing::{error, instrument};
Expand Down Expand Up @@ -54,6 +64,12 @@ pub const MODELS_PATH: &str = "/v1/models";
/// in the system, ensuring that the system has the correct address for routing requests.
pub const NODE_PUBLIC_ADDRESS_REGISTRATION_PATH: &str = "/node/registration";

/// Body size limit for signature verification (contains the body size of the request)
const MAX_BODY_SIZE: usize = 1024 * 1024; // 1MB

/// Size of the blake2b hash in bytes
const BODY_HASH_SIZE: usize = 32;

/// Represents the shared state of the application.
///
/// This struct holds various components and configurations that are shared
Expand Down Expand Up @@ -297,8 +313,91 @@ pub(crate) struct NodePublicAddressRegistrationOpenApi;
#[instrument(level = "info", skip_all)]
pub async fn node_public_address_registration(
State(state): State<ProxyState>,
Json(payload): Json<NodePublicAddressAssignment>,
headers: HeaderMap,
request: Request<Body>,
) -> Result<Json<Value>, StatusCode> {
let base64_signature = headers
.get(SIGNATURE)
.ok_or_else(|| {
error!("Signature header not found");
StatusCode::BAD_REQUEST
})?
.to_str()
.map_err(|e| {
error!("Failed to extract base64 signature encoding, with error: {e}");
StatusCode::BAD_REQUEST
})?;

let body_bytes = axum::body::to_bytes(request.into_body(), MAX_BODY_SIZE)
.await
.map_err(|_| {
error!("Failed to convert body to bytes");
StatusCode::BAD_REQUEST
})?;

let signature = Signature::from_str(base64_signature).map_err(|_| {
error!("Failed to parse signature");
StatusCode::BAD_REQUEST
})?;

let public_key_bytes = signature.public_key_bytes();
let public_key =
SuiPublicKey::try_from_bytes(signature.scheme(), public_key_bytes).map_err(|e| {
error!("Failed to extract public key from bytes, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let sui_address = SuiAddress::from(&public_key);

let mut blake2b_hash = Blake2b::new();
blake2b_hash.update(&body_bytes);
let body_blake2b_hash: GenericArray<u8, U32> = blake2b_hash.finalize();
let body_blake2b_hash_bytes: [u8; BODY_HASH_SIZE] =
body_blake2b_hash.as_slice().try_into().map_err(|_| {
error!("Failed to convert blake2b hash to bytes");
StatusCode::BAD_REQUEST
})?;
verify_signature(base64_signature, &body_blake2b_hash_bytes)?;

let payload =
serde_json::from_slice::<NodePublicAddressAssignment>(&body_bytes).map_err(|e| {
error!("Failed to parse request body: {:?}", e);
StatusCode::BAD_REQUEST
})?;

let (result_sender, result_receiver) = oneshot::channel();

state
.state_manager_sender
.send(AtomaAtomaStateManagerEvent::GetNodeSuiAddress {
node_small_id: payload.node_small_id as i64,
result_sender,
})
.map_err(|err| {
error!("Failed to send GetNodeSuiAddress event: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?;

let node_sui_address = result_receiver
.await
.map_err(|err| {
error!("Failed to receive GetNodeSuiAddress result: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?
.map_err(|err| {
error!("Failed to get node Sui address: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| {
error!("Node Sui address not found");
StatusCode::NOT_FOUND
})?;

// Check if the address associated with the small ID in the request matches the Sui address in the signature.
if node_sui_address != sui_address.to_string() {
error!("The sui address associated with the node small ID does not match the signature sui address");
return Err(StatusCode::BAD_REQUEST);
}

state
.state_manager_sender
.send(AtomaAtomaStateManagerEvent::UpsertNodePublicAddress {
Expand Down
69 changes: 63 additions & 6 deletions atoma-state/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use atoma_sui::events::{
AtomaEvent, NewStackSettlementAttestationEvent, NodePublicKeyCommittmentEvent,
NodeSubscribedToTaskEvent, NodeSubscriptionUpdatedEvent, NodeUnsubscribedFromTaskEvent,
StackAttestationDisputeEvent, StackCreatedEvent, StackSettlementTicketClaimedEvent,
StackSettlementTicketEvent, StackTrySettleEvent, TaskDeprecationEvent, TaskRegisteredEvent,
NodeRegisteredEvent, NodeSubscribedToTaskEvent, NodeSubscriptionUpdatedEvent,
NodeUnsubscribedFromTaskEvent, StackAttestationDisputeEvent, StackCreatedEvent,
StackSettlementTicketClaimedEvent, StackSettlementTicketEvent, StackTrySettleEvent,
TaskDeprecationEvent, TaskRegisteredEvent,
};
use tracing::{info, instrument, trace};

Expand Down Expand Up @@ -67,9 +68,8 @@ pub async fn handle_atoma_event(
info!("Published event: {:?}", event);
Ok(())
}
AtomaEvent::NodeRegisteredEvent(event) => {
info!("Node registered event: {:?}", event);
Ok(())
AtomaEvent::NodeRegisteredEvent((event, address)) => {
handle_node_registration_event(state_manager, event, address.to_string()).await
}
AtomaEvent::NodeSubscribedToModelEvent(event) => {
info!("Node subscribed to model event: {:?}", event);
Expand Down Expand Up @@ -607,6 +607,45 @@ pub(crate) async fn handle_stack_attestation_dispute_event(
Ok(())
}

/// Handles node registration event.
///
/// This function processes a node registration event by parsing the event data
/// and inserting the node into the database.
///
/// # Arguments
///
/// * `state_manager` - A reference to the `AtomaStateManager` for database operations.
/// * `event` - A `NodeRegisteredEvent` containing the details of the node registration event.
/// * `address` - The public address of the node.
///
/// # Returns
///
/// * `Result<()>` - Ok(()) if the event was processed successfully, or an error if something went wrong.
///
/// # Errors
///
/// This function will return an error if:
/// * The event data cannot be deserialized into a `NodeRegisteredEvent`.
/// * The database operation to insert the node fails.
///
/// # Behavior
///
/// The function performs the following steps:
/// 1. Extracts the `node_small_id` from the event.
/// 2. Calls the `insert_new_node` method on the `AtomaStateManager` to insert the node into the database.
#[instrument(level = "trace", skip_all)]
pub(crate) async fn handle_node_registration_event(
state_manager: &AtomaStateManager,
event: NodeRegisteredEvent,
address: String,
) -> Result<()> {
state_manager
.state
.insert_new_node(event.node_small_id.inner as i64, address)
.await?;
Ok(())
}

/// Handles events related to the state manager.
///
/// This function processes various events that are sent to the state manager,
Expand Down Expand Up @@ -782,6 +821,24 @@ pub(crate) async fn handle_state_manager_event(
.send(public_address)
.map_err(|_| AtomaStateManagerError::ChannelSendError)?;
}
AtomaAtomaStateManagerEvent::GetNodeSuiAddress {
node_small_id,
result_sender,
} => {
trace!(
target = "atoma-state-handlers",
event = "handle-state-manager-event",
"Getting sui address for node with id: {}",
node_small_id
);
let sui_address = state_manager
.state
.get_node_sui_address(node_small_id)
.await;
result_sender
.send(sui_address)
.map_err(|_| AtomaStateManagerError::ChannelSendError)?;
}
AtomaAtomaStateManagerEvent::NewStackAcquired {
event,
already_computed_units,
Expand Down
Loading

0 comments on commit b0fd59f

Please sign in to comment.