Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Ocean API database implementation to all modules #2748

Merged
merged 13 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 72 additions & 72 deletions lib/Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion lib/ain-evm/src/storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl BlockStorage for BlockStore {

fn put_block(&self, block: &BlockAny) -> Result<()> {
self.extend_transactions_from_block(block)?;

let block_number = block.header.number;
let hash = block.header.hash();
let blocks_cf = self.column::<columns::Blocks>();
Expand Down
2 changes: 2 additions & 0 deletions lib/ain-ocean/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[profile.test]
debug = true

[dependencies]
ain-cpp-imports.workspace = true
Expand Down
164 changes: 146 additions & 18 deletions lib/ain-ocean/src/data_acces/block.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,171 @@
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};

use crate::{
database::db_manager::{ColumnFamilyOperations, RocksDB},
database::db_manager::{ColumnFamilyOperations, RocksDB, SortOrder},
model::block::Block,
};

use anyhow::{anyhow, Result};
use rocksdb::IteratorMode;
use std::convert::TryInto;

#[derive(Debug)]
pub struct BlockDb {
pub db: RocksDB,
}

impl BlockDb {
pub async fn get_by_hash(&self) -> Result<Block> {
todo!()
pub async fn get_by_hash(&self, hash: String) -> Result<Option<Block>> {
let number = match self.db.get("block_map", hash.as_bytes()) {
Ok(Some(value)) => {
// Convert the stored bytes to a block number
let block_number_bytes: [u8; 4] = match value.try_into() {
Ok(bytes) => bytes,
Err(e) => {
return Err(anyhow!("Error converting bytes to block number: {:?}", e))
}
};
let block_number = i32::from_be_bytes(block_number_bytes);
Some(block_number)
}
Ok(None) => None,
Err(e) => return Err(anyhow!("Error retrieving block number: {:?}", e)),
};

if let Some(block_number) = number {
let block_key = block_number.to_be_bytes();
match self.db.get("block", &block_key) {
Ok(Some(value)) => {
let block: Block = serde_json::from_slice(&value).map_err(|e| anyhow!(e))?;
Ok(Some(block))
}
Ok(None) => Ok(None),
Err(e) => Err(anyhow!(e)),
}
} else {
Ok(None)
}
}
pub async fn get_by_height(&self) -> Result<Block> {
todo!()

pub async fn get_by_height(&self, height: i32) -> Result<Option<Block>> {
match self.db.get("block", &height.to_be_bytes()) {
Ok(Some(value)) => {
let block: Block = serde_json::from_slice(&value).map_err(|e| anyhow!(e))?;
Ok(Some(block))
}
Ok(None) => Ok(None),
Err(e) => Err(anyhow!(e)),
}
}
pub async fn get_heighest(&self) -> Result<Block> {
todo!()

pub async fn get_highest(&self) -> Result<Option<Block>> {
// Retrieve the latest block height
let latest_height_bytes = match self.db.get("latest_block_height", b"latest_block_height") {
Ok(Some(value)) => value,
Ok(None) => return Ok(None), // No latest block height set
Err(e) => return Err(anyhow!(e)),
};

// Convert the latest height bytes back to an integer
let latest_height = i32::from_be_bytes(
latest_height_bytes
.as_slice()
.try_into()
.map_err(|_| anyhow!("Byte length mismatch for latest height"))?,
);

// Retrieve the block with the latest height
match self.db.get("block", &latest_height.to_be_bytes()) {
Ok(Some(value)) => {
let block: Block = serde_json::from_slice(&value).map_err(|e| anyhow!(e))?;
Ok(Some(block))
}
Ok(None) => Ok(None), // No block found for the latest height
Err(e) => Err(anyhow!(e)),
}
}
pub async fn query_by_height(&self, limit: i32, lt: i32) -> Result<Vec<Block>> {
todo!()

pub async fn query_by_height(
&self,
limit: i32,
lt: i32,
sort_order: SortOrder,
) -> Result<Vec<Block>> {
let mut blocks: Vec<Block> = Vec::new();

let iterator = self.db.iterator("block", IteratorMode::End)?;
let collected_blocks: Vec<_> = iterator.collect();

for result in collected_blocks.into_iter().rev() {
let (key, value) = match result {
Ok((key, value)) => (key, value),
Err(err) => return Err(anyhow!("Error during iteration: {}", err)),
};

let block: Block = serde_json::from_slice(&value)?;

if block.height < lt {
blocks.push(block);

if blocks.len() == limit as usize {
break;
}
}
}

// Sort blocks based on the specified sort order
match sort_order {
SortOrder::Ascending => blocks.sort_by(|a, b| a.height.cmp(&b.height)),
SortOrder::Descending => blocks.sort_by(|a, b| b.height.cmp(&a.height)),
}

Ok(blocks)
}
pub async fn store_block(&self, block: Block) -> Result<()> {

pub async fn put_block(&self, block: Block) -> Result<()> {
match serde_json::to_string(&block) {
Ok(value) => {
let key = block.id.clone();
self.db.put("raw_block", key.as_bytes(), value.as_bytes())?;
let block_number = block.height;
self.db
.put("block", &block_number.to_be_bytes(), value.as_bytes())?;
let block_map_key = block.hash.as_bytes();
self.db
.put("block_map", block_map_key, &block_number.to_be_bytes())?;
self.db
.delete("latest_block_height", b"latest_block_height")?;
self.db.put(
"latest_block_height",
b"latest_block_height",
&block_number.to_be_bytes(),
)?;
Ok(())
}
Err(e) => Err(anyhow!(e)),
}
}
pub async fn delete_block(&self, hash: String) -> Result<()> {
match self.db.delete("raw_block", hash.as_bytes()) {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!(e)),
let number = match self.db.get("block_map", hash.as_bytes()) {
Ok(Some(value)) => {
// Convert the stored bytes to a block number
let block_number_bytes: [u8; 4] = match value.try_into() {
Ok(bytes) => bytes,
Err(e) => {
return Err(anyhow!("Error converting bytes to block number: {:?}", e))
}
};
let block_number = i32::from_be_bytes(block_number_bytes);
Some(block_number)
}
Ok(None) => None,
Err(e) => return Err(anyhow!("Error retrieving block number: {:?}", e)),
};

if let Some(block_number) = number {
let block_key = block_number.to_be_bytes();
match self.db.delete("block", &block_key) {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!(e)),
}
} else {
Ok(())
}
}
}
44 changes: 37 additions & 7 deletions lib/ain-ocean/src/data_acces/masternode.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use serde_json;

use crate::{
database::db_manager::{ColumnFamilyOperations, RocksDB},
database::db_manager::{ColumnFamilyOperations, RocksDB, SortOrder},
model::masternode::Masternode,
};
use anyhow::{anyhow, Result};
use rocksdb::IteratorMode;
use serde_json;

pub struct MasterNodeDB {
pub db: RocksDB,
}

impl MasterNodeDB {
pub async fn query(&self, limit: i32, lt: i32) -> Result<Vec<Masternode>> {
todo!()
pub async fn query(
&self,
limit: i32,
lt: i32,
sort_order: SortOrder,
) -> Result<Vec<Masternode>> {
let iterator = self.db.iterator("masternode", IteratorMode::End)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle secondary sort index (block height + masternode.id). This CF indexes by masternode.id for now, we need to be able to sort by masternode creation height.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to dig through Ocean spec and check all the models that need a secondary sort index (I guess all the one that are currently indexed by TX id but needs to be iterable via block height). I'll add this requirement to the main tracking PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take care of the rest and implement these secondary indexes in another dedicated PR.

let mut master_node: Vec<Masternode> = Vec::new();
let collected_blocks: Vec<_> = iterator.collect();

for result in collected_blocks.into_iter().rev() {
let (key, value) = match result {
Ok((key, value)) => (key, value),
Err(err) => return Err(anyhow!("Error during iteration: {}", err)),
};

let master_stats: Masternode = serde_json::from_slice(&value)?;
master_node.push(master_stats);

if master_node.len() == limit as usize {
break;
}
nagarajm22 marked this conversation as resolved.
Show resolved Hide resolved
}

// Sort blocks based on the specified sort order
match sort_order {
SortOrder::Ascending => master_node.sort_by(|a, b| a.block.height.cmp(&b.block.height)),
SortOrder::Descending => {
master_node.sort_by(|a, b| b.block.height.cmp(&a.block.height))
}
}
nagarajm22 marked this conversation as resolved.
Show resolved Hide resolved

Ok(master_node)
}
pub async fn get(&self, id: String) -> Result<Option<Masternode>> {
match self.db.get("masternode", id.as_bytes()) {
Expand Down
93 changes: 73 additions & 20 deletions lib/ain-ocean/src/data_acces/masternode_states.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use anyhow::{anyhow, Result};
nagarajm22 marked this conversation as resolved.
Show resolved Hide resolved
use bitcoin::absolute::Height;
use rocksdb::{ColumnFamilyDescriptor, IteratorMode, DB};
use rocksdb::{IteratorMode, DB};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;

use crate::{
database::db_manager::{ColumnFamilyOperations, RocksDB},
database::db_manager::{ColumnFamilyOperations, RocksDB, SortOrder},
model::masternode_stats::MasternodeStats,
};

Expand All @@ -13,25 +14,71 @@ pub struct MasterStatsDb {
pub db: RocksDB,
}
impl MasterStatsDb {
pub async fn get_latest(&self) -> Result<MasternodeStats> {
// let mut latest_stats: Option<MasternodeStats> = None;
// let mut highest_height = -1;
pub async fn get_latest(&self) -> Result<Option<MasternodeStats>> {
let latest_height_bytes = match self
.db
.get("masternode_block_height", b"master_block_height")
{
Ok(Some(value)) => value,
Ok(None) => return Ok(None), // No latest block height set
Err(e) => return Err(anyhow!(e)),
};

// let iter = self.db.iterator("masternode_stats", IteratorMode::End); // Start from the end of the DB
// Convert the latest height bytes back to an integer
let latest_height = i32::from_be_bytes(
latest_height_bytes
.as_slice()
.try_into()
.map_err(|_| anyhow!("Byte length mismatch for latest height"))?,
);

// for (key, value) in iter {
// let stats: MasternodeStats = serde_json::from_slice(&value)?;
// if stats.block.height > highest_height {
// highest_height = stats.block.height;
// latest_stats = Some(stats);
// }
// }

// Ok(latest_stats);
todo!()
// Retrieve the block with the latest height
match self.db.get("block", &latest_height.to_be_bytes()) {
Ok(Some(value)) => {
let block: MasternodeStats =
serde_json::from_slice(&value).map_err(|e| anyhow!(e))?;
Ok(Some(block))
}
Ok(None) => Ok(None), // No block found for the latest height
Err(e) => Err(anyhow!(e)),
}
}
nagarajm22 marked this conversation as resolved.
Show resolved Hide resolved
pub async fn query(&self, limit: i32, lt: i32) -> Result<Vec<MasternodeStats>> {
todo!()
pub async fn query(
&self,
limit: i32,
lt: i32,
sort_order: SortOrder,
) -> Result<Vec<MasternodeStats>> {
let iterator = self.db.iterator("masternode_stats", IteratorMode::End)?;
let mut master_node: Vec<MasternodeStats> = Vec::new();
let collected_blocks: Vec<_> = iterator.collect();

for result in collected_blocks.into_iter().rev() {
let (key, value) = match result {
Ok((key, value)) => (key, value),
Err(err) => return Err(anyhow!("Error during iteration: {}", err)),
};

let master_stats: MasternodeStats = serde_json::from_slice(&value)?;

if master_stats.block.height < lt {
master_node.push(master_stats);

if master_node.len() == limit as usize {
break;
}
}
}

// Sort blocks based on the specified sort order
match sort_order {
SortOrder::Ascending => master_node.sort_by(|a, b| a.block.height.cmp(&b.block.height)),
SortOrder::Descending => {
master_node.sort_by(|a, b| b.block.height.cmp(&a.block.height))
}
}
nagarajm22 marked this conversation as resolved.
Show resolved Hide resolved

Ok(master_node)
}
pub async fn get(&self, height: i32) -> Result<Option<MasternodeStats>> {
let bytes: &[u8] = &height.to_be_bytes();
Expand All @@ -49,8 +96,14 @@ impl MasterStatsDb {
match serde_json::to_string(&stats) {
Ok(value) => {
let key = stats.block.height.clone();
let bytes: &[u8] = &key.to_be_bytes();
self.db.put("masternode_stats", bytes, value.as_bytes())?;
let height: &[u8] = &key.to_be_bytes();
self.db.put("masternode_stats", height, value.as_bytes())?;
self.db
.put("masternode_map", stats.block.hash.as_bytes(), height)?;
self.db
.delete("masternode_block_height", b"master_block_height")?;
self.db
.put("masternode_block_height", b"master_block_height", height)?;
Ok(())
}
Err(e) => Err(anyhow!(e)),
Expand Down
2 changes: 2 additions & 0 deletions lib/ain-ocean/src/data_acces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ pub mod block;
pub mod masternode;
pub mod masternode_states;
pub mod oracle;
pub mod oracle_price_active;
pub mod oracle_price_aggregated;
pub mod oracle_price_aggregated_interval;
pub mod oracle_price_feed;
pub mod oracle_token_currency;
pub mod order_history;
pub mod pool_swap;
Expand Down
Loading