Skip to content

Commit

Permalink
Content is now transparently compressed/decompressed in a storage
Browse files Browse the repository at this point in the history
  • Loading branch information
bazhenov committed Feb 28, 2023
1 parent 21bb88b commit fd87815
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 23 deletions.
5 changes: 5 additions & 0 deletions migrations/V6__compression.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE pages ADD compressed INT;
ALTER TABLE pages ADD _content BLOB;
UPDATE pages SET _content = CAST(content AS BLOB);
ALTER TABLE pages DROP content;
ALTER TABLE pages RENAME _content TO content;
4 changes: 1 addition & 3 deletions src/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ pub async fn run_crawler(
let mut futures = FuturesUnordered::new();
let mut pages = vec![];
let mut proxies = match &opts.proxies {
Some(path) => {
Proxies::from_file(&path).context(AppError::LoadingProxyList(path.clone()))?
}
Some(path) => Proxies::from_file(path).context(AppError::LoadingProxyList(path.clone()))?,
None => Proxies::default(),
};

Expand Down
59 changes: 39 additions & 20 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::{prelude::*, PageTypeId};
use futures::{stream::BoxStream, StreamExt};
use int_enum::IntEnum;
use refinery::{
config::{Config, ConfigDbType},
embed_migrations,
};
use sqlx::{
sqlite::{SqlitePoolOptions, SqliteRow},
Row, SqlitePool,
};
use std::{fmt, path::Path};
use std::{fmt, io::Cursor, path::Path};
use url::Url;

use refinery::{
config::{Config, ConfigDbType},
embed_migrations,
};
use zstd::bulk::compress;
embed_migrations!("./migrations");

pub struct Storage {
Expand Down Expand Up @@ -90,13 +90,15 @@ impl Storage {
where
U::Error: Sync + Send + std::error::Error + 'static,
{
let new_id = sqlx::query("INSERT OR IGNORE INTO pages (url, type, depth) VALUES (?, ?, ?)")
.bind(url.try_into()?.to_string())
.bind(type_id)
.bind(depth)
.execute(&self.connection)
.await?
.last_insert_rowid();
let new_id = sqlx::query(
"INSERT OR IGNORE INTO pages (url, type, depth, compressed) VALUES (?, ?, ?, 0)",
)
.bind(url.try_into()?.to_string())
.bind(type_id)
.bind(depth)
.execute(&self.connection)
.await?
.last_insert_rowid();
if new_id > 0 && new_id != self.last_insert_id {
self.last_insert_id = new_id;
Ok(Some(new_id))
Expand Down Expand Up @@ -131,8 +133,9 @@ impl Storage {

/// Writes page content in storage and marks page as [`PageStatus::Downloaded`]
pub async fn write_page_content(&self, page_id: i64, content: &str) -> Result<()> {
sqlx::query("UPDATE pages SET content = ?, status = ? WHERE id = ?")
.bind(content)
let compressed = compress(content.as_bytes(), 3)?;
sqlx::query("UPDATE pages SET content = ?, compressed = 1, status = ? WHERE id = ?")
.bind(compressed)
.bind(PageStatus::Downloaded.int_value())
.bind(page_id)
.execute(&self.connection)
Expand All @@ -150,17 +153,22 @@ impl Storage {
}

pub async fn read_page_content(&self, id: i64) -> Result<Option<(String, PageTypeId)>> {
let content: Option<(String, PageTypeId)> =
sqlx::query_as("SELECT content, type FROM pages WHERE id = ?")
let content: Option<(Vec<u8>, PageTypeId, u8)> =
sqlx::query_as("SELECT content, type, compressed FROM pages WHERE id = ?")
.bind(id)
.fetch_optional(&self.connection)
.await?;
Ok(content)
if let Some((content, type_id, compressed)) = content {
let content = decompress_zstd(content, compressed > 0)?;
Ok(Some((content, type_id)))
} else {
Ok(None)
}
}

/// Lists downloaded pages and its content
pub fn read_downloaded_pages(&self) -> BoxStream<Result<(Page, String)>> {
let sql = "SELECT id, url, type, depth, status, content FROM pages WHERE content IS NOT NULL AND status = ?";
let sql = "SELECT id, url, type, depth, status, content, compressed FROM pages WHERE content IS NOT NULL AND status = ?";
let r = sqlx::query(sql)
.bind(PageStatus::Downloaded.int_value())
.fetch(&self.connection)
Expand All @@ -169,6 +177,16 @@ impl Storage {
}
}

fn decompress_zstd(data: Vec<u8>, compressed: bool) -> Result<String> {
if compressed {
let mut out = Cursor::new(vec![]);
zstd::stream::copy_decode(data.as_slice(), &mut out)?;
Ok(String::from_utf8(out.into_inner())?)
} else {
Ok(String::from_utf8(data)?)
}
}

fn page_from_row(row: StdResult<SqliteRow, sqlx::Error>) -> Result<(Page, String)> {
let row = row?;

Expand All @@ -179,7 +197,8 @@ fn page_from_row(row: StdResult<SqliteRow, sqlx::Error>) -> Result<(Page, String
let status: u8 = row.try_get("status")?;
let page = page_from_tuple((page_id, url, type_id, depth, status))?;

let content: String = row.try_get("content")?;
let compressed: u8 = row.try_get("compressed")?;
let content = decompress_zstd(row.try_get("content")?, compressed > 0)?;

Ok((page, content))
}
Expand Down

0 comments on commit fd87815

Please sign in to comment.