introduce & use JobPool

Signed-off-by: kernelkind <kernelkind@gmail.com>
This commit is contained in:
kernelkind
2025-04-10 16:37:28 -04:00
parent 7bb871d377
commit 5cdf3698d2
4 changed files with 108 additions and 1 deletions

View File

@@ -1,6 +1,7 @@
use crate::persist::{AppSizeHandler, ZoomHandler};
use crate::wallet::GlobalWallet;
use crate::zaps::Zaps;
use crate::JobPool;
use crate::{
frame_history::FrameHistory, AccountStorage, Accounts, AppContext, Args, DataPath,
DataPathType, Directory, Images, NoteAction, NoteCache, RelayDebugView, ThemeHandler,
@@ -43,6 +44,7 @@ pub struct Notedeck {
clipboard: Clipboard,
zaps: Zaps,
frame_history: FrameHistory,
job_pool: JobPool,
}
/// Our chrome, which is basically nothing
@@ -219,6 +221,7 @@ impl Notedeck {
let global_wallet = GlobalWallet::new(&path);
let zaps = Zaps::default();
let job_pool = JobPool::default();
Self {
ndb,
@@ -238,6 +241,7 @@ impl Notedeck {
frame_history: FrameHistory::default(),
clipboard: Clipboard::new(None),
zaps,
job_pool,
}
}
@@ -261,6 +265,7 @@ impl Notedeck {
clipboard: &mut self.clipboard,
zaps: &mut self.zaps,
frame_history: &mut self.frame_history,
job_pool: &mut self.job_pool,
}
}

View File

@@ -1,6 +1,6 @@
use crate::{
frame_history::FrameHistory, wallet::GlobalWallet, zaps::Zaps, Accounts, Args, DataPath,
Images, NoteCache, ThemeHandler, UnknownIds,
Images, JobPool, NoteCache, ThemeHandler, UnknownIds,
};
use egui_winit::clipboard::Clipboard;
@@ -23,4 +23,5 @@ pub struct AppContext<'a> {
pub clipboard: &'a mut Clipboard,
pub zaps: &'a mut Zaps,
pub frame_history: &'a mut FrameHistory,
pub job_pool: &'a mut JobPool,
}

View File

@@ -0,0 +1,99 @@
use std::{
future::Future,
sync::{
mpsc::{self, Sender},
Arc, Mutex,
},
};
use tokio::sync::oneshot;
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct JobPool {
tx: Sender<Job>,
}
impl Default for JobPool {
fn default() -> Self {
JobPool::new(2)
}
}
impl JobPool {
pub fn new(num_threads: usize) -> Self {
let (tx, rx) = mpsc::channel::<Job>();
let arc_rx = Arc::new(Mutex::new(rx));
for _ in 0..num_threads {
let arc_rx_clone = arc_rx.clone();
std::thread::spawn(move || loop {
let job = {
let Ok(unlocked) = arc_rx_clone.lock() else {
continue;
};
let Ok(job) = unlocked.recv() else {
continue;
};
job
};
job();
});
}
Self { tx }
}
pub fn schedule<F, T>(&self, job: F) -> impl Future<Output = T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx_result, rx_result) = oneshot::channel::<T>();
let job = Box::new(move || {
let output = job();
let _ = tx_result.send(output);
});
self.tx
.send(job)
.expect("receiver should not be deallocated");
async move {
rx_result.await.unwrap_or_else(|_| {
panic!("Worker thread or channel dropped before returning the result.")
})
}
}
}
#[cfg(test)]
mod tests {
use crate::job_pool::JobPool;
fn test_fn(a: u32, b: u32) -> u32 {
a + b
}
#[tokio::test]
async fn test() {
let pool = JobPool::default();
// Now each job can return different T
let future_str = pool.schedule(|| -> String { "hello from string job".into() });
let a = 5;
let b = 6;
let future_int = pool.schedule(move || -> u32 { test_fn(a, b) });
println!("(Meanwhile we can do more async work) ...");
let s = future_str.await;
let i = future_int.await;
println!("Got string: {:?}", s);
println!("Got integer: {}", i);
}
}

View File

@@ -9,6 +9,7 @@ pub mod filter;
pub mod fonts;
mod frame_history;
mod imgcache;
mod job_pool;
mod muted;
pub mod name;
pub mod note;
@@ -43,6 +44,7 @@ pub use imgcache::{
Animation, GifState, GifStateMap, ImageFrame, Images, MediaCache, MediaCacheType,
MediaCacheValue, TextureFrame, TexturedImage,
};
pub use job_pool::JobPool;
pub use muted::{MuteFun, Muted};
pub use name::NostrName;
pub use note::{