Add a jobserver proxy to ensure at least one token is always held

This commit is contained in:
John Kåre Alsaker 2025-04-22 07:02:27 +02:00
parent 25cdf1f674
commit cff9efde74
10 changed files with 149 additions and 29 deletions

View file

@ -1,7 +1,8 @@
use std::sync::{LazyLock, OnceLock};
use std::sync::{Arc, LazyLock, OnceLock};
pub use jobserver_crate::{Acquired, Client, HelperThread};
use jobserver_crate::{FromEnv, FromEnvErrorKind};
use parking_lot::{Condvar, Mutex};
// We can only call `from_env_ext` once per process
@ -71,10 +72,93 @@ pub fn client() -> Client {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
}
pub fn acquire_thread() {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok();
struct ProxyData {
/// The number of tokens assigned to threads.
/// If this is 0, a single token is still assigned to this process, but is unused.
used: u16,
/// The number of threads requesting a token
pending: u16,
}
pub fn release_thread() {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok();
/// This is a jobserver proxy used to ensure that we hold on to at least one token.
pub struct Proxy {
client: Client,
data: Mutex<ProxyData>,
/// Threads which are waiting on a token will wait on this.
wake_pending: Condvar,
helper: OnceLock<HelperThread>,
}
impl Proxy {
pub fn new() -> Arc<Self> {
let proxy = Arc::new(Proxy {
client: client(),
data: Mutex::new(ProxyData { used: 1, pending: 0 }),
wake_pending: Condvar::new(),
helper: OnceLock::new(),
});
let proxy_ = Arc::clone(&proxy);
let helper = proxy
.client
.clone()
.into_helper_thread(move |token| {
if let Ok(token) = token {
let mut data = proxy_.data.lock();
if data.pending > 0 {
// Give the token to a waiting thread
token.drop_without_releasing();
assert!(data.used > 0);
data.used += 1;
data.pending -= 1;
proxy_.wake_pending.notify_one();
} else {
// The token is no longer needed, drop it.
drop(data);
drop(token);
}
}
})
.expect("failed to create helper thread");
proxy.helper.set(helper).unwrap();
proxy
}
pub fn acquire_thread(&self) {
let mut data = self.data.lock();
if data.used == 0 {
// There was a free token around. This can
// happen when all threads release their token.
assert_eq!(data.pending, 0);
data.used += 1;
} else {
// Request a token from the helper thread. We can't directly use `acquire_raw`
// as we also need to be able to wait for the final token in the process which
// does not get a corresponding `release_raw` call.
self.helper.get().unwrap().request_token();
data.pending += 1;
self.wake_pending.wait(&mut data);
}
}
pub fn release_thread(&self) {
let mut data = self.data.lock();
if data.pending > 0 {
// Give the token to a waiting thread
data.pending -= 1;
self.wake_pending.notify_one();
} else {
data.used -= 1;
// Release the token unless it's the last one in the process
if data.used > 0 {
drop(data);
self.client.release_raw().ok();
}
}
}
}

View file

@ -59,8 +59,8 @@ macro_rules! already_send {
// These structures are already `Send`.
already_send!(
[std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
[rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
[crate::owned_slice::OwnedSlice]
[rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
[crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
);
macro_rules! impl_dyn_send {
@ -134,8 +134,8 @@ macro_rules! already_sync {
already_sync!(
[std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
[jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
[crate::owned_slice::OwnedSlice]
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
);
// Use portable AtomicU64 for targets without native 64-bit atomics

View file

@ -5,7 +5,7 @@ use std::sync::Arc;
use rustc_ast::{LitKind, MetaItemKind, token};
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_data_structures::jobserver;
use rustc_data_structures::jobserver::{self, Proxy};
use rustc_data_structures::stable_hasher::StableHasher;
use rustc_errors::registry::Registry;
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
@ -41,6 +41,7 @@ pub struct Compiler {
pub codegen_backend: Box<dyn CodegenBackend>,
pub(crate) override_queries: Option<fn(&Session, &mut Providers)>,
pub(crate) current_gcx: CurrentGcx,
pub(crate) jobserver_proxy: Arc<Proxy>,
}
/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
@ -415,7 +416,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
config.opts.unstable_opts.threads,
&config.extra_symbols,
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
|current_gcx| {
|current_gcx, jobserver_proxy| {
// The previous `early_dcx` can't be reused here because it doesn't
// impl `Send`. Creating a new one is fine.
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
@ -511,6 +512,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
codegen_backend,
override_queries: config.override_queries,
current_gcx,
jobserver_proxy,
};
// There are two paths out of `f`.

View file

@ -7,6 +7,7 @@ use std::{env, fs, iter};
use rustc_ast as ast;
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::parallel;
use rustc_data_structures::steal::Steal;
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
dyn for<'tcx> FnOnce(
&'tcx Session,
CurrentGcx,
Arc<Proxy>,
&'tcx OnceLock<GlobalCtxt<'tcx>>,
&'tcx WorkerLocal<Arena<'tcx>>,
&'tcx WorkerLocal<rustc_hir::Arena<'tcx>>,
F,
) -> T,
> = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| {
> = Box::new(move |sess, current_gcx, jobserver_proxy, gcx_cell, arena, hir_arena, f| {
TyCtxt::create_global_ctxt(
gcx_cell,
sess,
@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
),
providers.hooks,
current_gcx,
jobserver_proxy,
|tcx| {
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
assert_eq!(feed.key(), LOCAL_CRATE);
@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
)
});
inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f)
inner(
&compiler.sess,
compiler.current_gcx.clone(),
Arc::clone(&compiler.jobserver_proxy),
&gcx_cell,
&arena,
&hir_arena,
f,
)
}
/// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses.

View file

@ -1,11 +1,12 @@
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use std::{env, iter, thread};
use rustc_ast as ast;
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::sync;
use rustc_metadata::{DylibError, load_symbol_from_dylib};
use rustc_middle::ty::CurrentGcx;
@ -113,7 +114,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
})
}
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
thread_stack_size: usize,
edition: Edition,
sm_inputs: SourceMapInputs,
@ -139,7 +140,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
edition,
extra_symbols,
Some(sm_inputs),
|| f(CurrentGcx::new()),
|| f(CurrentGcx::new(), Proxy::new()),
)
})
.unwrap()
@ -152,7 +153,10 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
})
}
pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
pub(crate) fn run_in_thread_pool_with_globals<
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
R: Send,
>(
thread_builder_diag: &EarlyDiagCtxt,
edition: Edition,
threads: usize,
@ -162,8 +166,8 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
) -> R {
use std::process;
use rustc_data_structures::defer;
use rustc_data_structures::sync::FromDyn;
use rustc_data_structures::{defer, jobserver};
use rustc_middle::ty::tls;
use rustc_query_impl::QueryCtxt;
use rustc_query_system::query::{QueryContext, break_query_cycles};
@ -178,11 +182,11 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
edition,
sm_inputs,
extra_symbols,
|current_gcx| {
|current_gcx, jobserver_proxy| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();
f(current_gcx)
f(current_gcx, jobserver_proxy)
},
);
}
@ -190,10 +194,14 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
let current_gcx = FromDyn::from(CurrentGcx::new());
let current_gcx2 = current_gcx.clone();
let proxy = Proxy::new();
let proxy_ = Arc::clone(&proxy);
let proxy__ = Arc::clone(&proxy);
let builder = rayon_core::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.acquire_thread_handler(jobserver::acquire_thread)
.release_thread_handler(jobserver::release_thread)
.acquire_thread_handler(move || proxy_.acquire_thread())
.release_thread_handler(move || proxy__.release_thread())
.num_threads(threads)
.deadlock_handler(move || {
// On deadlock, creates a new thread and forwards information in thread
@ -257,7 +265,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
},
// Run `f` on the first thread in the thread pool.
move |pool: &rayon_core::ThreadPool| {
pool.install(|| f(current_gcx.into_inner()))
pool.install(|| f(current_gcx.into_inner(), proxy))
},
)
.unwrap()

