Prevent panics from tearing down worker threads

This commit is contained in:
Lukas Wirth 2025-02-04 14:43:01 +01:00
parent 93b72cefca
commit 1ce2d7eaca
2 changed files with 17 additions and 10 deletions

View file

@ -1,6 +1,8 @@
//! A thin wrapper around [`stdx::thread::Pool`] which threads a sender through spawned jobs.
//! It is used in [`crate::global_state::GlobalState`] throughout the main loop.
use std::panic::UnwindSafe;
use crossbeam_channel::Sender;
use stdx::thread::{Pool, ThreadIntent};
@ -18,7 +20,7 @@ impl<T> TaskPool<T> {
pub(crate) fn spawn<F>(&mut self, intent: ThreadIntent, task: F)
where
F: FnOnce() -> T + Send + 'static,
F: FnOnce() -> T + Send + UnwindSafe + 'static,
T: Send + 'static,
{
self.pool.spawn(intent, {
@ -29,7 +31,7 @@ impl<T> TaskPool<T> {
pub(crate) fn spawn_with_sender<F>(&mut self, intent: ThreadIntent, task: F)
where
F: FnOnce(Sender<T>) + Send + 'static,
F: FnOnce(Sender<T>) + Send + UnwindSafe + 'static,
T: Send + 'static,
{
self.pool.spawn(intent, {

View file

@ -7,9 +7,12 @@
//! The thread pool is implemented entirely using
//! the threading utilities in [`crate::thread`].
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
use std::{
panic::{self, UnwindSafe},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use crossbeam_channel::{Receiver, Sender};
@ -25,13 +28,13 @@ pub struct Pool {
// so that the channel is actually closed
// before we join the worker threads!
job_sender: Sender<Job>,
_handles: Vec<JoinHandle>,
_handles: Box<[JoinHandle]>,
extant_tasks: Arc<AtomicUsize>,
}
struct Job {
requested_intent: ThreadIntent,
f: Box<dyn FnOnce() + Send + 'static>,
f: Box<dyn FnOnce() + Send + UnwindSafe + 'static>,
}
impl Pool {
@ -47,6 +50,7 @@ impl Pool {
let handle = Builder::new(INITIAL_INTENT)
.stack_size(STACK_SIZE)
.name("Worker".into())
.allow_leak(true)
.spawn({
let extant_tasks = Arc::clone(&extant_tasks);
let job_receiver: Receiver<Job> = job_receiver.clone();
@ -58,7 +62,8 @@ impl Pool {
current_intent = job.requested_intent;
}
extant_tasks.fetch_add(1, Ordering::SeqCst);
(job.f)();
// discard the panic, we should've logged the backtrace already
_ = panic::catch_unwind(job.f);
extant_tasks.fetch_sub(1, Ordering::SeqCst);
}
}
@ -68,12 +73,12 @@ impl Pool {
handles.push(handle);
}
Pool { _handles: handles, extant_tasks, job_sender }
Pool { _handles: handles.into_boxed_slice(), extant_tasks, job_sender }
}
pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
where
F: FnOnce() + Send + 'static,
F: FnOnce() + Send + UnwindSafe + 'static,
{
let f = Box::new(move || {
if cfg!(debug_assertions) {