Skip to content

Commit

Permalink
refactor manifest info to check for content type
Browse files Browse the repository at this point in the history
  • Loading branch information
xyz committed Jan 29, 2025
1 parent e82dcc2 commit 7ab35db
Show file tree
Hide file tree
Showing 19 changed files with 142 additions and 220 deletions.
1 change: 0 additions & 1 deletion src/analytics/enrich_rms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,5 +245,4 @@ fn test_rms_pipeline() {
assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0));

process_shill(&mut swaps);
// dbg!(&swaps);
}
77 changes: 1 addition & 76 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,77 +89,10 @@ pub async fn get_date_range_deposits_alt(
deposited,
};
top_deposits.push(d);
// dbg!(&d);
}
Ok(top_deposits)
}

// pub async fn get_date_range_deposits(
// pool: &Graph,
// top_n: u64,
// start: DateTime<Utc>,
// end: DateTime<Utc>,
// ) -> Result<Vec<Deposit>> {
// let mut top_deposits = vec![];

// let q = format!(
// // r#"
// // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit

// // // Step 1: Get the list of all depositors
// // MATCH (depositor:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WITH COLLECT(DISTINCT depositor) AS all_depositors, olswap_deposit, tx

// // // Step 2: Match depositors and amounts within the date range

// // UNWIND all_depositors AS depositor

// // OPTIONAL MATCH (depositor)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WHERE tx2.block_datetime >= datetime('{}') AND tx2.block_datetime <= datetime('{}')

// // WITH
// // depositor.address AS account,
// // COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount
// // RETURN account, toFloat(deposit_amount) as deposited
// // ORDER BY deposited DESC

// // "#,
// r#"
// WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit
// MATCH
// (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}})
// WHERE
// tx.`block_datetime` > datetime("{}")
// AND tx.`block_datetime` < datetime("{}")
// WITH
// DISTINCT(u),
// SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount
// ORDER BY totalTxAmount DESCENDING
// RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited

// "#,
// start.to_rfc3339(),
// end.to_rfc3339(),
// // top_n,
// );
// let cypher_query = neo4rs::query(&q);

// // Execute the query
// let mut result = pool.execute(cypher_query).await?;

// // Fetch the first row only
// while let Some(r) = result.next().await? {
// let account_str = r.get::<String>("account").unwrap_or("unknown".to_string());
// let deposited = r.get::<f64>("deposited").unwrap_or(0.0);
// let d = Deposit {
// account: account_str.parse().unwrap_or(AccountAddress::ZERO),
// deposited,
// };
// top_deposits.push(d);
// // dbg!(&d);
// }
// Ok(top_deposits)
// }

