Skip to content

Add a jobserver proxy to ensure at least one token is always held #140145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 89 additions & 5 deletions compiler/rustc_data_structures/src/jobserver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::{LazyLock, OnceLock};
use std::sync::{Arc, LazyLock, OnceLock};

pub use jobserver_crate::{Acquired, Client, HelperThread};
use jobserver_crate::{FromEnv, FromEnvErrorKind};
use parking_lot::{Condvar, Mutex};

// We can only call `from_env_ext` once per process

Expand Down Expand Up @@ -71,10 +72,93 @@ pub fn client() -> Client {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
}

pub fn acquire_thread() {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok();
struct ProxyData {
/// The number of tokens assigned to threads.
/// If this is 0, a single token is still assigned to this process, but is unused.
used: u16,

/// The number of threads requesting a token
pending: u16,
}

/// This is a jobserver proxy used to ensure that we hold on to at least one token.
pub struct Proxy {
client: Client,
data: Mutex<ProxyData>,

/// Threads which are waiting on a token will wait on this.
wake_pending: Condvar,

helper: OnceLock<HelperThread>,
}

pub fn release_thread() {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok();
impl Proxy {
pub fn new() -> Arc<Self> {
let proxy = Arc::new(Proxy {
client: client(),
data: Mutex::new(ProxyData { used: 1, pending: 0 }),
wake_pending: Condvar::new(),
helper: OnceLock::new(),
});
let proxy_ = Arc::clone(&proxy);
let helper = proxy
.client
.clone()
.into_helper_thread(move |token| {
if let Ok(token) = token {
let mut data = proxy_.data.lock();
if data.pending > 0 {
// Give the token to a waiting thread
token.drop_without_releasing();
assert!(data.used > 0);
data.used += 1;
data.pending -= 1;
proxy_.wake_pending.notify_one();
} else {
// The token is no longer needed, drop it.
drop(data);
drop(token);
}
}
})
.expect("failed to create helper thread");
proxy.helper.set(helper).unwrap();
proxy
}

pub fn acquire_thread(&self) {
let mut data = self.data.lock();

if data.used == 0 {
// There was a free token around. This can
// happen when all threads release their token.
assert_eq!(data.pending, 0);
data.used += 1;
} else {
// Request a token from the helper thread. We can't directly use `acquire_raw`
// as we also need to be able to wait for the final token in the process which
// does not get a corresponding `release_raw` call.
self.helper.get().unwrap().request_token();
data.pending += 1;
self.wake_pending.wait(&mut data);
}
}

pub fn release_thread(&self) {
let mut data = self.data.lock();

if data.pending > 0 {
// Give the token to a waiting thread
data.pending -= 1;
self.wake_pending.notify_one();
} else {
data.used -= 1;

// Release the token unless it's the last one in the process
if data.used > 0 {
drop(data);
self.client.release_raw().ok();
}
}
}
}
8 changes: 4 additions & 4 deletions compiler/rustc_data_structures/src/marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ macro_rules! already_send {
// These structures are already `Send`.
already_send!(
[std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
[rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
[crate::owned_slice::OwnedSlice]
[rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
[crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
);

macro_rules! impl_dyn_send {
Expand Down Expand Up @@ -120,8 +120,8 @@ macro_rules! already_sync {
already_sync!(
[std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
[jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
[crate::owned_slice::OwnedSlice]
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
);

// Use portable AtomicU64 for targets without native 64-bit atomics
Expand Down
6 changes: 4 additions & 2 deletions compiler/rustc_interface/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use rustc_ast::{LitKind, MetaItemKind, token};
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_data_structures::jobserver;
use rustc_data_structures::jobserver::{self, Proxy};
use rustc_data_structures::stable_hasher::StableHasher;
use rustc_errors::registry::Registry;
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
Expand Down Expand Up @@ -41,6 +41,7 @@ pub struct Compiler {
pub codegen_backend: Box<dyn CodegenBackend>,
pub(crate) override_queries: Option<fn(&Session, &mut Providers)>,
pub(crate) current_gcx: CurrentGcx,
pub(crate) jobserver_proxy: Arc<Proxy>,
}

/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
Expand Down Expand Up @@ -415,7 +416,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
config.opts.unstable_opts.threads,
&config.extra_symbols,
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
|current_gcx| {
|current_gcx, jobserver_proxy| {
// The previous `early_dcx` can't be reused here because it doesn't
// impl `Send`. Creating a new one is fine.
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
Expand Down Expand Up @@ -511,6 +512,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
codegen_backend,
override_queries: config.override_queries,
current_gcx,
jobserver_proxy,
};

// There are two paths out of `f`.
Expand Down
15 changes: 13 additions & 2 deletions compiler/rustc_interface/src/passes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{env, fs, iter};

use rustc_ast as ast;
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::parallel;
use rustc_data_structures::steal::Steal;
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
Expand Down Expand Up @@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
dyn for<'tcx> FnOnce(
&'tcx Session,
CurrentGcx,
Arc<Proxy>,
&'tcx OnceLock<GlobalCtxt<'tcx>>,
&'tcx WorkerLocal<Arena<'tcx>>,
&'tcx WorkerLocal<rustc_hir::Arena<'tcx>>,
F,
) -> T,
> = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| {
> = Box::new(move |sess, current_gcx, jobserver_proxy, gcx_cell, arena, hir_arena, f| {
TyCtxt::create_global_ctxt(
gcx_cell,
sess,
Expand All @@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
),
providers.hooks,
current_gcx,
jobserver_proxy,
|tcx| {
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
assert_eq!(feed.key(), LOCAL_CRATE);
Expand All @@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
)
});

inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f)
inner(
&compiler.sess,
compiler.current_gcx.clone(),
Arc::clone(&compiler.jobserver_proxy),
&gcx_cell,
&arena,
&hir_arena,
f,
)
}

/// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses.
Expand Down
28 changes: 18 additions & 10 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use std::{env, iter, thread};

use rustc_ast as ast;
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::sync;
use rustc_metadata::{DylibError, load_symbol_from_dylib};
use rustc_middle::ty::CurrentGcx;
Expand Down Expand Up @@ -113,7 +114,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
})
}

fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
thread_stack_size: usize,
edition: Edition,
sm_inputs: SourceMapInputs,
Expand All @@ -139,7 +140,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
edition,
extra_symbols,
Some(sm_inputs),
|| f(CurrentGcx::new()),
|| f(CurrentGcx::new(), Proxy::new()),
)
})
.unwrap()
Expand All @@ -152,7 +153,10 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
})
}

pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
pub(crate) fn run_in_thread_pool_with_globals<
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
R: Send,
>(
thread_builder_diag: &EarlyDiagCtxt,
edition: Edition,
threads: usize,
Expand All @@ -162,8 +166,8 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
) -> R {
use std::process;

use rustc_data_structures::defer;
use rustc_data_structures::sync::FromDyn;
use rustc_data_structures::{defer, jobserver};
use rustc_middle::ty::tls;
use rustc_query_impl::QueryCtxt;
use rustc_query_system::query::{QueryContext, break_query_cycles};
Expand All @@ -178,22 +182,26 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
edition,
sm_inputs,
extra_symbols,
|current_gcx| {
|current_gcx, jobserver_proxy| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

f(current_gcx)
f(current_gcx, jobserver_proxy)
},
);
}

