Skip to content

Commit 2dfc7cd

Browse files
committed
Use jobserver crate's helper thread
1 parent 141055f commit 2dfc7cd

File tree

10 files changed

+114
-82
lines changed

10 files changed

+114
-82
lines changed

compiler/rustc_data_structures/src/jobserver.rs

+54-58
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::{LazyLock, OnceLock};
1+
use std::sync::{Arc, LazyLock, OnceLock};
22

33
pub use jobserver_crate::{Acquired, Client, HelperThread};
44
use jobserver_crate::{FromEnv, FromEnvErrorKind};
@@ -53,6 +53,8 @@ fn default_client() -> Client {
5353
client
5454
}
5555

56+
static GLOBAL_CLIENT_CHECKED: OnceLock<Client> = OnceLock::new();
57+
5658
pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
5759
let client_checked = match &*GLOBAL_CLIENT {
5860
Ok(client) => client.clone(),
@@ -61,35 +63,15 @@ pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
6163
default_client()
6264
}
6365
};
64-
let proxy = Proxy {
65-
client: client_checked,
66-
data: Mutex::new(ProxyData { total: 1, used: 1, needed: 0 }),
67-
wake_needer: Condvar::new(),
68-
wake_helper: Condvar::new(),
69-
};
70-
GLOBAL_PROXY.set(proxy).ok();
71-
72-
std::thread::spawn(|| {
73-
GLOBAL_PROXY.get().unwrap().helper();
74-
});
66+
GLOBAL_CLIENT_CHECKED.set(client_checked).ok();
7567
}
7668

7769
const ACCESS_ERROR: &str = "jobserver check should have been called earlier";
7870

7971
pub fn client() -> Client {
80-
GLOBAL_PROXY.get().expect(ACCESS_ERROR).client.clone()
81-
}
82-
83-
pub fn acquire_thread() {
84-
GLOBAL_PROXY.get().expect(ACCESS_ERROR).acquire_thread();
85-
}
86-
87-
pub fn release_thread() {
88-
GLOBAL_PROXY.get().expect(ACCESS_ERROR).release_thread();
72+
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
8973
}
9074

