diff --git a/src/import.rs b/src/import.rs index 1a8881b..df069be 100644 --- a/src/import.rs +++ b/src/import.rs @@ -11,13 +11,16 @@ use rocket::http::Status; use rocket::serde::json::Json; use rocket::serde::{Deserialize, Serialize}; use rocket::tokio::io::AsyncReadExt; -use rocket::{post, Data}; +use rocket::{post, Data, State}; use rocket_db_pools::Connection; +use tokio::spawn; use tracing::{error, info}; use url::Url; +use uuid::Uuid; use crate::feed_utils::fetch_feed; use crate::feeds::Feed; +use crate::jobs::{JobStatus, SharedJobStore}; use crate::user::AuthenticatedUser; use crate::Db; @@ -26,7 +29,8 @@ use crate::Db; pub struct ImportResponse { success: bool, message: String, - feeds_imported: Option, + #[serde(skip_serializing_if = "Option::is_none")] + job_id: Option, } #[derive(Debug, Deserialize)] @@ -77,34 +81,51 @@ pub struct UploadForm<'f> { } /// Import feeds from an OPML file -#[post("/import/opml", data = "
")] +#[post("/import/opml", data = "")] pub async fn import_opml( - mut db: Connection, + db: Connection, user: AuthenticatedUser, - mut form: Form>, + file: Data<'_>, + job_store: &State, ) -> Result, Status> { - // Read the file contents - let mut file = form.file.open().await.map_err(|e| { - error!("Failed to open OPML file: {}", e); + // Limit file size to 1MB and read the raw bytes + let file_data = file.open(1.mebibytes()).into_bytes().await.map_err(|e| { + error!("Failed to read OPML file: {e}"); Status::BadRequest })?; - let mut bytes = Vec::new(); - file.read_to_end(&mut bytes).await.map_err(|e| { - error!("Failed to read OPML file: {}", e); - Status::BadRequest - })?; + if !file_data.is_complete() { + error!("OPML file too large"); + return Err(Status::PayloadTooLarge); + } - let cursor = Cursor::new(bytes.clone()); + let bytes = file_data.value; + + // Find the start of the actual XML content by looking for double newline + let content_start = bytes + .windows(4) + .position(|window| window == b"\r\n\r\n") + .map(|pos| pos + 4) + .unwrap_or(0); - // Parse OPML + // Find the end of the content by looking for the boundary + let content_end = bytes + .windows(2) + .rposition(|window| window == b"\r\n") + .unwrap_or(bytes.len()); + + // Extract just the XML content + let xml_content = &bytes[content_start..content_end]; + let cursor = Cursor::new(xml_content); + + // Parse OPML to validate format let opml: Opml = from_reader(cursor).map_err(|e| { - let preview = String::from_utf8_lossy(&bytes[..bytes.len().min(100)]); - error!("Failed to parse OPML: {}. File starts with: {}", e, preview); + let preview = String::from_utf8_lossy(&xml_content[..xml_content.len().min(100)]); + error!("Failed to parse OPML: {e}. File starts with: {preview}"); Status::UnprocessableEntity })?; - // Extract and validate feeds + // Extract feeds to import let mut feeds_to_import = Vec::new(); extract_feeds(&opml.body.outlines, String::new(), &mut feeds_to_import); @@ -112,19 +133,69 @@ pub async fn import_opml( return Ok(Json(ImportResponse { success: false, message: "No valid feeds found in OPML file".to_string(), - feeds_imported: Some(0), + job_id: None, })); } - // Import feeds + // Create a background job + let job_id = { + let mut store = job_store.write().await; + store.create_job("opml_import".to_string()) + }; + + // Launch background job + let job_store = (*job_store).clone(); + let user_id = user.user_id; + let feeds_len = feeds_to_import.len(); + spawn(async move { + import_feeds_job(feeds_to_import, db, user_id, job_id, job_store).await; + }); + + Ok(Json(ImportResponse { + success: true, + message: format!("OPML file validated. Importing {feeds_len} feeds in the background."), + job_id: Some(job_id), + })) +} + +async fn import_feeds_job( + feeds: Vec<(String, Option, Option)>, + mut db: Connection, + user_id: Uuid, + job_id: Uuid, + job_store: SharedJobStore, +) { + let total_feeds = feeds.len(); let mut imported_count = 0; - for (url, title, category) in feeds_to_import { - // Validate URL - if let Ok(parsed_url) = Url::parse(&url) { - // Try to fetch feed data + + // Update initial job status + { + let mut store = job_store.write().await; + store.update_job_status( + job_id, + JobStatus::InProgress { + completed: 0, + total: total_feeds, + }, + ); + } + + for (url, title, category) in feeds { + // Update progress + { + let mut store = job_store.write().await; + store.update_job_status( + job_id, + JobStatus::InProgress { + completed: imported_count, + total: total_feeds, + }, + ); + } + + if let Ok(parsed_url) = url::Url::parse(&url) { match fetch_feed(&parsed_url).await { Ok(feed_data) => { - // Use the feed title or the one from OPML let name = feed_data .title .map(|t| t.content) @@ -136,33 +207,32 @@ pub async fn import_opml( .unwrap_or_else(|| "".to_string()) }); - // Create and save the feed - let mut feed = Feed::new(name, parsed_url, user.user_id); + let mut feed = Feed::new(name, parsed_url, user_id); if let Some(cat) = category { feed.categorization = vec![cat]; } - if let Err(e) = feed.write_to_database(&mut **db).await { - error!("Failed to import feed {}: {}", url, e); - continue; + if feed.write_to_database(&mut **db).await.is_ok() { + imported_count += 1; } - - imported_count += 1; - info!("Imported feed: {}", feed.name); } - Err(_e) => { - error!("Failed to fetch feed {url}"); - continue; + Err(e) => { + error!("Failed to fetch or parse feed from {url}"); } } } } - Ok(Json(ImportResponse { - success: imported_count > 0, - message: format!("Successfully imported {} feeds", imported_count), - feeds_imported: Some(imported_count), - })) + // Update final job status + { + let mut store = job_store.write().await; + store.update_job_status( + job_id, + JobStatus::Completed { + success_count: imported_count, + }, + ); + } } fn extract_feeds( diff --git a/src/jobs.rs b/src/jobs.rs new file mode 100644 index 0000000..94e6f12 --- /dev/null +++ b/src/jobs.rs @@ -0,0 +1,59 @@ +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub enum JobStatus { + InProgress { completed: usize, total: usize }, + Completed { success_count: usize }, + Failed(String), +} + +#[derive(Debug, Clone)] +pub struct Job { + pub id: Uuid, + pub job_type: String, + pub status: JobStatus, +} + +#[derive(Debug, Default)] +pub struct JobStore { + jobs: HashMap, +} + +impl JobStore { + pub fn new() -> Self { + Self { + jobs: HashMap::new(), + } + } + + pub fn create_job(&mut self, job_type: String) -> Uuid { + let job_id = Uuid::new_v4(); + self.jobs.insert( + job_id, + Job { + id: job_id, + job_type, + status: JobStatus::InProgress { + completed: 0, + total: 0, + }, + }, + ); + job_id + } + + pub fn update_job_status(&mut self, job_id: Uuid, status: JobStatus) { + if let Some(job) = self.jobs.get_mut(&job_id) { + job.status = status; + } + } + + pub fn get_job_status(&self, job_id: Uuid) -> Option { + self.jobs.get(&job_id).map(|job| job.status.clone()) + } +} + +pub type SharedJobStore = Arc>; diff --git a/src/main.rs b/src/main.rs index 99d8aaf..9292065 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod demo; mod feed_utils; mod feeds; mod import; +mod jobs; mod poll; mod poll_utils; mod session_store; @@ -21,8 +22,12 @@ use rocket::{Build, Rocket, State}; use rocket_db_pools::{sqlx, Connection, Database}; use rocket_dyn_templates::{context, Template}; use session_store::SessionStore; +use std::sync::Arc; +use tokio::sync::RwLock; use user::AuthenticatedUser; +use crate::jobs::{JobStore, SharedJobStore}; + /// RSS Reader application #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -148,6 +153,8 @@ fn rocket() -> _ { std::env::var("SECRET_KEY").expect("SECRET_KEY environment variable must be set"), )); + let job_store: SharedJobStore = Arc::new(RwLock::new(JobStore::new())); + rocket::custom(figment) .mount( "/", @@ -177,6 +184,7 @@ fn rocket() -> _ { .attach(Db::init()) .manage(args.demo) .manage(SessionStore::new()) + .manage(job_store) .attach(AdHoc::try_on_ignite("DB Setup", move |rocket| async move { setup_database(args.demo, rocket).await })) diff --git a/static/js/app.js b/static/js/app.js index f04e896..953f20f 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -78,11 +78,10 @@ document.addEventListener('DOMContentLoaded', function() { if (response.ok) { const result = await response.json(); if (result.success) { - showError('OPML import successful: ' + result.message); - // Refresh the feed list if feeds were imported - if (result.feeds_imported) { - handleFeeds(); - } + showError(result.message); + // TODO: Poll job status endpoint when implemented + // For now, just refresh the feed list after a delay + setTimeout(handleFeeds, 5000); } else { showError('OPML import failed: ' + result.message); }