let current_gcx = FromDyn::from(CurrentGcx::new());
let current_gcx2 = current_gcx.clone();

let proxy = Proxy::new();

let proxy_ = Arc::clone(&proxy);
let proxy__ = Arc::clone(&proxy);
let builder = rayon_core::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.acquire_thread_handler(jobserver::acquire_thread)
.release_thread_handler(jobserver::release_thread)
.acquire_thread_handler(move || proxy_.acquire_thread())
.release_thread_handler(move || proxy__.release_thread())
.num_threads(threads)
.deadlock_handler(move || {
// On deadlock, creates a new thread and forwards information in thread
Expand Down Expand Up @@ -257,7 +265,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
},
// Run `f` on the first thread in the thread pool.
move |pool: &rayon_core::ThreadPool| {
pool.install(|| f(current_gcx.into_inner()))
pool.install(|| f(current_gcx.into_inner(), proxy))
},
)
.unwrap()
Expand Down
5 changes: 5 additions & 0 deletions compiler/rustc_middle/src/ty/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use rustc_data_structures::defer;
use rustc_data_structures::fingerprint::Fingerprint;
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::intern::Interned;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::profiling::SelfProfilerRef;
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
Expand Down Expand Up @@ -1420,6 +1421,8 @@ pub struct GlobalCtxt<'tcx> {
pub(crate) alloc_map: interpret::AllocMap<'tcx>,

current_gcx: CurrentGcx,

pub jobserver_proxy: Arc<Proxy>,
}

impl<'tcx> GlobalCtxt<'tcx> {
Expand Down Expand Up @@ -1624,6 +1627,7 @@ impl<'tcx> TyCtxt<'tcx> {
query_system: QuerySystem<'tcx>,
hooks: crate::hooks::Providers,
current_gcx: CurrentGcx,
jobserver_proxy: Arc<Proxy>,
f: impl FnOnce(TyCtxt<'tcx>) -> T,
) -> T {
let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
Expand Down Expand Up @@ -1658,6 +1662,7 @@ impl<'tcx> TyCtxt<'tcx> {
data_layout,
alloc_map: interpret::AllocMap::new(),
current_gcx,
jobserver_proxy,
});

// This is a separate function to work around a crash with parallel rustc (#135870)
Expand Down
6 changes: 6 additions & 0 deletions compiler/rustc_query_impl/src/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::num::NonZero;

use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
use rustc_data_structures::sync::{DynSend, DynSync};
use rustc_data_structures::unord::UnordMap;
Expand Down Expand Up @@ -69,6 +70,11 @@ impl<'tcx> HasDepContext for QueryCtxt<'tcx> {
impl<'tcx> QueryContext for QueryCtxt<'tcx> {
type QueryInfo = QueryStackDeferred<'tcx>;

#[inline]
fn jobserver_proxy(&self) -> &Proxy {
&*self.jobserver_proxy
}

#[inline]
fn next_job_id(self) -> QueryJobId {
QueryJobId(
Expand Down
Loading
Loading