View file

@ -21,6 +21,7 @@ use rustc_data_structures::defer;
use rustc_data_structures::fingerprint::Fingerprint;
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::intern::Interned;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::profiling::SelfProfilerRef;
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
@ -1438,6 +1439,8 @@ pub struct GlobalCtxt<'tcx> {
pub(crate) alloc_map: interpret::AllocMap<'tcx>,
current_gcx: CurrentGcx,
pub jobserver_proxy: Arc<Proxy>,
}
impl<'tcx> GlobalCtxt<'tcx> {
@ -1642,6 +1645,7 @@ impl<'tcx> TyCtxt<'tcx> {
query_system: QuerySystem<'tcx>,
hooks: crate::hooks::Providers,
current_gcx: CurrentGcx,
jobserver_proxy: Arc<Proxy>,
f: impl FnOnce(TyCtxt<'tcx>) -> T,
) -> T {
let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
@ -1676,6 +1680,7 @@ impl<'tcx> TyCtxt<'tcx> {
data_layout,
alloc_map: interpret::AllocMap::new(),
current_gcx,
jobserver_proxy,
});
// This is a separate function to work around a crash with parallel rustc (#135870)

View file

@ -4,6 +4,7 @@
use std::num::NonZero;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
use rustc_data_structures::sync::{DynSend, DynSync};
use rustc_data_structures::unord::UnordMap;
@ -69,6 +70,11 @@ impl<'tcx> HasDepContext for QueryCtxt<'tcx> {
impl<'tcx> QueryContext for QueryCtxt<'tcx> {
type QueryInfo = QueryStackDeferred<'tcx>;
#[inline]
fn jobserver_proxy(&self) -> &Proxy {
&*self.jobserver_proxy
}
#[inline]
fn next_job_id(self) -> QueryJobId {
QueryJobId(

View file

@ -7,7 +7,6 @@ use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_data_structures::jobserver;
use rustc_errors::{Diag, DiagCtxtHandle};
use rustc_hir::def::DefKind;
use rustc_session::Session;
@ -207,12 +206,13 @@ impl<I> QueryLatch<I> {
/// Awaits for the query job to complete.
pub(super) fn wait_on(
&self,
qcx: impl QueryContext,
query: Option<QueryJobId>,
span: Span,
) -> Result<(), CycleError<I>> {
let waiter =
Arc::new(QueryWaiter { query, span, cycle: Mutex::new(None), condvar: Condvar::new() });
self.wait_on_inner(&waiter);
self.wait_on_inner(qcx, &waiter);
// FIXME: Get rid of this lock. We have ownership of the QueryWaiter
// although another thread may still have a Arc reference so we cannot
// use Arc::get_mut
@ -224,7 +224,7 @@ impl<I> QueryLatch<I> {
}
/// Awaits the caller on this latch by blocking the current thread.
fn wait_on_inner(&self, waiter: &Arc<QueryWaiter<I>>) {
fn wait_on_inner(&self, qcx: impl QueryContext, waiter: &Arc<QueryWaiter<I>>) {
let mut info = self.info.lock();
if !info.complete {
// We push the waiter on to the `waiters` list. It can be accessed inside
@ -237,11 +237,12 @@ impl<I> QueryLatch<I> {
// we have to be in the `wait` call. This is ensured by the deadlock handler
// getting the self.info lock.
rayon_core::mark_blocked();
jobserver::release_thread();
let proxy = qcx.jobserver_proxy();
proxy.release_thread();
waiter.condvar.wait(&mut info);
// Release the lock before we potentially block in `acquire_thread`
drop(info);
jobserver::acquire_thread();
proxy.acquire_thread();
}
}

View file

@ -16,6 +16,7 @@ mod caches;
pub use self::caches::{DefIdCache, DefaultCache, QueryCache, SingleCache, VecCache};
mod config;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::sync::{DynSend, DynSync};
use rustc_errors::DiagInner;
use rustc_hashes::Hash64;
@ -151,6 +152,8 @@ pub enum QuerySideEffect {
pub trait QueryContext: HasDepContext {
type QueryInfo: Clone;
fn jobserver_proxy(&self) -> &Proxy;
fn next_job_id(self) -> QueryJobId;
/// Get the query information from the TLS context.

View file

@ -297,7 +297,7 @@ where
// With parallel queries we might just have to wait on some other
// thread.
let result = latch.wait_on(current, span);
let result = latch.wait_on(qcx, current, span);
match result {
Ok(()) => {