Implement weak memory emulation
This commit is contained in:
parent
16315b1540
commit
e7698f4f07
5 changed files with 476 additions and 27 deletions
170
src/data_race.rs
170
src/data_race.rs
|
|
@ -12,7 +12,7 @@
|
|||
//! The implementation also models races with memory allocation and deallocation via treating allocation and
|
||||
//! deallocation as a type of write internally for detecting data-races.
|
||||
//!
|
||||
//! This does not explore weak memory orders and so can still miss data-races
|
||||
//! Weak memory orders are explored but not all weak behaviours are exhibited, so it can still miss data-races
|
||||
//! but should not report false-positives
|
||||
//!
|
||||
//! Data-race definition from(<https://en.cppreference.com/w/cpp/language/memory_model#Threads_and_data_races>):
|
||||
|
|
@ -29,22 +29,6 @@
|
|||
//! This means that the thread-index can be safely re-used, starting on the next timestamp for the newly created
|
||||
//! thread.
|
||||
//!
|
||||
//! The sequentially consistent ordering corresponds to the ordering that the threads
|
||||
//! are currently scheduled, this means that the data-race detector has no additional
|
||||
//! logic for sequentially consistent accesses at the moment since they are indistinguishable
|
||||
//! from acquire/release operations. If weak memory orderings are explored then this
|
||||
//! may need to change or be updated accordingly.
|
||||
//!
|
||||
//! Per the C++ spec for the memory model a sequentially consistent operation:
|
||||
//! "A load operation with this memory order performs an acquire operation,
|
||||
//! a store performs a release operation, and read-modify-write performs
|
||||
//! both an acquire operation and a release operation, plus a single total
|
||||
//! order exists in which all threads observe all modifications in the same
|
||||
//! order (see Sequentially-consistent ordering below) "
|
||||
//! So in the absence of weak memory effects a seq-cst load & a seq-cst store is identical
|
||||
//! to an acquire load and a release store given the global sequentially consistent order
|
||||
//! of the schedule.
|
||||
//!
|
||||
//! The timestamps used in the data-race detector assign each sequence of non-atomic operations
|
||||
//! followed by a single atomic or concurrent operation a single timestamp.
|
||||
//! Write, Read, Write, ThreadJoin will be represented by a single timestamp value on a thread.
|
||||
|
|
@ -67,6 +51,7 @@ use std::{
|
|||
mem,
|
||||
};
|
||||
|
||||
use rustc_const_eval::interpret::alloc_range;
|
||||
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
|
||||
use rustc_index::vec::{Idx, IndexVec};
|
||||
use rustc_middle::{mir, ty::layout::TyAndLayout};
|
||||
|
|
@ -115,10 +100,10 @@ pub enum AtomicFenceOp {
|
|||
/// of a thread, contains the happens-before clock and
|
||||
/// additional metadata to model atomic fence operations.
|
||||
#[derive(Clone, Default, Debug)]
|
||||
struct ThreadClockSet {
|
||||
pub struct ThreadClockSet {
|
||||
/// The increasing clock representing timestamps
|
||||
/// that happen-before this thread.
|
||||
clock: VClock,
|
||||
pub clock: VClock,
|
||||
|
||||
/// The set of timestamps that will happen-before this
|
||||
/// thread once it performs an acquire fence.
|
||||
|
|
@ -127,6 +112,12 @@ struct ThreadClockSet {
|
|||
/// The last timestamp of happens-before relations that
|
||||
/// have been released by this thread by a fence.
|
||||
fence_release: VClock,
|
||||
|
||||
pub fence_seqcst: VClock,
|
||||
|
||||
pub write_seqcst: VClock,
|
||||
|
||||
pub read_seqcst: VClock,
|
||||
}
|
||||
|
||||
impl ThreadClockSet {
|
||||
|
|
@ -169,7 +160,7 @@ pub struct DataRace;
|
|||
/// common case where no atomic operations
|
||||
/// exists on the memory cell.
|
||||
#[derive(Clone, PartialEq, Eq, Default, Debug)]
|
||||
struct AtomicMemoryCellClocks {
|
||||
pub struct AtomicMemoryCellClocks {
|
||||
/// The clock-vector of the timestamp of the last atomic
|
||||
/// read operation performed by each thread.
|
||||
/// This detects potential data-races between atomic read
|
||||
|
|
@ -514,7 +505,32 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
atomic: AtomicReadOp,
|
||||
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
|
||||
let this = self.eval_context_ref();
|
||||
// This will read from the last store in the modification order of this location. In case
|
||||
// weak memory emulation is enabled, this may not be the store we will pick to actually read from and return.
|
||||
// This is fine with StackedBorrow and race checks because they don't concern metadata on
|
||||
// the *value* (including the associated provenance if this is an AtomicPtr) at this location.
|
||||
// Only metadata on the location itself is used.
|
||||
let scalar = this.allow_data_races_ref(move |this| this.read_scalar(&place.into()))?;
|
||||
|
||||
if let Some(global) = &this.machine.data_race {
|
||||
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
|
||||
if let Some(alloc_buffers) = this.get_alloc_extra(alloc_id)?.weak_memory.as_ref() {
|
||||
if atomic == AtomicReadOp::SeqCst {
|
||||
global.sc_read();
|
||||
}
|
||||
let mut rng = this.machine.rng.borrow_mut();
|
||||
let loaded = alloc_buffers.buffered_read(
|
||||
alloc_range(base_offset, place.layout.size),
|
||||
global,
|
||||
atomic == AtomicReadOp::SeqCst,
|
||||
&mut *rng,
|
||||
|| this.validate_atomic_load(place, atomic),
|
||||
)?;
|
||||
|
||||
return Ok(loaded.unwrap_or(scalar));
|
||||
}
|
||||
}
|
||||
|
||||
this.validate_atomic_load(place, atomic)?;
|
||||
Ok(scalar)
|
||||
}
|
||||
|
|
@ -528,7 +544,27 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
) -> InterpResult<'tcx> {
|
||||
let this = self.eval_context_mut();
|
||||
this.allow_data_races_mut(move |this| this.write_scalar(val, &(*dest).into()))?;
|
||||
this.validate_atomic_store(dest, atomic)
|
||||
|
||||
this.validate_atomic_store(dest, atomic)?;
|
||||
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(dest.ptr)?;
|
||||
if let (
|
||||
crate::AllocExtra { weak_memory: Some(alloc_buffers), .. },
|
||||
crate::Evaluator { data_race: Some(global), .. },
|
||||
) = this.get_alloc_extra_mut(alloc_id)?
|
||||
{
|
||||
if atomic == AtomicWriteOp::SeqCst {
|
||||
global.sc_write();
|
||||
}
|
||||
let size = dest.layout.size;
|
||||
alloc_buffers.buffered_write(
|
||||
val,
|
||||
alloc_range(base_offset, size),
|
||||
global,
|
||||
atomic == AtomicWriteOp::SeqCst,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform an atomic operation on a memory location.
|
||||
|
|
@ -550,6 +586,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
this.allow_data_races_mut(|this| this.write_immediate(*val, &(*place).into()))?;
|
||||
|
||||
this.validate_atomic_rmw(place, atomic)?;
|
||||
|
||||
this.buffered_atomic_rmw(val.to_scalar_or_uninit(), place, atomic)?;
|
||||
Ok(old)
|
||||
}
|
||||
|
||||
|
|
@ -565,7 +603,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
|
||||
let old = this.allow_data_races_mut(|this| this.read_scalar(&place.into()))?;
|
||||
this.allow_data_races_mut(|this| this.write_scalar(new, &(*place).into()))?;
|
||||
|
||||
this.validate_atomic_rmw(place, atomic)?;
|
||||
|
||||
this.buffered_atomic_rmw(new, place, atomic)?;
|
||||
Ok(old)
|
||||
}
|
||||
|
||||
|
|
@ -584,15 +625,25 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
let lt = this.binary_op(mir::BinOp::Lt, &old, &rhs)?.to_scalar()?.to_bool()?;
|
||||
|
||||
let new_val = if min {
|
||||
if lt { &old } else { &rhs }
|
||||
if lt {
|
||||
&old
|
||||
} else {
|
||||
&rhs
|
||||
}
|
||||
} else {
|
||||
if lt { &rhs } else { &old }
|
||||
if lt {
|
||||
&rhs
|
||||
} else {
|
||||
&old
|
||||
}
|
||||
};
|
||||
|
||||
this.allow_data_races_mut(|this| this.write_immediate(**new_val, &(*place).into()))?;
|
||||
|
||||
this.validate_atomic_rmw(place, atomic)?;
|
||||
|
||||
this.buffered_atomic_rmw(new_val.to_scalar_or_uninit(), place, atomic)?;
|
||||
|
||||
// Return the old value.
|
||||
Ok(old)
|
||||
}
|
||||
|
|
@ -642,14 +693,56 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
if cmpxchg_success {
|
||||
this.allow_data_races_mut(|this| this.write_scalar(new, &(*place).into()))?;
|
||||
this.validate_atomic_rmw(place, success)?;
|
||||
this.buffered_atomic_rmw(new, place, success)?;
|
||||
} else {
|
||||
this.validate_atomic_load(place, fail)?;
|
||||
// A failed compare exchange is equivalent to a load, reading from the latest store
|
||||
// in the modification order.
|
||||
// Since `old` is only a value and not the store element, we need to separately
|
||||
// find it in our store buffer and perform load_impl on it.
|
||||
if let Some(global) = &this.machine.data_race {
|
||||
if fail == AtomicReadOp::SeqCst {
|
||||
global.sc_read();
|
||||
}
|
||||
let size = place.layout.size;
|
||||
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
|
||||
if let Some(alloc_buffers) = this.get_alloc_extra(alloc_id)?.weak_memory.as_ref() {
|
||||
if global.multi_threaded.get() {
|
||||
alloc_buffers.read_from_last_store(alloc_range(base_offset, size), global);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return the old value.
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn buffered_atomic_rmw(
|
||||
&mut self,
|
||||
new_val: ScalarMaybeUninit<Tag>,
|
||||
place: &MPlaceTy<'tcx, Tag>,
|
||||
atomic: AtomicRwOp,
|
||||
) -> InterpResult<'tcx> {
|
||||
let this = self.eval_context_mut();
|
||||
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
|
||||
if let (
|
||||
crate::AllocExtra { weak_memory: Some(alloc_buffers), .. },
|
||||
crate::Evaluator { data_race: Some(global), .. },
|
||||
) = this.get_alloc_extra_mut(alloc_id)?
|
||||
{
|
||||
if atomic == AtomicRwOp::SeqCst {
|
||||
global.sc_read();
|
||||
global.sc_write();
|
||||
}
|
||||
let size = place.layout.size;
|
||||
let range = alloc_range(base_offset, size);
|
||||
alloc_buffers.read_from_last_store(range, global);
|
||||
alloc_buffers.buffered_write(new_val, range, global, atomic == AtomicRwOp::SeqCst)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the data-race detector for an atomic read occurring at the
|
||||
/// associated memory-place and on the current thread.
|
||||
fn validate_atomic_load(
|
||||
|
|
@ -723,7 +816,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
fn validate_atomic_fence(&mut self, atomic: AtomicFenceOp) -> InterpResult<'tcx> {
|
||||
let this = self.eval_context_mut();
|
||||
if let Some(data_race) = &mut this.machine.data_race {
|
||||
data_race.maybe_perform_sync_operation(move |index, mut clocks| {
|
||||
data_race.maybe_perform_sync_operation(|index, mut clocks| {
|
||||
log::trace!("Atomic fence on {:?} with ordering {:?}", index, atomic);
|
||||
|
||||
// Apply data-race detection for the current fences
|
||||
|
|
@ -737,6 +830,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> {
|
|||
// Either Release | AcqRel | SeqCst
|
||||
clocks.apply_release_fence();
|
||||
}
|
||||
if atomic == AtomicFenceOp::SeqCst {
|
||||
data_race.last_sc_fence.borrow_mut().set_at_index(&clocks.clock, index);
|
||||
clocks.fence_seqcst.join(&data_race.last_sc_fence.borrow());
|
||||
clocks.write_seqcst.join(&data_race.last_sc_write.borrow());
|
||||
}
|
||||
|
||||
// Increment timestamp in case of release semantics.
|
||||
Ok(atomic != AtomicFenceOp::Acquire)
|
||||
|
|
@ -1116,6 +1214,12 @@ pub struct GlobalState {
|
|||
/// The associated vector index will be moved into re-use candidates
|
||||
/// after the join operation occurs.
|
||||
terminated_threads: RefCell<FxHashMap<ThreadId, VectorIdx>>,
|
||||
|
||||
/// The timestamp of last SC fence performed by each thread
|
||||
last_sc_fence: RefCell<VClock>,
|
||||
|
||||
/// The timestamp of last SC write performed by each thread
|
||||
last_sc_write: RefCell<VClock>,
|
||||
}
|
||||
|
||||
impl GlobalState {
|
||||
|
|
@ -1131,6 +1235,8 @@ impl GlobalState {
|
|||
active_thread_count: Cell::new(1),
|
||||
reuse_candidates: RefCell::new(FxHashSet::default()),
|
||||
terminated_threads: RefCell::new(FxHashMap::default()),
|
||||
last_sc_fence: RefCell::new(VClock::default()),
|
||||
last_sc_write: RefCell::new(VClock::default()),
|
||||
};
|
||||
|
||||
// Setup the main-thread since it is not explicitly created:
|
||||
|
|
@ -1445,7 +1551,7 @@ impl GlobalState {
|
|||
/// Load the current vector clock in use and the current set of thread clocks
|
||||
/// in use for the vector.
|
||||
#[inline]
|
||||
fn current_thread_state(&self) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
|
||||
pub fn current_thread_state(&self) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
|
||||
let index = self.current_index();
|
||||
let ref_vector = self.vector_clocks.borrow();
|
||||
let clocks = Ref::map(ref_vector, |vec| &vec[index]);
|
||||
|
|
@ -1455,7 +1561,7 @@ impl GlobalState {
|
|||
/// Load the current vector clock in use and the current set of thread clocks
|
||||
/// in use for the vector mutably for modification.
|
||||
#[inline]
|
||||
fn current_thread_state_mut(&self) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
|
||||
pub fn current_thread_state_mut(&self) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
|
||||
let index = self.current_index();
|
||||
let ref_vector = self.vector_clocks.borrow_mut();
|
||||
let clocks = RefMut::map(ref_vector, |vec| &mut vec[index]);
|
||||
|
|
@ -1468,4 +1574,16 @@ impl GlobalState {
|
|||
fn current_index(&self) -> VectorIdx {
|
||||
self.current_index.get()
|
||||
}
|
||||
|
||||
// SC ATOMIC STORE rule in the paper.
|
||||
fn sc_write(&self) {
|
||||
let (index, clocks) = self.current_thread_state();
|
||||
self.last_sc_write.borrow_mut().set_at_index(&clocks.clock, index);
|
||||
}
|
||||
|
||||
// SC ATOMIC READ rule in the paper.
|
||||
fn sc_read(&self) {
|
||||
let (.., mut clocks) = self.current_thread_state_mut();
|
||||
clocks.read_seqcst.join(&self.last_sc_fence.borrow());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ mod stacked_borrows;
|
|||
mod sync;
|
||||
mod thread;
|
||||
mod vector_clock;
|
||||
mod weak_memory;
|
||||
|
||||
// Establish a "crate-wide prelude": we often import `crate::*`.
|
||||
|
||||
|
|
|
|||
|
|
@ -190,6 +190,9 @@ pub struct AllocExtra {
|
|||
/// Data race detection via the use of a vector-clock,
|
||||
/// this is only added if it is enabled.
|
||||
pub data_race: Option<data_race::AllocExtra>,
|
||||
/// Weak memory emulation via the use of store buffers,
|
||||
/// this is only added if it is enabled.
|
||||
pub weak_memory: Option<weak_memory::AllocExtra>,
|
||||
}
|
||||
|
||||
/// Precomputed layouts of primitive types
|
||||
|
|
@ -630,9 +633,16 @@ impl<'mir, 'tcx> Machine<'mir, 'tcx> for Evaluator<'mir, 'tcx> {
|
|||
} else {
|
||||
None
|
||||
};
|
||||
let buffer_alloc = if ecx.machine.weak_memory {
|
||||
// FIXME: if this is an atomic obejct, we want to supply its initial value
|
||||
// while allocating the store buffer here.
|
||||
Some(weak_memory::AllocExtra::new_allocation(alloc.size()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let alloc: Allocation<Tag, Self::AllocExtra> = alloc.convert_tag_add_extra(
|
||||
&ecx.tcx,
|
||||
AllocExtra { stacked_borrows: stacks, data_race: race_alloc },
|
||||
AllocExtra { stacked_borrows: stacks, data_race: race_alloc, weak_memory: buffer_alloc },
|
||||
|ptr| Evaluator::tag_alloc_base_pointer(ecx, ptr),
|
||||
);
|
||||
Cow::Owned(alloc)
|
||||
|
|
|
|||
297
src/weak_memory.rs
Normal file
297
src/weak_memory.rs
Normal file
|
|
@ -0,0 +1,297 @@
|
|||
//! Implementation of C++11-consistent weak memory emulation using store buffers
|
||||
//! based on Dynamic Race Detection for C++ ("the paper"):
|
||||
//! https://www.doc.ic.ac.uk/~afd/homepages/papers/pdfs/2017/POPL.pdf
|
||||
|
||||
// Our and the author's own implementation (tsan11) of the paper have some deviations from the provided operational semantics in §5.3:
|
||||
// 1. In the operational semantics, store elements keep a copy of the atomic object's vector clock (AtomicCellClocks::sync_vector in miri),
|
||||
// but this is not used anywhere so it's omitted here.
|
||||
//
|
||||
// 2. In the operational semantics, each store element keeps the timestamp of a thread when it loads from the store.
|
||||
// If the same thread loads from the same store element multiple times, then the timestamps at all loads are saved in a list of load elements.
|
||||
// This is not necessary as later loads by the same thread will always have greater timetstamp values, so we only need to record the timestamp of the first
|
||||
// load by each thread. This optimisation is done in tsan11
|
||||
// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.h#L35-L37)
|
||||
// and here.
|
||||
//
|
||||
// 3. §4.5 of the paper wants an SC store to mark all existing stores in the buffer that happens before it
|
||||
// as SC. This is not done in the operational semantics but implemented correctly in tsan11
|
||||
// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L160-L167)
|
||||
// and here.
|
||||
//
|
||||
// 4. W_SC ; R_SC case requires the SC load to ignore all but last store maked SC (stores not marked SC are not
|
||||
// affected). But this rule is applied to all loads in ReadsFromSet from the paper (last two lines of code), not just SC load.
|
||||
// This is implemented correctly in tsan11
|
||||
// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L295)
|
||||
// and here.
|
||||
|
||||
use std::{
|
||||
cell::{Ref, RefCell, RefMut},
|
||||
collections::VecDeque,
|
||||
};
|
||||
|
||||
use rustc_const_eval::interpret::{AllocRange, InterpResult, ScalarMaybeUninit};
|
||||
use rustc_data_structures::fx::FxHashMap;
|
||||
use rustc_target::abi::Size;
|
||||
|
||||
use crate::{
|
||||
data_race::{GlobalState, ThreadClockSet},
|
||||
RangeMap, Tag, VClock, VTimestamp, VectorIdx,
|
||||
};
|
||||
|
||||
pub type AllocExtra = StoreBufferAlloc;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StoreBufferAlloc {
|
||||
/// Store buffer of each atomic object in this allocation
|
||||
// Load may modify a StoreBuffer to record the loading thread's
|
||||
// timestamp so we need interior mutability here.
|
||||
store_buffer: RefCell<RangeMap<StoreBuffer>>,
|
||||
}
|
||||
|
||||
impl StoreBufferAlloc {
|
||||
pub fn new_allocation(len: Size) -> Self {
|
||||
Self { store_buffer: RefCell::new(RangeMap::new(len, StoreBuffer::default())) }
|
||||
}
|
||||
|
||||
/// Gets a store buffer associated with an atomic object in this allocation
|
||||
fn get_store_buffer(&self, range: AllocRange) -> Ref<'_, StoreBuffer> {
|
||||
Ref::map(self.store_buffer.borrow(), |range_map| {
|
||||
let (.., store_buffer) = range_map.iter(range.start, range.size).next().unwrap();
|
||||
store_buffer
|
||||
})
|
||||
}
|
||||
|
||||
fn get_store_buffer_mut(&self, range: AllocRange) -> RefMut<'_, StoreBuffer> {
|
||||
RefMut::map(self.store_buffer.borrow_mut(), |range_map| {
|
||||
let (.., store_buffer) = range_map.iter_mut(range.start, range.size).next().unwrap();
|
||||
store_buffer
|
||||
})
|
||||
}
|
||||
|
||||
/// Reads from the last store in modification order
|
||||
pub fn read_from_last_store<'tcx>(&self, range: AllocRange, global: &GlobalState) {
|
||||
let store_buffer = self.get_store_buffer(range);
|
||||
let store_elem = store_buffer.buffer.back();
|
||||
if let Some(store_elem) = store_elem {
|
||||
let (index, clocks) = global.current_thread_state();
|
||||
store_elem.load_impl(index, &clocks);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn buffered_read<'tcx>(
|
||||
&self,
|
||||
range: AllocRange,
|
||||
global: &GlobalState,
|
||||
is_seqcst: bool,
|
||||
rng: &mut (impl rand::Rng + ?Sized),
|
||||
validate: impl FnOnce() -> InterpResult<'tcx>,
|
||||
) -> InterpResult<'tcx, Option<ScalarMaybeUninit<Tag>>> {
|
||||
// Having a live borrow to store_buffer while calling validate_atomic_load is fine
|
||||
// because the race detector doesn't touch store_buffer
|
||||
let store_buffer = self.get_store_buffer(range);
|
||||
|
||||
let store_elem = {
|
||||
// The `clocks` we got here must be dropped before calling validate_atomic_load
|
||||
// as the race detector will update it
|
||||
let (.., clocks) = global.current_thread_state();
|
||||
// Load from a valid entry in the store buffer
|
||||
store_buffer.fetch_store(is_seqcst, &clocks, &mut *rng)
|
||||
};
|
||||
|
||||
// Unlike in write_scalar_atomic, thread clock updates have to be done
|
||||
// after we've picked a store element from the store buffer, as presented
|
||||
// in ATOMIC LOAD rule of the paper. This is because fetch_store
|
||||
// requires access to ThreadClockSet.clock, which is updated by the race detector
|
||||
validate()?;
|
||||
|
||||
let loaded = store_elem.map(|store_elem| {
|
||||
let (index, clocks) = global.current_thread_state();
|
||||
store_elem.load_impl(index, &clocks)
|
||||
});
|
||||
Ok(loaded)
|
||||
}
|
||||
|
||||
pub fn buffered_write<'tcx>(
|
||||
&mut self,
|
||||
val: ScalarMaybeUninit<Tag>,
|
||||
range: AllocRange,
|
||||
global: &GlobalState,
|
||||
is_seqcst: bool,
|
||||
) -> InterpResult<'tcx> {
|
||||
let (index, clocks) = global.current_thread_state();
|
||||
|
||||
let mut store_buffer = self.get_store_buffer_mut(range);
|
||||
store_buffer.store_impl(val, index, &clocks.clock, is_seqcst);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const STORE_BUFFER_LIMIT: usize = 128;
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct StoreBuffer {
|
||||
// Stores to this location in modification order
|
||||
buffer: VecDeque<StoreElement>,
|
||||
}
|
||||
|
||||
impl Default for StoreBuffer {
|
||||
fn default() -> Self {
|
||||
let mut buffer = VecDeque::new();
|
||||
buffer.reserve(STORE_BUFFER_LIMIT);
|
||||
Self { buffer }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'mir, 'tcx: 'mir> StoreBuffer {
|
||||
/// Selects a valid store element in the buffer.
|
||||
/// The buffer does not contain the value used to initialise the atomic object
|
||||
/// so a fresh atomic object has an empty store buffer until an explicit store.
|
||||
fn fetch_store<R: rand::Rng + ?Sized>(
|
||||
&self,
|
||||
is_seqcst: bool,
|
||||
clocks: &ThreadClockSet,
|
||||
rng: &mut R,
|
||||
) -> Option<&StoreElement> {
|
||||
use rand::seq::IteratorRandom;
|
||||
let mut found_sc = false;
|
||||
// FIXME: this should be an inclusive take_while (stops after a false predicate, but
|
||||
// includes the element that gave the false), but such function doesn't yet
|
||||
// exist in the standard libary https://github.com/rust-lang/rust/issues/62208
|
||||
let mut keep_searching = true;
|
||||
let candidates = self
|
||||
.buffer
|
||||
.iter()
|
||||
.rev()
|
||||
.take_while(move |&store_elem| {
|
||||
if !keep_searching {
|
||||
return false;
|
||||
}
|
||||
// CoWR: if a store happens-before the current load,
|
||||
// then we can't read-from anything earlier in modification order.
|
||||
if store_elem.timestamp <= clocks.clock[store_elem.store_index] {
|
||||
log::info!("Stopped due to coherent write-read");
|
||||
keep_searching = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// CoRR: if there was a load from this store which happened-before the current load,
|
||||
// then we cannot read-from anything earlier in modification order.
|
||||
if store_elem.loads.borrow().iter().any(|(&load_index, &load_timestamp)| {
|
||||
load_timestamp <= clocks.clock[load_index]
|
||||
}) {
|
||||
log::info!("Stopped due to coherent read-read");
|
||||
keep_searching = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// The current load, which may be sequenced-after an SC fence, can only read-from
|
||||
// the last store sequenced-before an SC fence in another thread (or any stores
|
||||
// later than that SC fence)
|
||||
if store_elem.timestamp <= clocks.fence_seqcst[store_elem.store_index] {
|
||||
log::info!("Stopped due to coherent load sequenced after sc fence");
|
||||
keep_searching = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// The current non-SC load can only read-from the latest SC store (or any stores later than that
|
||||
// SC store)
|
||||
if store_elem.timestamp <= clocks.write_seqcst[store_elem.store_index]
|
||||
&& store_elem.is_seqcst
|
||||
{
|
||||
log::info!("Stopped due to needing to load from the last SC store");
|
||||
keep_searching = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// The current SC load can only read-from the last store sequenced-before
|
||||
// the last SC fence (or any stores later than the SC fence)
|
||||
if is_seqcst && store_elem.timestamp <= clocks.read_seqcst[store_elem.store_index] {
|
||||
log::info!("Stopped due to sc load needing to load from the last SC store before an SC fence");
|
||||
keep_searching = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
true
|
||||
})
|
||||
.filter(|&store_elem| {
|
||||
if is_seqcst {
|
||||
// An SC load needs to ignore all but last store maked SC (stores not marked SC are not
|
||||
// affected)
|
||||
let include = !(store_elem.is_seqcst && found_sc);
|
||||
found_sc |= store_elem.is_seqcst;
|
||||
include
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
candidates.choose(rng)
|
||||
}
|
||||
|
||||
/// ATOMIC STORE IMPL in the paper (except we don't need the location's vector clock)
|
||||
fn store_impl(
|
||||
&mut self,
|
||||
val: ScalarMaybeUninit<Tag>,
|
||||
index: VectorIdx,
|
||||
thread_clock: &VClock,
|
||||
is_seqcst: bool,
|
||||
) {
|
||||
let store_elem = StoreElement {
|
||||
store_index: index,
|
||||
timestamp: thread_clock[index],
|
||||
// In the language provided in the paper, an atomic store takes the value from a
|
||||
// non-atomic memory location.
|
||||
// But we already have the immediate value here so we don't need to do the memory
|
||||
// access
|
||||
val,
|
||||
is_seqcst,
|
||||
loads: RefCell::new(FxHashMap::default()),
|
||||
};
|
||||
self.buffer.push_back(store_elem);
|
||||
if self.buffer.len() > STORE_BUFFER_LIMIT {
|
||||
self.buffer.pop_front();
|
||||
}
|
||||
if is_seqcst {
|
||||
// Every store that happens before this needs to be marked as SC
|
||||
// so that in a later SC load, only the last SC store (i.e. this one) or stores that
|
||||
// aren't ordered by hb with the last SC is picked.
|
||||
self.buffer.iter_mut().rev().for_each(|elem| {
|
||||
if elem.timestamp <= thread_clock[elem.store_index] {
|
||||
elem.is_seqcst = true;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct StoreElement {
|
||||
/// The identifier of the vector index, corresponding to a thread
|
||||
/// that performed the store.
|
||||
store_index: VectorIdx,
|
||||
|
||||
/// Whether this store is SC.
|
||||
is_seqcst: bool,
|
||||
|
||||
/// The timestamp of the storing thread when it performed the store
|
||||
timestamp: VTimestamp,
|
||||
/// The value of this store
|
||||
val: ScalarMaybeUninit<Tag>,
|
||||
|
||||
/// Timestamp of first loads from this store element by each thread
|
||||
/// Behind a RefCell to keep load op take &self
|
||||
loads: RefCell<FxHashMap<VectorIdx, VTimestamp>>,
|
||||
}
|
||||
|
||||
impl StoreElement {
|
||||
/// ATOMIC LOAD IMPL in the paper
|
||||
/// Unlike the operational semantics in the paper, we don't need to keep track
|
||||
/// of the thread timestamp for every single load. Keeping track of the first (smallest)
|
||||
/// timestamp of each thread that has loaded from a store is sufficient: if the earliest
|
||||
/// load of another thread happens before the current one, then we must stop searching the store
|
||||
/// buffer regardless of subsequent loads by the same thread; if the earliest load of another
|
||||
/// thread doesn't happen before the current one, then no subsequent load by the other thread
|
||||
/// can happen before the current one.
|
||||
fn load_impl(&self, index: VectorIdx, clocks: &ThreadClockSet) -> ScalarMaybeUninit<Tag> {
|
||||
let _ = self.loads.borrow_mut().try_insert(index, clocks.clock[index]);
|
||||
self.val
|
||||
}
|
||||
}
|
||||
|
|
@ -63,6 +63,28 @@ fn reads_value(loc: &AtomicUsize, val: usize) -> usize {
|
|||
val
|
||||
}
|
||||
|
||||
// https://plv.mpi-sws.org/scfix/paper.pdf
|
||||
// Test case SB
|
||||
fn test_sc_store_buffering() {
|
||||
let x = static_atomic(0);
|
||||
let y = static_atomic(0);
|
||||
|
||||
let j1 = spawn(move || {
|
||||
x.store(1, SeqCst);
|
||||
y.load(SeqCst)
|
||||
});
|
||||
|
||||
let j2 = spawn(move || {
|
||||
y.store(1, SeqCst);
|
||||
x.load(SeqCst)
|
||||
});
|
||||
|
||||
let a = j1.join().unwrap();
|
||||
let b = j2.join().unwrap();
|
||||
|
||||
assert_ne!((a, b), (0, 0));
|
||||
}
|
||||
|
||||
// https://plv.mpi-sws.org/scfix/paper.pdf
|
||||
// 2.2 Second Problem: SC Fences are Too Weak
|
||||
fn test_rwc_syncs() {
|
||||
|
|
@ -247,6 +269,7 @@ pub fn main() {
|
|||
// prehaps each function should be its own test case so they
|
||||
// can be run in parallel
|
||||
for _ in 0..500 {
|
||||
test_sc_store_buffering();
|
||||
test_mixed_access();
|
||||
test_load_buffering_acq_rel();
|
||||
test_message_passing();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue