diff --git a/src/poll.rs b/src/poll.rs index def2eb7..7e987cd 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,4 +1,4 @@ -use crate::poll_utils::Entry; +use crate::poll_utils::{update_entry_db, Entry}; use crate::user::AuthenticatedUser; use crate::Db; use chrono::{Duration, Utc}; @@ -6,7 +6,7 @@ use rocket::http::Status; use rocket::serde::uuid::Uuid; use rocket::serde::{self, json::Json, Deserialize, Serialize}; use rocket_db_pools::Connection; -use sqlx::{Acquire, SqliteConnection}; +use sqlx::SqliteConnection; use tracing::{error, info}; const POLLING_INTERVAL: Duration = Duration::minutes(20); @@ -26,90 +26,6 @@ pub struct EntryStateUpdate { read: Option, } -async fn update_entry_db( - entries: &Vec, - feed_id: &str, - db: &mut SqliteConnection, -) -> Result<(), Status> { - // Start a transaction for batch update - let mut tx = db.begin().await.map_err(|e| { - error!("Failed to start transaction: {}", e); - Status::InternalServerError - })?; - - let now = Utc::now().to_rfc3339(); - - // Update the feed's last_checked_time - sqlx::query!( - "UPDATE feeds SET last_checked_time = ? WHERE feed_id = ?", - now, - feed_id - ) - .execute(&mut *tx) - .await - .map_err(|e| { - error!("Failed to update feed last_checked_time: {}", e); - Status::InternalServerError - })?; - - for entry in entries { - let content_json = if let Some(content) = &entry.content { - serde::json::to_string(content).ok() - } else { - None - }; - - let published = entry.published.map(|dt| dt.to_rfc3339()); - let updated = entry.updated.map(|dt| dt.to_rfc3339()); - - let local_id = entry.local_id.to_string(); - - let result = sqlx::query!( - r#" - INSERT INTO feed_entries ( - id, feed_id, local_id, title, published, updated, summary, content, link, created_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (feed_id, id) DO UPDATE SET - title = excluded.title, - published = excluded.published, - updated = excluded.updated, - summary = excluded.summary, - content = excluded.content, - link = excluded.link - "#, - entry.id, - feed_id, - local_id, - entry.title, - published, - updated, - entry.summary, - content_json, - entry.link, - now - ) - .execute(&mut *tx) - .await; - - if let Err(e) = result { - error!("Failed to save feed entry: {}", e); - tx.rollback().await.map_err(|e| { - error!("Failed to rollback transaction: {}", e); - Status::InternalServerError - })?; - return Err(Status::InternalServerError); - } - } - - // Commit the transaction - tx.commit().await.map_err(|e| { - error!("Failed to commit transaction: {}", e); - Status::InternalServerError - })?; - - Ok(()) -} - async fn read_entries(feed_id: &str, db: &mut SqliteConnection) -> Result, Status> { let rows = sqlx::query!( r#" diff --git a/src/poll_utils.rs b/src/poll_utils.rs index a4b2bfa..2ebc1da 100644 --- a/src/poll_utils.rs +++ b/src/poll_utils.rs @@ -2,8 +2,8 @@ use crate::feed_utils::fetch_feed; use chrono::{DateTime, Utc}; use feed_rs::model::Text; use rocket::http::Status; -use rocket::serde::uuid::Uuid; -use rocket::serde::Serialize; +use rocket::serde::{json, uuid::Uuid, Serialize}; +use sqlx::{Acquire, SqliteConnection}; use url::Url; #[derive(Debug, Serialize)] @@ -50,3 +50,87 @@ pub async fn fetch_new_entries(url: &Url) -> Result, Status> { .collect(); Ok(entries) } + +pub async fn update_entry_db( + entries: &Vec, + feed_id: &str, + db: &mut SqliteConnection, +) -> Result<(), Status> { + // Start a transaction for batch update + let mut tx = db.begin().await.map_err(|e| { + error!("Failed to start transaction: {}", e); + Status::InternalServerError + })?; + + let now = Utc::now().to_rfc3339(); + + // Update the feed's last_checked_time + sqlx::query!( + "UPDATE feeds SET last_checked_time = ? WHERE feed_id = ?", + now, + feed_id + ) + .execute(&mut *tx) + .await + .map_err(|e| { + error!("Failed to update feed last_checked_time: {}", e); + Status::InternalServerError + })?; + + for entry in entries { + let content_json = if let Some(content) = &entry.content { + json::to_string(content).ok() + } else { + None + }; + + let published = entry.published.map(|dt| dt.to_rfc3339()); + let updated = entry.updated.map(|dt| dt.to_rfc3339()); + + let local_id = entry.local_id.to_string(); + + let result = sqlx::query!( + r#" + INSERT INTO feed_entries ( + id, feed_id, local_id, title, published, updated, summary, content, link, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (feed_id, id) DO UPDATE SET + title = excluded.title, + published = excluded.published, + updated = excluded.updated, + summary = excluded.summary, + content = excluded.content, + link = excluded.link + "#, + entry.id, + feed_id, + local_id, + entry.title, + published, + updated, + entry.summary, + content_json, + entry.link, + now + ) + .execute(&mut *tx) + .await; + + if let Err(e) = result { + error!("Failed to save feed entry: {}", e); + tx.rollback().await.map_err(|e| { + error!("Failed to rollback transaction: {}", e); + Status::InternalServerError + })?; + return Err(Status::InternalServerError); + } + } + + // Commit the transaction + tx.commit().await.map_err(|e| { + error!("Failed to commit transaction: {}", e); + Status::InternalServerError + })?; + + Ok(()) +}