do feed load in background

This commit is contained in:
Greg Shuflin 2025-02-16 01:06:36 -08:00
parent 1946f56b43
commit 14aec7503e
4 changed files with 182 additions and 46 deletions

View File

@ -11,13 +11,16 @@ use rocket::http::Status;
use rocket::serde::json::Json; use rocket::serde::json::Json;
use rocket::serde::{Deserialize, Serialize}; use rocket::serde::{Deserialize, Serialize};
use rocket::tokio::io::AsyncReadExt; use rocket::tokio::io::AsyncReadExt;
use rocket::{post, Data}; use rocket::{post, Data, State};
use rocket_db_pools::Connection; use rocket_db_pools::Connection;
use tokio::spawn;
use tracing::{error, info}; use tracing::{error, info};
use url::Url; use url::Url;
use uuid::Uuid;
use crate::feed_utils::fetch_feed; use crate::feed_utils::fetch_feed;
use crate::feeds::Feed; use crate::feeds::Feed;
use crate::jobs::{JobStatus, SharedJobStore};
use crate::user::AuthenticatedUser; use crate::user::AuthenticatedUser;
use crate::Db; use crate::Db;
@ -26,7 +29,8 @@ use crate::Db;
pub struct ImportResponse { pub struct ImportResponse {
success: bool, success: bool,
message: String, message: String,
feeds_imported: Option<usize>, #[serde(skip_serializing_if = "Option::is_none")]
job_id: Option<Uuid>,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -77,34 +81,51 @@ pub struct UploadForm<'f> {
} }
/// Import feeds from an OPML file /// Import feeds from an OPML file
#[post("/import/opml", data = "<form>")] #[post("/import/opml", data = "<file>")]
pub async fn import_opml( pub async fn import_opml(
mut db: Connection<Db>, db: Connection<Db>,
user: AuthenticatedUser, user: AuthenticatedUser,
mut form: Form<UploadForm<'_>>, file: Data<'_>,
job_store: &State<SharedJobStore>,
) -> Result<Json<ImportResponse>, Status> { ) -> Result<Json<ImportResponse>, Status> {
// Read the file contents // Limit file size to 1MB and read the raw bytes
let mut file = form.file.open().await.map_err(|e| { let file_data = file.open(1.mebibytes()).into_bytes().await.map_err(|e| {
error!("Failed to open OPML file: {}", e); error!("Failed to read OPML file: {e}");
Status::BadRequest Status::BadRequest
})?; })?;
let mut bytes = Vec::new(); if !file_data.is_complete() {
file.read_to_end(&mut bytes).await.map_err(|e| { error!("OPML file too large");
error!("Failed to read OPML file: {}", e); return Err(Status::PayloadTooLarge);
Status::BadRequest }
})?;
let cursor = Cursor::new(bytes.clone()); let bytes = file_data.value;
// Parse OPML // Find the start of the actual XML content by looking for double newline
let content_start = bytes
.windows(4)
.position(|window| window == b"\r\n\r\n")
.map(|pos| pos + 4)
.unwrap_or(0);
// Find the end of the content by looking for the boundary
let content_end = bytes
.windows(2)
.rposition(|window| window == b"\r\n")
.unwrap_or(bytes.len());
// Extract just the XML content
let xml_content = &bytes[content_start..content_end];
let cursor = Cursor::new(xml_content);
// Parse OPML to validate format
let opml: Opml = from_reader(cursor).map_err(|e| { let opml: Opml = from_reader(cursor).map_err(|e| {
let preview = String::from_utf8_lossy(&bytes[..bytes.len().min(100)]); let preview = String::from_utf8_lossy(&xml_content[..xml_content.len().min(100)]);
error!("Failed to parse OPML: {}. File starts with: {}", e, preview); error!("Failed to parse OPML: {e}. File starts with: {preview}");
Status::UnprocessableEntity Status::UnprocessableEntity
})?; })?;
// Extract and validate feeds // Extract feeds to import
let mut feeds_to_import = Vec::new(); let mut feeds_to_import = Vec::new();
extract_feeds(&opml.body.outlines, String::new(), &mut feeds_to_import); extract_feeds(&opml.body.outlines, String::new(), &mut feeds_to_import);
@ -112,19 +133,69 @@ pub async fn import_opml(
return Ok(Json(ImportResponse { return Ok(Json(ImportResponse {
success: false, success: false,
message: "No valid feeds found in OPML file".to_string(), message: "No valid feeds found in OPML file".to_string(),
feeds_imported: Some(0), job_id: None,
})); }));
} }
// Import feeds // Create a background job
let job_id = {
let mut store = job_store.write().await;
store.create_job("opml_import".to_string())
};
// Launch background job
let job_store = (*job_store).clone();
let user_id = user.user_id;
let feeds_len = feeds_to_import.len();
spawn(async move {
import_feeds_job(feeds_to_import, db, user_id, job_id, job_store).await;
});
Ok(Json(ImportResponse {
success: true,
message: format!("OPML file validated. Importing {feeds_len} feeds in the background."),
job_id: Some(job_id),
}))
}
async fn import_feeds_job(
feeds: Vec<(String, Option<String>, Option<String>)>,
mut db: Connection<Db>,
user_id: Uuid,
job_id: Uuid,
job_store: SharedJobStore,
) {
let total_feeds = feeds.len();
let mut imported_count = 0; let mut imported_count = 0;
for (url, title, category) in feeds_to_import {
// Validate URL // Update initial job status
if let Ok(parsed_url) = Url::parse(&url) { {
// Try to fetch feed data let mut store = job_store.write().await;
store.update_job_status(
job_id,
JobStatus::InProgress {
completed: 0,
total: total_feeds,
},
);
}
for (url, title, category) in feeds {
// Update progress
{
let mut store = job_store.write().await;
store.update_job_status(
job_id,
JobStatus::InProgress {
completed: imported_count,
total: total_feeds,
},
);
}
if let Ok(parsed_url) = url::Url::parse(&url) {
match fetch_feed(&parsed_url).await { match fetch_feed(&parsed_url).await {
Ok(feed_data) => { Ok(feed_data) => {
// Use the feed title or the one from OPML
let name = feed_data let name = feed_data
.title .title
.map(|t| t.content) .map(|t| t.content)
@ -136,33 +207,32 @@ pub async fn import_opml(
.unwrap_or_else(|| "<Unknown>".to_string()) .unwrap_or_else(|| "<Unknown>".to_string())
}); });
// Create and save the feed let mut feed = Feed::new(name, parsed_url, user_id);
let mut feed = Feed::new(name, parsed_url, user.user_id);
if let Some(cat) = category { if let Some(cat) = category {
feed.categorization = vec![cat]; feed.categorization = vec![cat];
} }
if let Err(e) = feed.write_to_database(&mut **db).await { if feed.write_to_database(&mut **db).await.is_ok() {
error!("Failed to import feed {}: {}", url, e);
continue;
}
imported_count += 1; imported_count += 1;
info!("Imported feed: {}", feed.name);
} }
Err(_e) => { }
error!("Failed to fetch feed {url}"); Err(e) => {
continue; error!("Failed to fetch or parse feed from {url}");
} }
} }
} }
} }
Ok(Json(ImportResponse { // Update final job status
success: imported_count > 0, {
message: format!("Successfully imported {} feeds", imported_count), let mut store = job_store.write().await;
feeds_imported: Some(imported_count), store.update_job_status(
})) job_id,
JobStatus::Completed {
success_count: imported_count,
},
);
}
} }
fn extract_feeds( fn extract_feeds(

59
src/jobs.rs Normal file
View File

@ -0,0 +1,59 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub enum JobStatus {
InProgress { completed: usize, total: usize },
Completed { success_count: usize },
Failed(String),
}
#[derive(Debug, Clone)]
pub struct Job {
pub id: Uuid,
pub job_type: String,
pub status: JobStatus,
}
#[derive(Debug, Default)]
pub struct JobStore {
jobs: HashMap<Uuid, Job>,
}
impl JobStore {
pub fn new() -> Self {
Self {
jobs: HashMap::new(),
}
}
pub fn create_job(&mut self, job_type: String) -> Uuid {
let job_id = Uuid::new_v4();
self.jobs.insert(
job_id,
Job {
id: job_id,
job_type,
status: JobStatus::InProgress {
completed: 0,
total: 0,
},
},
);
job_id
}
pub fn update_job_status(&mut self, job_id: Uuid, status: JobStatus) {
if let Some(job) = self.jobs.get_mut(&job_id) {
job.status = status;
}
}
pub fn get_job_status(&self, job_id: Uuid) -> Option<JobStatus> {
self.jobs.get(&job_id).map(|job| job.status.clone())
}
}
pub type SharedJobStore = Arc<RwLock<JobStore>>;

View File

@ -9,6 +9,7 @@ mod demo;
mod feed_utils; mod feed_utils;
mod feeds; mod feeds;
mod import; mod import;
mod jobs;
mod poll; mod poll;
mod poll_utils; mod poll_utils;
mod session_store; mod session_store;
@ -21,8 +22,12 @@ use rocket::{Build, Rocket, State};
use rocket_db_pools::{sqlx, Connection, Database}; use rocket_db_pools::{sqlx, Connection, Database};
use rocket_dyn_templates::{context, Template}; use rocket_dyn_templates::{context, Template};
use session_store::SessionStore; use session_store::SessionStore;
use std::sync::Arc;
use tokio::sync::RwLock;
use user::AuthenticatedUser; use user::AuthenticatedUser;
use crate::jobs::{JobStore, SharedJobStore};
/// RSS Reader application /// RSS Reader application
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
@ -148,6 +153,8 @@ fn rocket() -> _ {
std::env::var("SECRET_KEY").expect("SECRET_KEY environment variable must be set"), std::env::var("SECRET_KEY").expect("SECRET_KEY environment variable must be set"),
)); ));
let job_store: SharedJobStore = Arc::new(RwLock::new(JobStore::new()));
rocket::custom(figment) rocket::custom(figment)
.mount( .mount(
"/", "/",
@ -177,6 +184,7 @@ fn rocket() -> _ {
.attach(Db::init()) .attach(Db::init())
.manage(args.demo) .manage(args.demo)
.manage(SessionStore::new()) .manage(SessionStore::new())
.manage(job_store)
.attach(AdHoc::try_on_ignite("DB Setup", move |rocket| async move { .attach(AdHoc::try_on_ignite("DB Setup", move |rocket| async move {
setup_database(args.demo, rocket).await setup_database(args.demo, rocket).await
})) }))

View File

@ -78,11 +78,10 @@ document.addEventListener('DOMContentLoaded', function() {
if (response.ok) { if (response.ok) {
const result = await response.json(); const result = await response.json();
if (result.success) { if (result.success) {
showError('OPML import successful: ' + result.message); showError(result.message);
// Refresh the feed list if feeds were imported // TODO: Poll job status endpoint when implemented
if (result.feeds_imported) { // For now, just refresh the feed list after a delay
handleFeeds(); setTimeout(handleFeeds, 5000);
}
} else { } else {
showError('OPML import failed: ' + result.message); showError('OPML import failed: ' + result.message);
} }