Auto merge of #2197 - RalfJung:round-robin, r=RalfJung
make Miri's scheduler proper round-robin When thread N blocks or yields, we activate thread N+1 next, rather than always activating thread 0. This should guarantee that as long as all threads regularly yield, each thread eventually takes a step again. Fixes the "multiple loops that yield playing ping-pong" part of https://github.com/rust-lang/miri/issues/1388. `@cbeuw` I hope this doesn't screw up the scheduler-dependent tests you are adding in your PR.
This commit is contained in:
commit
2eae474673
3 changed files with 102 additions and 8 deletions
|
|
@ -518,16 +518,26 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
|
|||
return Ok(SchedulingAction::ExecuteTimeoutCallback);
|
||||
}
|
||||
// No callbacks scheduled, pick a regular thread to execute.
|
||||
// We need to pick a new thread for execution.
|
||||
for (id, thread) in self.threads.iter_enumerated() {
|
||||
// The active thread blocked or yielded. So we go search for another enabled thread.
|
||||
// Curcially, we start searching at the current active thread ID, rather than at 0, since we
|
||||
// want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2.
|
||||
//
|
||||
// `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after*
|
||||
// the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the
|
||||
// active thread.
|
||||
let threads = self
|
||||
.threads
|
||||
.iter_enumerated()
|
||||
.skip(self.active_thread.index() + 1)
|
||||
.chain(self.threads.iter_enumerated().take(self.active_thread.index()));
|
||||
for (id, thread) in threads {
|
||||
debug_assert_ne!(self.active_thread, id);
|
||||
if thread.state == ThreadState::Enabled {
|
||||
if !self.yield_active_thread || id != self.active_thread {
|
||||
self.active_thread = id;
|
||||
if let Some(data_race) = data_race {
|
||||
data_race.thread_set_active(self.active_thread);
|
||||
}
|
||||
break;
|
||||
self.active_thread = id;
|
||||
if let Some(data_race) = data_race {
|
||||
data_race.thread_set_active(self.active_thread);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.yield_active_thread = false;
|
||||
|
|
|
|||
82
tests/pass/concurrency/spin_loops.rs
Normal file
82
tests/pass/concurrency/spin_loops.rs
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
// ignore-windows: Concurrency on Windows is not supported yet.
|
||||
|
||||
use std::thread;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::mpsc;
|
||||
use std::cell::Cell;
|
||||
|
||||
/// When a thread yields, Miri's scheduler used to pick the thread with the lowest ID
|
||||
/// that can run. IDs are assigned in thread creation order.
|
||||
/// This means we could make 2 threads infinitely ping-pong with each other while
|
||||
/// really there is a 3rd thread that we should schedule to make progress.
|
||||
fn two_player_ping_pong() {
|
||||
static FLAG: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
let waiter1 = thread::spawn(|| {
|
||||
while FLAG.load(Ordering::Acquire) == 0 {
|
||||
// spin and wait
|
||||
thread::yield_now();
|
||||
}
|
||||
});
|
||||
let waiter2 = thread::spawn(|| {
|
||||
while FLAG.load(Ordering::Acquire) == 0 {
|
||||
// spin and wait
|
||||
thread::yield_now();
|
||||
}
|
||||
});
|
||||
let progress = thread::spawn(|| {
|
||||
FLAG.store(1, Ordering::Release);
|
||||
});
|
||||
// The first `join` blocks the main thread and thus takes it out of the equation.
|
||||
waiter1.join().unwrap();
|
||||
waiter2.join().unwrap();
|
||||
progress.join().unwrap();
|
||||
}
|
||||
|
||||
/// Based on a test by @jethrogb.
|
||||
fn launcher() {
|
||||
static THREAD2_LAUNCHED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
for _ in 0..10 {
|
||||
let (tx, rx) = mpsc::sync_channel(0);
|
||||
THREAD2_LAUNCHED.store(false, Ordering::SeqCst);
|
||||
|
||||
let jh = thread::spawn(move || {
|
||||
struct RecvOnDrop(Cell<Option<mpsc::Receiver<()>>>);
|
||||
|
||||
impl Drop for RecvOnDrop {
|
||||
fn drop(&mut self) {
|
||||
let rx = self.0.take().unwrap();
|
||||
while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
|
||||
thread::yield_now();
|
||||
}
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let tl_rx: RecvOnDrop = RecvOnDrop(Cell::new(None));
|
||||
tl_rx.0.set(Some(rx));
|
||||
});
|
||||
|
||||
let tx_clone = tx.clone();
|
||||
let jh2 = thread::spawn(move || {
|
||||
THREAD2_LAUNCHED.store(true, Ordering::SeqCst);
|
||||
jh.join().unwrap();
|
||||
tx_clone.send(()).expect_err(
|
||||
"Expecting channel to be closed because thread 1 TLS destructors must've run",
|
||||
);
|
||||
});
|
||||
|
||||
while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
|
||||
thread::yield_now();
|
||||
}
|
||||
thread::yield_now();
|
||||
tx.send(()).expect("Expecting channel to be live because thread 2 must block on join");
|
||||
jh2.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
two_player_ping_pong();
|
||||
launcher();
|
||||
}
|
||||
2
tests/pass/concurrency/spin_loops.stderr
Normal file
2
tests/pass/concurrency/spin_loops.stderr
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
warning: thread support is experimental and incomplete: weak memory effects are not emulated.
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue