diff --git a/src/notifications/database_ext.rs b/src/notifications/database_ext.rs index 00197a8..155b22d 100644 --- a/src/notifications/database_ext.rs +++ b/src/notifications/database_ext.rs @@ -74,13 +74,14 @@ impl Database { let page_limit = page_size as i64; try_stream! { let mut last_id = 0; + let mut conn = self.pool.acquire().await?; loop { let raw_notification_ids = query!( r#"SELECT id FROM notifications WHERE scheduled_at <= $1 AND id > $2 ORDER BY scheduled_at, id LIMIT $3;"#, scheduled_before_or_at, last_id, page_limit - ).fetch_all(&self.pool).await?; + ).fetch_all(&mut *conn).await?; let is_last_page = raw_notification_ids.len() < page_size; for raw_notification_id in raw_notification_ids { diff --git a/src/scheduler/database_ext.rs b/src/scheduler/database_ext.rs index 30dec7c..87ef266 100644 --- a/src/scheduler/database_ext.rs +++ b/src/scheduler/database_ext.rs @@ -79,12 +79,13 @@ WHERE id = $1 let page_limit = page_size as i64; try_stream! { let mut last_id = Uuid::nil(); + let mut conn = self.pool.acquire().await?; loop { let jobs = query_as!(RawSchedulerJobStoredData, r#"SELECT * FROM scheduler_jobs WHERE id > $1 ORDER BY id LIMIT $2;"#, last_id, page_limit ) - .fetch_all(&self.pool) + .fetch_all(&mut *conn) .await?; let is_last_page = jobs.len() < page_size; diff --git a/src/utils/web_scraping/database_ext.rs b/src/utils/web_scraping/database_ext.rs index ac6091a..a55f0c3 100644 --- a/src/utils/web_scraping/database_ext.rs +++ b/src/utils/web_scraping/database_ext.rs @@ -357,6 +357,7 @@ ORDER BY created_at try_stream! { let mut last_created_at = OffsetDateTime::UNIX_EPOCH; let kind = Vec::try_from(Tag::KIND)?; + let mut conn = self.pool.acquire().await?; loop { let records = query!( r#" @@ -371,7 +372,7 @@ LIMIT $3; "#, kind, last_created_at, page_limit ) - .fetch_all(self.pool) + .fetch_all(&mut *conn) .await?; let is_last_page = records.len() < page_size;