Auto merge of #2641 - DrMeepster:init_once_acquire, r=RalfJung
InitOnce: synchronize with completion when already complete The completion of an InitOnce happens-before the threads waiting on it wake up. However, this is not the case for threads that call `InitOnceBeginInitialize` after the completion, leading to data races and outdated weak memory loads as observed in the CI for #2638. This PR fixes this.
This commit is contained in:
commit
4492c029ef
3 changed files with 97 additions and 33 deletions
|
|
@ -3,7 +3,7 @@ use std::num::NonZeroU32;
|
|||
|
||||
use rustc_index::vec::Idx;
|
||||
|
||||
use super::sync::EvalContextExtPriv;
|
||||
use super::sync::EvalContextExtPriv as _;
|
||||
use super::thread::MachineCallback;
|
||||
use super::vector_clock::VClock;
|
||||
use crate::*;
|
||||
|
|
@ -52,6 +52,43 @@ impl<'mir, 'tcx> VisitTags for InitOnce<'mir, 'tcx> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'mir, 'tcx: 'mir> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
|
||||
trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
|
||||
/// Synchronize with the previous initialization attempt of an InitOnce.
|
||||
#[inline]
|
||||
fn init_once_observe_attempt(&mut self, id: InitOnceId) {
|
||||
let this = self.eval_context_mut();
|
||||
let current_thread = this.get_active_thread();
|
||||
|
||||
if let Some(data_race) = &this.machine.data_race {
|
||||
data_race.validate_lock_acquire(
|
||||
&this.machine.threads.sync.init_onces[id].data_race,
|
||||
current_thread,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn init_once_wake_waiter(
|
||||
&mut self,
|
||||
id: InitOnceId,
|
||||
waiter: InitOnceWaiter<'mir, 'tcx>,
|
||||
) -> InterpResult<'tcx> {
|
||||
let this = self.eval_context_mut();
|
||||
let current_thread = this.get_active_thread();
|
||||
|
||||
this.unblock_thread(waiter.thread);
|
||||
|
||||
// Call callback, with the woken-up thread as `current`.
|
||||
this.set_active_thread(waiter.thread);
|
||||
this.init_once_observe_attempt(id);
|
||||
waiter.callback.call(this)?;
|
||||
this.set_active_thread(current_thread);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
|
||||
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
|
||||
fn init_once_get_or_create_id(
|
||||
|
|
@ -141,20 +178,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
|
|||
// Wake up everyone.
|
||||
// need to take the queue to avoid having `this` be borrowed multiple times
|
||||
for waiter in std::mem::take(&mut init_once.waiters) {
|
||||
// End of the wait happens-before woken-up thread.
|
||||
if let Some(data_race) = &this.machine.data_race {
|
||||
data_race.validate_lock_acquire(
|
||||
&this.machine.threads.sync.init_onces[id].data_race,
|
||||
waiter.thread,
|
||||
);
|
||||
}
|
||||
|
||||
this.unblock_thread(waiter.thread);
|
||||
|
||||
// Call callback, with the woken-up thread as `current`.
|
||||
this.set_active_thread(waiter.thread);
|
||||
waiter.callback.call(this)?;
|
||||
this.set_active_thread(current_thread);
|
||||
this.init_once_wake_waiter(id, waiter)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -172,28 +196,13 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
|
|||
);
|
||||
|
||||
// Each complete happens-before the end of the wait
|
||||
// FIXME: should this really induce synchronization? If we think of it as a lock, then yes,
|
||||
// but the docs don't talk about such details.
|
||||
if let Some(data_race) = &this.machine.data_race {
|
||||
data_race.validate_lock_release(&mut init_once.data_race, current_thread);
|
||||
}
|
||||
|
||||
// Wake up one waiting thread, so they can go ahead and try to init this.
|
||||
if let Some(waiter) = init_once.waiters.pop_front() {
|
||||
// End of the wait happens-before woken-up thread.
|
||||
if let Some(data_race) = &this.machine.data_race {
|
||||
data_race.validate_lock_acquire(
|
||||
&this.machine.threads.sync.init_onces[id].data_race,
|
||||
waiter.thread,
|
||||
);
|
||||
}
|
||||
|
||||
this.unblock_thread(waiter.thread);
|
||||
|
||||
// Call callback, with the woken-up thread as `current`.
|
||||
this.set_active_thread(waiter.thread);
|
||||
waiter.callback.call(this)?;
|
||||
this.set_active_thread(current_thread);
|
||||
this.init_once_wake_waiter(id, waiter)?;
|
||||
} else {
|
||||
// Nobody there to take this, so go back to 'uninit'
|
||||
init_once.status = InitOnceStatus::Uninitialized;
|
||||
|
|
@ -201,4 +210,19 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Synchronize with the previous completion of an InitOnce.
|
||||
/// Must only be called after checking that it is complete.
|
||||
#[inline]
|
||||
fn init_once_observe_completed(&mut self, id: InitOnceId) {
|
||||
let this = self.eval_context_mut();
|
||||
|
||||
assert_eq!(
|
||||
this.init_once_status(id),
|
||||
InitOnceStatus::Complete,
|
||||
"observing the completion of incomplete init once"
|
||||
);
|
||||
|
||||
this.init_once_observe_attempt(id);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -177,8 +177,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
|
|||
Box::new(Callback { init_once_id: id, pending_place }),
|
||||
)
|
||||
}
|
||||
InitOnceStatus::Complete =>
|
||||
this.write_scalar(this.eval_windows("c", "FALSE")?, &pending_place)?,
|
||||
InitOnceStatus::Complete => {
|
||||
this.init_once_observe_completed(id);
|
||||
this.write_scalar(this.eval_windows("c", "FALSE")?, &pending_place)?;
|
||||
}
|
||||
}
|
||||
|
||||
// This always succeeds (even if the thread is blocked, we will succeed if we ever unblock).
|
||||
|
|
|
|||
|
|
@ -131,8 +131,46 @@ fn retry_on_fail() {
|
|||
waiter2.join().unwrap();
|
||||
}
|
||||
|
||||
fn no_data_race_after_complete() {
|
||||
let mut init_once = null_mut();
|
||||
let mut pending = 0;
|
||||
|
||||
unsafe {
|
||||
assert_eq!(InitOnceBeginInitialize(&mut init_once, 0, &mut pending, null_mut()), TRUE);
|
||||
assert_eq!(pending, TRUE);
|
||||
}
|
||||
|
||||
let init_once_ptr = SendPtr(&mut init_once);
|
||||
|
||||
let mut place = 0;
|
||||
let place_ptr = SendPtr(&mut place);
|
||||
|
||||
let reader = thread::spawn(move || unsafe {
|
||||
let mut pending = 0;
|
||||
|
||||
// this doesn't block because reader only executes after `InitOnceComplete` is called
|
||||
assert_eq!(InitOnceBeginInitialize(init_once_ptr.0, 0, &mut pending, null_mut()), TRUE);
|
||||
assert_eq!(pending, FALSE);
|
||||
// this should not data race
|
||||
place_ptr.0.read()
|
||||
});
|
||||
|
||||
unsafe {
|
||||
// this should not data race
|
||||
place_ptr.0.write(1);
|
||||
}
|
||||
|
||||
unsafe {
|
||||
assert_eq!(InitOnceComplete(init_once_ptr.0, 0, null_mut()), TRUE);
|
||||
}
|
||||
|
||||
// run reader (without preemption, it has not taken a step yet)
|
||||
assert_eq!(reader.join().unwrap(), 1);
|
||||
}
|
||||
|
||||
fn main() {
|
||||
single_thread();
|
||||
block_until_complete();
|
||||
retry_on_fail();
|
||||
no_data_race_after_complete();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue