diff --git a/Cargo.lock b/Cargo.lock index 632fc0d..65896c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,6 +158,7 @@ dependencies = [ "clap", "crossterm", "env_logger", + "futures", "html-escape", "log", "open", @@ -663,6 +664,21 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -670,6 +686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -678,6 +695,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.112", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -696,10 +741,16 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 752742f..95c6802 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ textwrap = "0.16.2" tokio = { version = "1.48.0", features = ["full"] } unicode-general-category = "1.1.0" brittling_macros = { path = "macros" } +futures = "0.3.31" diff --git a/src/app.rs b/src/app.rs index af81d40..6b535ba 100644 --- a/src/app.rs +++ b/src/app.rs @@ -3,6 +3,7 @@ pub mod run; pub mod seeding; pub mod snowballing; +use futures::StreamExt; use log::{error, info, warn}; use ratatui::crossterm::event::KeyCode; use serde::{Deserialize, Serialize}; @@ -14,7 +15,10 @@ use tokio::{ use crate::{ app::run::Action, - literature::{Publication, SnowballingHistory, get_publication_by_id}, + literature::{ + Publication, SnowballingHistory, get_publication_by_id, + get_references_stream, + }, status_error, status_info, }; use brittling_macros::component; @@ -134,9 +138,16 @@ impl App { .push(publ.clone()); } + // TODO: Is deduplication necessary here? #[action] - fn fetch_pub( + fn add_pending_pub(&mut self, publ: Publication) { + self.state.history.pending_publications.push(publ.clone()); + } + + #[action] + fn fetch_and_include_seed( &self, + link: String, action_tx: &'static UnboundedSender, ) -> Result<(), SendError> { if !self @@ -160,15 +171,15 @@ impl App { let api_link = format!( "https://api.openalex.org/{}", - self.state - .seeding - .input - .trim_start_matches("https://openalex.org/") + link.trim_start_matches("https://openalex.org/") ); tokio::spawn(async move { - let publ = - get_publication_by_id(&api_link, "an.tsouchlos@gmail.com"); + let publ = get_publication_by_id( + api_link.into(), + "an.tsouchlos@gmail.com".to_string().into(), + None, + ); match publ.await { Ok(publ) => { @@ -191,13 +202,42 @@ impl App { action_tx.send(SeedingAction::ClearInput.into()) } - // TODO: Implement #[action] - fn fetch( + fn fetch_references( &self, action_tx: &'static UnboundedSender, ) -> Result<(), SendError> { - status_info!(action_tx, "Fetch action triggered") + status_info!(action_tx, "Fetching references...")?; + + let included = self.state.history.get_all_included(); + + tokio::spawn(async move { + let mut stream = get_references_stream( + included, + "an.tsouchlos@gmail.com".to_string(), + ); + + while let Some(result) = stream.next().await { + match result { + Ok(vals) => { + for val in vals { + let _ = action_tx + .send(GlobalAction::AddPendingPub(val).into()); + } + } + Err(err) => { + let _ = status_error!( + action_tx, + "Error loading reference: {}", + err + ); + } + } + } + let _ = status_info!(action_tx, "Done fetching references"); + }); + + Ok(()) } pub fn handle_key( @@ -214,9 +254,12 @@ impl App { (Tab::Seeding, KeyCode::Backspace) => { action_tx.send(SeedingAction::EnterBackspace.into()) } - (Tab::Seeding, KeyCode::Enter) => { - action_tx.send(GlobalAction::FetchPub.into()) - } + (Tab::Seeding, KeyCode::Enter) => action_tx.send( + GlobalAction::FetchAndIncludeSeed( + self.state.seeding.input.clone(), + ) + .into(), + ), (Tab::Snowballing, KeyCode::Enter) => { action_tx.send(SnowballingAction::Search.into()) } @@ -233,7 +276,7 @@ impl App { action_tx.send(SnowballingAction::PrevItem.into()) } (Tab::Snowballing, KeyCode::Char(' ')) => { - action_tx.send(GlobalAction::Fetch.into()) + action_tx.send(GlobalAction::FetchReferences.into()) } _ => Ok(()), } diff --git a/src/app/common.rs b/src/app/common.rs index bb199c7..a0dbdf8 100644 --- a/src/app/common.rs +++ b/src/app/common.rs @@ -28,7 +28,7 @@ macro_rules! status_warn { ($action_tx:expr, $text:expr $(, $args:expr)*) => { $action_tx.send( crate::app::GlobalAction::ShowStatMsg( - crate::app::StatusMessage::Info(format!($text, $($args)*))) + crate::app::StatusMessage::Warn(format!($text, $($args)*))) .into(), ) }; @@ -40,7 +40,7 @@ macro_rules! status_error { ($action_tx:expr, $text:expr $(, $args:expr)*) => { $action_tx.send( crate::app::GlobalAction::ShowStatMsg( - crate::app::StatusMessage::Info(format!($text, $($args)*))) + crate::app::StatusMessage::Error(format!($text $(, $args)*))) .into(), ) }; diff --git a/src/app/seeding.rs b/src/app/seeding.rs index 1163e99..03fba2d 100644 --- a/src/app/seeding.rs +++ b/src/app/seeding.rs @@ -1,6 +1,10 @@ use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{UnboundedSender, error::SendError}; -use crate::literature::Publication; +use crate::{ + app::{GlobalAction, run::Action}, + literature::Publication, +}; use brittling_macros::component; #[derive(Serialize, Deserialize, Default)] @@ -12,9 +16,18 @@ pub struct SeedingComponent { #[component(SeedingAction)] impl SeedingComponent { + #[action] + pub fn submit( + &mut self, + action_tx: &UnboundedSender, + ) -> Result<(), SendError> { + action_tx + .send(GlobalAction::FetchAndIncludeSeed(self.input.clone()).into()) + } + #[action] pub fn clear_input(&mut self) { - self.input.clear() + self.input.clear(); } #[action] diff --git a/src/literature.rs b/src/literature.rs index f812417..65b2545 100644 --- a/src/literature.rs +++ b/src/literature.rs @@ -1,8 +1,9 @@ use ammonia::Builder; +use futures::{StreamExt, future::BoxFuture, stream}; use html_escape::decode_html_entities; -use reqwest::Error; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Display, sync::Arc}; +use tokio::task::JoinSet; use unicode_general_category::{GeneralCategory, get_general_category}; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -72,9 +73,35 @@ impl SnowballingHistory { } } +fn sanitize_text(raw_text: &str) -> String { + let cleaner = Builder::empty().clean(&raw_text).to_string(); + let decoded = decode_html_entities(&cleaner); + + let cleaned: String = decoded + .chars() + .filter(|&c| { + let cat = get_general_category(c); + !matches!( + cat, + GeneralCategory::Control + | GeneralCategory::Format + | GeneralCategory::Surrogate + | GeneralCategory::PrivateUse + | GeneralCategory::Unassigned + ) || c.is_whitespace() + }) + .collect(); + + cleaned + .split_whitespace() + .collect::>() + .join(" ") + .replace("\\n", " ") +} + impl Publication { pub fn get_title(&self) -> Option { - self.display_name.clone() + self.display_name.clone().map(|s| sanitize_text(&s)) } pub fn get_year(&self) -> Option { @@ -113,50 +140,127 @@ impl Publication { .collect::>() .join(" "); - let cleaner = Builder::empty().clean(&raw_text).to_string(); - let decoded = decode_html_entities(&cleaner); - - let cleaned: String = decoded - .chars() - .filter(|&c| { - let cat = get_general_category(c); - !matches!( - cat, - GeneralCategory::Control - | GeneralCategory::Format - | GeneralCategory::Surrogate - | GeneralCategory::PrivateUse - | GeneralCategory::Unassigned - ) || c.is_whitespace() - }) - .collect(); - - cleaned.split_whitespace().collect::>().join(" ") + sanitize_text(&raw_text) }) } } -// #[derive(Serialize, Deserialize, Debug)] -// pub struct OpenAlexResponse { -// pub results: Vec, -// } +#[derive(Serialize, Deserialize, Debug)] +pub struct OpenAlexResponse { + pub results: Vec, +} + +#[derive(Debug)] +pub enum Error { + ApiError(String), +} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::ApiError(s) => write!( + f, + "An error occurred while attempting to access the OpenAlex \ + API: url={}", + s + ), + } + } +} pub async fn get_publication_by_id( - api_link: &str, - email: &str, + api_link: Arc, + email: Arc, + client: Option, ) -> Result { let url = format!("{}&mailto={}", api_link, email); - let client = reqwest::Client::new(); - let response = client - .get(url) + let client = client.or(Some(reqwest::Client::new())).unwrap(); + + client + .get(url.clone()) .header("User-Agent", "Rust-OpenAlex-Client/1.0") .send() - .await? + .await + .map_err(|_| Error::ApiError(url.clone()))? .json::() - .await?; + .await + .map_err(|_| Error::ApiError(url)) +} - Ok(response) +// TODO: Get all publications, not just the first 25 +pub fn get_references_stream( + publications: Vec, + email: String, +) -> impl futures::Stream, Error>> { + let email = Arc::new(email); + let client = reqwest::Client::new(); + + let mut publication_ids = Vec::::new(); + let mut referenced_work_urls = Vec::::new(); + + for p in &publications { + publication_ids + .push(p.id.trim_start_matches("https://openalex.org/").to_string()); + + referenced_work_urls.append( + &mut p + .referenced_works + .iter() + .map(|r| { + format!( + "https://api.openalex.org/{}", + r.trim_start_matches("https://openalex.org/") + ) + }) + .collect::>(), + ); + } + + // Get references using the referenced_works field + + let stream1 = stream::iter(referenced_work_urls).map({ + let (email, client) = (email.clone(), client.clone()); + move |url| { + let (email, client) = (email.clone(), client.clone()); + let fut: BoxFuture<'static, Result, Error>> = + Box::pin(async move { + get_publication_by_id(url.into(), email, Some(client)) + .await + .map(|val| vec![val]) + }); + fut + } + }); + + // Search for references using API calls + + let stream2 = stream::iter(publication_ids).map(move |id| { + let url = format!( + "https://api.openalex.org/works?filter=cites:{}&mailto={}", + id, email + ); + + let client = client.clone(); + let fut: BoxFuture<'static, Result, Error>> = + Box::pin(async move { + client + .get(url.clone()) + .header("User-Agent", "Rust-OpenAlex-Client/1.0") + .send() + .await + .map_err(|_| Error::ApiError(url.clone()))? + .json::() + .await + .map(|response| response.results) + .map_err(|_| Error::ApiError(url.clone())) + }); + fut + }); + + // Combine the two streams + + stream::select(stream1, stream2).buffer_unordered(10) } // // TODO: Get all papers, not just the first page