91-
static GLOBAL_PROXY: OnceLock<Proxy> = OnceLock::new();
92-
9375
struct ProxyData {
9476
/// The number of tokens assigned to this process.
9577
total: u16,
@@ -98,66 +80,80 @@ struct ProxyData {
9880
used: u16,
9981

10082
/// The number of threads requesting a token
101-
needed: u16,
83+
pending: u16,
10284
}
10385

10486
/// This is a jobserver proxy used to ensure that we hold on to at least one token.
105-
struct Proxy {
87+
pub struct Proxy {
10688
client: Client,
10789
data: Mutex<ProxyData>,
10890

10991
/// Threads which are waiting on a token will wait on this.
110-
wake_needer: Condvar,
92+
wake_pending: Condvar,
11193

112-
/// This is used to wake the helper thread when tokens are needed.
113-
wake_helper: Condvar,
94+
helper: OnceLock<HelperThread>,
11495
}
11596

11697
impl Proxy {
117-
fn helper(&self) {
118-
let mut data = self.data.lock();
119-
loop {
120-
while data.needed > 0 {
121-
drop(data);
122-
self.client.acquire_raw().ok();
123-
data = self.data.lock();
124-
if data.needed > 0 {
125-
data.total += 1;
126-
data.used += 1;
127-
data.needed -= 1;
128-
self.wake_needer.notify_one();
129-
} else {
130-
drop(data);
131-
self.client.release_raw().ok();
132-
data = self.data.lock();
98+
pub fn new() -> Arc<Self> {
99+
let proxy = Arc::new(Proxy {
100+
client: client(),
101+
data: Mutex::new(ProxyData { total: 1, used: 1, pending: 0 }),
102+
wake_pending: Condvar::new(),
103+
helper: OnceLock::new(),
104+
});
105+
let proxy_ = Arc::clone(&proxy);
106+
let helper = proxy
107+
.client
108+
.clone()
109+
.into_helper_thread(move |token| {
110+
if let Ok(token) = token {
111+
let mut data = proxy_.data.lock();
112+
if data.pending > 0 {
113+
// Give the token to a waiting thread
114+
token.drop_without_releasing();
115+
data.total += 1;
116+
data.used += 1;
117+
data.pending -= 1;
118+
proxy_.wake_pending.notify_one();
119+
} else {
120+
// The token is no longer needed, drop it.
121+
drop(data);
122+
drop(token);
123+
}
133124
}
134-
}
135-
self.wake_helper.wait(&mut data);
136-
}
125+
})
126+
.expect("failed to create helper thread");
127+
proxy.helper.set(helper).unwrap();
128+
proxy
137129
}
138130

139-
fn acquire_thread(&self) {
131+
pub fn acquire_thread(&self) {
140132
let mut data = self.data.lock();
141133

142134
if data.total > data.used {
143-
assert_eq!(data.needed, 0);
135+
// There was a free token around. This can
136+
// happen when all threads release their token.
137+
assert_eq!(data.total, 1);
138+
assert_eq!(data.pending, 0);
144139
data.used += 1;
145140
} else {
146-
if data.needed == 0 {
147-
self.wake_helper.notify_one();
148-
}
149-
data.needed += 1;
150-
self.wake_needer.wait(&mut data);
141+
// Request a token from the helper thread. We can't directly use `acquire_raw`
142+
// as we also need to be able to wait for the final token in the process which
143+
// does not get a corresponding `release_raw` call.
144+
self.helper.get().unwrap().request_token();
145+
data.pending += 1;
146+
self.wake_pending.wait(&mut data);
151147
}
152148
}
153149

154-
fn release_thread(&self) {
150+
pub fn release_thread(&self) {
155151
let mut data = self.data.lock();
156152

157-
if data.needed > 0 {
153+
if data.pending > 0 {
158154
// Give the token to a waiting thread
159-
data.needed -= 1;
160-
self.wake_needer.notify_one();
155+
data.pending -= 1;
156+
self.wake_pending.notify_one();
161157
} else {
162158
data.used -= 1;
163159

compiler/rustc_data_structures/src/marker.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ macro_rules! already_send {
5252
// These structures are already `Send`.
5353
already_send!(
5454
[std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
55-
[rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
56-
[crate::owned_slice::OwnedSlice]
55+
[rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
56+
[crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
5757
);
5858

5959
macro_rules! impl_dyn_send {
@@ -120,8 +120,8 @@ macro_rules! already_sync {
120120
already_sync!(
121121
[std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
122122
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
123-
[jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
124-
[crate::owned_slice::OwnedSlice]
123+
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
124+
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
125125
);
126126

127127
// Use portable AtomicU64 for targets without native 64-bit atomics

compiler/rustc_interface/src/interface.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55
use rustc_ast::{LitKind, MetaItemKind, token};
66
use rustc_codegen_ssa::traits::CodegenBackend;
77
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
8-
use rustc_data_structures::jobserver;
8+
use rustc_data_structures::jobserver::{self, Proxy};
99
use rustc_data_structures::stable_hasher::StableHasher;
1010
use rustc_errors::registry::Registry;
1111
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
@@ -41,6 +41,7 @@ pub struct Compiler {
4141
pub codegen_backend: Box<dyn CodegenBackend>,
4242
pub(crate) override_queries: Option<fn(&Session, &mut Providers)>,
4343
pub(crate) current_gcx: CurrentGcx,
44+
pub(crate) jobserver_proxy: Arc<Proxy>,
4445
}
4546

4647
/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
@@ -415,7 +416,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
415416
config.opts.unstable_opts.threads,
416417
&config.extra_symbols,
417418
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
418-
|current_gcx| {
419+
|current_gcx, jobserver_proxy| {
419420
// The previous `early_dcx` can't be reused here because it doesn't
420421
// impl `Send`. Creating a new one is fine.
421422
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
@@ -511,6 +512,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
511512
codegen_backend,
512513
override_queries: config.override_queries,
513514
current_gcx,
515+
jobserver_proxy,
514516
};
515517

516518
// There are two paths out of `f`.

compiler/rustc_interface/src/passes.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{env, fs, iter};
77

88
use rustc_ast as ast;
99
use rustc_codegen_ssa::traits::CodegenBackend;
10+
use rustc_data_structures::jobserver::Proxy;
1011
use rustc_data_structures::parallel;
1112
use rustc_data_structures::steal::Steal;
1213
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
@@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
841842
dyn for<'tcx> FnOnce(
842843
&'tcx Session,
843844
CurrentGcx,
845+
Arc<Proxy>,
844846
&'tcx OnceLock<GlobalCtxt<'tcx>>,
845847
&'tcx WorkerLocal<Arena<'tcx>>,
846848
&'tcx WorkerLocal<rustc_hir::Arena<'tcx>>,
847849
F,
848850
) -> T,
849-
> = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| {
851+
> = Box::new(move |sess, current_gcx, jobserver_proxy, gcx_cell, arena, hir_arena, f| {
850852
TyCtxt::create_global_ctxt(
851853
gcx_cell,
852854
sess,
@@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
865867
),
866868
providers.hooks,
867869
current_gcx,
870+
jobserver_proxy,
868871
|tcx| {
869872
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
870873
assert_eq!(feed.key(), LOCAL_CRATE);
@@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
887890
)
888891
});
889892

890-
inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f)
893+
inner(
894+
&compiler.sess,
895+
compiler.current_gcx.clone(),
896+
Arc::clone(&compiler.jobserver_proxy),
897+
&gcx_cell,
898+
&arena,
899+
&hir_arena,
900+
f,
901+
)
891902
}
892903

893904
/// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses.

compiler/rustc_interface/src/util.rs

+18-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
22
use std::path::{Path, PathBuf};
3-
use std::sync::OnceLock;
43
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::{Arc, OnceLock};
55
use std::{env, iter, thread};
66

77
use rustc_ast as ast;
88
use rustc_codegen_ssa::traits::CodegenBackend;
9+
use rustc_data_structures::jobserver::Proxy;
910
use rustc_data_structures::sync;
1011
use rustc_metadata::{DylibError, load_symbol_from_dylib};
1112
use rustc_middle::ty::CurrentGcx;
@@ -113,7 +114,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
113114
})
114115
}
115116

116-
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
117+
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
117118
thread_stack_size: usize,
118119
edition: Edition,
119120
sm_inputs: SourceMapInputs,
@@ -139,7 +140,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
139140
edition,
140141
extra_symbols,
141142
Some(sm_inputs),
142-
|| f(CurrentGcx::new()),
143+
|| f(CurrentGcx::new(), Proxy::new()),
143144
)
144145
})
145146
.unwrap()
@@ -152,7 +153,10 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
152153
})
153154
}
154155

155-
pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
156+
pub(crate) fn run_in_thread_pool_with_globals<
157+
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
158+
R: Send,
159+
>(
156160
thread_builder_diag: &EarlyDiagCtxt,
157161
edition: Edition,
158162
threads: usize,
@@ -162,8 +166,8 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
162166
) -> R {
163167
use std::process;
164168

169+
use rustc_data_structures::defer;
165170
use rustc_data_structures::sync::FromDyn;
166-
use rustc_data_structures::{defer, jobserver};
167171
use rustc_middle::ty::tls;
168172
use rustc_query_impl::QueryCtxt;
169173
use rustc_query_system::query::{QueryContext, break_query_cycles};
@@ -178,22 +182,26 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
178182
edition,
179183
sm_inputs,
180184
extra_symbols,
181-
|current_gcx| {
185+
|current_gcx, jobserver_proxy| {
182186
// Register the thread for use with the `WorkerLocal` type.
183187
registry.register();
184188

185-
f(current_gcx)
189+
f(current_gcx, jobserver_proxy)
186190
},
187191
);
188192
}
189193

190194
let current_gcx = FromDyn::from(CurrentGcx::new());
191195
let current_gcx2 = current_gcx.clone();
192196

197+
let proxy = Proxy::new();
198+
199+
let proxy_ = Arc::clone(&proxy);
200+
let proxy__ = Arc::clone(&proxy);
193201
let builder = rayon_core::ThreadPoolBuilder::new()
194202
.thread_name(|_| "rustc".to_string())
195-
.acquire_thread_handler(jobserver::acquire_thread)
196-
.release_thread_handler(jobserver::release_thread)
203+
.acquire_thread_handler(move || proxy_.acquire_thread())
204+
.release_thread_handler(move || proxy__.release_thread())
197205
.num_threads(threads)
198206
.deadlock_handler(move || {
199207
// On deadlock, creates a new thread and forwards information in thread
@@ -257,7 +265,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
257265
},
258266
// Run `f` on the first thread in the thread pool.
259267
move |pool: &rayon_core::ThreadPool| {
260-
pool.install(|| f(current_gcx.into_inner()))
268+
pool.install(|| f(current_gcx.into_inner(), proxy))
261269
},
262270
)
263271
.unwrap()

compiler/rustc_middle/src/ty/context.rs

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use rustc_data_structures::defer;
2121
use rustc_data_structures::fingerprint::Fingerprint;
2222
use rustc_data_structures::fx::FxHashMap;
2323
use rustc_data_structures::intern::Interned;
24+
use rustc_data_structures::jobserver::Proxy;
2425
use rustc_data_structures::profiling::SelfProfilerRef;
2526
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
2627
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
@@ -1420,6 +1421,8 @@ pub struct GlobalCtxt<'tcx> {
14201421
pub(crate) alloc_map: interpret::AllocMap<'tcx>,
14211422

14221423
current_gcx: CurrentGcx,
1424+
1425+
pub jobserver_proxy: Arc<Proxy>,
14231426
}
14241427

14251428
impl<'tcx> GlobalCtxt<'tcx> {
@@ -1624,6 +1627,7 @@ impl<'tcx> TyCtxt<'tcx> {
16241627
query_system: QuerySystem<'tcx>,
16251628
hooks: crate::hooks::Providers,
16261629
current_gcx: CurrentGcx,
1630+
jobserver_proxy: Arc<Proxy>,
16271631
f: impl FnOnce(TyCtxt<'tcx>) -> T,
16281632
) -> T {
16291633
let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
@@ -1658,6 +1662,7 @@ impl<'tcx> TyCtxt<'tcx> {
16581662
data_layout,
16591663
alloc_map: interpret::AllocMap::new(),
16601664
current_gcx,
1665+
jobserver_proxy,
16611666
});
16621667

16631668
// This is a separate function to work around a crash with parallel rustc (#135870)

0 commit comments

Comments
 (0)