Merge pull request #4577 from RalfJung/release-seq

Fix release/scquire synchonization for loads from the store buffer
This commit is contained in:
Ralf Jung 2025-09-11 15:43:36 +00:00 committed by GitHub
commit 166af83c69
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 157 additions and 44 deletions

View file

@ -183,6 +183,9 @@ struct AtomicMemoryCellClocks {
/// contains the vector of timestamps that will
/// happen-before a thread if an acquire-load is
/// performed on the data.
///
/// With weak memory emulation, this is the clock of the most recent write. It is then only used
/// for release sequences, to integrate the most recent clock into the next one for RMWs.
sync_vector: VClock,
/// The size of accesses to this atomic location.
@ -276,7 +279,7 @@ struct MemoryCellClocks {
/// zero on each write operation.
read: VClock,
/// Atomic access, acquire, release sequence tracking clocks.
/// Atomic access tracking clocks.
/// For non-atomic memory this value is set to None.
/// For atomic memory, each byte carries this information.
atomic_ops: Option<Box<AtomicMemoryCellClocks>>,
@ -504,10 +507,11 @@ impl MemoryCellClocks {
thread_clocks: &mut ThreadClockSet,
index: VectorIdx,
access_size: Size,
sync_clock: Option<&VClock>,
) -> Result<(), DataRace> {
self.atomic_read_detect(thread_clocks, index, access_size)?;
if let Some(atomic) = self.atomic() {
thread_clocks.clock.join(&atomic.sync_vector);
if let Some(sync_clock) = sync_clock.or_else(|| self.atomic().map(|a| &a.sync_vector)) {
thread_clocks.clock.join(sync_clock);
}
Ok(())
}
@ -520,10 +524,11 @@ impl MemoryCellClocks {
thread_clocks: &mut ThreadClockSet,
index: VectorIdx,
access_size: Size,
sync_clock: Option<&VClock>,
) -> Result<(), DataRace> {
self.atomic_read_detect(thread_clocks, index, access_size)?;
if let Some(atomic) = self.atomic() {
thread_clocks.fence_acquire.join(&atomic.sync_vector);
if let Some(sync_clock) = sync_clock.or_else(|| self.atomic().map(|a| &a.sync_vector)) {
thread_clocks.fence_acquire.join(sync_clock);
}
Ok(())
}
@ -555,7 +560,8 @@ impl MemoryCellClocks {
// The handling of release sequences was changed in C++20 and so
// the code here is different to the paper since now all relaxed
// stores block release sequences. The exception for same-thread
// relaxed stores has been removed.
// relaxed stores has been removed. We always overwrite the `sync_vector`,
// meaning the previous release sequence is broken.
let atomic = self.atomic_mut_unwrap();
atomic.sync_vector.clone_from(&thread_clocks.fence_release);
Ok(())
@ -571,6 +577,8 @@ impl MemoryCellClocks {
) -> Result<(), DataRace> {
self.atomic_write_detect(thread_clocks, index, access_size)?;
let atomic = self.atomic_mut_unwrap();
// This *joining* of `sync_vector` implements release sequences: future
// reads of this location will acquire our clock *and* what was here before.
atomic.sync_vector.join(&thread_clocks.clock);
Ok(())
}
@ -585,6 +593,8 @@ impl MemoryCellClocks {
) -> Result<(), DataRace> {
self.atomic_write_detect(thread_clocks, index, access_size)?;
let atomic = self.atomic_mut_unwrap();
// This *joining* of `sync_vector` implements release sequences: future
// reads of this location will acquire our fence clock *and* what was here before.
atomic.sync_vector.join(&thread_clocks.fence_release);
Ok(())
}
@ -731,8 +741,8 @@ pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
}
let scalar = this.allow_data_races_ref(move |this| this.read_scalar(place))?;
let buffered_scalar = this.buffered_atomic_read(place, atomic, scalar, || {
this.validate_atomic_load(place, atomic)
let buffered_scalar = this.buffered_atomic_read(place, atomic, scalar, |sync_clock| {
this.validate_atomic_load(place, atomic, sync_clock)
})?;
interp_ok(buffered_scalar.ok_or_else(|| err_ub!(InvalidUninitBytes(None)))?)
}
@ -930,7 +940,7 @@ pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
this.validate_atomic_rmw(place, success)?;
this.buffered_atomic_rmw(new, place, success, old.to_scalar())?;
} else {
this.validate_atomic_load(place, fail)?;
this.validate_atomic_load(place, fail, /* can use latest sync clock */ None)?;
// A failed compare exchange is equivalent to a load, reading from the latest store
// in the modification order.
// Since `old` is only a value and not the store element, we need to separately
@ -1172,6 +1182,18 @@ impl VClockAlloc {
}))?
}
/// Return the release/acquire synchronization clock for the given memory range.
pub(super) fn sync_clock(&self, access_range: AllocRange) -> VClock {
let alloc_ranges = self.alloc_ranges.borrow();
let mut clock = VClock::default();
for (_, mem_clocks) in alloc_ranges.iter(access_range.start, access_range.size) {
if let Some(atomic) = mem_clocks.atomic() {
clock.join(&atomic.sync_vector);
}
}
clock
}
/// Detect data-races for an unsynchronized read operation. It will not perform
/// data-race detection if `race_detecting()` is false, either due to no threads
/// being created or if it is temporarily disabled during a racy read or write
@ -1448,6 +1470,7 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
&self,
place: &MPlaceTy<'tcx>,
atomic: AtomicReadOrd,
sync_clock: Option<&VClock>,
) -> InterpResult<'tcx> {
let this = self.eval_context_ref();
this.validate_atomic_op(
@ -1456,9 +1479,9 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
AccessType::AtomicLoad,
move |memory, clocks, index, atomic| {
if atomic == AtomicReadOrd::Relaxed {
memory.load_relaxed(&mut *clocks, index, place.layout.size)
memory.load_relaxed(&mut *clocks, index, place.layout.size, sync_clock)
} else {
memory.load_acquire(&mut *clocks, index, place.layout.size)
memory.load_acquire(&mut *clocks, index, place.layout.size, sync_clock)
}
},
)
@ -1503,9 +1526,9 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
AccessType::AtomicRmw,
move |memory, clocks, index, _| {
if acquire {
memory.load_acquire(clocks, index, place.layout.size)?;
memory.load_acquire(clocks, index, place.layout.size, None)?;
} else {
memory.load_relaxed(clocks, index, place.layout.size)?;
memory.load_relaxed(clocks, index, place.layout.size, None)?;
}
if release {
memory.rmw_release(clocks, index, place.layout.size)

View file

@ -62,8 +62,11 @@
//! You can refer to test cases in weak_memory/extra_cpp.rs and weak_memory/extra_cpp_unsafe.rs for examples of these operations.
// Our and the author's own implementation (tsan11) of the paper have some deviations from the provided operational semantics in §5.3:
// 1. In the operational semantics, store elements keep a copy of the atomic object's vector clock (AtomicCellClocks::sync_vector in miri),
// but this is not used anywhere so it's omitted here.
// 1. In the operational semantics, loads acquire the vector clock of the atomic location
// irrespective of which store buffer element is loaded. That's incorrect; the synchronization clock
// needs to be tracked per-store-buffer-element. (The paper has a field "clocks" for that purpose,
// but it is not actuallt used.) tsan11 does this correctly
// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L305).
//
// 2. In the operational semantics, each store element keeps the timestamp of a thread when it loads from the store.
// If the same thread loads from the same store element multiple times, then the timestamps at all loads are saved in a list of load elements.
@ -138,16 +141,17 @@ enum LoadRecency {
#[derive(Debug, Clone, PartialEq, Eq)]
struct StoreElement {
/// The identifier of the vector index, corresponding to a thread
/// that performed the store.
store_index: VectorIdx,
/// The thread that performed the store.
store_thread: VectorIdx,
/// The timestamp of the storing thread when it performed the store
store_timestamp: VTimestamp,
/// The vector clock that can be acquired by loading this store.
sync_clock: VClock,
/// Whether this store is SC.
is_seqcst: bool,
/// The timestamp of the storing thread when it performed the store
timestamp: VTimestamp,
/// The value of this store. `None` means uninitialized.
// FIXME: Currently, we cannot represent partial initialization.
val: Option<Scalar>,
@ -159,9 +163,12 @@ struct StoreElement {
#[derive(Debug, Clone, PartialEq, Eq, Default)]
struct LoadInfo {
/// Timestamp of first loads from this store element by each thread
/// Timestamp of first loads from this store element by each thread.
timestamps: FxHashMap<VectorIdx, VTimestamp>,
/// Whether this store element has been read by an SC load
/// Whether this store element has been read by an SC load.
/// This is crucial to ensure we respect coherence-ordered-before. Concretely we use
/// this to ensure that if a store element is seen by an SC load, then all later SC loads
/// cannot see `mo`-earlier store elements.
sc_loaded: bool,
}
@ -243,8 +250,10 @@ impl<'tcx> StoreBuffer {
let store_elem = StoreElement {
// The thread index and timestamp of the initialisation write
// are never meaningfully used, so it's fine to leave them as 0
store_index: VectorIdx::from(0),
timestamp: VTimestamp::ZERO,
store_thread: VectorIdx::from(0),
store_timestamp: VTimestamp::ZERO,
// The initialization write is non-atomic so nothing can be acquired.
sync_clock: VClock::default(),
val: init,
is_seqcst: false,
load_info: RefCell::new(LoadInfo::default()),
@ -273,7 +282,7 @@ impl<'tcx> StoreBuffer {
thread_mgr: &ThreadManager<'_>,
is_seqcst: bool,
rng: &mut (impl rand::Rng + ?Sized),
validate: impl FnOnce() -> InterpResult<'tcx>,
validate: impl FnOnce(Option<&VClock>) -> InterpResult<'tcx>,
) -> InterpResult<'tcx, (Option<Scalar>, LoadRecency)> {
// Having a live borrow to store_buffer while calling validate_atomic_load is fine
// because the race detector doesn't touch store_buffer
@ -290,7 +299,7 @@ impl<'tcx> StoreBuffer {
// after we've picked a store element from the store buffer, as presented
// in ATOMIC LOAD rule of the paper. This is because fetch_store
// requires access to ThreadClockSet.clock, which is updated by the race detector
validate()?;
validate(Some(&store_elem.sync_clock))?;
let (index, clocks) = global.active_thread_state(thread_mgr);
let loaded = store_elem.load_impl(index, &clocks, is_seqcst);
@ -303,10 +312,11 @@ impl<'tcx> StoreBuffer {
global: &DataRaceState,
thread_mgr: &ThreadManager<'_>,
is_seqcst: bool,
sync_clock: VClock,
) -> InterpResult<'tcx> {
let (index, clocks) = global.active_thread_state(thread_mgr);
self.store_impl(val, index, &clocks.clock, is_seqcst);
self.store_impl(val, index, &clocks.clock, is_seqcst, sync_clock);
interp_ok(())
}
@ -333,7 +343,9 @@ impl<'tcx> StoreBuffer {
return false;
}
keep_searching = if store_elem.timestamp <= clocks.clock[store_elem.store_index] {
keep_searching = if store_elem.store_timestamp
<= clocks.clock[store_elem.store_thread]
{
// CoWR: if a store happens-before the current load,
// then we can't read-from anything earlier in modification order.
// C++20 §6.9.2.2 [intro.races] paragraph 18
@ -345,7 +357,7 @@ impl<'tcx> StoreBuffer {
// then we cannot read-from anything earlier in modification order.
// C++20 §6.9.2.2 [intro.races] paragraph 16
false
} else if store_elem.timestamp <= clocks.write_seqcst[store_elem.store_index]
} else if store_elem.store_timestamp <= clocks.write_seqcst[store_elem.store_thread]
&& store_elem.is_seqcst
{
// The current non-SC load, which may be sequenced-after an SC fence,
@ -353,7 +365,7 @@ impl<'tcx> StoreBuffer {
// C++17 §32.4 [atomics.order] paragraph 4
false
} else if is_seqcst
&& store_elem.timestamp <= clocks.read_seqcst[store_elem.store_index]
&& store_elem.store_timestamp <= clocks.read_seqcst[store_elem.store_thread]
{
// The current SC load cannot read-before the last store sequenced-before
// the last SC fence.
@ -391,17 +403,19 @@ impl<'tcx> StoreBuffer {
}
}
/// ATOMIC STORE IMPL in the paper (except we don't need the location's vector clock)
/// ATOMIC STORE IMPL in the paper
fn store_impl(
&mut self,
val: Scalar,
index: VectorIdx,
thread_clock: &VClock,
is_seqcst: bool,
sync_clock: VClock,
) {
let store_elem = StoreElement {
store_index: index,
timestamp: thread_clock[index],
store_thread: index,
store_timestamp: thread_clock[index],
sync_clock,
// In the language provided in the paper, an atomic store takes the value from a
// non-atomic memory location.
// But we already have the immediate value here so we don't need to do the memory
@ -419,7 +433,7 @@ impl<'tcx> StoreBuffer {
// so that in a later SC load, only the last SC store (i.e. this one) or stores that
// aren't ordered by hb with the last SC is picked.
self.buffer.iter_mut().rev().for_each(|elem| {
if elem.timestamp <= thread_clock[elem.store_index] {
if elem.store_timestamp <= thread_clock[elem.store_thread] {
elem.is_seqcst = true;
}
})
@ -462,7 +476,7 @@ pub(super) trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr(), 0)?;
if let (
crate::AllocExtra {
data_race: AllocDataRaceHandler::Vclocks(_, Some(alloc_buffers)),
data_race: AllocDataRaceHandler::Vclocks(data_race_clocks, Some(alloc_buffers)),
..
},
crate::MiriMachine {
@ -475,19 +489,29 @@ pub(super) trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
global.sc_write(threads);
}
let range = alloc_range(base_offset, place.layout.size);
let sync_clock = data_race_clocks.sync_clock(range);
let buffer = alloc_buffers.get_or_create_store_buffer_mut(range, Some(init))?;
// The RMW always reads from the most recent store.
buffer.read_from_last_store(global, threads, atomic == AtomicRwOrd::SeqCst);
buffer.buffered_write(new_val, global, threads, atomic == AtomicRwOrd::SeqCst)?;
buffer.buffered_write(
new_val,
global,
threads,
atomic == AtomicRwOrd::SeqCst,
sync_clock,
)?;
}
interp_ok(())
}
/// The argument to `validate` is the synchronization clock of the memory that is being read,
/// if we are reading from a store buffer element.
fn buffered_atomic_read(
&self,
place: &MPlaceTy<'tcx>,
atomic: AtomicReadOrd,
latest_in_mo: Scalar,
validate: impl FnOnce() -> InterpResult<'tcx>,
validate: impl FnOnce(Option<&VClock>) -> InterpResult<'tcx>,
) -> InterpResult<'tcx, Option<Scalar>> {
let this = self.eval_context_ref();
'fallback: {
@ -525,7 +549,7 @@ pub(super) trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
}
// Race detector or weak memory disabled, simply read the latest value
validate()?;
validate(None)?;
interp_ok(Some(latest_in_mo))
}
@ -533,6 +557,8 @@ pub(super) trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
///
/// `init` says with which value to initialize the store buffer in case there wasn't a store
/// buffer for this memory range before.
///
/// Must be called *after* `validate_atomic_store` to ensure that `sync_clock` is up-to-date.
fn buffered_atomic_write(
&mut self,
val: Scalar,
@ -544,7 +570,7 @@ pub(super) trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(dest.ptr(), 0)?;
if let (
crate::AllocExtra {
data_race: AllocDataRaceHandler::Vclocks(_, Some(alloc_buffers)),
data_race: AllocDataRaceHandler::Vclocks(data_race_clocks, Some(alloc_buffers)),
..
},
crate::MiriMachine {
@ -556,9 +582,18 @@ pub(super) trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
global.sc_write(threads);
}
let buffer = alloc_buffers
.get_or_create_store_buffer_mut(alloc_range(base_offset, dest.layout.size), init)?;
buffer.buffered_write(val, global, threads, atomic == AtomicWriteOrd::SeqCst)?;
let range = alloc_range(base_offset, dest.layout.size);
// It's a bit annoying that we have to go back to the data race part to get the clock...
// but it does make things a lot simpler.
let sync_clock = data_race_clocks.sync_clock(range);
let buffer = alloc_buffers.get_or_create_store_buffer_mut(range, init)?;
buffer.buffered_write(
val,
global,
threads,
atomic == AtomicWriteOrd::SeqCst,
sync_clock,
)?;
}
// Caller should've written to dest with the vanilla scalar write, we do nothing here

View file

@ -44,7 +44,7 @@ fn check_all_outcomes<T: Eq + std::hash::Hash + std::fmt::Debug>(
let expected: HashSet<T> = HashSet::from_iter(expected);
let mut seen = HashSet::new();
// Let's give it N times as many tries as we are expecting values.
let tries = expected.len() * 12;
let tries = expected.len() * 16;
for i in 0..tries {
let val = generate();
assert!(expected.contains(&val), "got an unexpected value: {val:?}");
@ -198,11 +198,66 @@ fn weaker_release_sequences() {
});
}
/// Ensuring normal release sequences (with RMWs) still work correctly.
fn release_sequence() {
check_all_outcomes([None, Some(1)], || {
let x = static_atomic(0);
let y = static_atomic(0);
let t1 = spawn(move || {
y.store(1, Relaxed);
x.store(1, Release);
x.swap(3, Relaxed);
});
let t2 = spawn(move || {
if x.load(Acquire) == 3 {
// If we read 3 here, we are seeing the result of the `x.swap` above, which was
// relaxed but forms a release sequence with the `x.store`. This means there is a
// release sequence, so we acquire the `y.store` and cannot see the original value
// `0` any more.
Some(y.load(Relaxed))
} else {
None
}
});
t1.join().unwrap();
t2.join().unwrap()
});
}
/// Ensure that when we read from an outdated release store, we acquire its clock.
fn old_release_store() {
check_all_outcomes([None, Some(1)], || {
let x = static_atomic(0);
let y = static_atomic(0);
let t1 = spawn(move || {
y.store(1, Relaxed);
x.store(1, Release); // this is what we want to read from
x.store(3, Relaxed);
});
let t2 = spawn(move || {
if x.load(Acquire) == 1 {
// We must have acquired the `y.store` so we cannot see the initial value any more.
Some(y.load(Relaxed))
} else {
None
}
});
t1.join().unwrap();
t2.join().unwrap()
});
}
pub fn main() {
relaxed();
seq_cst();
initialization_write(false);
initialization_write(true);
faa_replaced_by_load();
release_sequence();
weaker_release_sequences();
old_release_store();
}