diff --git a/src/db.rs b/src/db.rs index a696be99..272601b0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,5 +1,4 @@ -use crate::handlers::jobs::handle_job; -use crate::{db::jobs::*, handlers::Context}; +use crate::{db::jobs::*, handlers::Context, jobs::jobs}; use anyhow::Context as _; use chrono::Utc; use native_tls::{Certificate, TlsConnector}; @@ -188,16 +187,32 @@ pub async fn schedule_jobs(db: &DbClient, jobs: Vec) -> anyhow::Res let mut upcoming = job.schedule.upcoming(Utc).take(1); if let Some(scheduled_at) = upcoming.next() { - if let Err(_) = get_job_by_name_and_scheduled_at(&db, &job.name, &scheduled_at).await { - // mean there's no job already in the db with that name and scheduled_at - insert_job(&db, &job.name, &scheduled_at, &job.metadata).await?; - } + schedule_job(db, job.name, job.metadata, scheduled_at).await?; } } Ok(()) } +pub async fn schedule_job( + db: &DbClient, + job_name: &str, + job_metadata: serde_json::Value, + when: chrono::DateTime, +) -> anyhow::Result<()> { + let all_jobs = jobs(); + if !all_jobs.iter().any(|j| j.name() == job_name) { + anyhow::bail!("Job {} does not exist in the current job list.", job_name); + } + + if let Err(_) = get_job_by_name_and_scheduled_at(&db, job_name, &when).await { + // mean there's no job already in the db with that name and scheduled_at + insert_job(&db, job_name, &when, &job_metadata).await?; + } + + Ok(()) +} + pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<()> { let jobs = get_jobs_to_execute(&db).await.unwrap(); tracing::trace!("jobs to execute: {:#?}", jobs); @@ -220,6 +235,26 @@ pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result< Ok(()) } +// Try to handle a specific job +async fn handle_job( + ctx: &Context, + name: &String, + metadata: &serde_json::Value, +) -> anyhow::Result<()> { + for job in jobs() { + if &job.name() == &name { + return job.run(ctx, metadata).await; + } + } + tracing::trace!( + "handle_job fell into default case: (name={:?}, metadata={:?})", + name, + metadata + ); + + Ok(()) +} + static MIGRATIONS: &[&str] = &[ " CREATE TABLE notifications ( diff --git a/src/db/jobs.rs b/src/db/jobs.rs index 5db66b0f..cc0f3e65 100644 --- a/src/db/jobs.rs +++ b/src/db/jobs.rs @@ -7,7 +7,7 @@ use tokio_postgres::Client as DbClient; use uuid::Uuid; pub struct JobSchedule { - pub name: String, + pub name: &'static str, pub schedule: Schedule, pub metadata: serde_json::Value, } @@ -24,7 +24,7 @@ pub struct Job { pub async fn insert_job( db: &DbClient, - name: &String, + name: &str, scheduled_at: &DateTime, metadata: &serde_json::Value, ) -> Result<()> { @@ -76,7 +76,7 @@ pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> { pub async fn get_job_by_name_and_scheduled_at( db: &DbClient, - name: &String, + name: &str, scheduled_at: &DateTime, ) -> Result { tracing::trace!( diff --git a/src/github.rs b/src/github.rs index 310d71c3..49f276bc 100644 --- a/src/github.rs +++ b/src/github.rs @@ -492,6 +492,21 @@ impl Issue { .await?) } + // returns an array of one element + pub async fn get_first100_comments( + &self, + client: &GithubClient, + ) -> anyhow::Result> { + let comment_url = format!( + "{}/issues/{}/comments?page=1&per_page=100", + self.repository().url(), + self.number, + ); + Ok(client + .json::>(client.get(&comment_url)) + .await?) + } + pub async fn edit_body(&self, client: &GithubClient, body: &str) -> anyhow::Result<()> { let edit_url = format!("{}/issues/{}", self.repository().url(), self.number); #[derive(serde::Serialize)] diff --git a/src/handlers.rs b/src/handlers.rs index 7e4a04f9..d2bf47d8 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -29,7 +29,6 @@ mod close; pub mod docs_update; mod github_releases; mod glacier; -pub mod jobs; mod major_change; mod mentions; mod milestone_prs; @@ -46,6 +45,7 @@ mod review_submitted; mod rfc_helper; pub mod rustc_commits; mod shortcut; +pub mod types_planning_updates; pub async fn handle(ctx: &Context, event: &Event) -> Vec { let config = config::get(&ctx.github, event.repo()).await; diff --git a/src/handlers/docs_update.rs b/src/handlers/docs_update.rs index 13932d54..80e5886b 100644 --- a/src/handlers/docs_update.rs +++ b/src/handlers/docs_update.rs @@ -1,13 +1,12 @@ //! A scheduled job to post a PR to update the documentation on rust-lang/rust. -use crate::db::jobs::JobSchedule; use crate::github::{self, GitTreeEntry, GithubClient, Issue, Repository}; +use crate::jobs::Job; use anyhow::Context; use anyhow::Result; -use cron::Schedule; +use async_trait::async_trait; use reqwest::Client; use std::fmt::Write; -use std::str::FromStr; /// This is the repository where the commits will be created. const WORK_REPO: &str = "rustbot/rust"; @@ -28,38 +27,42 @@ const SUBMODULES: &[&str] = &[ const TITLE: &str = "Update books"; -pub fn job() -> JobSchedule { - JobSchedule { - name: "docs_update".to_string(), - // Around 9am Pacific time on every Monday. - schedule: Schedule::from_str("0 00 17 * * Mon *").unwrap(), - metadata: serde_json::Value::Null, - } -} +pub struct DocsUpdateJob; -pub async fn handle_job() -> Result<()> { - // Only run every other week. Doing it every week can be a bit noisy, and - // (rarely) a PR can take longer than a week to merge (like if there are - // CI issues). `Schedule` does not allow expressing this, so check it - // manually. - // - // This is set to run the first week after a release, and the week just - // before a release. That allows getting the latest changes in the next - // release, accounting for possibly taking a few days for the PR to land. - let today = chrono::Utc::today().naive_utc(); - let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10); - let duration = today.signed_duration_since(base); - let weeks = duration.num_weeks(); - if weeks % 2 != 0 { - tracing::trace!("skipping job, this is an odd week"); - return Ok(()); +#[async_trait] +impl Job for DocsUpdateJob { + fn name(&self) -> &'static str { + "docs_update" } - tracing::trace!("starting docs-update"); - docs_update() - .await - .context("failed to process docs update")?; - Ok(()) + async fn run( + &self, + _ctx: &super::Context, + _metadata: &serde_json::Value, + ) -> anyhow::Result<()> { + // Only run every other week. Doing it every week can be a bit noisy, and + // (rarely) a PR can take longer than a week to merge (like if there are + // CI issues). `Schedule` does not allow expressing this, so check it + // manually. + // + // This is set to run the first week after a release, and the week just + // before a release. That allows getting the latest changes in the next + // release, accounting for possibly taking a few days for the PR to land. + let today = chrono::Utc::today().naive_utc(); + let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10); + let duration = today.signed_duration_since(base); + let weeks = duration.num_weeks(); + if weeks % 2 != 0 { + tracing::trace!("skipping job, this is an odd week"); + return Ok(()); + } + + tracing::trace!("starting docs-update"); + docs_update() + .await + .context("failed to process docs update")?; + Ok(()) + } } pub async fn docs_update() -> Result> { diff --git a/src/handlers/jobs.rs b/src/handlers/jobs.rs index adb05e4a..e667fa42 100644 --- a/src/handlers/jobs.rs +++ b/src/handlers/jobs.rs @@ -8,20 +8,20 @@ use super::Context; pub async fn handle_job( ctx: &Context, - name: &String, + name: &str, metadata: &serde_json::Value, ) -> anyhow::Result<()> { - match name.as_str() { + match name { "docs_update" => super::docs_update::handle_job().await, "rustc_commits" => { super::rustc_commits::synchronize_commits_inner(ctx, None).await; Ok(()) } - _ => default(&name, &metadata), + _ => default(name, &metadata), } } -fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> { +fn default(name: &str, metadata: &serde_json::Value) -> anyhow::Result<()> { tracing::trace!( "handle_job fell into default case: (name={:?}, metadata={:?})", name, diff --git a/src/handlers/rustc_commits.rs b/src/handlers/rustc_commits.rs index 4724bb2b..d5dc30a9 100644 --- a/src/handlers/rustc_commits.rs +++ b/src/handlers/rustc_commits.rs @@ -1,14 +1,13 @@ -use crate::db::jobs::JobSchedule; use crate::db::rustc_commits; use crate::db::rustc_commits::get_missing_commits; +use crate::jobs::Job; use crate::{ github::{self, Event}, handlers::Context, }; -use cron::Schedule; +use async_trait::async_trait; use std::collections::VecDeque; use std::convert::TryInto; -use std::str::FromStr; use tracing as log; const BORS_GH_ID: i64 = 3372342; @@ -153,12 +152,17 @@ pub async fn synchronize_commits_inner(ctx: &Context, starter: Option<(String, u } } -pub fn job() -> JobSchedule { - JobSchedule { - name: "rustc_commits".to_string(), - // Every 30 minutes... - schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(), - metadata: serde_json::Value::Null, +pub struct RustcCommitsJob; + +#[async_trait] +impl Job for RustcCommitsJob { + fn name(&self) -> &'static str { + "rustc_commits" + } + + async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> { + synchronize_commits_inner(ctx, None).await; + Ok(()) } } diff --git a/src/handlers/types_planning_updates.rs b/src/handlers/types_planning_updates.rs new file mode 100644 index 00000000..a6c9c0c4 --- /dev/null +++ b/src/handlers/types_planning_updates.rs @@ -0,0 +1,170 @@ +use crate::db::schedule_job; +use crate::github; +use crate::jobs::Job; +use anyhow::Context as _; +use async_trait::async_trait; +use chrono::{Datelike, Duration, NaiveTime, TimeZone, Utc}; +use serde::{Deserialize, Serialize}; + +const TYPES_REPO: &'static str = "rust-lang/types-team"; +// T-types/meetings +const TYPES_MEETINGS_STREAM: u64 = 326132; + +pub struct TypesPlanningMeetingThreadOpenJob; + +#[async_trait] +impl Job for TypesPlanningMeetingThreadOpenJob { + fn name(&self) -> &'static str { + "types_planning_meeting_thread_open" + } + + async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> { + // On the last week of the month, we open a thread on zulip for the next Monday + let today = chrono::Utc::now().date().naive_utc(); + let first_monday = today + chrono::Duration::days(7); + // We actually schedule for every Monday, so first check if this is the last Monday of the month + if first_monday.month() == today.month() { + return Ok(()); + } + let meeting_date_string = first_monday.format("%Y-%m-%d").to_string(); + let message = format!("\ + Hello @*T-types/meetings*. Monthly planning meeting in one week.\n\ + This is a reminder to update the current [roadmap tracking issues](https://github.com/rust-lang/types-team/issues?q=is%3Aissue+is%3Aopen+label%3Aroadmap-tracking-issue).\n\ + Extra reminders will be sent later this week."); + let zulip_req = crate::zulip::MessageApiRequest { + recipient: crate::zulip::Recipient::Stream { + id: TYPES_MEETINGS_STREAM, + topic: &format!("{meeting_date_string} planning meeting"), + }, + content: &message, + }; + zulip_req.send(&ctx.github.raw()).await?; + + // Then, we want to schedule the next Thursday after this + let mut thursday = today; + while thursday.weekday().num_days_from_monday() != 3 { + thursday = thursday.succ(); + } + let thursday_at_noon = + Utc.from_utc_datetime(&thursday.and_time(NaiveTime::from_hms(12, 0, 0))); + let metadata = serde_json::value::to_value(PlanningMeetingUpdatesPingMetadata { + date_string: meeting_date_string, + }) + .unwrap(); + schedule_job( + &*ctx.db.get().await, + TypesPlanningMeetingUpdatesPing.name(), + metadata, + thursday_at_noon, + ) + .await?; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +pub struct PlanningMeetingUpdatesPingMetadata { + pub date_string: String, +} + +pub struct TypesPlanningMeetingUpdatesPing; + +#[async_trait] +impl Job for TypesPlanningMeetingUpdatesPing { + fn name(&self) -> &'static str { + "types_planning_meeting_updates_ping" + } + + async fn run(&self, ctx: &super::Context, metadata: &serde_json::Value) -> anyhow::Result<()> { + let metadata = serde_json::from_value(metadata.clone())?; + // On the thursday before the first monday, we want to ping for updates + request_updates(ctx, metadata).await?; + Ok(()) + } +} + +pub async fn request_updates( + ctx: &super::Context, + metadata: PlanningMeetingUpdatesPingMetadata, +) -> anyhow::Result<()> { + let gh = &ctx.github; + let types_repo = gh.repository(TYPES_REPO).await?; + + let tracking_issues_query = github::Query { + filters: vec![("state", "open"), ("is", "issue")], + include_labels: vec!["roadmap-tracking-issue"], + exclude_labels: vec![], + }; + let issues = types_repo + .get_issues(&gh, &tracking_issues_query) + .await + .with_context(|| "Unable to get issues.")?; + + let mut issues_needs_updates = vec![]; + for issue in issues { + // If the issue has been updated in the past 7 days, we consider this "updated". We *could* be more clever, but + // this is fine under the assumption that tracking issues should only contain updates. + let older_than_7_days = issue.updated_at < (Utc::now() - Duration::days(7)); + if !older_than_7_days { + continue; + } + // In the future, we should reach out to specific people in charge of specific issues. For now, because our tracking + // method is crude and will over-estimate the issues that need updates. + /* + let mut dmed_assignee = false; + for assignee in issue.assignees { + let zulip_id_and_email = zulip_id_and_email(ctx, assignee.id.unwrap()).await?; + let (zulip_id, email) = match zulip_id_and_email { + Some(id) => id, + None => continue, + }; + let message = format!( + "Type team tracking issue needs an update. [Issue #{}]({})", + issue.number, issue.html_url + ); + let zulip_req = crate::zulip::MessageApiRequest { + recipient: crate::zulip::Recipient::Private { + id: zulip_id, + email: &email, + }, + content: &message, + }; + zulip_req.send(&ctx.github.raw()).await?; + dmed_assignee = true; + } + if !dmed_assignee { + let message = format!( + "Type team tracking issue needs an update, and was unable to reach an assignee. \ + [Issue #{}]({})", + issue.number, issue.html_url + ); + let zulip_req = crate::zulip::MessageApiRequest { + recipient: crate::zulip::Recipient::Stream { + id: 144729, + topic: "tracking issue updates", + }, + content: &message, + }; + zulip_req.send(&ctx.github.raw()).await?; + } + */ + issues_needs_updates.push(format!("- [Issue #{}]({})", issue.number, issue.html_url)); + } + + let issue_list = issues_needs_updates.join("\n"); + + let message = format!("The following issues still need updates:\n\n{issue_list}"); + + let meeting_date_string = metadata.date_string; + let zulip_req = crate::zulip::MessageApiRequest { + recipient: crate::zulip::Recipient::Stream { + id: TYPES_MEETINGS_STREAM, + topic: &format!("{meeting_date_string} planning meeting"), + }, + content: &message, + }; + zulip_req.send(&ctx.github.raw()).await?; + + Ok(()) +} diff --git a/src/jobs.rs b/src/jobs.rs index 3ba6de49..0acadf27 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -1,4 +1,11 @@ -//! SCHEDULED JOBS +//! # Scheduled Jobs +//! +//! Scheduled jobs essentially come in two flavors: automatically repeating +//! (cron) jobs and one-off jobs. +//! +//! The core trait here is the `Job` trait, which *must* define the name of the +//! job (to be used as an identifier in the database) and the function to run +//! when the job runs. //! //! The metadata is a serde_json::Value //! Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html @@ -7,33 +14,49 @@ //! The schedule is a cron::Schedule //! Please refer to https://docs.rs/cron/latest/cron/struct.Schedule.html for further info //! -//! For example, if we want to sends a Zulip message every Friday at 11:30am ET into #t-release -//! with a @T-release meeting! content, we should create some JobSchedule like: +//! ## Example, sending a zulip message once a week //! +//! To give an example, let's imagine we want to sends a Zulip message every +//! Friday at 11:30am ET into #t-release with a "@T-release meeting!"" content. +//! +//! To begin, let's create a generic zulip message Job: //! #[derive(Serialize, Deserialize)] //! struct ZulipMetadata { //! pub message: String +//! pub channel: String, //! } +//! struct ZulipMessageJob; +//! impl Job for ZulipMessageJob { ... } //! -//! let metadata = serde_json::value::to_value(ZulipMetadata { -//! message: "@T-release meeting!".to_string() -//! }).unwrap(); -//! -//! let schedule = Schedule::from_str("0 30 11 * * FRI *").unwrap(); -//! -//! let new_job = JobSchedule { -//! name: "send_zulip_message".to_owned(), -//! schedule: schedule, -//! metadata: metadata -//! } -//! -//! and include it in the below vector in jobs(): -//! -//! jobs.push(new_job); +//! (Imagine that this job requires a channel and a message in the metadata.) //! -//! ... fianlly, add the corresponding "send_zulip_message" handler in src/handlers/jobs.rs +//! If we wanted to have a default scheduled message, we could add the following to +//! `default_jobs`: +//! JobSchedule { +//! name: ZulipMessageJob.name(), +//! schedule: Schedule::from_str("0 30 11 * * FRI *").unwrap(), +//! metadata: serde_json::value::to_value(ZulipMetadata { +//! message: "@T-release meeting!".to_string() +//! channel: "T-release".to_string(), +//! }).unwrap(), +//! } -use crate::db::jobs::JobSchedule; +use std::str::FromStr; + +use async_trait::async_trait; +use cron::Schedule; + +use crate::{ + db::jobs::JobSchedule, + handlers::{ + docs_update::DocsUpdateJob, + rustc_commits::RustcCommitsJob, + types_planning_updates::{ + TypesPlanningMeetingThreadOpenJob, TypesPlanningMeetingUpdatesPing, + }, + Context, + }, +}; // How often new cron-based jobs will be placed in the queue. // This is the minimum period *between* a single cron task's executions. @@ -43,17 +66,62 @@ pub const JOB_SCHEDULING_CADENCE_IN_SECS: u64 = 1800; // This is the granularity at which events will occur. pub const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60; -pub fn jobs() -> Vec { - // Add to this vector any new cron task you want (as explained above) - let mut jobs: Vec = Vec::new(); - jobs.push(crate::handlers::docs_update::job()); - jobs.push(crate::handlers::rustc_commits::job()); +// The default jobs to schedule, repeatedly. +pub fn jobs() -> Vec> { + vec![ + Box::new(DocsUpdateJob), + Box::new(RustcCommitsJob), + Box::new(TypesPlanningMeetingThreadOpenJob), + Box::new(TypesPlanningMeetingUpdatesPing), + ] +} - jobs +pub fn default_jobs() -> Vec { + vec![ + JobSchedule { + name: DocsUpdateJob.name(), + // Around 9am Pacific time on every Monday. + schedule: Schedule::from_str("0 00 17 * * Mon *").unwrap(), + metadata: serde_json::Value::Null, + }, + JobSchedule { + name: RustcCommitsJob.name(), + // Every 30 minutes... + schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(), + metadata: serde_json::Value::Null, + }, + JobSchedule { + name: TypesPlanningMeetingThreadOpenJob.name(), + // We want last Monday of every month, but cron unfortunately doesn't support that + // Instead, every Monday and we can check + schedule: Schedule::from_str("0 0 12 ? * * *").unwrap(), + metadata: serde_json::Value::Null, + }, + ] +} + +#[async_trait] +pub trait Job { + fn name(&self) -> &str; + + async fn run(&self, ctx: &Context, metadata: &serde_json::Value) -> anyhow::Result<()>; } #[test] fn jobs_defined() { + // This checks that we don't panic (during schedule parsing) and that all names are unique // Checks we don't panic here, mostly for the schedule parsing. - drop(jobs()); + let all_jobs = jobs(); + let mut all_job_names: Vec<_> = all_jobs.into_iter().map(|j| j.name().to_string()).collect(); + all_job_names.sort(); + let mut unique_all_job_names = all_job_names.clone(); + unique_all_job_names.sort(); + unique_all_job_names.dedup(); + assert_eq!(all_job_names, unique_all_job_names); + + // Also ensure that our defalt jobs are release jobs + let default_jobs = default_jobs(); + default_jobs + .iter() + .for_each(|j| assert!(all_job_names.contains(&j.name.to_string()))); } diff --git a/src/main.rs b/src/main.rs index 0a041385..83cf2c83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,9 @@ use tokio::{task, time}; use tower::{Service, ServiceExt}; use tracing as log; use tracing::Instrument; -use triagebot::jobs::{jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS}; +use triagebot::jobs::{ + default_jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS, +}; use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName}; async fn handle_agenda_request(req: String) -> anyhow::Result { @@ -320,7 +322,7 @@ fn spawn_job_scheduler() { loop { interval.tick().await; - db::schedule_jobs(&*pool.get().await, jobs()) + db::schedule_jobs(&*pool.get().await, default_jobs()) .await .context("database schedule jobs") .unwrap(); diff --git a/src/zulip.rs b/src/zulip.rs index 2a2ea3e4..a158db50 100644 --- a/src/zulip.rs +++ b/src/zulip.rs @@ -299,14 +299,14 @@ async fn execute_for_other_user( } #[derive(serde::Deserialize)] -struct MembersApiResponse { - members: Vec, +pub struct MembersApiResponse { + pub members: Vec, } #[derive(serde::Deserialize)] -struct Member { - email: String, - user_id: u64, +pub struct Member { + pub email: String, + pub user_id: u64, } #[derive(serde::Serialize)]