Skip to content

Commit fc55b0d

Browse files
committed
add pool support
1 parent d0f7579 commit fc55b0d

File tree

3 files changed

+271
-1
lines changed

3 files changed

+271
-1
lines changed

README.md

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,51 @@ unsafe extern "C" {
236236
```
237237

238238
Note how `main` just calls `tarnish::main()` with the parent logic function.
239-
This handles the check for parent-vs-task context.
239+
This handles the check for parent-vs-task context.
240+
241+
## Process Pools
242+
243+
For concurrent task execution we offer a `ProcessPool` to manage multiple worker processes.
244+
This is useful in situations where your tasks are CPU-bound, which is a common problem with `*-sys` crates.
245+
246+
```rust,no_run
247+
use std::num::NonZeroUsize;
248+
use tarnish::{Task, ProcessPool};
249+
250+
#[derive(Default)]
251+
struct HeavyComputation;
252+
253+
impl Task for HeavyComputation {
254+
type Input = Vec<u8>;
255+
type Output = u64;
256+
type Error = String;
257+
258+
fn run(&mut self, input: Vec<u8>) -> Result<u64, String> {
259+
// Do the expensive computation
260+
Ok(input.iter().map(|&x| x as u64).sum())
261+
}
262+
}
263+
264+
fn main() {
265+
tarnish::main::<HeavyComputation>(|| {
266+
let size = NonZeroUsize::new(4).unwrap();
267+
let mut pool = ProcessPool::<HeavyComputation>::new(size)
268+
.expect("Failed to create pool");
269+
270+
// Process tasks across 4 workers
271+
for i in 0..100 {
272+
let result = pool.call(vec![i; 1000]);
273+
println!("Result: {:?}", result);
274+
}
275+
});
276+
}
277+
```
278+
279+
Each pool provides a set of guarantees:
280+
- Uses round-robin scheduling to distribute work
281+
- Automatically restarts crashed workers, just like `Process` does
282+
- Each worker maintains its own isolated process memory
283+
- Workers persist between calls for efficiency
240284

241285
## Shutdown
242286

examples/pool.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#![allow(
2+
clippy::print_stderr,
3+
clippy::use_debug,
4+
clippy::missing_docs_in_private_items,
5+
clippy::unwrap_used
6+
)]
7+
8+
use std::num::NonZeroUsize;
9+
use std::thread;
10+
use std::time::Duration;
11+
use tarnish::{ProcessPool, Task};
12+
13+
#[derive(Default)]
14+
struct HeavyComputation;
15+
16+
impl Task for HeavyComputation {
17+
type Input = usize;
18+
type Output = u64;
19+
type Error = String;
20+
21+
fn run(&mut self, input: usize) -> Result<u64, String> {
22+
let pid = std::process::id();
23+
eprintln!("[Worker PID {pid}] Starting computation for input {input}");
24+
25+
// Simulate heavy computation with sleep
26+
thread::sleep(Duration::from_millis(500));
27+
28+
let result: u64 = (0..input).map(|x| x as u64).sum();
29+
eprintln!("[Worker PID {pid}] Completed: {result}");
30+
Ok(result)
31+
}
32+
}
33+
34+
fn main() {
35+
tarnish::main::<HeavyComputation>(|| {
36+
eprintln!("Creating pool with 4 workers...");
37+
38+
let size = NonZeroUsize::new(4).unwrap();
39+
let mut pool = match ProcessPool::<HeavyComputation>::new(size) {
40+
Ok(p) => p,
41+
Err(e) => {
42+
eprintln!("Failed to create pool: {e}");
43+
return;
44+
}
45+
};
46+
47+
let pool_size = pool.size();
48+
eprintln!("Pool size: {pool_size}\n");
49+
50+
// Process 12 tasks across 4 workers
51+
// Each worker should get ~3 tasks due to round-robin
52+
eprintln!("Processing 12 tasks across {pool_size} workers:\n");
53+
54+
for i in 1_usize..=12 {
55+
eprintln!("[Parent] Submitting task {i}");
56+
57+
// Use saturating_mul to avoid arithmetic overflow
58+
let input = i.saturating_mul(100);
59+
60+
match pool.call(input) {
61+
Ok(result) => eprintln!("[Parent] Task {i} result: {result}\n"),
62+
Err(e) => eprintln!("[Parent] Task {i} failed: {e}\n"),
63+
}
64+
}
65+
66+
eprintln!("All tasks completed!");
67+
});
68+
}

src/lib.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ use std::any;
6666
use std::env;
6767
use std::fmt;
6868
use std::io::{self, Read, Write};
69+
use std::num::NonZeroUsize;
6970
use std::process::{Child, Command, Stdio};
7071
use std::time::{Duration, Instant};
7172

@@ -484,6 +485,163 @@ impl<T: Task> Drop for Process<T> {
484485
}
485486
}
486487

488+
/// A pool of worker processes for concurrent task execution.
489+
///
490+
/// `ProcessPool` manages multiple worker processes, distributing tasks across them
491+
/// using round-robin scheduling. Each worker runs in its own isolated process with
492+
/// automatic crash recovery.
493+
///
494+
/// # Example
495+
///
496+
/// ```no_run
497+
/// use std::num::NonZeroUsize;
498+
/// use tarnish::{Task, ProcessPool};
499+
///
500+
/// #[derive(Default)]
501+
/// struct HeavyComputation;
502+
///
503+
/// impl Task for HeavyComputation {
504+
/// type Input = Vec<u8>;
505+
/// type Output = u64;
506+
/// type Error = String;
507+
///
508+
/// fn run(&mut self, input: Vec<u8>) -> Result<u64, String> {
509+
/// // Expensive computation here
510+
/// Ok(input.iter().map(|&x| x as u64).sum())
511+
/// }
512+
/// }
513+
///
514+
/// tarnish::main::<HeavyComputation>(|| {
515+
/// let size = NonZeroUsize::new(4).unwrap();
516+
/// let mut pool = ProcessPool::<HeavyComputation>::new(size)
517+
/// .expect("Failed to create pool");
518+
///
519+
/// // Process 100 items across 4 workers
520+
/// for i in 0..100 {
521+
/// let result = pool.call(vec![i; 1000]);
522+
/// println!("Result {}: {:?}", i, result);
523+
/// }
524+
/// });
525+
/// ```
526+
pub struct ProcessPool<T: Task> {
527+
workers: Vec<Process<T>>,
528+
next_worker: std::sync::atomic::AtomicUsize,
529+
}
530+
531+
impl<T: Task> ProcessPool<T> {
532+
/// Create a new process pool with the specified number of workers.
533+
///
534+
/// Each worker is a separate process that will be spawned immediately.
535+
/// If any worker fails to spawn, an error is returned and no pool is created.
536+
///
537+
/// # Arguments
538+
///
539+
/// * `size` - The number of worker processes to spawn (must be non-zero).
540+
///
541+
/// # Errors
542+
///
543+
/// Returns an error if any worker process fails to spawn.
544+
///
545+
/// # Example
546+
///
547+
/// ```no_run
548+
/// use std::num::NonZeroUsize;
549+
/// use tarnish::ProcessPool;
550+
/// # use tarnish::Task;
551+
/// # #[derive(Default)]
552+
/// # struct MyTask;
553+
/// # impl Task for MyTask {
554+
/// # type Input = String;
555+
/// # type Output = String;
556+
/// # type Error = String;
557+
/// # fn run(&mut self, input: String) -> Result<String, String> { Ok(input) }
558+
/// # }
559+
///
560+
/// let size = NonZeroUsize::new(4).unwrap();
561+
/// let pool = ProcessPool::<MyTask>::new(size)?;
562+
/// # Ok::<(), tarnish::ProcessError>(())
563+
/// ```
564+
pub fn new(size: NonZeroUsize) -> Result<Self> {
565+
let workers = (0..size.get())
566+
.map(|_| Process::<T>::spawn())
567+
.collect::<Result<Vec<_>>>()?;
568+
569+
Ok(Self {
570+
workers,
571+
next_worker: std::sync::atomic::AtomicUsize::new(0),
572+
})
573+
}
574+
575+
/// Execute a task on the next available worker.
576+
///
577+
/// This method uses round-robin scheduling to distribute work across workers.
578+
/// The call blocks until the worker returns a result. If the worker crashes,
579+
/// it will be automatically restarted.
580+
///
581+
/// # Errors
582+
///
583+
/// Returns an error if:
584+
/// - The worker process crashes and cannot be restarted
585+
/// - Communication with the worker fails
586+
/// - The worker returns a task error
587+
///
588+
/// # Example
589+
///
590+
/// ```no_run
591+
/// # use std::num::NonZeroUsize;
592+
/// # use tarnish::{Task, ProcessPool};
593+
/// # #[derive(Default)]
594+
/// # struct MyTask;
595+
/// # impl Task for MyTask {
596+
/// # type Input = String;
597+
/// # type Output = String;
598+
/// # type Error = String;
599+
/// # fn run(&mut self, input: String) -> Result<String, String> { Ok(input) }
600+
/// # }
601+
/// let size = NonZeroUsize::new(4).unwrap();
602+
/// let mut pool = ProcessPool::<MyTask>::new(size)?;
603+
/// let result = pool.call("hello".to_string())?;
604+
/// # Ok::<(), tarnish::ProcessError>(())
605+
/// ```
606+
#[allow(clippy::arithmetic_side_effects, clippy::indexing_slicing)]
607+
pub fn call(&mut self, input: T::Input) -> Result<T::Output> {
608+
let idx = self
609+
.next_worker
610+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
611+
% self.workers.len();
612+
613+
self.workers
614+
.get_mut(idx)
615+
.ok_or_else(|| ProcessError::ProtocolError(format!("Invalid worker index: {idx}")))?
616+
.call(input)
617+
}
618+
619+
/// Returns the number of workers in the pool.
620+
///
621+
/// # Example
622+
///
623+
/// ```no_run
624+
/// # use std::num::NonZeroUsize;
625+
/// # use tarnish::{Task, ProcessPool};
626+
/// # #[derive(Default)]
627+
/// # struct MyTask;
628+
/// # impl Task for MyTask {
629+
/// # type Input = String;
630+
/// # type Output = String;
631+
/// # type Error = String;
632+
/// # fn run(&mut self, input: String) -> Result<String, String> { Ok(input) }
633+
/// # }
634+
/// let size = NonZeroUsize::new(4).unwrap();
635+
/// let pool = ProcessPool::<MyTask>::new(size)?;
636+
/// assert_eq!(pool.size(), 4);
637+
/// # Ok::<(), tarnish::ProcessError>(())
638+
/// ```
639+
#[must_use]
640+
pub const fn size(&self) -> usize {
641+
self.workers.len()
642+
}
643+
}
644+
487645
/// Handle worker process mode in your main function
488646
///
489647
/// Call this at the start of your `main()` function. If it returns `Some(exit_code)`,

0 commit comments

Comments
 (0)