write demo feeds to db
This commit is contained in:
parent
89982755a0
commit
08994856c3
11
src/demo.rs
11
src/demo.rs
@ -1,5 +1,5 @@
|
|||||||
use crate::feeds::Feed;
|
use crate::feeds::Feed;
|
||||||
use crate::poll_utils::fetch_new_entries;
|
use crate::poll_utils::{fetch_new_entries, update_entry_db};
|
||||||
use crate::user::User;
|
use crate::user::User;
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
@ -89,15 +89,16 @@ pub async fn setup_demo_data(pool: &sqlx::SqlitePool) {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|feed| {
|
.map(|feed| {
|
||||||
let url = feed.url.clone();
|
let url = feed.url.clone();
|
||||||
|
let feed_id = feed.feed_id;
|
||||||
async move {
|
async move {
|
||||||
let result = fetch_new_entries(&url).await;
|
let result = fetch_new_entries(&url).await;
|
||||||
(url, result)
|
(feed_id, url, result)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let results = join_all(fetch_futures).await;
|
let results = join_all(fetch_futures).await;
|
||||||
for (url, result) in results {
|
for (feed_id, url, result) in results {
|
||||||
match result {
|
match result {
|
||||||
Ok(entries) => {
|
Ok(entries) => {
|
||||||
info!(
|
info!(
|
||||||
@ -105,7 +106,9 @@ pub async fn setup_demo_data(pool: &sqlx::SqlitePool) {
|
|||||||
"Successfully fetched {} entries",
|
"Successfully fetched {} entries",
|
||||||
entries.len()
|
entries.len()
|
||||||
);
|
);
|
||||||
// TODO: Store entries in database
|
if let Err(e) = update_entry_db(&entries, &feed_id, &*pool).await {
|
||||||
|
warn!(error=%e, feed_url=url.as_str(), "Failed to store entries in database");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(error=%e, feed_url=url.as_str(), "Error populating feed");
|
warn!(error=%e, feed_url=url.as_str(), "Error populating feed");
|
||||||
|
@ -107,11 +107,11 @@ pub async fn poll_feed(
|
|||||||
info!("Feed {} last checked: {}", feed_id, last_checked);
|
info!("Feed {} last checked: {}", feed_id, last_checked);
|
||||||
let entries = if last_checked < POLLING_INTERVAL {
|
let entries = if last_checked < POLLING_INTERVAL {
|
||||||
info!("Reading entries from database for feed {}", feed_id);
|
info!("Reading entries from database for feed {}", feed_id);
|
||||||
read_entries(&feed_id, &mut db).await?
|
read_entries(&feed_id, &mut **db).await?
|
||||||
} else {
|
} else {
|
||||||
info!("Fetching new entries for feed {}", feed_id);
|
info!("Fetching new entries for feed {}", feed_id);
|
||||||
let entries = crate::poll_utils::fetch_new_entries(&url).await?;
|
let entries = crate::poll_utils::fetch_new_entries(&url).await?;
|
||||||
update_entry_db(&entries, &feed_id, &mut db).await?;
|
update_entry_db(&entries, &feed_id, &mut **db).await?;
|
||||||
entries
|
entries
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
|
|||||||
use feed_rs::model::Text;
|
use feed_rs::model::Text;
|
||||||
use rocket::http::Status;
|
use rocket::http::Status;
|
||||||
use rocket::serde::{json, uuid::Uuid, Serialize};
|
use rocket::serde::{json, uuid::Uuid, Serialize};
|
||||||
use sqlx::{Acquire, SqliteConnection};
|
use sqlx::{Acquire, Executor};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
@ -50,13 +50,16 @@ pub async fn fetch_new_entries(url: &Url) -> Result<Vec<Entry>, Status> {
|
|||||||
Ok(entries)
|
Ok(entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update_entry_db(
|
pub async fn update_entry_db<'a, E>(
|
||||||
entries: &Vec<Entry>,
|
entries: &Vec<Entry>,
|
||||||
feed_id: &Uuid,
|
feed_id: &Uuid,
|
||||||
db: &mut SqliteConnection,
|
executor: E,
|
||||||
) -> Result<(), Status> {
|
) -> Result<(), Status>
|
||||||
|
where
|
||||||
|
E: Executor<'a, Database = sqlx::Sqlite> + Acquire<'a, Database = sqlx::Sqlite>,
|
||||||
|
{
|
||||||
// Start a transaction for batch update
|
// Start a transaction for batch update
|
||||||
let mut tx = db.begin().await.map_err(|e| {
|
let mut tx = executor.begin().await.map_err(|e| {
|
||||||
error!("Failed to start transaction: {}", e);
|
error!("Failed to start transaction: {}", e);
|
||||||
Status::InternalServerError
|
Status::InternalServerError
|
||||||
})?;
|
})?;
|
||||||
|
Loading…
Reference in New Issue
Block a user