Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[benchmarks] Add concurrent readers #707 #708

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions agdb_benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,37 @@ use std::time::Duration;
mod bench_error;
mod bench_result;
mod database;
mod readers;
mod users;
mod utilities;
mod writers;

pub(crate) const BENCH_DATABASE: &str = "db.agdb";
pub(crate) const LOCALE: Locale = Locale::cs;
pub(crate) const PADDING: usize = 30;
pub(crate) const CELL_PADDING: usize = 8;
pub(crate) const CELL_PADDING: usize = 15;

pub(crate) const USER_COUNT: u32 = 1000;

pub(crate) const POST_WRITER_COUNT: u32 = 100;
pub(crate) const POST_WRITERS: u32 = 10;
pub(crate) const POSTS_PER_WRITER: u32 = 100;
pub(crate) const POST_TITLE: &str = "Title of the testing post";
pub(crate) const POST_BODY: &str = "Body of the testing post should be longer than the title";

pub(crate) const COMMENT_WRITER_COUNT: u32 = 100;
pub(crate) const COMMENT_WRITERS: u32 = 10;
pub(crate) const COMMENTS_PER_WRITER: u32 = 100;
pub(crate) const COMMENT_BODY: &str = "This is a testing comment of a post.";

pub(crate) const WRITE_DELAY: Duration = Duration::from_millis(0);
pub(crate) const WRITE_DELAY: Duration = Duration::from_millis(10);

pub(crate) const POST_READERS: u32 = 100;
pub(crate) const POSTS_PER_READ: u32 = 10;
pub(crate) const READS_PER_POST_READER: u32 = 100;

pub(crate) const COMMENTS_READERS: u32 = 100;
pub(crate) const COMMENTS_PER_READ: u32 = 10;
pub(crate) const READS_PER_COMMENTS_READER: u32 = 100;

pub(crate) const READ_DELAY: Duration = Duration::from_millis(10);
pub(crate) const USER_COUNT: u32 = POST_WRITERS + COMMENT_WRITERS;

