Merge pull request #4106 from shamb0/generalize-callback-miri-concurrency

Concurrency: Generalize UnblockCallback to MachineCallback
This commit is contained in:
Ralf Jung 2025-01-02 10:36:31 +00:00 committed by GitHub
commit 6118879441
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 171 additions and 131 deletions

View file

@ -422,7 +422,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
mutex_ref: MutexRef,
retval_dest: Option<(Scalar, MPlaceTy<'tcx>)>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
assert!(!this.mutex_is_locked(&mutex_ref));
this.mutex_lock(&mutex_ref);
@ -538,7 +540,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
this.rwlock_reader_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
@ -623,7 +626,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
this.rwlock_writer_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
@ -677,25 +681,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
// The condvar was signaled. Make sure we get the clock for that.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(
&this.machine.sync.condvars[condvar].clock,
&this.machine.threads,
);
|this, unblock: UnblockKind| {
match unblock {
UnblockKind::Ready => {
// The condvar was signaled. Make sure we get the clock for that.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(
&this.machine.sync.condvars[condvar].clock,
&this.machine.threads,
);
}
// Try to acquire the mutex.
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
}
UnblockKind::TimedOut => {
// We have to remove the waiter from the queue again.
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
waiters.retain(|waiter| *waiter != thread);
// Now get back the lock.
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
}
}
// Try to acquire the mutex.
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
}
@timeout = |this| {
// We have to remove the waiter from the queue again.
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
waiters.retain(|waiter| *waiter != thread);
// Now get back the lock.
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
}
),
);
@ -752,25 +760,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
}
@unblock = |this| {
let futex = futex_ref.0.borrow();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
|this, unblock: UnblockKind| {
match unblock {
UnblockKind::Ready => {
let futex = futex_ref.0.borrow();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
},
UnblockKind::TimedOut => {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
},
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
}
@timeout = |this| {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
}
),
);

View file

@ -19,7 +19,7 @@ use crate::concurrency::data_race;
use crate::shims::tls;
use crate::*;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq)]
enum SchedulingAction {
/// Execute step on the active thread.
ExecuteStep,
@ -30,6 +30,7 @@ enum SchedulingAction {
}
/// What to do with TLS allocations from terminated threads
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum TlsAllocAction {
/// Deallocate backing memory of thread-local statics as usual
Deallocate,
@ -38,71 +39,18 @@ pub enum TlsAllocAction {
Leak,
}
/// Trait for callbacks that are executed when a thread gets unblocked.
pub trait UnblockCallback<'tcx>: VisitProvenance {
/// Will be invoked when the thread was unblocked the "regular" way,
/// i.e. whatever event it was blocking on has happened.
fn unblock(self: Box<Self>, ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>) -> InterpResult<'tcx>;
/// Will be invoked when the timeout ellapsed without the event the
/// thread was blocking on having occurred.
fn timeout(self: Box<Self>, _ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>)
-> InterpResult<'tcx>;
/// The argument type for the "unblock" callback, indicating why the thread got unblocked.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum UnblockKind {
/// Operation completed successfully, thread continues normal execution.
Ready,
/// The operation did not complete within its specified duration.
TimedOut,
}
pub type DynUnblockCallback<'tcx> = Box<dyn UnblockCallback<'tcx> + 'tcx>;
#[macro_export]
macro_rules! callback {
(
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
@unblock = |$this:ident| $unblock:block
) => {
callback!(
@capture<$tcx, $($lft),*> { $($name: $type),* }
@unblock = |$this| $unblock
@timeout = |_this| {
unreachable!(
"timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)"
)
}
)
};
(
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
@unblock = |$this:ident| $unblock:block
@timeout = |$this_timeout:ident| $timeout:block
) => {{
struct Callback<$tcx, $($lft),*> {
$($name: $type,)*
_phantom: std::marker::PhantomData<&$tcx ()>,
}
impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> {
#[allow(unused_variables)]
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
$(
self.$name.visit_provenance(visit);
)*
}
}
impl<$tcx, $($lft),*> UnblockCallback<$tcx> for Callback<$tcx, $($lft),*> {
fn unblock(self: Box<Self>, $this: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$unblock
}
fn timeout(self: Box<Self>, $this_timeout: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$timeout
}
}
Box::new(Callback { $($name,)* _phantom: std::marker::PhantomData })
}}
}
/// Type alias for unblock callbacks, i.e. machine callbacks invoked when
/// a thread gets unblocked.
pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
/// A thread identifier.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
@ -656,7 +604,8 @@ impl<'tcx> ThreadManager<'tcx> {
@capture<'tcx> {
joined_thread_id: ThreadId,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(&this.machine.threads, joined_thread_id);
}
@ -842,7 +791,7 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
// 2. Make the scheduler the only place that can change the active
// thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.timeout(this)?;
callback.call(this, UnblockKind::TimedOut)?;
this.machine.threads.set_active_thread_id(old_thread);
}
// found_callback can remain None if the computer's clock
@ -1084,7 +1033,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
};
// The callback must be executed in the previously blocked thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.unblock(this)?;
callback.call(this, UnblockKind::Ready)?;
this.machine.threads.set_active_thread_id(old_thread);
interp_ok(())
}