pub async fn get_exchange_users(
pool: &Graph,
Expand Down Expand Up @@ -194,7 +127,6 @@ pub async fn get_exchange_users(
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -226,7 +158,6 @@ pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result<Vec<MinFun
funded: funded as f64,
};
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -263,7 +194,6 @@ pub async fn get_one_exchange_user(
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -311,11 +241,6 @@ impl Matching {
.map(|el| el.user_id)
.collect();

// dbg!(&ids);
// let user_ledger = funded.iter().find(|el| {
// // check if we have already identified it
// self.definite.0.get(el.user_id).none()
// });
Ok((*ids.first().unwrap(), *ids.get(1).unwrap()))
}

Expand Down Expand Up @@ -460,7 +385,7 @@ impl Matching {

let mut eval: Vec<AccountAddress> = vec![];
deposits.iter().for_each(|el| {
// dbg!(&el);

if el.deposited >= user.funded &&
// must not already have been tagged impossible
!pending.impossible.contains(&el.account) &&
Expand Down
1 change: 0 additions & 1 deletion src/cypher_templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ use serde_json::Value;
pub fn to_cypher_object<T: Serialize>(object: &T) -> Result<String> {
// Serialize the struct to a JSON value
let serialized_value = serde_json::to_value(object).expect("Failed to serialize");
// dbg!(&serialized_value);

let flattener = smooth_json::Flattener {
separator: "_",
Expand Down
1 change: 0 additions & 1 deletion src/enrich_exchange_onboarding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOnRamp]) ->
// cypher queries makes it annoying to do a single insert of users and
// txs
let cypher_string = ExchangeOnRamp::cypher_batch_link_owner(&list_str);
// dbg!(&cypher_string);

// Execute the query
let cypher_query = neo4rs::query(&cypher_string);
Expand Down
28 changes: 14 additions & 14 deletions src/extract_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ pub async fn extract_current_transactions(

// first increment the block metadata. This assumes the vector is sequential.
if let Some(block) = tx.try_as_block_metadata() {
// check the epochs are incrementing or not
if epoch > block.epoch()
&& round > block.round()
&& timestamp > block.timestamp_usecs()
{
dbg!(
epoch,
block.epoch(),
round,
block.round(),
timestamp,
block.timestamp_usecs()
);
}
// // check the epochs are incrementing or not
// if epoch > block.epoch()
// && round > block.round()
// && timestamp > block.timestamp_usecs()
// {
// dbg!(
// epoch,
// block.epoch(),
// round,
// block.round(),
// timestamp,
// block.timestamp_usecs()
// );
// }

epoch = block.epoch();
round = block.round();
Expand Down
19 changes: 11 additions & 8 deletions src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ pub async fn ingest_all(
let pending = queue::get_queued(pool).await?;
info!("pending archives: {}", pending.len());

// This manifest may be for a .gz file, we should handle here as well
for (_p, m) in archive_map.0.iter() {
info!("checking if we need to decompress");
let (new_unzip_path, temp) = unzip_temp::maybe_handle_gz(&m.archive_dir)?;
let mut better_man = ManifestInfo::new(&new_unzip_path);
better_man.set_info()?;

println!(
"\nProcessing: {:?} with archive: {}",
m.contents,
Expand All @@ -60,6 +66,7 @@ pub async fn ingest_all(
m.archive_dir.display()
);
}
drop(temp);
}

Ok(())
Expand All @@ -70,9 +77,6 @@ pub async fn try_load_one_archive(
pool: &Graph,
batch_size: usize,
) -> Result<BatchTxReturn> {
info!("checking if we need to decompress");
let (archive_path, temp) = unzip_temp::maybe_handle_gz(&man.archive_dir)?;

let mut all_results = BatchTxReturn::new();
match man.contents {
crate::scan::BundleContent::Unknown => todo!(),
Expand All @@ -82,24 +86,23 @@ pub async fn try_load_one_archive(
error!("no framework version detected");
bail!("could not load archive from manifest");
}
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&archive_path).await?,
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?,
crate::scan::FrameworkVersion::V6 => {
extract_current_snapshot(&archive_path).await?
extract_current_snapshot(&man.archive_dir).await?
}
crate::scan::FrameworkVersion::V7 => {
extract_current_snapshot(&archive_path).await?
extract_current_snapshot(&man.archive_dir).await?
}
};
snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?;
}
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&archive_path, &man.version).await?;
let (txs, _) = extract_current_transactions(&man.archive_dir, &man.version).await?;
let batch_res =
load_tx_cypher::tx_batch(&txs, pool, batch_size, &man.archive_id).await?;
all_results.increment(&batch_res);
}
crate::scan::BundleContent::EpochEnding => todo!(),
}
drop(temp);
Ok(all_results)
}
2 changes: 0 additions & 2 deletions src/load_account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ pub async fn snapshot_batch(

match impl_batch_snapshot_insert(pool, c).await {
Ok(batch) => {
// dbg!(&batch);
all_results.increment(&batch);
// dbg!(&all_results);
queue::update_task(pool, archive_id, true, i).await?;
info!("...success");
}
Expand Down
Loading

0 comments on commit 7ab35db

Please sign in to comment.