Compare commits

...

3 Commits

Author SHA1 Message Date
Greg Shuflin
2fd1ade3ee Some tweaks 2025-02-04 15:07:46 -08:00
Greg Shuflin
6d58a7d70e entry db 2025-02-04 14:29:17 -08:00
Greg Shuflin
c756e206e4 Feeds + polling work 2025-02-04 14:05:35 -08:00
9 changed files with 126 additions and 33 deletions

View File

@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "SELECT url, last_checked_time as \"last_checked_time: chrono::DateTime<chrono::Utc>\" FROM feeds WHERE feed_id = ? AND user_id = ?",
"describe": {
"columns": [
{
"name": "url",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "last_checked_time: chrono::DateTime<chrono::Utc>",
"ordinal": 1,
"type_info": "Datetime"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
false
]
},
"hash": "55e409553723d53a32bd3fa0ae3e83976eb5d3ce1ab7ec2963d5e1a5525dfe6f"
}

View File

@ -1,20 +0,0 @@
{
"db_name": "SQLite",
"query": "SELECT url FROM feeds WHERE feed_id = ? AND user_id = ?",
"describe": {
"columns": [
{
"name": "url",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false
]
},
"hash": "5ca2526f1ec4a055bc36898ef13a7d830df848285f4f6d66bb104935844b7f2d"
}

View File

@ -0,0 +1,13 @@
CREATE TABLE feed_entries (
id TEXT PRIMARY KEY,
feed_id TEXT NOT NULL,
title TEXT NOT NULL,
published TIMESTAMP,
updated TIMESTAMP,
summary TEXT NOT NULL,
content TEXT,
link TEXT,
created_at TIMESTAMP NOT NULL,
FOREIGN KEY (feed_id) REFERENCES feeds(feed_id) ON DELETE CASCADE,
UNIQUE(feed_id, id)
);

View File

@ -32,7 +32,7 @@ const DEMO_FEEDS: [DemoFeed; 5] = [
name: "Astronomy Picture of the Day (APOD)", name: "Astronomy Picture of the Day (APOD)",
url: "https://apod.nasa.gov/apod.rss", url: "https://apod.nasa.gov/apod.rss",
category: None, category: None,
} },
]; ];
pub async fn setup_demo_data(pool: &sqlx::SqlitePool) { pub async fn setup_demo_data(pool: &sqlx::SqlitePool) {

View File

@ -5,6 +5,8 @@ use url::Url;
pub struct FeedError; pub struct FeedError;
pub async fn fetch_feed(url: &Url) -> Result<feed_rs::model::Feed, FeedError> { pub async fn fetch_feed(url: &Url) -> Result<feed_rs::model::Feed, FeedError> {
println!("Making a request to fetch feed `{url}`");
// Fetch the feed content // Fetch the feed content
let response = reqwest::get(url.as_ref()).await.map_err(|e| { let response = reqwest::get(url.as_ref()).await.map_err(|e| {
eprintln!("Failed to fetch feed: {}", e); eprintln!("Failed to fetch feed: {}", e);

View File

@ -31,7 +31,7 @@ impl Feed {
url, url,
user_id, user_id,
added_time: now, added_time: now,
last_checked_time: now, last_checked_time: chrono::DateTime::UNIX_EPOCH,
categorization: Vec::new(), categorization: Vec::new(),
} }
} }

View File

@ -1,11 +1,14 @@
use crate::user::AuthenticatedUser; use crate::user::AuthenticatedUser;
use crate::{feed_utils::fetch_feed, Db}; use crate::{feed_utils::fetch_feed, Db};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Duration, Utc};
use feed_rs::model::Text; use feed_rs::model::Text;
use rocket::http::Status; use rocket::http::Status;
use rocket::serde::uuid::Uuid; use rocket::serde::uuid::Uuid;
use rocket::serde::{json::Json, Serialize}; use rocket::serde::{self, json::Json, Serialize};
use rocket_db_pools::Connection; use rocket_db_pools::Connection;
use sqlx::{Acquire, SqliteConnection};
const POLLING_INTERVAL: Duration = Duration::minutes(20);
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[serde(crate = "rocket::serde")] #[serde(crate = "rocket::serde")]
@ -26,6 +29,66 @@ struct Entry {
link: Option<String>, link: Option<String>,
} }
async fn update_entry_db(entries: &Vec<Entry>, feed_id: &str, db: &mut SqliteConnection) -> Result<(), Status> {
// Start a transaction for batch update
let mut tx = db.begin().await.map_err(|e| {
eprintln!("Failed to start transaction: {}", e);
Status::InternalServerError
})?;
let now = Utc::now().to_rfc3339();
for entry in entries {
let content_json = if let Some(content) = &entry.content {
serde::json::to_string(content).ok()
} else {
None
};
let result = sqlx::query(
r#"
INSERT INTO feed_entries (
id, feed_id, title, published, updated, summary, content, link, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (feed_id, id) DO UPDATE SET
title = excluded.title,
published = excluded.published,
updated = excluded.updated,
summary = excluded.summary,
content = excluded.content,
link = excluded.link
"#,
)
.bind(&entry.id)
.bind(feed_id)
.bind(&entry.title)
.bind(entry.published.map(|dt| dt.to_rfc3339()))
.bind(entry.updated.map(|dt| dt.to_rfc3339()))
.bind(&entry.summary)
.bind(content_json)
.bind(&entry.link)
.bind(&now)
.execute(&mut *tx)
.await;
if let Err(e) = result {
eprintln!("Failed to save feed entry: {}", e);
tx.rollback().await.map_err(|e| {
eprintln!("Failed to rollback transaction: {}", e);
Status::InternalServerError
})?;
return Err(Status::InternalServerError);
}
}
// Commit the transaction
tx.commit().await.map_err(|e| {
eprintln!("Failed to commit transaction: {}", e);
Status::InternalServerError
})?;
Ok(())
}
#[post("/poll/<feed_id>")] #[post("/poll/<feed_id>")]
pub async fn poll_feed( pub async fn poll_feed(
mut db: Connection<Db>, mut db: Connection<Db>,
@ -35,19 +98,26 @@ pub async fn poll_feed(
let feed_id = feed_id.to_string(); let feed_id = feed_id.to_string();
let user_id = user.user_id.to_string(); let user_id = user.user_id.to_string();
// Get the feed URL from the database, ensuring it belongs to the authenticated user // Get the feed URL from the database, ensuring it belongs to the authenticated user
let feed_url = sqlx::query!( let feed = sqlx::query!(
"SELECT url FROM feeds WHERE feed_id = ? AND user_id = ?", r#"SELECT url, last_checked_time as "last_checked_time: chrono::DateTime<chrono::Utc>" FROM feeds WHERE feed_id = ? AND user_id = ?"#,
feed_id, feed_id,
user_id user_id
) )
.fetch_optional(&mut **db) .fetch_optional(&mut **db)
.await .await
.map_err(|_| Status::InternalServerError)? .map_err(|_| Status::InternalServerError)?
.ok_or(Status::NotFound)? .ok_or(Status::NotFound)?;
.url;
let now = Utc::now();
if now - feed.last_checked_time < POLLING_INTERVAL {
println!(
"Feed {} was checked recently at {}",
feed_id, feed.last_checked_time
);
}
// Parse the URL // Parse the URL
let url = url::Url::parse(&feed_url).map_err(|_| Status::InternalServerError)?; let url = url::Url::parse(&feed.url).map_err(|_| Status::InternalServerError)?;
let feed_data = fetch_feed(&url).await.map_err(|_| Status::BadGateway)?; let feed_data = fetch_feed(&url).await.map_err(|_| Status::BadGateway)?;
let count = feed_data.entries.len(); let count = feed_data.entries.len();
@ -57,7 +127,7 @@ pub async fn poll_feed(
.unwrap_or(format!("<no {name}>")) .unwrap_or(format!("<no {name}>"))
} }
let entries = feed_data let entries: Vec<Entry> = feed_data
.entries .entries
.into_iter() .into_iter()
.map(|feed_entry| Entry { .map(|feed_entry| Entry {
@ -71,5 +141,7 @@ pub async fn poll_feed(
}) })
.collect(); .collect();
update_entry_db(&entries, &feed_id, &mut db).await?;
Ok(Json(FeedPollResponse { count, entries })) Ok(Json(FeedPollResponse { count, entries }))
} }

View File

@ -1,7 +1,7 @@
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::RwLock; use std::sync::RwLock;
use uuid::Uuid; use uuid::Uuid;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
pub struct SessionStore(RwLock<HashMap<Uuid, HashSet<String>>>); pub struct SessionStore(RwLock<HashMap<Uuid, HashSet<String>>>);
@ -41,4 +41,4 @@ impl SessionStore {
} }
} }
} }
} }

View File

@ -7,8 +7,8 @@ use rocket_db_pools::Connection;
use rocket_dyn_templates::{context, Template}; use rocket_dyn_templates::{context, Template};
use uuid::Uuid; use uuid::Uuid;
use crate::Db;
use crate::session_store::SessionStore; use crate::session_store::SessionStore;
use crate::Db;
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[serde(crate = "rocket::serde")] #[serde(crate = "rocket::serde")]