More moving code around
This commit is contained in:
parent
cf89d17035
commit
241bb3fba4
88
src/poll.rs
88
src/poll.rs
@ -1,4 +1,4 @@
|
|||||||
use crate::poll_utils::Entry;
|
use crate::poll_utils::{update_entry_db, Entry};
|
||||||
use crate::user::AuthenticatedUser;
|
use crate::user::AuthenticatedUser;
|
||||||
use crate::Db;
|
use crate::Db;
|
||||||
use chrono::{Duration, Utc};
|
use chrono::{Duration, Utc};
|
||||||
@ -6,7 +6,7 @@ use rocket::http::Status;
|
|||||||
use rocket::serde::uuid::Uuid;
|
use rocket::serde::uuid::Uuid;
|
||||||
use rocket::serde::{self, json::Json, Deserialize, Serialize};
|
use rocket::serde::{self, json::Json, Deserialize, Serialize};
|
||||||
use rocket_db_pools::Connection;
|
use rocket_db_pools::Connection;
|
||||||
use sqlx::{Acquire, SqliteConnection};
|
use sqlx::SqliteConnection;
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
const POLLING_INTERVAL: Duration = Duration::minutes(20);
|
const POLLING_INTERVAL: Duration = Duration::minutes(20);
|
||||||
@ -26,90 +26,6 @@ pub struct EntryStateUpdate {
|
|||||||
read: Option<bool>,
|
read: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_entry_db(
|
|
||||||
entries: &Vec<Entry>,
|
|
||||||
feed_id: &str,
|
|
||||||
db: &mut SqliteConnection,
|
|
||||||
) -> Result<(), Status> {
|
|
||||||
// Start a transaction for batch update
|
|
||||||
let mut tx = db.begin().await.map_err(|e| {
|
|
||||||
error!("Failed to start transaction: {}", e);
|
|
||||||
Status::InternalServerError
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let now = Utc::now().to_rfc3339();
|
|
||||||
|
|
||||||
// Update the feed's last_checked_time
|
|
||||||
sqlx::query!(
|
|
||||||
"UPDATE feeds SET last_checked_time = ? WHERE feed_id = ?",
|
|
||||||
now,
|
|
||||||
feed_id
|
|
||||||
)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("Failed to update feed last_checked_time: {}", e);
|
|
||||||
Status::InternalServerError
|
|
||||||
})?;
|
|
||||||
|
|
||||||
for entry in entries {
|
|
||||||
let content_json = if let Some(content) = &entry.content {
|
|
||||||
serde::json::to_string(content).ok()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let published = entry.published.map(|dt| dt.to_rfc3339());
|
|
||||||
let updated = entry.updated.map(|dt| dt.to_rfc3339());
|
|
||||||
|
|
||||||
let local_id = entry.local_id.to_string();
|
|
||||||
|
|
||||||
let result = sqlx::query!(
|
|
||||||
r#"
|
|
||||||
INSERT INTO feed_entries (
|
|
||||||
id, feed_id, local_id, title, published, updated, summary, content, link, created_at
|
|
||||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT (feed_id, id) DO UPDATE SET
|
|
||||||
title = excluded.title,
|
|
||||||
published = excluded.published,
|
|
||||||
updated = excluded.updated,
|
|
||||||
summary = excluded.summary,
|
|
||||||
content = excluded.content,
|
|
||||||
link = excluded.link
|
|
||||||
"#,
|
|
||||||
entry.id,
|
|
||||||
feed_id,
|
|
||||||
local_id,
|
|
||||||
entry.title,
|
|
||||||
published,
|
|
||||||
updated,
|
|
||||||
entry.summary,
|
|
||||||
content_json,
|
|
||||||
entry.link,
|
|
||||||
now
|
|
||||||
)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if let Err(e) = result {
|
|
||||||
error!("Failed to save feed entry: {}", e);
|
|
||||||
tx.rollback().await.map_err(|e| {
|
|
||||||
error!("Failed to rollback transaction: {}", e);
|
|
||||||
Status::InternalServerError
|
|
||||||
})?;
|
|
||||||
return Err(Status::InternalServerError);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Commit the transaction
|
|
||||||
tx.commit().await.map_err(|e| {
|
|
||||||
error!("Failed to commit transaction: {}", e);
|
|
||||||
Status::InternalServerError
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_entries(feed_id: &str, db: &mut SqliteConnection) -> Result<Vec<Entry>, Status> {
|
async fn read_entries(feed_id: &str, db: &mut SqliteConnection) -> Result<Vec<Entry>, Status> {
|
||||||
let rows = sqlx::query!(
|
let rows = sqlx::query!(
|
||||||
r#"
|
r#"
|
||||||
|
@ -2,8 +2,8 @@ use crate::feed_utils::fetch_feed;
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use feed_rs::model::Text;
|
use feed_rs::model::Text;
|
||||||
use rocket::http::Status;
|
use rocket::http::Status;
|
||||||
use rocket::serde::uuid::Uuid;
|
use rocket::serde::{json, uuid::Uuid, Serialize};
|
||||||
use rocket::serde::Serialize;
|
use sqlx::{Acquire, SqliteConnection};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
@ -50,3 +50,87 @@ pub async fn fetch_new_entries(url: &Url) -> Result<Vec<Entry>, Status> {
|
|||||||
.collect();
|
.collect();
|
||||||
Ok(entries)
|
Ok(entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn update_entry_db(
|
||||||
|
entries: &Vec<Entry>,
|
||||||
|
feed_id: &str,
|
||||||
|
db: &mut SqliteConnection,
|
||||||
|
) -> Result<(), Status> {
|
||||||
|
// Start a transaction for batch update
|
||||||
|
let mut tx = db.begin().await.map_err(|e| {
|
||||||
|
error!("Failed to start transaction: {}", e);
|
||||||
|
Status::InternalServerError
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let now = Utc::now().to_rfc3339();
|
||||||
|
|
||||||
|
// Update the feed's last_checked_time
|
||||||
|
sqlx::query!(
|
||||||
|
"UPDATE feeds SET last_checked_time = ? WHERE feed_id = ?",
|
||||||
|
now,
|
||||||
|
feed_id
|
||||||
|
)
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("Failed to update feed last_checked_time: {}", e);
|
||||||
|
Status::InternalServerError
|
||||||
|
})?;
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
let content_json = if let Some(content) = &entry.content {
|
||||||
|
json::to_string(content).ok()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let published = entry.published.map(|dt| dt.to_rfc3339());
|
||||||
|
let updated = entry.updated.map(|dt| dt.to_rfc3339());
|
||||||
|
|
||||||
|
let local_id = entry.local_id.to_string();
|
||||||
|
|
||||||
|
let result = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO feed_entries (
|
||||||
|
id, feed_id, local_id, title, published, updated, summary, content, link, created_at
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (feed_id, id) DO UPDATE SET
|
||||||
|
title = excluded.title,
|
||||||
|
published = excluded.published,
|
||||||
|
updated = excluded.updated,
|
||||||
|
summary = excluded.summary,
|
||||||
|
content = excluded.content,
|
||||||
|
link = excluded.link
|
||||||
|
"#,
|
||||||
|
entry.id,
|
||||||
|
feed_id,
|
||||||
|
local_id,
|
||||||
|
entry.title,
|
||||||
|
published,
|
||||||
|
updated,
|
||||||
|
entry.summary,
|
||||||
|
content_json,
|
||||||
|
entry.link,
|
||||||
|
now
|
||||||
|
)
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let Err(e) = result {
|
||||||
|
error!("Failed to save feed entry: {}", e);
|
||||||
|
tx.rollback().await.map_err(|e| {
|
||||||
|
error!("Failed to rollback transaction: {}", e);
|
||||||
|
Status::InternalServerError
|
||||||
|
})?;
|
||||||
|
return Err(Status::InternalServerError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the transaction
|
||||||
|
tx.commit().await.map_err(|e| {
|
||||||
|
error!("Failed to commit transaction: {}", e);
|
||||||
|
Status::InternalServerError
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user