Implement fetching references with future streams

This commit is contained in:
Andreas Tsouchlos 2026-01-01 05:33:32 +02:00
parent 0ba4e21ae3
commit 143e177b8c
6 changed files with 265 additions and 53 deletions

51
Cargo.lock generated
View File

@ -158,6 +158,7 @@ dependencies = [
"clap", "clap",
"crossterm", "crossterm",
"env_logger", "env_logger",
"futures",
"html-escape", "html-escape",
"log", "log",
"open", "open",
@ -663,6 +664,21 @@ dependencies = [
"new_debug_unreachable", "new_debug_unreachable",
] ]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@ -670,6 +686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink",
] ]
[[package]] [[package]]
@ -678,6 +695,34 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.31"
@ -696,10 +741,16 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task", "futures-task",
"memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab",
] ]
[[package]] [[package]]

View File

@ -23,3 +23,4 @@ textwrap = "0.16.2"
tokio = { version = "1.48.0", features = ["full"] } tokio = { version = "1.48.0", features = ["full"] }
unicode-general-category = "1.1.0" unicode-general-category = "1.1.0"
brittling_macros = { path = "macros" } brittling_macros = { path = "macros" }
futures = "0.3.31"

View File

@ -3,6 +3,7 @@ pub mod run;
pub mod seeding; pub mod seeding;
pub mod snowballing; pub mod snowballing;
use futures::StreamExt;
use log::{error, info, warn}; use log::{error, info, warn};
use ratatui::crossterm::event::KeyCode; use ratatui::crossterm::event::KeyCode;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -14,7 +15,10 @@ use tokio::{
use crate::{ use crate::{
app::run::Action, app::run::Action,
literature::{Publication, SnowballingHistory, get_publication_by_id}, literature::{
Publication, SnowballingHistory, get_publication_by_id,
get_references_stream,
},
status_error, status_info, status_error, status_info,
}; };
use brittling_macros::component; use brittling_macros::component;
@ -134,9 +138,16 @@ impl App {
.push(publ.clone()); .push(publ.clone());
} }
// TODO: Is deduplication necessary here?
#[action] #[action]
fn fetch_pub( fn add_pending_pub(&mut self, publ: Publication) {
self.state.history.pending_publications.push(publ.clone());
}
#[action]
fn fetch_and_include_seed(
&self, &self,
link: String,
action_tx: &'static UnboundedSender<Action>, action_tx: &'static UnboundedSender<Action>,
) -> Result<(), SendError<Action>> { ) -> Result<(), SendError<Action>> {
if !self if !self
@ -160,15 +171,15 @@ impl App {
let api_link = format!( let api_link = format!(
"https://api.openalex.org/{}", "https://api.openalex.org/{}",
self.state link.trim_start_matches("https://openalex.org/")
.seeding
.input
.trim_start_matches("https://openalex.org/")
); );
tokio::spawn(async move { tokio::spawn(async move {
let publ = let publ = get_publication_by_id(
get_publication_by_id(&api_link, "an.tsouchlos@gmail.com"); api_link.into(),
"an.tsouchlos@gmail.com".to_string().into(),
None,
);
match publ.await { match publ.await {
Ok(publ) => { Ok(publ) => {
@ -191,13 +202,42 @@ impl App {
action_tx.send(SeedingAction::ClearInput.into()) action_tx.send(SeedingAction::ClearInput.into())
} }
// TODO: Implement
#[action] #[action]
fn fetch( fn fetch_references(
&self, &self,
action_tx: &'static UnboundedSender<Action>, action_tx: &'static UnboundedSender<Action>,
) -> Result<(), SendError<Action>> { ) -> Result<(), SendError<Action>> {
status_info!(action_tx, "Fetch action triggered") status_info!(action_tx, "Fetching references...")?;
let included = self.state.history.get_all_included();
tokio::spawn(async move {
let mut stream = get_references_stream(
included,
"an.tsouchlos@gmail.com".to_string(),
);
while let Some(result) = stream.next().await {
match result {
Ok(vals) => {
for val in vals {
let _ = action_tx
.send(GlobalAction::AddPendingPub(val).into());
}
}
Err(err) => {
let _ = status_error!(
action_tx,
"Error loading reference: {}",
err
);
}
}
}
let _ = status_info!(action_tx, "Done fetching references");
});
Ok(())
} }
pub fn handle_key( pub fn handle_key(
@ -214,9 +254,12 @@ impl App {
(Tab::Seeding, KeyCode::Backspace) => { (Tab::Seeding, KeyCode::Backspace) => {
action_tx.send(SeedingAction::EnterBackspace.into()) action_tx.send(SeedingAction::EnterBackspace.into())
} }
(Tab::Seeding, KeyCode::Enter) => { (Tab::Seeding, KeyCode::Enter) => action_tx.send(
action_tx.send(GlobalAction::FetchPub.into()) GlobalAction::FetchAndIncludeSeed(
} self.state.seeding.input.clone(),
)
.into(),
),
(Tab::Snowballing, KeyCode::Enter) => { (Tab::Snowballing, KeyCode::Enter) => {
action_tx.send(SnowballingAction::Search.into()) action_tx.send(SnowballingAction::Search.into())
} }
@ -233,7 +276,7 @@ impl App {
action_tx.send(SnowballingAction::PrevItem.into()) action_tx.send(SnowballingAction::PrevItem.into())
} }
(Tab::Snowballing, KeyCode::Char(' ')) => { (Tab::Snowballing, KeyCode::Char(' ')) => {
action_tx.send(GlobalAction::Fetch.into()) action_tx.send(GlobalAction::FetchReferences.into())
} }
_ => Ok(()), _ => Ok(()),
} }

View File

@ -28,7 +28,7 @@ macro_rules! status_warn {
($action_tx:expr, $text:expr $(, $args:expr)*) => { ($action_tx:expr, $text:expr $(, $args:expr)*) => {
$action_tx.send( $action_tx.send(
crate::app::GlobalAction::ShowStatMsg( crate::app::GlobalAction::ShowStatMsg(
crate::app::StatusMessage::Info(format!($text, $($args)*))) crate::app::StatusMessage::Warn(format!($text, $($args)*)))
.into(), .into(),
) )
}; };
@ -40,7 +40,7 @@ macro_rules! status_error {
($action_tx:expr, $text:expr $(, $args:expr)*) => { ($action_tx:expr, $text:expr $(, $args:expr)*) => {
$action_tx.send( $action_tx.send(
crate::app::GlobalAction::ShowStatMsg( crate::app::GlobalAction::ShowStatMsg(
crate::app::StatusMessage::Info(format!($text, $($args)*))) crate::app::StatusMessage::Error(format!($text $(, $args)*)))
.into(), .into(),
) )
}; };

View File

@ -1,6 +1,10 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{UnboundedSender, error::SendError};
use crate::literature::Publication; use crate::{
app::{GlobalAction, run::Action},
literature::Publication,
};
use brittling_macros::component; use brittling_macros::component;
#[derive(Serialize, Deserialize, Default)] #[derive(Serialize, Deserialize, Default)]
@ -12,9 +16,18 @@ pub struct SeedingComponent {
#[component(SeedingAction)] #[component(SeedingAction)]
impl SeedingComponent { impl SeedingComponent {
#[action]
pub fn submit(
&mut self,
action_tx: &UnboundedSender<Action>,
) -> Result<(), SendError<Action>> {
action_tx
.send(GlobalAction::FetchAndIncludeSeed(self.input.clone()).into())
}
#[action] #[action]
pub fn clear_input(&mut self) { pub fn clear_input(&mut self) {
self.input.clear() self.input.clear();
} }
#[action] #[action]

View File

@ -1,8 +1,9 @@
use ammonia::Builder; use ammonia::Builder;
use futures::{StreamExt, future::BoxFuture, stream};
use html_escape::decode_html_entities; use html_escape::decode_html_entities;
use reqwest::Error;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::{collections::HashMap, fmt::Display, sync::Arc};
use tokio::task::JoinSet;
use unicode_general_category::{GeneralCategory, get_general_category}; use unicode_general_category::{GeneralCategory, get_general_category};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
@ -72,9 +73,35 @@ impl SnowballingHistory {
} }
} }
fn sanitize_text(raw_text: &str) -> String {
let cleaner = Builder::empty().clean(&raw_text).to_string();
let decoded = decode_html_entities(&cleaner);
let cleaned: String = decoded
.chars()
.filter(|&c| {
let cat = get_general_category(c);
!matches!(
cat,
GeneralCategory::Control
| GeneralCategory::Format
| GeneralCategory::Surrogate
| GeneralCategory::PrivateUse
| GeneralCategory::Unassigned
) || c.is_whitespace()
})
.collect();
cleaned
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
.replace("\\n", " ")
}
impl Publication { impl Publication {
pub fn get_title(&self) -> Option<String> { pub fn get_title(&self) -> Option<String> {
self.display_name.clone() self.display_name.clone().map(|s| sanitize_text(&s))
} }
pub fn get_year(&self) -> Option<u32> { pub fn get_year(&self) -> Option<u32> {
@ -113,50 +140,127 @@ impl Publication {
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(" "); .join(" ");
let cleaner = Builder::empty().clean(&raw_text).to_string(); sanitize_text(&raw_text)
let decoded = decode_html_entities(&cleaner);
let cleaned: String = decoded
.chars()
.filter(|&c| {
let cat = get_general_category(c);
!matches!(
cat,
GeneralCategory::Control
| GeneralCategory::Format
| GeneralCategory::Surrogate
| GeneralCategory::PrivateUse
| GeneralCategory::Unassigned
) || c.is_whitespace()
})
.collect();
cleaned.split_whitespace().collect::<Vec<_>>().join(" ")
}) })
} }
} }
// #[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
// pub struct OpenAlexResponse { pub struct OpenAlexResponse {
// pub results: Vec<Publication>, pub results: Vec<Publication>,
// } }
#[derive(Debug)]
pub enum Error {
ApiError(String),
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::ApiError(s) => write!(
f,
"An error occurred while attempting to access the OpenAlex \
API: url={}",
s
),
}
}
}
pub async fn get_publication_by_id( pub async fn get_publication_by_id(
api_link: &str, api_link: Arc<String>,
email: &str, email: Arc<String>,
client: Option<reqwest::Client>,
) -> Result<Publication, Error> { ) -> Result<Publication, Error> {
let url = format!("{}&mailto={}", api_link, email); let url = format!("{}&mailto={}", api_link, email);
let client = reqwest::Client::new(); let client = client.or(Some(reqwest::Client::new())).unwrap();
let response = client
.get(url) client
.get(url.clone())
.header("User-Agent", "Rust-OpenAlex-Client/1.0") .header("User-Agent", "Rust-OpenAlex-Client/1.0")
.send() .send()
.await? .await
.map_err(|_| Error::ApiError(url.clone()))?
.json::<Publication>() .json::<Publication>()
.await?; .await
.map_err(|_| Error::ApiError(url))
}
Ok(response) // TODO: Get all publications, not just the first 25
pub fn get_references_stream(
publications: Vec<Publication>,
email: String,
) -> impl futures::Stream<Item = Result<Vec<Publication>, Error>> {
let email = Arc::new(email);
let client = reqwest::Client::new();
let mut publication_ids = Vec::<String>::new();
let mut referenced_work_urls = Vec::<String>::new();
for p in &publications {
publication_ids
.push(p.id.trim_start_matches("https://openalex.org/").to_string());
referenced_work_urls.append(
&mut p
.referenced_works
.iter()
.map(|r| {
format!(
"https://api.openalex.org/{}",
r.trim_start_matches("https://openalex.org/")
)
})
.collect::<Vec<String>>(),
);
}
// Get references using the referenced_works field
let stream1 = stream::iter(referenced_work_urls).map({
let (email, client) = (email.clone(), client.clone());
move |url| {
let (email, client) = (email.clone(), client.clone());
let fut: BoxFuture<'static, Result<Vec<Publication>, Error>> =
Box::pin(async move {
get_publication_by_id(url.into(), email, Some(client))
.await
.map(|val| vec![val])
});
fut
}
});
// Search for references using API calls
let stream2 = stream::iter(publication_ids).map(move |id| {
let url = format!(
"https://api.openalex.org/works?filter=cites:{}&mailto={}",
id, email
);
let client = client.clone();
let fut: BoxFuture<'static, Result<Vec<Publication>, Error>> =
Box::pin(async move {
client
.get(url.clone())
.header("User-Agent", "Rust-OpenAlex-Client/1.0")
.send()
.await
.map_err(|_| Error::ApiError(url.clone()))?
.json::<OpenAlexResponse>()
.await
.map(|response| response.results)
.map_err(|_| Error::ApiError(url.clone()))
});
fut
});
// Combine the two streams
stream::select(stream1, stream2).buffer_unordered(10)
} }
// // TODO: Get all papers, not just the first page // // TODO: Get all papers, not just the first page