From 08994856c3ef04d972e1e71025d20d838975d844 Mon Sep 17 00:00:00 2001 From: Greg Shuflin Date: Wed, 5 Feb 2025 21:36:42 -0800 Subject: [PATCH] write demo feeds to db --- src/demo.rs | 11 +++++++---- src/poll.rs | 4 ++-- src/poll_utils.rs | 13 ++++++++----- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/demo.rs b/src/demo.rs index 8143c37..c08b504 100644 --- a/src/demo.rs +++ b/src/demo.rs @@ -1,5 +1,5 @@ use crate::feeds::Feed; -use crate::poll_utils::fetch_new_entries; +use crate::poll_utils::{fetch_new_entries, update_entry_db}; use crate::user::User; use futures::future::join_all; use tracing::{info, warn}; @@ -89,15 +89,16 @@ pub async fn setup_demo_data(pool: &sqlx::SqlitePool) { .iter() .map(|feed| { let url = feed.url.clone(); + let feed_id = feed.feed_id; async move { let result = fetch_new_entries(&url).await; - (url, result) + (feed_id, url, result) } }) .collect(); let results = join_all(fetch_futures).await; - for (url, result) in results { + for (feed_id, url, result) in results { match result { Ok(entries) => { info!( @@ -105,7 +106,9 @@ pub async fn setup_demo_data(pool: &sqlx::SqlitePool) { "Successfully fetched {} entries", entries.len() ); - // TODO: Store entries in database + if let Err(e) = update_entry_db(&entries, &feed_id, &*pool).await { + warn!(error=%e, feed_url=url.as_str(), "Failed to store entries in database"); + } } Err(e) => { warn!(error=%e, feed_url=url.as_str(), "Error populating feed"); diff --git a/src/poll.rs b/src/poll.rs index 97aaf9d..9dfa416 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -107,11 +107,11 @@ pub async fn poll_feed( info!("Feed {} last checked: {}", feed_id, last_checked); let entries = if last_checked < POLLING_INTERVAL { info!("Reading entries from database for feed {}", feed_id); - read_entries(&feed_id, &mut db).await? + read_entries(&feed_id, &mut **db).await? } else { info!("Fetching new entries for feed {}", feed_id); let entries = crate::poll_utils::fetch_new_entries(&url).await?; - update_entry_db(&entries, &feed_id, &mut db).await?; + update_entry_db(&entries, &feed_id, &mut **db).await?; entries }; diff --git a/src/poll_utils.rs b/src/poll_utils.rs index fb5ed3f..1763dd2 100644 --- a/src/poll_utils.rs +++ b/src/poll_utils.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use feed_rs::model::Text; use rocket::http::Status; use rocket::serde::{json, uuid::Uuid, Serialize}; -use sqlx::{Acquire, SqliteConnection}; +use sqlx::{Acquire, Executor}; use url::Url; #[derive(Debug, Serialize)] @@ -50,13 +50,16 @@ pub async fn fetch_new_entries(url: &Url) -> Result, Status> { Ok(entries) } -pub async fn update_entry_db( +pub async fn update_entry_db<'a, E>( entries: &Vec, feed_id: &Uuid, - db: &mut SqliteConnection, -) -> Result<(), Status> { + executor: E, +) -> Result<(), Status> +where + E: Executor<'a, Database = sqlx::Sqlite> + Acquire<'a, Database = sqlx::Sqlite>, +{ // Start a transaction for batch update - let mut tx = db.begin().await.map_err(|e| { + let mut tx = executor.begin().await.map_err(|e| { error!("Failed to start transaction: {}", e); Status::InternalServerError })?;