Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ocean: Reduce disk usage #3045

Merged
merged 7 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/ain-db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ bincode.workspace = true
rocksdb.workspace = true
anyhow.workspace = true
num_cpus.workspace = true

log.workspace = true
79 changes: 74 additions & 5 deletions lib/ain-db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{
collections::BTreeMap,
fmt::Debug,
iter::Iterator,
marker::PhantomData,
path::{Path, PathBuf},
sync::Arc,
};
pub mod version;

use anyhow::format_err;
use log::debug;
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor, DBIterator, Direction,
IteratorMode, Options, DB,
Expand Down Expand Up @@ -49,10 +50,14 @@ fn get_db_default_options() -> Options {
pub struct Rocks(DB);

impl Rocks {
pub fn open(path: &PathBuf, cf_names: &[&'static str], opts: Option<Options>) -> Result<Self> {
let cf_descriptors = cf_names
.iter()
.map(|cf_name| ColumnFamilyDescriptor::new(*cf_name, Options::default()));
pub fn open(
path: &PathBuf,
cf_names: Vec<(&'static str, Option<Options>)>,
opts: Option<Options>,
) -> Result<Self> {
let cf_descriptors = cf_names.into_iter().map(|(cf_name, opts)| {
ColumnFamilyDescriptor::new(cf_name, opts.unwrap_or_else(Options::default))
});

let db_opts = opts.unwrap_or_else(get_db_default_options);
let db = DB::open_cf_descriptors(&db_opts, path, cf_descriptors)?;
Expand All @@ -67,6 +72,10 @@ impl Rocks {
Ok(())
}

pub fn compact(&self) {
self.0.compact_range(None::<&[u8]>, None::<&[u8]>);
}

pub fn cf_handle(&self, cf: &str) -> Result<&ColumnFamily> {
self.0
.cf_handle(cf)
Expand Down Expand Up @@ -99,6 +108,66 @@ impl Rocks {
self.0.flush()?;
Ok(())
}

pub fn dump_table_sizes(&self, cf_names: &[&'static str]) -> Result<()> {
let mut stats: BTreeMap<String, (u64, u64, f64)> = BTreeMap::new(); // (size, entries, avg_size)
let mut total_size: u64 = 0;
let mut total_entries: u64 = 0;

for cf_name in cf_names.iter() {
if let Some(cf) = self.0.cf_handle(cf_name) {
let size = self
.0
.property_int_value_cf(cf, "rocksdb.estimate-live-data-size")?
.unwrap_or(0);
let entries = self
.0
.property_int_value_cf(cf, "rocksdb.estimate-num-keys")?
.unwrap_or(0);
let avg_size = if entries > 0 {
size as f64 / entries as f64
} else {
0.0
};

stats.insert(cf_name.to_string(), (size, entries, avg_size));
total_size += size;
total_entries += entries;
}
}

debug!("RocksDB Table Statistics:");
debug!("{:-<80}", "");
debug!(
"{:<30} {:>10} {:>15} {:>15} {:>10}",
"Table Name", "Size (MB)", "Entries", "Avg Size (B)", "%% of Total"
);
debug!("{:-<80}", "");

for (name, (size, entries, avg_size)) in stats.iter() {
let size_mb = *size as f64 / (1024.0 * 1024.0);
let percentage = (*size as f64 / total_size as f64) * 100.0;
debug!(
"{:<30} {:>10.2} {:>15} {:>15.2} {:>9.2}%%",
name, size_mb, entries, avg_size, percentage
);
}

debug!("{:-<80}", "");
let total_avg_size = if total_entries > 0 {
total_size as f64 / total_entries as f64
} else {
0.0
};
debug!(
"Total size: {:.2} MB",
total_size as f64 / (1024.0 * 1024.0)
);
debug!("Total entries: {}", total_entries);
debug!("Overall average entry size: {:.2} bytes", total_avg_size);

Ok(())
}
}

//
Expand Down
8 changes: 7 additions & 1 deletion lib/ain-evm/src/storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ impl BlockStore {
pub fn new(path: &Path) -> Result<Self> {
let path = path.join("indexes");
fs::create_dir_all(&path)?;
let backend = Arc::new(Rocks::open(&path, &COLUMN_NAMES, None)?);

let cf_with_opts = COLUMN_NAMES
.into_iter()
.zip(std::iter::repeat(None))
.collect::<Vec<_>>();

let backend = Arc::new(Rocks::open(&path, cf_with_opts, None)?);
let store = Self(backend);
store.startup()?;
Ok(store)
Expand Down
57 changes: 43 additions & 14 deletions lib/ain-ocean/src/api/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ pub struct ScriptAggregationResponse {
impl From<ScriptAggregation> for ScriptAggregationResponse {
fn from(v: ScriptAggregation) -> Self {
Self {
id: format!("{}{}", hex::encode(v.id.1.to_be_bytes()), v.id.0),
hid: v.hid,
id: format!(
"{}{}",
hex::encode(v.id.1.to_be_bytes()),
hex::encode(v.id.0)
),
hid: hex::encode(v.hid),
block: v.block,
script: ScriptAggregationScriptResponse {
r#type: v.script.r#type,
Expand Down Expand Up @@ -162,13 +166,13 @@ pub struct ScriptAggregationAmountResponse {

fn get_latest_aggregation(
ctx: &Arc<AppContext>,
hid: String,
hid: [u8; 32],
) -> Result<Option<ScriptAggregationResponse>> {
let latest = ctx
.services
.script_aggregation
.by_id
.list(Some((hid.clone(), u32::MAX)), SortOrder::Descending)?
.list(Some((hid, u32::MAX)), SortOrder::Descending)?
.find(|item| matches!(item, Ok(((v, _), _)) if v == &hid))
.transpose()?
.map(|(_, v)| v.into());
Expand Down Expand Up @@ -242,9 +246,32 @@ pub struct ScriptActivityResponse {

impl From<ScriptActivity> for ScriptActivityResponse {
fn from(v: ScriptActivity) -> Self {
let id = match v.type_hex {
ScriptActivityTypeHex::Vin => {
// TODO put vin instead ScriptActivityType
let vin = v.vin.as_ref().unwrap();
format!(
"{}{}{}{}",
hex::encode(v.block.height.to_be_bytes()),
ScriptActivityTypeHex::Vin,
vin.txid,
hex::encode(vin.n.to_be_bytes())
)
}
ScriptActivityTypeHex::Vout => {
let vout = v.vout.as_ref().unwrap();
format!(
"{}{}{}{}",
hex::encode(v.block.height.to_be_bytes()),
ScriptActivityTypeHex::Vout,
v.txid,
hex::encode(vout.n.to_be_bytes())
)
}
};
Self {
id: v.id,
hid: v.hid,
id,
hid: hex::encode(v.hid),
r#type: v.r#type.to_string(),
type_hex: v.type_hex.to_string(),
txid: v.txid,
Expand Down Expand Up @@ -319,7 +346,7 @@ async fn list_transactions(
.script_activity
.by_id
.list(
Some((hid.clone(), next.0, next.1, next.2, next.3)),
Some((hid, next.0, next.1, next.2, next.3)),
SortOrder::Descending,
)?
.skip(usize::from(query.next.is_some()))
Expand Down Expand Up @@ -350,9 +377,14 @@ pub struct ScriptUnspentResponse {
impl From<ScriptUnspent> for ScriptUnspentResponse {
fn from(v: ScriptUnspent) -> Self {
Self {
id: v.id,
hid: v.hid,
sort: v.sort,
id: format!("{}{}", v.id.0, hex::encode(v.id.1)),
hid: hex::encode(v.hid),
sort: format!(
"{}{}{}",
hex::encode(v.block.height.to_be_bytes()),
v.txid,
hex::encode(v.vout.n.to_be_bytes())
),
block: v.block,
script: ScriptUnspentScriptResponse {
r#type: v.script.r#type,
Expand Down Expand Up @@ -414,10 +446,7 @@ async fn list_transaction_unspent(
.services
.script_unspent
.by_id
.list(
Some((hid.clone(), next.0, next.1, next.2)),
SortOrder::Ascending,
)?
.list(Some((hid, next.0, next.1, next.2)), SortOrder::Ascending)?
.skip(usize::from(query.next.is_some()))
.filter_map(|item| match item {
Ok((k, v)) if k.0 == hid => Some(Ok(v.into())),
Expand Down
2 changes: 1 addition & 1 deletion lib/ain-ocean/src/api/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub fn to_script(address: &str, network: Network) -> Result<ScriptBuf> {
Ok(ScriptBuf::from(addr))
}

pub fn address_to_hid(address: &str, network: Network) -> Result<String> {
pub fn address_to_hid(address: &str, network: Network) -> Result<[u8; 32]> {
let script = to_script(address, network)?;
let bytes = script.to_bytes();
Ok(as_sha256(bytes))
Expand Down
70 changes: 64 additions & 6 deletions lib/ain-ocean/src/api/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ use std::sync::Arc;
use ain_macros::ocean_endpoint;
use axum::{extract::Query, routing::get, Extension, Router};
use bitcoin::Txid;
use serde::Deserialize;
use serde::{Deserialize, Serialize};

use super::{path::Path, query::PaginationQuery, response::ApiPagedResponse, AppContext};
use crate::{
api::{common::Paginate, response::Response},
error::ApiError,
model::{Transaction, TransactionVin, TransactionVout},
model::{
Transaction, TransactionVin, TransactionVinType, TransactionVinVout, TransactionVout,
TransactionVoutScript,
},
storage::{
InitialKeyProvider, RepositoryOps, SortOrder, TransactionVin as TransactionVinStorage,
},
Expand All @@ -30,12 +33,43 @@ async fn get_transaction(
Ok(Response::new(transactions))
}

#[derive(Debug, Serialize)]
struct TransactionVinResponse {
pub id: String,
pub txid: Txid,
pub coinbase: Option<String>,
pub vout: Option<TransactionVinVout>,
pub script: Option<String>,
pub tx_in_witness: Option<Vec<String>>,
pub sequence: i64,
}

impl From<TransactionVin> for TransactionVinResponse {
fn from(v: TransactionVin) -> Self {
let (id, coinbase) = match v.r#type {
TransactionVinType::Coinbase(coinbase) => (format!("{}00", v.txid), Some(coinbase)),
TransactionVinType::Standard((txid, vout)) => {
(format!("{}{}{:x}", v.txid, txid, vout), None)
}
};
Self {
id,
txid: v.txid,
coinbase,
vout: v.vout,
script: v.script,
tx_in_witness: v.tx_in_witness,
sequence: v.sequence,
}
}
}

#[ocean_endpoint]
async fn get_vins(
Path(TransactionId { id }): Path<TransactionId>,
Query(query): Query<PaginationQuery>,
Extension(ctx): Extension<Arc<AppContext>>,
) -> Result<ApiPagedResponse<TransactionVin>> {
) -> Result<ApiPagedResponse<TransactionVinResponse>> {
let next = query
.next
.clone()
Expand All @@ -48,7 +82,7 @@ async fn get_vins(
.list(Some(next), SortOrder::Descending)?
.paginate(&query)
.filter_map(|item| match item {
Ok((_, vout)) if vout.txid == id => Some(Ok(vout)),
Ok((_, vin)) if vin.txid == id => Some(Ok(TransactionVinResponse::from(vin))),
Ok(_) => None,
Err(e) => Some(Err(e.into())),
})
Expand All @@ -59,13 +93,37 @@ async fn get_vins(
}))
}

#[derive(Debug, Serialize)]
struct TransactionVoutResponse {
pub id: String,
// pub vout: usize,
pub txid: Txid,
pub n: usize,
pub value: String,
pub token_id: Option<u32>,
pub script: TransactionVoutScript,
}

impl From<TransactionVout> for TransactionVoutResponse {
fn from(v: TransactionVout) -> Self {
Self {
id: format!("{}{:x}", v.txid, v.vout),
txid: v.txid,
n: v.n,
value: v.value,
token_id: v.token_id,
script: v.script,
}
}
}

//get list of vout transaction, by passing id which contains txhash + vout_idx
#[ocean_endpoint]
async fn get_vouts(
Path(TransactionId { id }): Path<TransactionId>,
Query(query): Query<PaginationQuery>,
Extension(ctx): Extension<Arc<AppContext>>,
) -> Result<ApiPagedResponse<TransactionVout>> {
) -> Result<ApiPagedResponse<TransactionVoutResponse>> {
let next = query.next.as_deref().unwrap_or("0").parse::<usize>()?;

let list = ctx
Expand All @@ -75,7 +133,7 @@ async fn get_vouts(
.list(Some((id, next)), SortOrder::Ascending)?
.paginate(&query)
.filter_map(|item| match item {
Ok((_, vin)) if vin.txid == id => Some(Ok(vin)),
Ok((_, vout)) if vout.txid == id => Some(Ok(TransactionVoutResponse::from(vout))),
Ok(_) => None,
Err(e) => Some(Err(e.into())),
})
Expand Down
4 changes: 2 additions & 2 deletions lib/ain-ocean/src/hex_encoder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use sha2::{Digest, Sha256};

#[must_use]
pub fn as_sha256(bytes: Vec<u8>) -> String {
pub fn as_sha256(bytes: Vec<u8>) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update(bytes);
format!("{:x}", hasher.finalize())
hasher.finalize().into()
}
Loading
Loading