Implement fetching citing works

This commit is contained in:
Andreas Tsouchlos 2026-01-01 06:08:03 +02:00
parent 143e177b8c
commit 0276bfe515
2 changed files with 177 additions and 65 deletions

View File

@ -16,8 +16,8 @@ use tokio::{
use crate::{ use crate::{
app::run::Action, app::run::Action,
literature::{ literature::{
Publication, SnowballingHistory, get_publication_by_id, Publication, SnowballingHistory, get_citing_works_stream,
get_references_stream, get_publication_by_id, get_references_stream,
}, },
status_error, status_info, status_error, status_info,
}; };
@ -202,6 +202,46 @@ impl App {
action_tx.send(SeedingAction::ClearInput.into()) action_tx.send(SeedingAction::ClearInput.into())
} }
#[action]
fn fetch_citing_works(
&self,
action_tx: &'static UnboundedSender<Action>,
) -> Result<(), SendError<Action>> {
status_info!(action_tx, "Fetching citing works...")?;
let included = self.state.history.get_all_included();
tokio::spawn(async move {
let stream = get_citing_works_stream(
included,
"an.tsouchlos@gmail.com".to_string(),
);
tokio::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok(vals) => {
for val in vals {
let _ = action_tx
.send(GlobalAction::AddPendingPub(val).into());
}
}
Err(err) => {
let _ = status_error!(
action_tx,
"Error loading citing works: {}",
err
);
}
}
}
let _ = status_info!(action_tx, "Done fetching citing works");
});
Ok(())
}
#[action] #[action]
fn fetch_references( fn fetch_references(
&self, &self,
@ -212,11 +252,13 @@ impl App {
let included = self.state.history.get_all_included(); let included = self.state.history.get_all_included();
tokio::spawn(async move { tokio::spawn(async move {
let mut stream = get_references_stream( let stream = get_references_stream(
included, included,
"an.tsouchlos@gmail.com".to_string(), "an.tsouchlos@gmail.com".to_string(),
); );
tokio::pin!(stream);
while let Some(result) = stream.next().await { while let Some(result) = stream.next().await {
match result { match result {
Ok(vals) => { Ok(vals) => {
@ -278,6 +320,9 @@ impl App {
(Tab::Snowballing, KeyCode::Char(' ')) => { (Tab::Snowballing, KeyCode::Char(' ')) => {
action_tx.send(GlobalAction::FetchReferences.into()) action_tx.send(GlobalAction::FetchReferences.into())
} }
(Tab::Snowballing, KeyCode::Char('c')) => {
action_tx.send(GlobalAction::FetchCitingWorks.into())
}
_ => Ok(()), _ => Ok(()),
} }
} }

View File

@ -3,7 +3,6 @@ use futures::{StreamExt, future::BoxFuture, stream};
use html_escape::decode_html_entities; use html_escape::decode_html_entities;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Display, sync::Arc}; use std::{collections::HashMap, fmt::Display, sync::Arc};
use tokio::task::JoinSet;
use unicode_general_category::{GeneralCategory, get_general_category}; use unicode_general_category::{GeneralCategory, get_general_category};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
@ -145,9 +144,15 @@ impl Publication {
} }
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Deserialize)]
pub struct OpenAlexResponse { struct OpenAlexResponse {
pub results: Vec<Publication>, results: Vec<Publication>,
meta: Meta,
}
#[derive(Deserialize)]
struct Meta {
next_cursor: Option<String>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -188,7 +193,49 @@ pub async fn get_publication_by_id(
.map_err(|_| Error::ApiError(url)) .map_err(|_| Error::ApiError(url))
} }
// TODO: Get all publications, not just the first 25 // TODO: Rename this
fn get_cited_works(
id: String,
email: Arc<String>,
client: reqwest::Client,
) -> impl futures::Stream<Item = Result<Vec<Publication>, Error>> {
let initial_state = Some("*".to_string());
stream::unfold(initial_state, move |cursor| {
let email = email.clone();
let client = client.clone();
let id = id.clone();
async move {
let current_cursor = cursor?;
let url = format!(
"https://api.openalex.org/works?filter=cited_by:{}&mailto={}&cursor={}",
id, email, current_cursor
);
let response = client
.get(&url)
.header("User-Agent", "Rust-OpenAlex-Client/1.0")
.send()
.await
.map_err(|_| Error::ApiError(url.clone()))
.ok()?
.json::<OpenAlexResponse>()
.await
.ok()?;
let next_cursor = if response.results.is_empty() {
None
} else {
response.meta.next_cursor
};
Some((Ok(response.results), next_cursor))
}
})
}
pub fn get_references_stream( pub fn get_references_stream(
publications: Vec<Publication>, publications: Vec<Publication>,
email: String, email: String,
@ -219,68 +266,88 @@ pub fn get_references_stream(
// Get references using the referenced_works field // Get references using the referenced_works field
let stream1 = stream::iter(referenced_work_urls).map({ let stream1 = stream::iter(referenced_work_urls)
let (email, client) = (email.clone(), client.clone()); .map({
move |url| {
let (email, client) = (email.clone(), client.clone()); let (email, client) = (email.clone(), client.clone());
let fut: BoxFuture<'static, Result<Vec<Publication>, Error>> = move |url| {
Box::pin(async move { let (email, client) = (email.clone(), client.clone());
get_publication_by_id(url.into(), email, Some(client)) let fut: BoxFuture<'static, Result<Vec<Publication>, Error>> =
.await Box::pin(async move {
.map(|val| vec![val]) get_publication_by_id(url.into(), email, Some(client))
}); .await
fut .map(|val| vec![val])
} });
}); fut
}
})
.buffer_unordered(10);
// Search for references using API calls // Search for references using API calls
let stream2 = stream::iter(publication_ids).map(move |id| { let stream2 = stream::iter(publication_ids)
let url = format!( .map(move |id| get_cited_works(id, email.clone(), client.clone()))
"https://api.openalex.org/works?filter=cites:{}&mailto={}", .flatten();
id, email
);
let client = client.clone();
let fut: BoxFuture<'static, Result<Vec<Publication>, Error>> =
Box::pin(async move {
client
.get(url.clone())
.header("User-Agent", "Rust-OpenAlex-Client/1.0")
.send()
.await
.map_err(|_| Error::ApiError(url.clone()))?
.json::<OpenAlexResponse>()
.await
.map(|response| response.results)
.map_err(|_| Error::ApiError(url.clone()))
});
fut
});
// Combine the two streams // Combine the two streams
stream::select(stream1, stream2)
stream::select(stream1, stream2).buffer_unordered(10)
} }
// // TODO: Get all papers, not just the first page // TODO: Rename this
// pub async fn get_citing_papers( fn get_citing_works(
// target_id: &str, id: String,
// email: &str, email: Arc<String>,
// ) -> Result<Vec<Publication>, Error> { client: reqwest::Client,
// let url = format!( ) -> impl futures::Stream<Item = Result<Vec<Publication>, Error>> {
// "https://api.openalex.org/works?filter=cites:{}&mailto={}", let initial_state = Some("*".to_string());
// target_id, email
// ); stream::unfold(initial_state, move |cursor| {
// let email = email.clone();
// let client = reqwest::Client::new(); let client = client.clone();
// let response = client let id = id.clone();
// .get(url)
// .header("User-Agent", "Rust-OpenAlex-Client/1.0") async move {
// .send() let current_cursor = cursor?;
// .await?
// .json::<OpenAlexResponse>() let url = format!(
// .await?; "https://api.openalex.org/works?filter=cites:{}&mailto={}&cursor={}",
// id, email, current_cursor
// Ok(response.results) );
// }
let response = client
.get(&url)
.header("User-Agent", "Rust-OpenAlex-Client/1.0")
.send()
.await
.map_err(|_| Error::ApiError(url.clone()))
.ok()?
.json::<OpenAlexResponse>()
.await
.ok()?;
let next_cursor = if response.results.is_empty() {
None
} else {
response.meta.next_cursor
};
Some((Ok(response.results), next_cursor))
}
})
}
pub fn get_citing_works_stream(
publications: Vec<Publication>,
email: String,
) -> impl futures::Stream<Item = Result<Vec<Publication>, Error>> {
let email = Arc::new(email);
let client = reqwest::Client::new();
let ids: Vec<String> = publications
.iter()
.map(|p| p.id.trim_start_matches("https://openalex.org/").to_string())
.collect();
stream::iter(ids)
.map(move |id| get_citing_works(id, email.clone(), client.clone()))
.flatten()
}