Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some scheduling cleanup and automatic types planning stream opening #1742

Merged
merged 6 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::handlers::jobs::handle_job;
use crate::{db::jobs::*, handlers::Context};
use crate::{db::jobs::*, handlers::Context, jobs::jobs};
use anyhow::Context as _;
use chrono::Utc;
use native_tls::{Certificate, TlsConnector};
Expand Down Expand Up @@ -188,16 +187,32 @@ pub async fn schedule_jobs(db: &DbClient, jobs: Vec<JobSchedule>) -> anyhow::Res
let mut upcoming = job.schedule.upcoming(Utc).take(1);

if let Some(scheduled_at) = upcoming.next() {
if let Err(_) = get_job_by_name_and_scheduled_at(&db, &job.name, &scheduled_at).await {
// mean there's no job already in the db with that name and scheduled_at
insert_job(&db, &job.name, &scheduled_at, &job.metadata).await?;
}
schedule_job(db, job.name, job.metadata, scheduled_at).await?;
}
}

Ok(())
}

pub async fn schedule_job(
db: &DbClient,
job_name: &str,
job_metadata: serde_json::Value,
when: chrono::DateTime<Utc>,
) -> anyhow::Result<()> {
let all_jobs = jobs();
if !all_jobs.iter().any(|j| j.name() == job_name) {
anyhow::bail!("Job {} does not exist in the current job list.", job_name);
}

if let Err(_) = get_job_by_name_and_scheduled_at(&db, job_name, &when).await {
// mean there's no job already in the db with that name and scheduled_at
insert_job(&db, job_name, &when, &job_metadata).await?;
}

Ok(())
}

pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<()> {
let jobs = get_jobs_to_execute(&db).await.unwrap();
tracing::trace!("jobs to execute: {:#?}", jobs);
Expand All @@ -220,6 +235,26 @@ pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<
Ok(())
}

// Try to handle a specific job
async fn handle_job(
ctx: &Context,
name: &String,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
for job in jobs() {
if &job.name() == &name {
return job.run(ctx, metadata).await;
}
}
tracing::trace!(
"handle_job fell into default case: (name={:?}, metadata={:?})",
name,
metadata
);

Ok(())
}

