Skip to content

Commit

Permalink
feat: digest table and multiple user with the same wallet (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko authored Jan 8, 2025
1 parent 9724419 commit 555a92d
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 52 deletions.
24 changes: 15 additions & 9 deletions atoma-auth/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,22 @@ impl Auth {
pub async fn usdc_payment(&self, jwt: &str, transaction_digest: &str) -> Result<()> {
let claims = self.validate_token(jwt, false)?;

let (timestamp, balance_changes) = self
let (result_sender, result_receiver) = oneshot::channel();
self.state_manager_sender.send(
AtomaAtomaStateManagerEvent::InsertNewUsdcPaymentDigest {
digest: transaction_digest.to_string(),
result_sender,
},
)?;
result_receiver.await??;
let balance_changes = self
.sui
.read()
.await
.get_balance_changes(transaction_digest)
.await?;
let balance_changes =
balance_changes.ok_or_else(|| anyhow::anyhow!("No balance changes found"))?;
let timestamp = timestamp.ok_or_else(|| anyhow::anyhow!("No timestamp found"))?;
let mut sender = None;
let mut receiver = None;
let mut money_in = None;
Expand Down Expand Up @@ -517,22 +524,21 @@ impl Auth {
if receiver == own_address {
let (result_sender, result_receiver) = oneshot::channel();
self.state_manager_sender
.send(AtomaAtomaStateManagerEvent::GetUserId {
.send(AtomaAtomaStateManagerEvent::ConfirmUser {
sui_address: sender.to_string(),
user_id: claims.user_id,
result_sender,
})?;
let user_id = result_receiver
.await??
.ok_or_else(|| anyhow::anyhow!("User not found"))?;
if claims.user_id != user_id {
let is_their_wallet = result_receiver.await??;

if !is_their_wallet {
return Err(anyhow::anyhow!("The payment is not for this user"));
}
// We are the receiver and we know the sender
self.state_manager_sender
.send(AtomaAtomaStateManagerEvent::TopUpBalance {
user_id,
user_id: claims.user_id,
amount: money_in.unwrap() as i64,
timestamp: timestamp as i64,
})?;
}
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions atoma-auth/src/sui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl Sui {
///
/// # Returns
///
/// Returns a tuple containing the timestamp of the transaction and the balance changes
/// Returns the balance changes
///
/// # Errors
///
Expand All @@ -437,7 +437,7 @@ impl Sui {
pub async fn get_balance_changes(
&self,
transaction_digest: &str,
) -> Result<(Option<u64>, Option<Vec<BalanceChange>>)> {
) -> Result<Option<Vec<BalanceChange>>> {
let transaction_digest = TransactionDigest::from_str(transaction_digest).unwrap();
let client = self.wallet_ctx.get_client().await?;
let transaction = client
Expand All @@ -450,6 +450,6 @@ impl Sui {
},
)
.await?;
Ok((transaction.timestamp_ms, transaction.balance_changes))
Ok(transaction.balance_changes)
}
}
30 changes: 18 additions & 12 deletions atoma-state/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,24 +1147,18 @@ pub(crate) async fn handle_state_manager_event(
.send(sui_address)
.map_err(|_| AtomaStateManagerError::ChannelSendError)?;
}
AtomaAtomaStateManagerEvent::GetUserId {
AtomaAtomaStateManagerEvent::ConfirmUser {
sui_address,
user_id,
result_sender,
} => {
let user_id = state_manager.state.get_user_id(sui_address).await;
let confirmation = state_manager.state.confirm_user(sui_address, user_id).await;
result_sender
.send(user_id)
.send(confirmation)
.map_err(|_| AtomaStateManagerError::ChannelSendError)?;
}
AtomaAtomaStateManagerEvent::TopUpBalance {
user_id,
amount,
timestamp,
} => {
state_manager
.state
.top_up_balance(user_id, amount, timestamp)
.await?;
AtomaAtomaStateManagerEvent::TopUpBalance { user_id, amount } => {
state_manager.state.top_up_balance(user_id, amount).await?;
}
AtomaAtomaStateManagerEvent::DeductFromUsdc {
user_id,
Expand All @@ -1176,6 +1170,18 @@ pub(crate) async fn handle_state_manager_event(
.send(success)
.map_err(|_| AtomaStateManagerError::ChannelSendError)?;
}
AtomaAtomaStateManagerEvent::InsertNewUsdcPaymentDigest {
digest,
result_sender,
} => {
let success = state_manager
.state
.insert_new_usdc_payment_digest(digest)
.await;
result_sender
.send(success)
.map_err(|_| AtomaStateManagerError::ChannelSendError)?;
}
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS idx_unique_sui_address;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE usdc_payment_digests (
digest TEXT PRIMARY KEY
);

ALTER TABLE balance
DROP COLUMN usdc_last_timestamp;
69 changes: 48 additions & 21 deletions atoma-state/src/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3975,19 +3975,20 @@ impl AtomaState {
Ok(sui_address.flatten())
}

/// Retrieves the user id for the user.
/// Confirms that the user is associated with the wallet.
///
/// This method retrieves the user id the user from the `users` table.
/// This method confirms that users has this wallet associated with them.
///
/// # Arguments
///
/// * `sui_address` - The sui_address of the user.
/// * `user_id` - The unique identifier of the user.
///
/// # Returns
///
/// - `Result<Option<i64>>`: A result containing either:
/// - `Ok(Some(i64))`: The user id of the user.
/// - `Ok(None)`: If the user is not found.
/// - `Result<bool>`: A result containing either:
/// - `Ok(true)`: If the user is associated with the wallet.
/// - `Ok(false)`: If the user is not associated with the wallet.
/// - `Err(AtomaStateManagerError)`: An error if the database query fails.
///
/// # Errors
Expand All @@ -4001,18 +4002,20 @@ impl AtomaState {
/// ```rust,ignore
/// use atoma_node::atoma_state::AtomaStateManager;
///
/// async fn get_sui_address(state_manager: &AtomaStateManager, sui_address: String) -> Result<Option<i64>, AtomaStateManagerError> {
/// state_manager.get_sui_address(sui_address).await
/// async fn confirm_user(state_manager: &AtomaStateManager, sui_address: String, user_id: i64) -> Result<Option<i64>, AtomaStateManagerError> {
/// state_manager.confirm_user(sui_address, user_id).await
/// }
/// ```
#[instrument(level = "trace", skip(self))]
pub async fn get_user_id(&self, sui_address: String) -> Result<Option<i64>> {
let user = sqlx::query("SELECT id FROM users WHERE sui_address = $1")
.bind(sui_address)
.fetch_optional(&self.db)
.await?;

Ok(user.map(|user| user.get("id")))
pub async fn confirm_user(&self, sui_address: String, user_id: i64) -> Result<bool> {
let exists = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM users WHERE sui_address = $1 and id = $2)",
)
.bind(sui_address)
.bind(user_id)
.fetch_one(&self.db)
.await?;
Ok(exists)
}

/// Update the balance for the user.
Expand Down Expand Up @@ -4044,19 +4047,16 @@ impl AtomaState {
/// }
/// ```
#[instrument(level = "trace", skip(self))]
pub async fn top_up_balance(&self, user_id: i64, balance: i64, timestamp: i64) -> Result<()> {
pub async fn top_up_balance(&self, user_id: i64, balance: i64) -> Result<()> {
sqlx::query(
"INSERT INTO balance (user_id, usdc_balance, usdc_last_timestamp)
VALUES ($1, $2, $3)
"INSERT INTO balance (user_id, usdc_balance)
VALUES ($1, $2)
ON CONFLICT (user_id)
DO UPDATE SET
usdc_balance = balance.usdc_balance + EXCLUDED.usdc_balance,
usdc_last_timestamp = EXCLUDED.usdc_last_timestamp
WHERE balance.usdc_last_timestamp < EXCLUDED.usdc_last_timestamp",
usdc_balance = balance.usdc_balance + EXCLUDED.usdc_balance",
)
.bind(user_id)
.bind(balance)
.bind(timestamp)
.execute(&self.db)
.await?;
Ok(())
Expand All @@ -4080,6 +4080,7 @@ impl AtomaState {
/// This function will return an error if:
///
/// - The database query fails to execute (that could mean the balance is not available)
#[instrument(level = "trace", skip(self))]
pub async fn deduct_from_usdc(&self, user_id: i64, balance: i64) -> Result<()> {
sqlx::query("UPDATE balance SET usdc_balance = usdc_balance - $2 WHERE user_id = $1")
.bind(user_id)
Expand All @@ -4088,6 +4089,32 @@ impl AtomaState {
.await?;
Ok(())
}

/// Insert a new usdc payment digest.
///
/// This method inserts a new usdc payment digest into the `usdc_payment_digests` table. It fails if the digest already exists.
///
/// # Arguments
///
/// * `digest` - The digest to insert.
///
/// # Returns
///
/// - `Result<()>`: A result indicating success (Ok(())) or failure (Err(AtomaStateManagerError)).
///
/// # Errors
///
/// This function will return an error if:
///
/// - The database query fails to execute. Including if the digest already exists.
#[instrument(level = "trace", skip(self))]
pub async fn insert_new_usdc_payment_digest(&self, digest: String) -> Result<()> {
sqlx::query("INSERT INTO usdc_payment_digests (digest) VALUES ($1)")
.bind(digest)
.execute(&self.db)
.await?;
Ok(())
}
}

#[derive(Error, Debug)]
Expand Down
16 changes: 9 additions & 7 deletions atoma-state/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,20 +610,22 @@ pub enum AtomaAtomaStateManagerEvent {
result_sender: oneshot::Sender<Result<Option<String>>>,
},
/// Retrieves the user ID by Sui address
GetUserId {
ConfirmUser {
sui_address: String,
result_sender: oneshot::Sender<Result<Option<i64>>>,
},
/// Updates the balance of a user
TopUpBalance {
user_id: i64,
amount: i64,
timestamp: i64,
result_sender: oneshot::Sender<Result<bool>>,
},
/// Updates the balance of a user
TopUpBalance { user_id: i64, amount: i64 },
/// Withdraws the balance of a user
DeductFromUsdc {
user_id: i64,
amount: i64,
result_sender: oneshot::Sender<Result<()>>,
},
/// Acknowledges a USDC payment. Fails if the digest has already been acknowledged.
InsertNewUsdcPaymentDigest {
digest: String,
result_sender: oneshot::Sender<Result<()>>,
},
}

0 comments on commit 555a92d

Please sign in to comment.