Skip to content

Commit

Permalink
ensure scraper is atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Jan 28, 2025
1 parent b99184f commit e4d6d4d
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 89 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ slippy-map-tiles = "0.16.0"
geo-buffer = "0.2.0"
actix-cors = "0.7.0"
finite = "0.0.1"
scraper = "0.22.0"
scraper = { version = "0.22.0", features = ["atomic"] }
html-escape = "0.2.13"
[[bin]]
name = "maple"
Expand Down
39 changes: 19 additions & 20 deletions src/aspen/import_alpenrose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,36 +717,35 @@ pub async fn new_rt_data(
if let Some(alert) = metrolink_alert_entity.alert {
let alert: AspenisedAlert = alert.into();

for informed_entity in alert.informed_entity.iter() {
if let Some(route_id) = &informed_entity.route_id {
impacted_route_id_to_alert_ids
.entry(route_id.clone())
.and_modify(|x| x.push(alert_id.clone()))
.or_insert(vec![alert_id.clone()]);
}

if let Some(trip) = &informed_entity.trip {
if let Some(trip_id) = &trip.trip_id {
impact_trip_id_to_alert_ids
.entry(trip_id.clone())
.and_modify(|x| x.push(alert_id.clone()))
.or_insert(vec![alert_id.clone()]);
}

if let Some(route_id) = &trip.route_id {
for informed_entity in alert.informed_entity.iter() {
if let Some(route_id) = &informed_entity.route_id {
impacted_route_id_to_alert_ids
.entry(route_id.clone())
.and_modify(|x| x.push(alert_id.clone()))
.or_insert(vec![alert_id.clone()]);
}

if let Some(trip) = &informed_entity.trip {
if let Some(trip_id) = &trip.trip_id {
impact_trip_id_to_alert_ids
.entry(trip_id.clone())
.and_modify(|x| x.push(alert_id.clone()))
.or_insert(vec![alert_id.clone()]);
}

if let Some(route_id) = &trip.route_id {
impacted_route_id_to_alert_ids
.entry(route_id.clone())
.and_modify(|x| x.push(alert_id.clone()))
.or_insert(vec![alert_id.clone()]);
}
}
}
}

alerts.insert(alert_id.clone(), alert);
alerts.insert(alert_id.clone(), alert);
}
}
}

}
}
}
Expand Down
188 changes: 121 additions & 67 deletions src/custom_alerts/metrolink_alerts.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use chrono::{TimeZone, Utc};
use chrono_tz::US::Pacific;
use itertools::Itertools;
use scraper::Html;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use itertools::Itertools;
use scraper::Html;
use chrono::{TimeZone, Utc};
use chrono_tz::US::Pacific;
use std::error::Error;
use std::io::ErrorKind;

pub type RawMetrolinkAlerts = Vec<RawMetrolinkEachRoute>;

Expand Down Expand Up @@ -66,7 +68,7 @@ pub struct RawAlertDetailsPage {
end_date: Option<chrono::NaiveDate>,
route_ids: Vec<String>,
url: String,
id: String
id: String,
}