View file

@ -128,8 +128,8 @@ pub use crate::concurrency::sync::{
CondvarId, EvalContextExt as _, MutexRef, RwLockId, SynchronizationObjects,
};
pub use crate::concurrency::thread::{
BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, TimeoutAnchor,
TimeoutClock, UnblockCallback,
BlockReason, DynUnblockCallback, EvalContextExt as _, StackEmptyCallback, ThreadId,
ThreadManager, TimeoutAnchor, TimeoutClock, UnblockKind,
};
pub use crate::diagnostics::{
EvalContextExt as _, NonHaltingDiagnostic, TerminationInfo, report_error,
@ -141,8 +141,8 @@ pub use crate::eval::{
pub use crate::helpers::{AccessKind, EvalContextExt as _};
pub use crate::intrinsics::EvalContextExt as _;
pub use crate::machine::{
AllocExtra, FrameExtra, MemoryKind, MiriInterpCx, MiriInterpCxExt, MiriMachine, MiriMemoryKind,
PrimitiveLayouts, Provenance, ProvenanceExtra,
AllocExtra, DynMachineCallback, FrameExtra, MachineCallback, MemoryKind, MiriInterpCx,
MiriInterpCxExt, MiriMachine, MiriMemoryKind, PrimitiveLayouts, Provenance, ProvenanceExtra,
};
pub use crate::mono_hash_map::MonoHashMap;
pub use crate::operator::EvalContextExt as _;

View file

@ -1723,3 +1723,69 @@ impl<'tcx> Machine<'tcx> for MiriMachine<'tcx> {
Cow::Borrowed(ecx.machine.union_data_ranges.entry(ty).or_insert_with(compute_range))
}
}
/// Trait for callbacks handling asynchronous machine operations.
pub trait MachineCallback<'tcx, T>: VisitProvenance {
/// The function to be invoked when the callback is fired.
fn call(
self: Box<Self>,
ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
arg: T,
) -> InterpResult<'tcx>;
}
/// Type alias for boxed machine callbacks with generic argument type.
pub type DynMachineCallback<'tcx, T> = Box<dyn MachineCallback<'tcx, T> + 'tcx>;
/// Creates a `DynMachineCallback`:
///
/// ```rust
/// callback!(
/// @capture<'tcx> {
/// var1: Ty1,
/// var2: Ty2<'tcx>,
/// }
/// |this, arg: ArgTy| {
/// // Implement the callback here.
/// todo!()
/// }
/// )
/// ```
///
/// All the argument types must implement `VisitProvenance`.
#[macro_export]
macro_rules! callback {
(@capture<$tcx:lifetime $(,)? $($lft:lifetime),*>
{ $($name:ident: $type:ty),* $(,)? }
|$this:ident, $arg:ident: $arg_ty:ty| $body:expr $(,)?) => {{
struct Callback<$tcx, $($lft),*> {
$($name: $type,)*
_phantom: std::marker::PhantomData<&$tcx ()>,
}
impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
$(
self.$name.visit_provenance(_visit);
)*
}
}
impl<$tcx, $($lft),*> MachineCallback<$tcx, $arg_ty> for Callback<$tcx, $($lft),*> {
fn call(
self: Box<Self>,
$this: &mut MiriInterpCx<$tcx>,
$arg: $arg_ty
) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$body
}
}
Box::new(Callback {
$($name,)*
_phantom: std::marker::PhantomData
})
}};
}

