From fd8781512161436f49c08d3e2bfb1176fdd0f528 Mon Sep 17 00:00:00 2001 From: Denis Bazhenov Date: Tue, 28 Feb 2023 12:57:17 +0700 Subject: [PATCH] Content is now transparently compressed/decompressed in a storage --- migrations/V6__compression.sql | 5 +++ src/crawler.rs | 4 +-- src/storage.rs | 59 ++++++++++++++++++++++------------ 3 files changed, 45 insertions(+), 23 deletions(-) create mode 100644 migrations/V6__compression.sql diff --git a/migrations/V6__compression.sql b/migrations/V6__compression.sql new file mode 100644 index 0000000..1d60af0 --- /dev/null +++ b/migrations/V6__compression.sql @@ -0,0 +1,5 @@ +ALTER TABLE pages ADD compressed INT; +ALTER TABLE pages ADD _content BLOB; +UPDATE pages SET _content = CAST(content AS BLOB); +ALTER TABLE pages DROP content; +ALTER TABLE pages RENAME _content TO content; diff --git a/src/crawler.rs b/src/crawler.rs index b69766a..8b6dd29 100644 --- a/src/crawler.rs +++ b/src/crawler.rs @@ -43,9 +43,7 @@ pub async fn run_crawler( let mut futures = FuturesUnordered::new(); let mut pages = vec![]; let mut proxies = match &opts.proxies { - Some(path) => { - Proxies::from_file(&path).context(AppError::LoadingProxyList(path.clone()))? - } + Some(path) => Proxies::from_file(path).context(AppError::LoadingProxyList(path.clone()))?, None => Proxies::default(), }; diff --git a/src/storage.rs b/src/storage.rs index c9ed909..484be4f 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,17 +1,17 @@ use crate::{prelude::*, PageTypeId}; use futures::{stream::BoxStream, StreamExt}; use int_enum::IntEnum; +use refinery::{ + config::{Config, ConfigDbType}, + embed_migrations, +}; use sqlx::{ sqlite::{SqlitePoolOptions, SqliteRow}, Row, SqlitePool, }; -use std::{fmt, path::Path}; +use std::{fmt, io::Cursor, path::Path}; use url::Url; - -use refinery::{ - config::{Config, ConfigDbType}, - embed_migrations, -}; +use zstd::bulk::compress; embed_migrations!("./migrations"); pub struct Storage { @@ -90,13 +90,15 @@ impl Storage { where U::Error: Sync + Send + std::error::Error + 'static, { - let new_id = sqlx::query("INSERT OR IGNORE INTO pages (url, type, depth) VALUES (?, ?, ?)") - .bind(url.try_into()?.to_string()) - .bind(type_id) - .bind(depth) - .execute(&self.connection) - .await? - .last_insert_rowid(); + let new_id = sqlx::query( + "INSERT OR IGNORE INTO pages (url, type, depth, compressed) VALUES (?, ?, ?, 0)", + ) + .bind(url.try_into()?.to_string()) + .bind(type_id) + .bind(depth) + .execute(&self.connection) + .await? + .last_insert_rowid(); if new_id > 0 && new_id != self.last_insert_id { self.last_insert_id = new_id; Ok(Some(new_id)) @@ -131,8 +133,9 @@ impl Storage { /// Writes page content in storage and marks page as [`PageStatus::Downloaded`] pub async fn write_page_content(&self, page_id: i64, content: &str) -> Result<()> { - sqlx::query("UPDATE pages SET content = ?, status = ? WHERE id = ?") - .bind(content) + let compressed = compress(content.as_bytes(), 3)?; + sqlx::query("UPDATE pages SET content = ?, compressed = 1, status = ? WHERE id = ?") + .bind(compressed) .bind(PageStatus::Downloaded.int_value()) .bind(page_id) .execute(&self.connection) @@ -150,17 +153,22 @@ impl Storage { } pub async fn read_page_content(&self, id: i64) -> Result> { - let content: Option<(String, PageTypeId)> = - sqlx::query_as("SELECT content, type FROM pages WHERE id = ?") + let content: Option<(Vec, PageTypeId, u8)> = + sqlx::query_as("SELECT content, type, compressed FROM pages WHERE id = ?") .bind(id) .fetch_optional(&self.connection) .await?; - Ok(content) + if let Some((content, type_id, compressed)) = content { + let content = decompress_zstd(content, compressed > 0)?; + Ok(Some((content, type_id))) + } else { + Ok(None) + } } /// Lists downloaded pages and its content pub fn read_downloaded_pages(&self) -> BoxStream> { - let sql = "SELECT id, url, type, depth, status, content FROM pages WHERE content IS NOT NULL AND status = ?"; + let sql = "SELECT id, url, type, depth, status, content, compressed FROM pages WHERE content IS NOT NULL AND status = ?"; let r = sqlx::query(sql) .bind(PageStatus::Downloaded.int_value()) .fetch(&self.connection) @@ -169,6 +177,16 @@ impl Storage { } } +fn decompress_zstd(data: Vec, compressed: bool) -> Result { + if compressed { + let mut out = Cursor::new(vec![]); + zstd::stream::copy_decode(data.as_slice(), &mut out)?; + Ok(String::from_utf8(out.into_inner())?) + } else { + Ok(String::from_utf8(data)?) + } +} + fn page_from_row(row: StdResult) -> Result<(Page, String)> { let row = row?; @@ -179,7 +197,8 @@ fn page_from_row(row: StdResult) -> Result<(Page, String let status: u8 = row.try_get("status")?; let page = page_from_tuple((page_id, url, type_id, depth, status))?; - let content: String = row.try_get("content")?; + let compressed: u8 = row.try_get("compressed")?; + let content = decompress_zstd(row.try_get("content")?, compressed > 0)?; Ok((page, content)) }