fn website_name_to_route_id(route_name: &str) -> Option<&str> {
Expand All @@ -85,7 +87,9 @@ fn website_name_to_route_id(route_name: &str) -> Option<&str> {
fn date_string_to_chrono_naive(date_string: &str) -> Option<chrono::NaiveDate> {
//Remove anything matching "Starting Date: " and "Ending Date: "

let date_string = date_string.replace("Starting Date: ", "").replace("Ending Date: ", "");
let date_string = date_string
.replace("Starting Date: ", "")
.replace("Ending Date: ", "");

//parse the date string

Expand All @@ -94,13 +98,10 @@ fn date_string_to_chrono_naive(date_string: &str) -> Option<chrono::NaiveDate> {
date
}

pub async fn fetch_alert_page_data(url: &str) -> anyhow::Result<RawAlertDetailsPage> {


let raw_html = reqwest::get(url)
.await?
.text()
.await?;
pub async fn fetch_alert_page_data(
url: &str,
) -> Result<RawAlertDetailsPage, Box<dyn Error + Send + Sync>> {
let raw_html = reqwest::get(url).await?.text().await?;

let document = Html::parse_document(&raw_html);

Expand All @@ -116,45 +117,81 @@ pub async fn fetch_alert_page_data(url: &str) -> anyhow::Result<RawAlertDetailsP

let header_selector = scraper::Selector::parse(".h-L.alertsDetail__description").unwrap();
//pick first element or return error
let header = document.select(&header_selector).next().ok_or(anyhow::anyhow!("Header not found"))?.text().collect::<String>();
let header = document
.select(&header_selector)
.next()
.ok_or_else(|| std::io::Error::new(ErrorKind::NotFound, "Header not found"))?
.text()
.collect::<String>();

let description_selector = scraper::Selector::parse(".alertsDetail__details").unwrap();
let description = document.select(&description_selector).next().ok_or(anyhow::anyhow!("Description not found"))?.text().collect::<String>().replace(" For real-time updates, follow us on Facebook and Twitter (X).", "");
let description = document
.select(&description_selector)
.next()
.ok_or_else(|| std::io::Error::new(ErrorKind::NotFound, "Description not found"))?
.text()
.collect::<String>()
.replace(
" For real-time updates, follow us on Facebook and Twitter (X).",
"",
);

let start_date_selector = scraper::Selector::parse(".alertsDetail__date--start").unwrap();
//pick first optionally
let start_date = document.select(&start_date_selector).next().map(|x| x.text().collect::<String>());
let start_date = start_date.as_deref().map(date_string_to_chrono_naive).flatten();
let start_date = document
.select(&start_date_selector)
.next()
.map(|x| x.text().collect::<String>());
let start_date = start_date
.as_deref()
.map(date_string_to_chrono_naive)
.flatten();

let end_date_selector = scraper::Selector::parse(".alertsDetail__date--end").unwrap();
//pick first optionally
let end_date = document.select(&end_date_selector).next().map(|x| x.text().collect::<String>());
let end_date = end_date.as_deref().map(date_string_to_chrono_naive).flatten();
let end_date = document
.select(&end_date_selector)
.next()
.map(|x| x.text().collect::<String>());
let end_date = end_date
.as_deref()
.map(date_string_to_chrono_naive)
.flatten();

let route_selector = scraper::Selector::parse(".alertsDetail__line-station").unwrap();
let route_ids = document.select(&route_selector).map(|x| x.text().collect::<String>()).map(|x| website_name_to_route_id(&x).map(|x| x.to_string())).flatten().collect::<Vec<String>>();
let route_ids = document
.select(&route_selector)
.map(|x| x.text().collect::<String>())
.map(|x| website_name_to_route_id(&x).map(|x| x.to_string()))
.flatten()
.collect::<Vec<String>>();

Ok(RawAlertDetailsPage {
header,
description,
start_date,
end_date,
route_ids,
id: url.replace("https://metrolinktrains.com/news/alert-details-page/?alertId=", "").to_string(),
id: url
.replace(
"https://metrolinktrains.com/news/alert-details-page/?alertId=",
"",
)
.to_string(),
url: url.to_string(),
})

}

pub async fn fetch_alerts_from_root_metrolink() -> anyhow::Result<Vec<RawAlertDetailsPage>> {
//query the root page for text
pub async fn fetch_alerts_from_root_metrolink(
) -> Result<Vec<RawAlertDetailsPage>, Box<dyn Error + Send + Sync>> {
//query the root page for text

//https://metrolinktrains.com/

let main_page = reqwest::get("https://metrolinktrains.com/")
.await?
.text()
.await?;
.await?
.text()
.await?;

let document = Html::parse_document(&main_page);

Expand All @@ -170,12 +207,13 @@ pub async fn fetch_alerts_from_root_metrolink() -> anyhow::Result<Vec<RawAlertDe

for alert in alerts {
if let Some(href) = alert.value().attr("href") {

hrefs.push(format!("https://metrolinktrains.com{}", href));
hrefs.push(format!("https://metrolinktrains.com{}", href));
}
}

// println!("alerts list: {:?}", hrefs);
drop(document);

// println!("alerts list: {:?}", hrefs);

// go to each webpage and get the information about the alerts

Expand All @@ -191,7 +229,7 @@ pub async fn fetch_alerts_from_root_metrolink() -> anyhow::Result<Vec<RawAlertDe
Ok(alert_details)
}

pub async fn determine_cause(header:&str, desc: &str) -> i32 {
pub async fn determine_cause(header: &str, desc: &str) -> i32 {
let header_lower = header.to_lowercase();

if header_lower.contains("construction") {
Expand Down Expand Up @@ -221,8 +259,10 @@ pub async fn determine_cause(header:&str, desc: &str) -> i32 {
return 2;
}

pub async fn determine_effect(header:&str, desc: &str) -> i32 {
if header.to_lowercase().contains("no train service") || header.to_lowercase().contains("no service") {
pub async fn determine_effect(header: &str, desc: &str) -> i32 {
if header.to_lowercase().contains("no train service")
|| header.to_lowercase().contains("no service")
{
return 1;
}

Expand All @@ -238,19 +278,22 @@ pub fn make_en_translated_string(text: String) -> gtfs_realtime::TranslatedStrin
translation: vec![gtfs_realtime::translated_string::Translation {
text,
language: Some("en".to_string()),
}]
}],
}
}

pub async fn gtfs_rt_alerts_from_metrolink_website() -> anyhow::Result<Vec<gtfs_realtime::FeedEntity>> {
pub async fn gtfs_rt_alerts_from_metrolink_website(
) -> Result<Vec<gtfs_realtime::FeedEntity>, Box<dyn Error + Send + Sync>> {
let mut entities = vec![];

let raw_from_homepage = fetch_alerts_from_root_metrolink().await?;

//conversion to gtfs rt
for raw_homepage_alert in raw_from_homepage {
let cause = determine_cause(&raw_homepage_alert.header, &raw_homepage_alert.description).await;
let effect = determine_effect(&raw_homepage_alert.header, &raw_homepage_alert.description).await;
let cause =
determine_cause(&raw_homepage_alert.header, &raw_homepage_alert.description).await;
let effect =
determine_effect(&raw_homepage_alert.header, &raw_homepage_alert.description).await;

let mut route_ids = vec![];

Expand All @@ -264,9 +307,17 @@ pub async fn gtfs_rt_alerts_from_metrolink_website() -> anyhow::Result<Vec<gtfs_

//convert start_date to 00:00:00 and end_date to 23:59:59, both in Los Angeles time

let start_dt = start_date.map(|x| x.and_hms_opt(0, 0, 0)).flatten().map(|x| Pacific.from_local_datetime(&x).single()).flatten();
let start_dt = start_date
.map(|x| x.and_hms_opt(0, 0, 0))
.flatten()
.map(|x| Pacific.from_local_datetime(&x).single())
.flatten();

let end_dt = end_date.map(|x| x.and_hms_opt(23, 59, 59)).flatten().map(|x| Pacific.from_local_datetime(&x).single()).flatten();
let end_dt = end_date
.map(|x| x.and_hms_opt(23, 59, 59))
.flatten()
.map(|x| Pacific.from_local_datetime(&x).single())
.flatten();

//make timestamps for both start and end

Expand All @@ -280,43 +331,47 @@ pub async fn gtfs_rt_alerts_from_metrolink_website() -> anyhow::Result<Vec<gtfs_
cause: Some(cause),
effect: Some(effect),
url: Some(make_en_translated_string(raw_homepage_alert.url)),
header_text: Some(make_en_translated_string(raw_homepage_alert.header)),
description_text: Some(make_en_translated_string(raw_homepage_alert.description)),
active_period: vec![gtfs_realtime::TimeRange {
start: start_timestamp,
end: end_timestamp,
}],
informed_entity: route_ids.into_iter().map(|route_id| gtfs_realtime::EntitySelector {
route_id: Some(route_id),
..Default::default()
}).collect(),
..Default::default()
}),
..Default::default()
header_text: Some(make_en_translated_string(raw_homepage_alert.header)),
description_text: Some(make_en_translated_string(raw_homepage_alert.description)),
active_period: vec![gtfs_realtime::TimeRange {
start: start_timestamp,
end: end_timestamp,
}],
informed_entity: route_ids
.into_iter()
.map(|route_id| gtfs_realtime::EntitySelector {
route_id: Some(route_id),
..Default::default()
})
.collect(),
..Default::default()
}),
..Default::default()
});

}

let body_of_alerts = reqwest::get(METROLINK_ALERTS_URL)
.await?
.text()
.await?;

let body_of_alerts = reqwest::get(METROLINK_ALERTS_URL).await?.text().await?;

//individual advisories

let raw_data:RawMetrolinkAlerts = serde_json::from_str(&body_of_alerts)?;
let raw_data: RawMetrolinkAlerts = serde_json::from_str(&body_of_alerts)?;

let raw_data = raw_data.into_iter().map(|each_line_alerts|

{
let raw_data = raw_data
.into_iter()
.map(|each_line_alerts| {
let mut each_line_alerts = each_line_alerts;

let service_advisories = each_line_alerts.service_advisories.into_iter().filter(|x| x.message.as_str() != GREETING).collect_vec();
let service_advisories = each_line_alerts
.service_advisories
.into_iter()
.filter(|x| x.message.as_str() != GREETING)
.collect_vec();

each_line_alerts.service_advisories = service_advisories;

each_line_alerts
}).collect_vec();
})
.collect_vec();

//println!("{:#?}", raw_data);

Expand All @@ -335,12 +390,11 @@ mod tests {
#[tokio::test]
async fn metrolink_alerts_fetch() {
//fetch json from url

let test = gtfs_rt_alerts_from_metrolink_website().await;

assert!(test.is_ok());

println!("{:#?}", test.unwrap());

}
}
}
2 changes: 1 addition & 1 deletion src/custom_alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub mod metrolink_alerts;
pub mod metrolink_alerts;

0 comments on commit e4d6d4d

Please sign in to comment.