View file

@ -331,8 +331,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)),
callback!(
@capture<'tcx> {}
@unblock = |_this| { panic!("sleeping thread unblocked before time is up") }
@timeout = |_this| { interp_ok(()) }
|_this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::TimedOut);
interp_ok(())
}
),
);
interp_ok(Scalar::from_i32(0))
@ -353,8 +355,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)),
callback!(
@capture<'tcx> {}
@unblock = |_this| { panic!("sleeping thread unblocked before time is up") }
@timeout = |_this| { interp_ok(()) }
|_this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::TimedOut);
interp_ok(())
}
),
);
interp_ok(())

View file

@ -493,17 +493,21 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
dest: MPlaceTy<'tcx>,
event: MPlaceTy<'tcx>,
}
@unblock = |this| {
return_ready_list(&epfd, &dest, &event, this)?;
interp_ok(())
}
@timeout = |this| {
// Remove the current active thread_id from the blocked thread_id list.
epfd
.blocked_tid.borrow_mut()
.retain(|&id| id != this.active_thread());
this.write_int(0, &dest)?;
interp_ok(())
|this, unblock: UnblockKind| {
match unblock {
UnblockKind::Ready => {
return_ready_list(&epfd, &dest, &event, this)?;
interp_ok(())
},
UnblockKind::TimedOut => {
// Remove the current active thread_id from the blocked thread_id list.
epfd
.blocked_tid.borrow_mut()
.retain(|&id| id != this.active_thread());
this.write_int(0, &dest)?;
interp_ok(())
},
}
}
),
);

View file

@ -242,7 +242,8 @@ fn eventfd_write<'tcx>(
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef<EventFd>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
// When we get unblocked, try again. We know the ref is still valid,
// otherwise there couldn't be a `write` that unblocks us.
let eventfd_ref = weak_eventfd.upgrade().unwrap();
@ -285,7 +286,8 @@ fn eventfd_read<'tcx>(
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef<EventFd>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
// When we get unblocked, try again. We know the ref is still valid,
// otherwise there couldn't be a `write` that unblocks us.
let eventfd_ref = weak_eventfd.upgrade().unwrap();

View file

@ -64,7 +64,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
None,
callback!(
@capture<'tcx> {}
@unblock = |_this| {
|_this, _unblock: UnblockKind| {
panic!("we shouldn't wake up ever")
}
),

View file

@ -162,7 +162,8 @@ fn anonsocket_write<'tcx>(
len: usize,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
// If we got unblocked, then our peer successfully upgraded its weak
// ref to us. That means we can also upgrade our weak ref.
let self_ref = weak_self_ref.upgrade().unwrap();
@ -248,7 +249,8 @@ fn anonsocket_read<'tcx>(
ptr: Pointer,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
// If we got unblocked, then our peer successfully upgraded its weak
// ref to us. That means we can also upgrade our weak ref.
let self_ref = weak_self_ref.upgrade().unwrap();

View file

@ -111,7 +111,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
pending_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
let ret = this.init_once_try_begin(id, &pending_place, &dest)?;
assert!(ret, "we were woken up but init_once_try_begin still failed");
interp_ok(())