From 0276bfe51565af88b27b811e0c90e7d6d97db0a8 Mon Sep 17 00:00:00 2001 From: Andreas Tsouchlos Date: Thu, 1 Jan 2026 06:08:03 +0200 Subject: [PATCH] Implement fetching citing works --- src/app.rs | 51 ++++++++++++- src/literature.rs | 191 +++++++++++++++++++++++++++++++--------------- 2 files changed, 177 insertions(+), 65 deletions(-) diff --git a/src/app.rs b/src/app.rs index 6b535ba..6e3269e 100644 --- a/src/app.rs +++ b/src/app.rs @@ -16,8 +16,8 @@ use tokio::{ use crate::{ app::run::Action, literature::{ - Publication, SnowballingHistory, get_publication_by_id, - get_references_stream, + Publication, SnowballingHistory, get_citing_works_stream, + get_publication_by_id, get_references_stream, }, status_error, status_info, }; @@ -202,6 +202,46 @@ impl App { action_tx.send(SeedingAction::ClearInput.into()) } + #[action] + fn fetch_citing_works( + &self, + action_tx: &'static UnboundedSender, + ) -> Result<(), SendError> { + status_info!(action_tx, "Fetching citing works...")?; + + let included = self.state.history.get_all_included(); + + tokio::spawn(async move { + let stream = get_citing_works_stream( + included, + "an.tsouchlos@gmail.com".to_string(), + ); + + tokio::pin!(stream); + + while let Some(result) = stream.next().await { + match result { + Ok(vals) => { + for val in vals { + let _ = action_tx + .send(GlobalAction::AddPendingPub(val).into()); + } + } + Err(err) => { + let _ = status_error!( + action_tx, + "Error loading citing works: {}", + err + ); + } + } + } + let _ = status_info!(action_tx, "Done fetching citing works"); + }); + + Ok(()) + } + #[action] fn fetch_references( &self, @@ -212,11 +252,13 @@ impl App { let included = self.state.history.get_all_included(); tokio::spawn(async move { - let mut stream = get_references_stream( + let stream = get_references_stream( included, "an.tsouchlos@gmail.com".to_string(), ); + tokio::pin!(stream); + while let Some(result) = stream.next().await { match result { Ok(vals) => { @@ -278,6 +320,9 @@ impl App { (Tab::Snowballing, KeyCode::Char(' ')) => { action_tx.send(GlobalAction::FetchReferences.into()) } + (Tab::Snowballing, KeyCode::Char('c')) => { + action_tx.send(GlobalAction::FetchCitingWorks.into()) + } _ => Ok(()), } } diff --git a/src/literature.rs b/src/literature.rs index 65b2545..57e9f71 100644 --- a/src/literature.rs +++ b/src/literature.rs @@ -3,7 +3,6 @@ use futures::{StreamExt, future::BoxFuture, stream}; use html_escape::decode_html_entities; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, fmt::Display, sync::Arc}; -use tokio::task::JoinSet; use unicode_general_category::{GeneralCategory, get_general_category}; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -145,9 +144,15 @@ impl Publication { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct OpenAlexResponse { - pub results: Vec, +#[derive(Deserialize)] +struct OpenAlexResponse { + results: Vec, + meta: Meta, +} + +#[derive(Deserialize)] +struct Meta { + next_cursor: Option, } #[derive(Debug)] @@ -188,7 +193,49 @@ pub async fn get_publication_by_id( .map_err(|_| Error::ApiError(url)) } -// TODO: Get all publications, not just the first 25 +// TODO: Rename this +fn get_cited_works( + id: String, + email: Arc, + client: reqwest::Client, +) -> impl futures::Stream, Error>> { + let initial_state = Some("*".to_string()); + + stream::unfold(initial_state, move |cursor| { + let email = email.clone(); + let client = client.clone(); + let id = id.clone(); + + async move { + let current_cursor = cursor?; + + let url = format!( + "https://api.openalex.org/works?filter=cited_by:{}&mailto={}&cursor={}", + id, email, current_cursor + ); + + let response = client + .get(&url) + .header("User-Agent", "Rust-OpenAlex-Client/1.0") + .send() + .await + .map_err(|_| Error::ApiError(url.clone())) + .ok()? + .json::() + .await + .ok()?; + + let next_cursor = if response.results.is_empty() { + None + } else { + response.meta.next_cursor + }; + + Some((Ok(response.results), next_cursor)) + } + }) +} + pub fn get_references_stream( publications: Vec, email: String, @@ -219,68 +266,88 @@ pub fn get_references_stream( // Get references using the referenced_works field - let stream1 = stream::iter(referenced_work_urls).map({ - let (email, client) = (email.clone(), client.clone()); - move |url| { + let stream1 = stream::iter(referenced_work_urls) + .map({ let (email, client) = (email.clone(), client.clone()); - let fut: BoxFuture<'static, Result, Error>> = - Box::pin(async move { - get_publication_by_id(url.into(), email, Some(client)) - .await - .map(|val| vec![val]) - }); - fut - } - }); + move |url| { + let (email, client) = (email.clone(), client.clone()); + let fut: BoxFuture<'static, Result, Error>> = + Box::pin(async move { + get_publication_by_id(url.into(), email, Some(client)) + .await + .map(|val| vec![val]) + }); + fut + } + }) + .buffer_unordered(10); // Search for references using API calls - let stream2 = stream::iter(publication_ids).map(move |id| { - let url = format!( - "https://api.openalex.org/works?filter=cites:{}&mailto={}", - id, email - ); - - let client = client.clone(); - let fut: BoxFuture<'static, Result, Error>> = - Box::pin(async move { - client - .get(url.clone()) - .header("User-Agent", "Rust-OpenAlex-Client/1.0") - .send() - .await - .map_err(|_| Error::ApiError(url.clone()))? - .json::() - .await - .map(|response| response.results) - .map_err(|_| Error::ApiError(url.clone())) - }); - fut - }); + let stream2 = stream::iter(publication_ids) + .map(move |id| get_cited_works(id, email.clone(), client.clone())) + .flatten(); // Combine the two streams - - stream::select(stream1, stream2).buffer_unordered(10) + stream::select(stream1, stream2) } -// // TODO: Get all papers, not just the first page -// pub async fn get_citing_papers( -// target_id: &str, -// email: &str, -// ) -> Result, Error> { -// let url = format!( -// "https://api.openalex.org/works?filter=cites:{}&mailto={}", -// target_id, email -// ); -// -// let client = reqwest::Client::new(); -// let response = client -// .get(url) -// .header("User-Agent", "Rust-OpenAlex-Client/1.0") -// .send() -// .await? -// .json::() -// .await?; -// -// Ok(response.results) -// } +// TODO: Rename this +fn get_citing_works( + id: String, + email: Arc, + client: reqwest::Client, +) -> impl futures::Stream, Error>> { + let initial_state = Some("*".to_string()); + + stream::unfold(initial_state, move |cursor| { + let email = email.clone(); + let client = client.clone(); + let id = id.clone(); + + async move { + let current_cursor = cursor?; + + let url = format!( + "https://api.openalex.org/works?filter=cites:{}&mailto={}&cursor={}", + id, email, current_cursor + ); + + let response = client + .get(&url) + .header("User-Agent", "Rust-OpenAlex-Client/1.0") + .send() + .await + .map_err(|_| Error::ApiError(url.clone())) + .ok()? + .json::() + .await + .ok()?; + + let next_cursor = if response.results.is_empty() { + None + } else { + response.meta.next_cursor + }; + + Some((Ok(response.results), next_cursor)) + } + }) +} + +pub fn get_citing_works_stream( + publications: Vec, + email: String, +) -> impl futures::Stream, Error>> { + let email = Arc::new(email); + let client = reqwest::Client::new(); + + let ids: Vec = publications + .iter() + .map(|p| p.id.trim_start_matches("https://openalex.org/").to_string()) + .collect(); + + stream::iter(ids) + .map(move |id| get_citing_works(id, email.clone(), client.clone())) + .flatten() +}