From e7698f4f07dcec9cf42b3de133f9ca171d0677f0 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Mon, 27 Dec 2021 19:07:23 +0000 Subject: [PATCH] Implement weak memory emulation --- src/data_race.rs | 170 +++++++++++-- src/lib.rs | 1 + src/machine.rs | 12 +- src/weak_memory.rs | 297 ++++++++++++++++++++++ tests/run-pass/concurrency/weak_memory.rs | 23 ++ 5 files changed, 476 insertions(+), 27 deletions(-) create mode 100644 src/weak_memory.rs diff --git a/src/data_race.rs b/src/data_race.rs index eb67a487b5a5..82ee32ddee71 100644 --- a/src/data_race.rs +++ b/src/data_race.rs @@ -12,7 +12,7 @@ //! The implementation also models races with memory allocation and deallocation via treating allocation and //! deallocation as a type of write internally for detecting data-races. //! -//! This does not explore weak memory orders and so can still miss data-races +//! Weak memory orders are explored but not all weak behaviours are exhibited, so it can still miss data-races //! but should not report false-positives //! //! Data-race definition from(): @@ -29,22 +29,6 @@ //! This means that the thread-index can be safely re-used, starting on the next timestamp for the newly created //! thread. //! -//! The sequentially consistent ordering corresponds to the ordering that the threads -//! are currently scheduled, this means that the data-race detector has no additional -//! logic for sequentially consistent accesses at the moment since they are indistinguishable -//! from acquire/release operations. If weak memory orderings are explored then this -//! may need to change or be updated accordingly. -//! -//! Per the C++ spec for the memory model a sequentially consistent operation: -//! "A load operation with this memory order performs an acquire operation, -//! a store performs a release operation, and read-modify-write performs -//! both an acquire operation and a release operation, plus a single total -//! order exists in which all threads observe all modifications in the same -//! order (see Sequentially-consistent ordering below) " -//! So in the absence of weak memory effects a seq-cst load & a seq-cst store is identical -//! to an acquire load and a release store given the global sequentially consistent order -//! of the schedule. -//! //! The timestamps used in the data-race detector assign each sequence of non-atomic operations //! followed by a single atomic or concurrent operation a single timestamp. //! Write, Read, Write, ThreadJoin will be represented by a single timestamp value on a thread. @@ -67,6 +51,7 @@ use std::{ mem, }; +use rustc_const_eval::interpret::alloc_range; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; use rustc_index::vec::{Idx, IndexVec}; use rustc_middle::{mir, ty::layout::TyAndLayout}; @@ -115,10 +100,10 @@ pub enum AtomicFenceOp { /// of a thread, contains the happens-before clock and /// additional metadata to model atomic fence operations. #[derive(Clone, Default, Debug)] -struct ThreadClockSet { +pub struct ThreadClockSet { /// The increasing clock representing timestamps /// that happen-before this thread. - clock: VClock, + pub clock: VClock, /// The set of timestamps that will happen-before this /// thread once it performs an acquire fence. @@ -127,6 +112,12 @@ struct ThreadClockSet { /// The last timestamp of happens-before relations that /// have been released by this thread by a fence. fence_release: VClock, + + pub fence_seqcst: VClock, + + pub write_seqcst: VClock, + + pub read_seqcst: VClock, } impl ThreadClockSet { @@ -169,7 +160,7 @@ pub struct DataRace; /// common case where no atomic operations /// exists on the memory cell. #[derive(Clone, PartialEq, Eq, Default, Debug)] -struct AtomicMemoryCellClocks { +pub struct AtomicMemoryCellClocks { /// The clock-vector of the timestamp of the last atomic /// read operation performed by each thread. /// This detects potential data-races between atomic read @@ -514,7 +505,32 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { atomic: AtomicReadOp, ) -> InterpResult<'tcx, ScalarMaybeUninit> { let this = self.eval_context_ref(); + // This will read from the last store in the modification order of this location. In case + // weak memory emulation is enabled, this may not be the store we will pick to actually read from and return. + // This is fine with StackedBorrow and race checks because they don't concern metadata on + // the *value* (including the associated provenance if this is an AtomicPtr) at this location. + // Only metadata on the location itself is used. let scalar = this.allow_data_races_ref(move |this| this.read_scalar(&place.into()))?; + + if let Some(global) = &this.machine.data_race { + let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?; + if let Some(alloc_buffers) = this.get_alloc_extra(alloc_id)?.weak_memory.as_ref() { + if atomic == AtomicReadOp::SeqCst { + global.sc_read(); + } + let mut rng = this.machine.rng.borrow_mut(); + let loaded = alloc_buffers.buffered_read( + alloc_range(base_offset, place.layout.size), + global, + atomic == AtomicReadOp::SeqCst, + &mut *rng, + || this.validate_atomic_load(place, atomic), + )?; + + return Ok(loaded.unwrap_or(scalar)); + } + } + this.validate_atomic_load(place, atomic)?; Ok(scalar) } @@ -528,7 +544,27 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); this.allow_data_races_mut(move |this| this.write_scalar(val, &(*dest).into()))?; - this.validate_atomic_store(dest, atomic) + + this.validate_atomic_store(dest, atomic)?; + let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(dest.ptr)?; + if let ( + crate::AllocExtra { weak_memory: Some(alloc_buffers), .. }, + crate::Evaluator { data_race: Some(global), .. }, + ) = this.get_alloc_extra_mut(alloc_id)? + { + if atomic == AtomicWriteOp::SeqCst { + global.sc_write(); + } + let size = dest.layout.size; + alloc_buffers.buffered_write( + val, + alloc_range(base_offset, size), + global, + atomic == AtomicWriteOp::SeqCst, + )?; + } + + Ok(()) } /// Perform an atomic operation on a memory location. @@ -550,6 +586,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { this.allow_data_races_mut(|this| this.write_immediate(*val, &(*place).into()))?; this.validate_atomic_rmw(place, atomic)?; + + this.buffered_atomic_rmw(val.to_scalar_or_uninit(), place, atomic)?; Ok(old) } @@ -565,7 +603,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { let old = this.allow_data_races_mut(|this| this.read_scalar(&place.into()))?; this.allow_data_races_mut(|this| this.write_scalar(new, &(*place).into()))?; + this.validate_atomic_rmw(place, atomic)?; + + this.buffered_atomic_rmw(new, place, atomic)?; Ok(old) } @@ -584,15 +625,25 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { let lt = this.binary_op(mir::BinOp::Lt, &old, &rhs)?.to_scalar()?.to_bool()?; let new_val = if min { - if lt { &old } else { &rhs } + if lt { + &old + } else { + &rhs + } } else { - if lt { &rhs } else { &old } + if lt { + &rhs + } else { + &old + } }; this.allow_data_races_mut(|this| this.write_immediate(**new_val, &(*place).into()))?; this.validate_atomic_rmw(place, atomic)?; + this.buffered_atomic_rmw(new_val.to_scalar_or_uninit(), place, atomic)?; + // Return the old value. Ok(old) } @@ -642,14 +693,56 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { if cmpxchg_success { this.allow_data_races_mut(|this| this.write_scalar(new, &(*place).into()))?; this.validate_atomic_rmw(place, success)?; + this.buffered_atomic_rmw(new, place, success)?; } else { this.validate_atomic_load(place, fail)?; + // A failed compare exchange is equivalent to a load, reading from the latest store + // in the modification order. + // Since `old` is only a value and not the store element, we need to separately + // find it in our store buffer and perform load_impl on it. + if let Some(global) = &this.machine.data_race { + if fail == AtomicReadOp::SeqCst { + global.sc_read(); + } + let size = place.layout.size; + let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?; + if let Some(alloc_buffers) = this.get_alloc_extra(alloc_id)?.weak_memory.as_ref() { + if global.multi_threaded.get() { + alloc_buffers.read_from_last_store(alloc_range(base_offset, size), global); + } + } + } } // Return the old value. Ok(res) } + fn buffered_atomic_rmw( + &mut self, + new_val: ScalarMaybeUninit, + place: &MPlaceTy<'tcx, Tag>, + atomic: AtomicRwOp, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?; + if let ( + crate::AllocExtra { weak_memory: Some(alloc_buffers), .. }, + crate::Evaluator { data_race: Some(global), .. }, + ) = this.get_alloc_extra_mut(alloc_id)? + { + if atomic == AtomicRwOp::SeqCst { + global.sc_read(); + global.sc_write(); + } + let size = place.layout.size; + let range = alloc_range(base_offset, size); + alloc_buffers.read_from_last_store(range, global); + alloc_buffers.buffered_write(new_val, range, global, atomic == AtomicRwOp::SeqCst)?; + } + Ok(()) + } + /// Update the data-race detector for an atomic read occurring at the /// associated memory-place and on the current thread. fn validate_atomic_load( @@ -723,7 +816,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { fn validate_atomic_fence(&mut self, atomic: AtomicFenceOp) -> InterpResult<'tcx> { let this = self.eval_context_mut(); if let Some(data_race) = &mut this.machine.data_race { - data_race.maybe_perform_sync_operation(move |index, mut clocks| { + data_race.maybe_perform_sync_operation(|index, mut clocks| { log::trace!("Atomic fence on {:?} with ordering {:?}", index, atomic); // Apply data-race detection for the current fences @@ -737,6 +830,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { // Either Release | AcqRel | SeqCst clocks.apply_release_fence(); } + if atomic == AtomicFenceOp::SeqCst { + data_race.last_sc_fence.borrow_mut().set_at_index(&clocks.clock, index); + clocks.fence_seqcst.join(&data_race.last_sc_fence.borrow()); + clocks.write_seqcst.join(&data_race.last_sc_write.borrow()); + } // Increment timestamp in case of release semantics. Ok(atomic != AtomicFenceOp::Acquire) @@ -1116,6 +1214,12 @@ pub struct GlobalState { /// The associated vector index will be moved into re-use candidates /// after the join operation occurs. terminated_threads: RefCell>, + + /// The timestamp of last SC fence performed by each thread + last_sc_fence: RefCell, + + /// The timestamp of last SC write performed by each thread + last_sc_write: RefCell, } impl GlobalState { @@ -1131,6 +1235,8 @@ impl GlobalState { active_thread_count: Cell::new(1), reuse_candidates: RefCell::new(FxHashSet::default()), terminated_threads: RefCell::new(FxHashMap::default()), + last_sc_fence: RefCell::new(VClock::default()), + last_sc_write: RefCell::new(VClock::default()), }; // Setup the main-thread since it is not explicitly created: @@ -1445,7 +1551,7 @@ impl GlobalState { /// Load the current vector clock in use and the current set of thread clocks /// in use for the vector. #[inline] - fn current_thread_state(&self) -> (VectorIdx, Ref<'_, ThreadClockSet>) { + pub fn current_thread_state(&self) -> (VectorIdx, Ref<'_, ThreadClockSet>) { let index = self.current_index(); let ref_vector = self.vector_clocks.borrow(); let clocks = Ref::map(ref_vector, |vec| &vec[index]); @@ -1455,7 +1561,7 @@ impl GlobalState { /// Load the current vector clock in use and the current set of thread clocks /// in use for the vector mutably for modification. #[inline] - fn current_thread_state_mut(&self) -> (VectorIdx, RefMut<'_, ThreadClockSet>) { + pub fn current_thread_state_mut(&self) -> (VectorIdx, RefMut<'_, ThreadClockSet>) { let index = self.current_index(); let ref_vector = self.vector_clocks.borrow_mut(); let clocks = RefMut::map(ref_vector, |vec| &mut vec[index]); @@ -1468,4 +1574,16 @@ impl GlobalState { fn current_index(&self) -> VectorIdx { self.current_index.get() } + + // SC ATOMIC STORE rule in the paper. + fn sc_write(&self) { + let (index, clocks) = self.current_thread_state(); + self.last_sc_write.borrow_mut().set_at_index(&clocks.clock, index); + } + + // SC ATOMIC READ rule in the paper. + fn sc_read(&self) { + let (.., mut clocks) = self.current_thread_state_mut(); + clocks.read_seqcst.join(&self.last_sc_fence.borrow()); + } } diff --git a/src/lib.rs b/src/lib.rs index f7c256656a76..06ab2fabab04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,7 @@ mod stacked_borrows; mod sync; mod thread; mod vector_clock; +mod weak_memory; // Establish a "crate-wide prelude": we often import `crate::*`. diff --git a/src/machine.rs b/src/machine.rs index 2060bba0b853..aa2a930ccd25 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -190,6 +190,9 @@ pub struct AllocExtra { /// Data race detection via the use of a vector-clock, /// this is only added if it is enabled. pub data_race: Option, + /// Weak memory emulation via the use of store buffers, + /// this is only added if it is enabled. + pub weak_memory: Option, } /// Precomputed layouts of primitive types @@ -630,9 +633,16 @@ impl<'mir, 'tcx> Machine<'mir, 'tcx> for Evaluator<'mir, 'tcx> { } else { None }; + let buffer_alloc = if ecx.machine.weak_memory { + // FIXME: if this is an atomic obejct, we want to supply its initial value + // while allocating the store buffer here. + Some(weak_memory::AllocExtra::new_allocation(alloc.size())) + } else { + None + }; let alloc: Allocation = alloc.convert_tag_add_extra( &ecx.tcx, - AllocExtra { stacked_borrows: stacks, data_race: race_alloc }, + AllocExtra { stacked_borrows: stacks, data_race: race_alloc, weak_memory: buffer_alloc }, |ptr| Evaluator::tag_alloc_base_pointer(ecx, ptr), ); Cow::Owned(alloc) diff --git a/src/weak_memory.rs b/src/weak_memory.rs new file mode 100644 index 000000000000..c82a31d0a89c --- /dev/null +++ b/src/weak_memory.rs @@ -0,0 +1,297 @@ +//! Implementation of C++11-consistent weak memory emulation using store buffers +//! based on Dynamic Race Detection for C++ ("the paper"): +//! https://www.doc.ic.ac.uk/~afd/homepages/papers/pdfs/2017/POPL.pdf + +// Our and the author's own implementation (tsan11) of the paper have some deviations from the provided operational semantics in §5.3: +// 1. In the operational semantics, store elements keep a copy of the atomic object's vector clock (AtomicCellClocks::sync_vector in miri), +// but this is not used anywhere so it's omitted here. +// +// 2. In the operational semantics, each store element keeps the timestamp of a thread when it loads from the store. +// If the same thread loads from the same store element multiple times, then the timestamps at all loads are saved in a list of load elements. +// This is not necessary as later loads by the same thread will always have greater timetstamp values, so we only need to record the timestamp of the first +// load by each thread. This optimisation is done in tsan11 +// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.h#L35-L37) +// and here. +// +// 3. §4.5 of the paper wants an SC store to mark all existing stores in the buffer that happens before it +// as SC. This is not done in the operational semantics but implemented correctly in tsan11 +// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L160-L167) +// and here. +// +// 4. W_SC ; R_SC case requires the SC load to ignore all but last store maked SC (stores not marked SC are not +// affected). But this rule is applied to all loads in ReadsFromSet from the paper (last two lines of code), not just SC load. +// This is implemented correctly in tsan11 +// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L295) +// and here. + +use std::{ + cell::{Ref, RefCell, RefMut}, + collections::VecDeque, +}; + +use rustc_const_eval::interpret::{AllocRange, InterpResult, ScalarMaybeUninit}; +use rustc_data_structures::fx::FxHashMap; +use rustc_target::abi::Size; + +use crate::{ + data_race::{GlobalState, ThreadClockSet}, + RangeMap, Tag, VClock, VTimestamp, VectorIdx, +}; + +pub type AllocExtra = StoreBufferAlloc; +#[derive(Debug, Clone)] +pub struct StoreBufferAlloc { + /// Store buffer of each atomic object in this allocation + // Load may modify a StoreBuffer to record the loading thread's + // timestamp so we need interior mutability here. + store_buffer: RefCell>, +} + +impl StoreBufferAlloc { + pub fn new_allocation(len: Size) -> Self { + Self { store_buffer: RefCell::new(RangeMap::new(len, StoreBuffer::default())) } + } + + /// Gets a store buffer associated with an atomic object in this allocation + fn get_store_buffer(&self, range: AllocRange) -> Ref<'_, StoreBuffer> { + Ref::map(self.store_buffer.borrow(), |range_map| { + let (.., store_buffer) = range_map.iter(range.start, range.size).next().unwrap(); + store_buffer + }) + } + + fn get_store_buffer_mut(&self, range: AllocRange) -> RefMut<'_, StoreBuffer> { + RefMut::map(self.store_buffer.borrow_mut(), |range_map| { + let (.., store_buffer) = range_map.iter_mut(range.start, range.size).next().unwrap(); + store_buffer + }) + } + + /// Reads from the last store in modification order + pub fn read_from_last_store<'tcx>(&self, range: AllocRange, global: &GlobalState) { + let store_buffer = self.get_store_buffer(range); + let store_elem = store_buffer.buffer.back(); + if let Some(store_elem) = store_elem { + let (index, clocks) = global.current_thread_state(); + store_elem.load_impl(index, &clocks); + } + } + + pub fn buffered_read<'tcx>( + &self, + range: AllocRange, + global: &GlobalState, + is_seqcst: bool, + rng: &mut (impl rand::Rng + ?Sized), + validate: impl FnOnce() -> InterpResult<'tcx>, + ) -> InterpResult<'tcx, Option>> { + // Having a live borrow to store_buffer while calling validate_atomic_load is fine + // because the race detector doesn't touch store_buffer + let store_buffer = self.get_store_buffer(range); + + let store_elem = { + // The `clocks` we got here must be dropped before calling validate_atomic_load + // as the race detector will update it + let (.., clocks) = global.current_thread_state(); + // Load from a valid entry in the store buffer + store_buffer.fetch_store(is_seqcst, &clocks, &mut *rng) + }; + + // Unlike in write_scalar_atomic, thread clock updates have to be done + // after we've picked a store element from the store buffer, as presented + // in ATOMIC LOAD rule of the paper. This is because fetch_store + // requires access to ThreadClockSet.clock, which is updated by the race detector + validate()?; + + let loaded = store_elem.map(|store_elem| { + let (index, clocks) = global.current_thread_state(); + store_elem.load_impl(index, &clocks) + }); + Ok(loaded) + } + + pub fn buffered_write<'tcx>( + &mut self, + val: ScalarMaybeUninit, + range: AllocRange, + global: &GlobalState, + is_seqcst: bool, + ) -> InterpResult<'tcx> { + let (index, clocks) = global.current_thread_state(); + + let mut store_buffer = self.get_store_buffer_mut(range); + store_buffer.store_impl(val, index, &clocks.clock, is_seqcst); + Ok(()) + } +} + +const STORE_BUFFER_LIMIT: usize = 128; +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StoreBuffer { + // Stores to this location in modification order + buffer: VecDeque, +} + +impl Default for StoreBuffer { + fn default() -> Self { + let mut buffer = VecDeque::new(); + buffer.reserve(STORE_BUFFER_LIMIT); + Self { buffer } + } +} + +impl<'mir, 'tcx: 'mir> StoreBuffer { + /// Selects a valid store element in the buffer. + /// The buffer does not contain the value used to initialise the atomic object + /// so a fresh atomic object has an empty store buffer until an explicit store. + fn fetch_store( + &self, + is_seqcst: bool, + clocks: &ThreadClockSet, + rng: &mut R, + ) -> Option<&StoreElement> { + use rand::seq::IteratorRandom; + let mut found_sc = false; + // FIXME: this should be an inclusive take_while (stops after a false predicate, but + // includes the element that gave the false), but such function doesn't yet + // exist in the standard libary https://github.com/rust-lang/rust/issues/62208 + let mut keep_searching = true; + let candidates = self + .buffer + .iter() + .rev() + .take_while(move |&store_elem| { + if !keep_searching { + return false; + } + // CoWR: if a store happens-before the current load, + // then we can't read-from anything earlier in modification order. + if store_elem.timestamp <= clocks.clock[store_elem.store_index] { + log::info!("Stopped due to coherent write-read"); + keep_searching = false; + return true; + } + + // CoRR: if there was a load from this store which happened-before the current load, + // then we cannot read-from anything earlier in modification order. + if store_elem.loads.borrow().iter().any(|(&load_index, &load_timestamp)| { + load_timestamp <= clocks.clock[load_index] + }) { + log::info!("Stopped due to coherent read-read"); + keep_searching = false; + return true; + } + + // The current load, which may be sequenced-after an SC fence, can only read-from + // the last store sequenced-before an SC fence in another thread (or any stores + // later than that SC fence) + if store_elem.timestamp <= clocks.fence_seqcst[store_elem.store_index] { + log::info!("Stopped due to coherent load sequenced after sc fence"); + keep_searching = false; + return true; + } + + // The current non-SC load can only read-from the latest SC store (or any stores later than that + // SC store) + if store_elem.timestamp <= clocks.write_seqcst[store_elem.store_index] + && store_elem.is_seqcst + { + log::info!("Stopped due to needing to load from the last SC store"); + keep_searching = false; + return true; + } + + // The current SC load can only read-from the last store sequenced-before + // the last SC fence (or any stores later than the SC fence) + if is_seqcst && store_elem.timestamp <= clocks.read_seqcst[store_elem.store_index] { + log::info!("Stopped due to sc load needing to load from the last SC store before an SC fence"); + keep_searching = false; + return true; + } + + true + }) + .filter(|&store_elem| { + if is_seqcst { + // An SC load needs to ignore all but last store maked SC (stores not marked SC are not + // affected) + let include = !(store_elem.is_seqcst && found_sc); + found_sc |= store_elem.is_seqcst; + include + } else { + true + } + }); + + candidates.choose(rng) + } + + /// ATOMIC STORE IMPL in the paper (except we don't need the location's vector clock) + fn store_impl( + &mut self, + val: ScalarMaybeUninit, + index: VectorIdx, + thread_clock: &VClock, + is_seqcst: bool, + ) { + let store_elem = StoreElement { + store_index: index, + timestamp: thread_clock[index], + // In the language provided in the paper, an atomic store takes the value from a + // non-atomic memory location. + // But we already have the immediate value here so we don't need to do the memory + // access + val, + is_seqcst, + loads: RefCell::new(FxHashMap::default()), + }; + self.buffer.push_back(store_elem); + if self.buffer.len() > STORE_BUFFER_LIMIT { + self.buffer.pop_front(); + } + if is_seqcst { + // Every store that happens before this needs to be marked as SC + // so that in a later SC load, only the last SC store (i.e. this one) or stores that + // aren't ordered by hb with the last SC is picked. + self.buffer.iter_mut().rev().for_each(|elem| { + if elem.timestamp <= thread_clock[elem.store_index] { + elem.is_seqcst = true; + } + }) + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StoreElement { + /// The identifier of the vector index, corresponding to a thread + /// that performed the store. + store_index: VectorIdx, + + /// Whether this store is SC. + is_seqcst: bool, + + /// The timestamp of the storing thread when it performed the store + timestamp: VTimestamp, + /// The value of this store + val: ScalarMaybeUninit, + + /// Timestamp of first loads from this store element by each thread + /// Behind a RefCell to keep load op take &self + loads: RefCell>, +} + +impl StoreElement { + /// ATOMIC LOAD IMPL in the paper + /// Unlike the operational semantics in the paper, we don't need to keep track + /// of the thread timestamp for every single load. Keeping track of the first (smallest) + /// timestamp of each thread that has loaded from a store is sufficient: if the earliest + /// load of another thread happens before the current one, then we must stop searching the store + /// buffer regardless of subsequent loads by the same thread; if the earliest load of another + /// thread doesn't happen before the current one, then no subsequent load by the other thread + /// can happen before the current one. + fn load_impl(&self, index: VectorIdx, clocks: &ThreadClockSet) -> ScalarMaybeUninit { + let _ = self.loads.borrow_mut().try_insert(index, clocks.clock[index]); + self.val + } +} diff --git a/tests/run-pass/concurrency/weak_memory.rs b/tests/run-pass/concurrency/weak_memory.rs index bd3d1de7c23e..b8e780ade1a0 100644 --- a/tests/run-pass/concurrency/weak_memory.rs +++ b/tests/run-pass/concurrency/weak_memory.rs @@ -63,6 +63,28 @@ fn reads_value(loc: &AtomicUsize, val: usize) -> usize { val } +// https://plv.mpi-sws.org/scfix/paper.pdf +// Test case SB +fn test_sc_store_buffering() { + let x = static_atomic(0); + let y = static_atomic(0); + + let j1 = spawn(move || { + x.store(1, SeqCst); + y.load(SeqCst) + }); + + let j2 = spawn(move || { + y.store(1, SeqCst); + x.load(SeqCst) + }); + + let a = j1.join().unwrap(); + let b = j2.join().unwrap(); + + assert_ne!((a, b), (0, 0)); +} + // https://plv.mpi-sws.org/scfix/paper.pdf // 2.2 Second Problem: SC Fences are Too Weak fn test_rwc_syncs() { @@ -247,6 +269,7 @@ pub fn main() { // prehaps each function should be its own test case so they // can be run in parallel for _ in 0..500 { + test_sc_store_buffering(); test_mixed_access(); test_load_buffering_acq_rel(); test_message_passing();