#[tokio::main]
async fn main() -> BenchResult<()> {
Expand All @@ -35,19 +45,32 @@ async fn main() -> BenchResult<()> {

let mut db = Database::new()?;
users::setup_users(&mut db)?;

let mut posters = writers::start_post_writers(&mut db)?;
let mut commenters = writers::start_comment_writers(&mut db)?;
let mut post_readers = readers::start_post_readers(&mut db)?;
let mut comment_readers = readers::start_comment_readers(&mut db)?;

posters
.join_and_report(&format!(
"{POST_WRITER_COUNT} posters * {POSTS_PER_WRITER} posts"
"{POST_WRITERS} posters * {POSTS_PER_WRITER} posts"
))
.await?;
commenters
.join_and_report(&format!(
"{POST_WRITER_COUNT} commenters * {COMMENTS_PER_WRITER} comments"
"{COMMENT_WRITERS} commenters * {COMMENTS_PER_WRITER} comments"
))
.await?;
post_readers
.join_and_report(&format!(
"{POST_READERS} post readers * {POSTS_PER_READ} * {READS_PER_POST_READER}"
))
.await?;
comment_readers
.join_and_report(&format!(
"{COMMENTS_READERS} comment readers * {COMMENTS_PER_READ} * {READS_PER_COMMENTS_READER}"
))
.await?;

println!("---");
db.stat()
Expand Down
178 changes: 178 additions & 0 deletions agdb_benchmarks/src/readers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use crate::bench_result::BenchResult;
use crate::database::Database;
use crate::utilities;
use crate::utilities::measured;
use crate::COMMENTS_PER_READ;
use crate::COMMENTS_READERS;
use crate::POSTS_PER_READ;
use crate::POST_READERS;
use crate::READS_PER_COMMENTS_READER;
use crate::READS_PER_POST_READER;
use crate::READ_DELAY;
use agdb::DbId;
use agdb::QueryBuilder;
use std::time::Duration;
use tokio::task::JoinHandle;

struct Reader {
db: Database,
pub(crate) times: Vec<Duration>,
}

pub(crate) struct Readers(Vec<JoinHandle<Reader>>);

impl Reader {
pub(crate) fn new(db: Database) -> Self {
Self { db, times: vec![] }
}

fn read_comments(&mut self) -> BenchResult<bool> {
if let Some(post_id) = self.last_post()? {
let duration = measured(|| {
let _comments = self.db.0.read()?.exec(
&QueryBuilder::select()
.ids(
QueryBuilder::search()
.from(post_id)
.limit(COMMENTS_PER_READ.into())
.where_()
.distance(agdb::CountComparison::Equal(2))
.query(),
)
.query(),
)?;
Ok(())
})?;

self.times.push(duration);

Ok(true)
} else {
Ok(false)
}
}

fn read_posts(&mut self) -> BenchResult<bool> {
let mut result = false;

let duration = measured(|| {
let posts = self.db.0.read()?.exec(
&QueryBuilder::select()
.ids(
QueryBuilder::search()
.from("posts")
.limit(POSTS_PER_READ.into())
.where_()
.distance(agdb::CountComparison::Equal(2))
.query(),
)
.query(),
)?;

result = posts.result != 0;

Ok(())
})?;

if result {
self.times.push(duration);
}

Ok(result)
}

fn last_post(&mut self) -> BenchResult<Option<DbId>> {
if let Some(post) = self
.db
.0
.read()?
.exec(
&QueryBuilder::search()
.depth_first()
.from("posts")
.limit(1)
.where_()
.distance(agdb::CountComparison::Equal(2))
.query(),
)?
.elements
.get(0)
{
Ok(Some(post.id))
} else {
Ok(None)
}
}
}

impl Readers {
pub(crate) async fn join_and_report(&mut self, description: &str) -> BenchResult<()> {
let mut readers = vec![];

for task in self.0.iter_mut() {
readers.push(task.await?);
}

let times: Vec<Duration> = readers.into_iter().flat_map(|w| w.times).collect();

utilities::report(description, times);

Ok(())
}
}

pub(crate) fn start_post_readers(db: &mut Database) -> BenchResult<Readers> {
let mut tasks = vec![];

for i in 0..POST_READERS {
let db = db.clone();

let handle = tokio::spawn(async move {
let mut reader = Reader::new(db);
let mut read = 0;
let read_delay = Duration::from_millis(READ_DELAY.as_millis() as u64 % (i + 1) as u64);

while read != READS_PER_POST_READER {
tokio::time::sleep(read_delay).await;

if reader.read_posts().unwrap_or(false) {
read += 1;
}
}

reader
});

tasks.push(handle);
}

Ok(Readers(tasks))
}

pub(crate) fn start_comment_readers(db: &mut Database) -> BenchResult<Readers> {
let mut tasks = vec![];

for i in 0..COMMENTS_READERS {
let db = db.clone();

let handle = tokio::spawn(async move {
let mut reader = Reader::new(db);
let mut read = 0;
let read_delay = Duration::from_millis(READ_DELAY.as_millis() as u64 % (i + 1) as u64);

while read != READS_PER_COMMENTS_READER {
tokio::time::sleep(read_delay).await;

if reader.read_comments().unwrap_or(false) {
read += 1;
}
}

reader
});

tasks.push(handle);
}

Ok(Readers(tasks))
}
4 changes: 4 additions & 0 deletions agdb_benchmarks/src/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pub(crate) fn print_header() {
"{:PADDING$} | {:CELL_PADDING$} | {:CELL_PADDING$} | {:CELL_PADDING$} | {:CELL_PADDING$} | {:CELL_PADDING$}",
"Description", "Count", "Min", "Avg", "Max", "Total"
);
println!(
"{:-<PADDING$} | {:-<CELL_PADDING$} | {:-<CELL_PADDING$} | {:-<CELL_PADDING$} | {:-<CELL_PADDING$} | {:-<CELL_PADDING$}",
"", "", "", "", "", ""
);
}

pub(crate) fn report(description: &str, mut times: Vec<Duration>) {
Expand Down
72 changes: 38 additions & 34 deletions agdb_benchmarks/src/writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use crate::utilities;
use crate::utilities::measured;
use crate::COMMENTS_PER_WRITER;
use crate::COMMENT_BODY;
use crate::COMMENT_WRITER_COUNT;
use crate::COMMENT_WRITERS;
use crate::POSTS_PER_WRITER;
use crate::POST_BODY;
use crate::POST_TITLE;
use crate::POST_WRITER_COUNT;
use crate::POST_WRITERS;
use crate::WRITE_DELAY;
use agdb::DbId;
use agdb::QueryBuilder;
Expand Down Expand Up @@ -45,7 +45,7 @@ impl Writer {
}
}

pub(crate) fn write_post(&mut self) -> BenchResult<()> {
fn write_post(&mut self) -> BenchResult<()> {
let duration = measured(|| {
self.db.0.write()?.transaction_mut(|t| -> BenchResult<()> {
let id = t
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Writer {
Ok(())
}

pub(crate) fn write_comment(&mut self) -> BenchResult<bool> {
fn write_comment(&mut self) -> BenchResult<bool> {
if let Some(post_id) = self.last_post()? {
let duration = measured(|| {
self.db.0.write()?.transaction_mut(|t| -> BenchResult<()> {
Expand Down Expand Up @@ -160,34 +160,36 @@ impl Writers {
}

pub(crate) fn start_post_writers(db: &mut Database) -> BenchResult<Writers> {
let tasks =
db.0.read()?
.exec(
&QueryBuilder::search()
.from("users")
.limit(POST_WRITER_COUNT.into())
.where_()
.distance(agdb::CountComparison::Equal(2))
.query(),
)?
.elements
.into_iter()
.map(|e| {
let id = e.id;
let db = db.clone();

tokio::task::spawn(async move {
let mut writer = Writer::new(id, db);

for _ in 0..POSTS_PER_WRITER {
let _ = writer.write_post();
tokio::time::sleep(WRITE_DELAY).await;
}

writer
})
let tasks = db
.0
.read()?
.exec(
&QueryBuilder::search()
.from("users")
.limit(POST_WRITERS.into())
.where_()
.distance(agdb::CountComparison::Equal(2))
.query(),
)?
.elements
.into_iter()
.map(|e| {
let id = e.id;
let db = db.clone();
let write_delay = Duration::from_millis(WRITE_DELAY.as_millis() as u64 % id.0 as u64);

tokio::task::spawn(async move {
let mut writer = Writer::new(id, db);

for _ in 0..POSTS_PER_WRITER {
let _ = writer.write_post();
tokio::time::sleep(write_delay).await;
}

writer
})
.collect::<Vec<JoinHandle<Writer>>>();
})
.collect::<Vec<JoinHandle<Writer>>>();

Ok(Writers(tasks))
}
Expand All @@ -198,8 +200,8 @@ pub(crate) fn start_comment_writers(db: &mut Database) -> BenchResult<Writers> {
.exec(
&QueryBuilder::search()
.from("users")
.offset(POST_WRITER_COUNT.into())
.limit(COMMENT_WRITER_COUNT.into())
.offset(POST_WRITERS.into())
.limit(COMMENT_WRITERS.into())
.where_()
.distance(agdb::CountComparison::Equal(2))
.query(),
Expand All @@ -213,13 +215,15 @@ pub(crate) fn start_comment_writers(db: &mut Database) -> BenchResult<Writers> {
tokio::task::spawn(async move {
let mut writer = Writer::new(id, db);
let mut written = 0;
let write_delay =
Duration::from_millis(WRITE_DELAY.as_millis() as u64 % id.0 as u64);

while written != COMMENTS_PER_WRITER {
if writer.write_comment().unwrap_or(false) {
written += 1;
}

tokio::time::sleep(WRITE_DELAY).await;
tokio::time::sleep(write_delay).await;
}

writer
Expand Down