static MIGRATIONS: &[&str] = &[
"
CREATE TABLE notifications (
Expand Down
6 changes: 3 additions & 3 deletions src/db/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio_postgres::Client as DbClient;
use uuid::Uuid;

pub struct JobSchedule {
pub name: String,
pub name: &'static str,
pub schedule: Schedule,
pub metadata: serde_json::Value,
}
Expand All @@ -24,7 +24,7 @@ pub struct Job {

pub async fn insert_job(
db: &DbClient,
name: &String,
name: &str,
scheduled_at: &DateTime<Utc>,
metadata: &serde_json::Value,
) -> Result<()> {
Expand Down Expand Up @@ -76,7 +76,7 @@ pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {

pub async fn get_job_by_name_and_scheduled_at(
db: &DbClient,
name: &String,
name: &str,
scheduled_at: &DateTime<Utc>,
) -> Result<Job> {
tracing::trace!(
Expand Down
15 changes: 15 additions & 0 deletions src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,21 @@ impl Issue {
.await?)
}

// returns an array of one element
pub async fn get_first100_comments(
&self,
client: &GithubClient,
) -> anyhow::Result<Vec<Comment>> {
let comment_url = format!(
"{}/issues/{}/comments?page=1&per_page=100",
self.repository().url(),
self.number,
);
Ok(client
.json::<Vec<Comment>>(client.get(&comment_url))
.await?)
}

pub async fn edit_body(&self, client: &GithubClient, body: &str) -> anyhow::Result<()> {
let edit_url = format!("{}/issues/{}", self.repository().url(), self.number);
#[derive(serde::Serialize)]
Expand Down
2 changes: 1 addition & 1 deletion src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ mod close;
pub mod docs_update;
mod github_releases;
mod glacier;
pub mod jobs;
mod major_change;
mod mentions;
mod milestone_prs;
Expand All @@ -46,6 +45,7 @@ mod review_submitted;
mod rfc_helper;
pub mod rustc_commits;
mod shortcut;
pub mod types_planning_updates;

pub async fn handle(ctx: &Context, event: &Event) -> Vec<HandlerError> {
let config = config::get(&ctx.github, event.repo()).await;
Expand Down
67 changes: 35 additions & 32 deletions src/handlers/docs_update.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! A scheduled job to post a PR to update the documentation on rust-lang/rust.

use crate::db::jobs::JobSchedule;
use crate::github::{self, GitTreeEntry, GithubClient, Issue, Repository};
use crate::jobs::Job;
use anyhow::Context;
use anyhow::Result;
use cron::Schedule;
use async_trait::async_trait;
use reqwest::Client;
use std::fmt::Write;
use std::str::FromStr;

/// This is the repository where the commits will be created.
const WORK_REPO: &str = "rustbot/rust";
Expand All @@ -28,38 +27,42 @@ const SUBMODULES: &[&str] = &[

const TITLE: &str = "Update books";

pub fn job() -> JobSchedule {
JobSchedule {
name: "docs_update".to_string(),
// Around 9am Pacific time on every Monday.
schedule: Schedule::from_str("0 00 17 * * Mon *").unwrap(),
metadata: serde_json::Value::Null,
}
}
pub struct DocsUpdateJob;

pub async fn handle_job() -> Result<()> {
// Only run every other week. Doing it every week can be a bit noisy, and
// (rarely) a PR can take longer than a week to merge (like if there are
// CI issues). `Schedule` does not allow expressing this, so check it
// manually.
//
// This is set to run the first week after a release, and the week just
// before a release. That allows getting the latest changes in the next
// release, accounting for possibly taking a few days for the PR to land.
let today = chrono::Utc::today().naive_utc();
let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
let duration = today.signed_duration_since(base);
let weeks = duration.num_weeks();
if weeks % 2 != 0 {
tracing::trace!("skipping job, this is an odd week");
return Ok(());
#[async_trait]
impl Job for DocsUpdateJob {
fn name(&self) -> &'static str {
"docs_update"
}

tracing::trace!("starting docs-update");
docs_update()
.await
.context("failed to process docs update")?;
Ok(())
async fn run(
&self,
_ctx: &super::Context,
_metadata: &serde_json::Value,
) -> anyhow::Result<()> {
// Only run every other week. Doing it every week can be a bit noisy, and
// (rarely) a PR can take longer than a week to merge (like if there are
// CI issues). `Schedule` does not allow expressing this, so check it
// manually.
//
// This is set to run the first week after a release, and the week just
// before a release. That allows getting the latest changes in the next
// release, accounting for possibly taking a few days for the PR to land.
let today = chrono::Utc::today().naive_utc();
let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
let duration = today.signed_duration_since(base);
let weeks = duration.num_weeks();
if weeks % 2 != 0 {
tracing::trace!("skipping job, this is an odd week");
return Ok(());
}

tracing::trace!("starting docs-update");
docs_update()
.await
.context("failed to process docs update")?;
Ok(())
}
}

pub async fn docs_update() -> Result<Option<Issue>> {
Expand Down
8 changes: 4 additions & 4 deletions src/handlers/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ use super::Context;

pub async fn handle_job(
ctx: &Context,
name: &String,
name: &str,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
match name.as_str() {
match name {
"docs_update" => super::docs_update::handle_job().await,
"rustc_commits" => {
super::rustc_commits::synchronize_commits_inner(ctx, None).await;
Ok(())
}
_ => default(&name, &metadata),
_ => default(name, &metadata),
}
}

fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
fn default(name: &str, metadata: &serde_json::Value) -> anyhow::Result<()> {
tracing::trace!(
"handle_job fell into default case: (name={:?}, metadata={:?})",
name,
Expand Down
22 changes: 13 additions & 9 deletions src/handlers/rustc_commits.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::db::jobs::JobSchedule;
use crate::db::rustc_commits;
use crate::db::rustc_commits::get_missing_commits;
use crate::jobs::Job;
use crate::{
github::{self, Event},
handlers::Context,
};
use cron::Schedule;
use async_trait::async_trait;
use std::collections::VecDeque;
use std::convert::TryInto;
use std::str::FromStr;
use tracing as log;

const BORS_GH_ID: i64 = 3372342;
Expand Down Expand Up @@ -153,12 +152,17 @@ pub async fn synchronize_commits_inner(ctx: &Context, starter: Option<(String, u
}
}

pub fn job() -> JobSchedule {
JobSchedule {
name: "rustc_commits".to_string(),
// Every 30 minutes...
schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
metadata: serde_json::Value::Null,
pub struct RustcCommitsJob;

#[async_trait]
impl Job for RustcCommitsJob {
fn name(&self) -> &'static str {
"rustc_commits"
}

async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
synchronize_commits_inner(ctx, None).await;
Ok(())
}
}

Expand Down
Loading