Auto merge of #1568 - fusion-engineering-forks:futex, r=RalfJung
Implement futex_wait and futex_wake. Fixes https://github.com/rust-lang/rust/issues/77406 and fixes #1562. This makes std's park(), park_timeout(), and unpark() work. That means std::sync::Once is usable again and the test pass again with the latest rustc. This also makes parking_lot work.
This commit is contained in:
commit
b0e02f0218
11 changed files with 364 additions and 3 deletions
|
|
@ -555,7 +555,7 @@ pub fn check_arg_count<'a, 'tcx, const N: usize>(args: &'a [OpTy<'tcx, Tag>]) ->
|
|||
|
||||
pub fn isolation_error(name: &str) -> InterpResult<'static> {
|
||||
throw_machine_stop!(TerminationInfo::UnsupportedInIsolation(format!(
|
||||
"`{}` not available when isolation is enabled",
|
||||
"{} not available when isolation is enabled",
|
||||
name,
|
||||
)))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ use rustc_middle::mir;
|
|||
use crate::*;
|
||||
use crate::helpers::check_arg_count;
|
||||
use shims::posix::fs::EvalContextExt as _;
|
||||
use shims::posix::linux::sync::futex;
|
||||
use shims::posix::sync::EvalContextExt as _;
|
||||
use shims::posix::thread::EvalContextExt as _;
|
||||
|
||||
|
|
@ -112,6 +113,17 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
|
|||
|
||||
// Dynamically invoked syscalls
|
||||
"syscall" => {
|
||||
// FIXME: The libc syscall() function is a variadic function.
|
||||
// It's valid to call it with more arguments than a syscall
|
||||
// needs, so none of these syscalls should use check_arg_count.
|
||||
// It's even valid to call it with the wrong type of arguments,
|
||||
// as long as they'd end up in the same place with the calling
|
||||
// convention used. (E.g. using a `usize` instead of a pointer.)
|
||||
// It's not directly clear which number, size, and type of arguments
|
||||
// are acceptable in which cases and which aren't. (E.g. some
|
||||
// types might take up the space of two registers.)
|
||||
// So this needs to be researched first.
|
||||
|
||||
let sys_getrandom = this
|
||||
.eval_libc("SYS_getrandom")?
|
||||
.to_machine_usize(this)?;
|
||||
|
|
@ -120,6 +132,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
|
|||
.eval_libc("SYS_statx")?
|
||||
.to_machine_usize(this)?;
|
||||
|
||||
let sys_futex = this
|
||||
.eval_libc("SYS_futex")?
|
||||
.to_machine_usize(this)?;
|
||||
|
||||
if args.is_empty() {
|
||||
throw_ub_format!("incorrect number of arguments for syscall: got 0, expected at least 1");
|
||||
}
|
||||
|
|
@ -139,6 +155,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
|
|||
let result = this.linux_statx(dirfd, pathname, flags, mask, statxbuf)?;
|
||||
this.write_scalar(Scalar::from_machine_isize(result.into(), this), dest)?;
|
||||
}
|
||||
// `futex` is used by some synchonization primitives.
|
||||
id if id == sys_futex => {
|
||||
futex(this, args, dest)?;
|
||||
}
|
||||
id => throw_unsup_format!("miri does not support syscall ID {}", id),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,2 +1,3 @@
|
|||
pub mod foreign_items;
|
||||
pub mod dlsym;
|
||||
pub mod sync;
|
||||
|
|
|
|||
133
src/shims/posix/linux/sync.rs
Normal file
133
src/shims/posix/linux/sync.rs
Normal file
|
|
@ -0,0 +1,133 @@
|
|||
use crate::thread::Time;
|
||||
use crate::*;
|
||||
use rustc_target::abi::{Align, Size};
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
/// Implementation of the SYS_futex syscall.
|
||||
pub fn futex<'tcx>(
|
||||
this: &mut MiriEvalContext<'_, 'tcx>,
|
||||
args: &[OpTy<'tcx, Tag>],
|
||||
dest: PlaceTy<'tcx, Tag>,
|
||||
) -> InterpResult<'tcx> {
|
||||
// The amount of arguments used depends on the type of futex operation.
|
||||
// The full futex syscall takes six arguments (excluding the syscall
|
||||
// number), which is also the maximum amount of arguments a linux syscall
|
||||
// can take on most architectures.
|
||||
// However, not all futex operations use all six arguments. The unused ones
|
||||
// may or may not be left out from the `syscall()` call.
|
||||
// Therefore we don't use `check_arg_count` here, but only check for the
|
||||
// number of arguments to fall within a range.
|
||||
if !(4..=7).contains(&args.len()) {
|
||||
throw_ub_format!("incorrect number of arguments for futex syscall: got {}, expected between 4 and 7 (inclusive)", args.len());
|
||||
}
|
||||
|
||||
// The first three arguments (after the syscall number itself) are the same to all futex operations:
|
||||
// (int *addr, int op, int val).
|
||||
// We checked above that these definitely exist.
|
||||
let addr = this.read_immediate(args[1])?;
|
||||
let op = this.read_scalar(args[2])?.to_i32()?;
|
||||
let val = this.read_scalar(args[3])?.to_i32()?;
|
||||
|
||||
// The raw pointer value is used to identify the mutex.
|
||||
// Not all mutex operations actually read from this address or even require this address to exist.
|
||||
// This will make FUTEX_WAKE fail on an integer cast to a pointer. But FUTEX_WAIT on
|
||||
// such a pointer can never work anyway, so that seems fine.
|
||||
let futex_ptr = this.force_ptr(addr.to_scalar()?)?;
|
||||
|
||||
let thread = this.get_active_thread();
|
||||
|
||||
let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG")?;
|
||||
let futex_wait = this.eval_libc_i32("FUTEX_WAIT")?;
|
||||
let futex_wake = this.eval_libc_i32("FUTEX_WAKE")?;
|
||||
let futex_realtime = this.eval_libc_i32("FUTEX_CLOCK_REALTIME")?;
|
||||
|
||||
// FUTEX_PRIVATE enables an optimization that stops it from working across processes.
|
||||
// Miri doesn't support that anyway, so we ignore that flag.
|
||||
match op & !futex_private {
|
||||
// FUTEX_WAIT: (int *addr, int op = FUTEX_WAIT, int val, const timespec *timeout)
|
||||
// Blocks the thread if *addr still equals val. Wakes up when FUTEX_WAKE is called on the same address,
|
||||
// or *timeout expires. `timeout == null` for an infinite timeout.
|
||||
op if op & !futex_realtime == futex_wait => {
|
||||
if args.len() < 5 {
|
||||
throw_ub_format!("incorrect number of arguments for FUTEX_WAIT syscall: got {}, expected at least 5", args.len());
|
||||
}
|
||||
let timeout = args[4];
|
||||
let timeout_time = if this.is_null(this.read_scalar(timeout)?.check_init()?)? {
|
||||
None
|
||||
} else {
|
||||
let duration = match this.read_timespec(timeout)? {
|
||||
Some(duration) => duration,
|
||||
None => {
|
||||
let einval = this.eval_libc("EINVAL")?;
|
||||
this.set_last_error(einval)?;
|
||||
this.write_scalar(Scalar::from_machine_isize(-1, this), dest)?;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
this.check_no_isolation("FUTEX_WAIT with timeout")?;
|
||||
Some(if op & futex_realtime != 0 {
|
||||
Time::RealTime(SystemTime::now().checked_add(duration).unwrap())
|
||||
} else {
|
||||
Time::Monotonic(Instant::now().checked_add(duration).unwrap())
|
||||
})
|
||||
};
|
||||
// Check the pointer for alignment and validity.
|
||||
// The API requires `addr` to be a 4-byte aligned pointer, and will
|
||||
// use the 4 bytes at the given address as an (atomic) i32.
|
||||
this.memory.check_ptr_access(addr.to_scalar()?, Size::from_bytes(4), Align::from_bytes(4).unwrap())?;
|
||||
// Read an `i32` through the pointer, regardless of any wrapper types.
|
||||
// It's not uncommon for `addr` to be passed as another type than `*mut i32`, such as `*const AtomicI32`.
|
||||
// FIXME: this fails if `addr` is not a pointer type.
|
||||
let futex_val = this.read_scalar_at_offset(addr.into(), 0, this.machine.layouts.i32)?.to_i32()?;
|
||||
if val == futex_val {
|
||||
// The value still matches, so we block the trait make it wait for FUTEX_WAKE.
|
||||
this.block_thread(thread);
|
||||
this.futex_wait(futex_ptr, thread);
|
||||
// Succesfully waking up from FUTEX_WAIT always returns zero.
|
||||
this.write_scalar(Scalar::from_machine_isize(0, this), dest)?;
|
||||
// Register a timeout callback if a timeout was specified.
|
||||
// This callback will override the return value when the timeout triggers.
|
||||
if let Some(timeout_time) = timeout_time {
|
||||
this.register_timeout_callback(
|
||||
thread,
|
||||
timeout_time,
|
||||
Box::new(move |this| {
|
||||
this.unblock_thread(thread);
|
||||
this.futex_remove_waiter(futex_ptr, thread);
|
||||
let etimedout = this.eval_libc("ETIMEDOUT")?;
|
||||
this.set_last_error(etimedout)?;
|
||||
this.write_scalar(Scalar::from_machine_isize(-1, this), dest)?;
|
||||
Ok(())
|
||||
}),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// The futex value doesn't match the expected value, so we return failure
|
||||
// right away without sleeping: -1 and errno set to EAGAIN.
|
||||
let eagain = this.eval_libc("EAGAIN")?;
|
||||
this.set_last_error(eagain)?;
|
||||
this.write_scalar(Scalar::from_machine_isize(-1, this), dest)?;
|
||||
}
|
||||
}
|
||||
// FUTEX_WAKE: (int *addr, int op = FUTEX_WAKE, int val)
|
||||
// Wakes at most `val` threads waiting on the futex at `addr`.
|
||||
// Returns the amount of threads woken up.
|
||||
// Does not access the futex value at *addr.
|
||||
op if op == futex_wake => {
|
||||
let mut n = 0;
|
||||
for _ in 0..val {
|
||||
if let Some(thread) = this.futex_wake(futex_ptr) {
|
||||
this.unblock_thread(thread);
|
||||
this.unregister_timeout_callback_if_exists(thread);
|
||||
n += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
this.write_scalar(Scalar::from_machine_isize(n, this), dest)?;
|
||||
}
|
||||
op => throw_unsup_format!("miri does not support SYS_futex operation {}", op),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
34
src/sync.rs
34
src/sync.rs
|
|
@ -96,12 +96,26 @@ struct Condvar {
|
|||
waiters: VecDeque<CondvarWaiter>,
|
||||
}
|
||||
|
||||
/// The futex state.
|
||||
#[derive(Default, Debug)]
|
||||
struct Futex {
|
||||
waiters: VecDeque<FutexWaiter>,
|
||||
}
|
||||
|
||||
/// A thread waiting on a futex.
|
||||
#[derive(Debug)]
|
||||
struct FutexWaiter {
|
||||
/// The thread that is waiting on this futex.
|
||||
thread: ThreadId,
|
||||
}
|
||||
|
||||
/// The state of all synchronization variables.
|
||||
#[derive(Default, Debug)]
|
||||
pub(super) struct SynchronizationState {
|
||||
mutexes: IndexVec<MutexId, Mutex>,
|
||||
rwlocks: IndexVec<RwLockId, RwLock>,
|
||||
condvars: IndexVec<CondvarId, Condvar>,
|
||||
futexes: HashMap<Pointer, Futex>,
|
||||
}
|
||||
|
||||
// Private extension trait for local helper methods
|
||||
|
|
@ -403,4 +417,24 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
|
|||
let this = self.eval_context_mut();
|
||||
this.machine.threads.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread);
|
||||
}
|
||||
|
||||
fn futex_wait(&mut self, addr: Pointer<stacked_borrows::Tag>, thread: ThreadId) {
|
||||
let this = self.eval_context_mut();
|
||||
let waiters = &mut this.machine.threads.sync.futexes.entry(addr.erase_tag()).or_default().waiters;
|
||||
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
|
||||
waiters.push_back(FutexWaiter { thread });
|
||||
}
|
||||
|
||||
fn futex_wake(&mut self, addr: Pointer<stacked_borrows::Tag>) -> Option<ThreadId> {
|
||||
let this = self.eval_context_mut();
|
||||
let waiters = &mut this.machine.threads.sync.futexes.get_mut(&addr.erase_tag())?.waiters;
|
||||
waiters.pop_front().map(|waiter| waiter.thread)
|
||||
}
|
||||
|
||||
fn futex_remove_waiter(&mut self, addr: Pointer<stacked_borrows::Tag>, thread: ThreadId) {
|
||||
let this = self.eval_context_mut();
|
||||
if let Some(futex) = this.machine.threads.sync.futexes.get_mut(&addr.erase_tag()) {
|
||||
futex.waiters.retain(|waiter| waiter.thread != thread);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
// ignore-windows: File handling is not implemented yet
|
||||
// error-pattern: `open` not available when isolation is enabled
|
||||
// error-pattern: open not available when isolation is enabled
|
||||
|
||||
fn main() {
|
||||
let _file = std::fs::File::open("file.txt").unwrap();
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ extern crate libc;
|
|||
fn main() -> std::io::Result<()> {
|
||||
let mut bytes = [0u8; 512];
|
||||
unsafe {
|
||||
libc::read(0, bytes.as_mut_ptr() as *mut libc::c_void, 512); //~ ERROR `read` not available when isolation is enabled
|
||||
libc::read(0, bytes.as_mut_ptr() as *mut libc::c_void, 512); //~ ERROR read not available when isolation is enabled
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
132
tests/run-pass/concurrency/linux-futex.rs
Normal file
132
tests/run-pass/concurrency/linux-futex.rs
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
// Unfortunately, the test framework does not support 'only-linux',
|
||||
// so we need to ignore Windows and macOS instead.
|
||||
// ignore-macos: Uses Linux-only APIs
|
||||
// ignore-windows: Uses Linux-only APIs
|
||||
// compile-flags: -Zmiri-disable-isolation
|
||||
|
||||
#![feature(rustc_private)]
|
||||
extern crate libc;
|
||||
|
||||
use std::ptr;
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
fn wake_nobody() {
|
||||
let futex = 0;
|
||||
|
||||
// Wake 1 waiter. Expect zero waiters woken up, as nobody is waiting.
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&futex as *const i32,
|
||||
libc::FUTEX_WAKE,
|
||||
1,
|
||||
), 0);
|
||||
}
|
||||
|
||||
// Same, but without omitting the unused arguments.
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&futex as *const i32,
|
||||
libc::FUTEX_WAKE,
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
), 0);
|
||||
}
|
||||
}
|
||||
|
||||
fn wake_dangling() {
|
||||
let futex = Box::new(0);
|
||||
let ptr: *const i32 = &*futex;
|
||||
drop(futex);
|
||||
|
||||
// Wake 1 waiter. Expect zero waiters woken up, as nobody is waiting.
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
ptr,
|
||||
libc::FUTEX_WAKE,
|
||||
1,
|
||||
), 0);
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_wrong_val() {
|
||||
let futex: i32 = 123;
|
||||
|
||||
// Only wait if the futex value is 456.
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&futex as *const i32,
|
||||
libc::FUTEX_WAIT,
|
||||
456,
|
||||
ptr::null::<libc::timespec>(),
|
||||
), -1);
|
||||
assert_eq!(*libc::__errno_location(), libc::EAGAIN);
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_timeout() {
|
||||
let start = Instant::now();
|
||||
|
||||
let futex: i32 = 123;
|
||||
|
||||
// Wait for 200ms, with nobody waking us up early.
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&futex as *const i32,
|
||||
libc::FUTEX_WAIT,
|
||||
123,
|
||||
&libc::timespec {
|
||||
tv_sec: 0,
|
||||
tv_nsec: 200_000_000,
|
||||
},
|
||||
), -1);
|
||||
assert_eq!(*libc::__errno_location(), libc::ETIMEDOUT);
|
||||
}
|
||||
|
||||
assert!((200..500).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn wait_wake() {
|
||||
let start = Instant::now();
|
||||
|
||||
static FUTEX: i32 = 0;
|
||||
|
||||
thread::spawn(move || {
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&FUTEX as *const i32,
|
||||
libc::FUTEX_WAKE,
|
||||
10, // Wake up at most 10 threads.
|
||||
), 1); // Woken up one thread.
|
||||
}
|
||||
});
|
||||
|
||||
unsafe {
|
||||
assert_eq!(libc::syscall(
|
||||
libc::SYS_futex,
|
||||
&FUTEX as *const i32,
|
||||
libc::FUTEX_WAIT,
|
||||
0,
|
||||
ptr::null::<libc::timespec>(),
|
||||
), 0);
|
||||
}
|
||||
|
||||
assert!((200..500).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn main() {
|
||||
wake_nobody();
|
||||
wake_dangling();
|
||||
wait_wrong_val();
|
||||
wait_timeout();
|
||||
wait_wake();
|
||||
}
|
||||
2
tests/run-pass/concurrency/linux-futex.stderr
Normal file
2
tests/run-pass/concurrency/linux-futex.stderr
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
warning: thread support is experimental. For example, Miri does not detect data races yet.
|
||||
|
||||
37
tests/run-pass/concurrency/parking.rs
Normal file
37
tests/run-pass/concurrency/parking.rs
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
// ignore-windows: Concurrency on Windows is not supported yet.
|
||||
// compile-flags: -Zmiri-disable-isolation
|
||||
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
// Normally, waiting in park/park_timeout may spuriously wake up early, but we
|
||||
// know Miri's timed synchronization primitives do not do that.
|
||||
|
||||
fn park_timeout() {
|
||||
let start = Instant::now();
|
||||
|
||||
thread::park_timeout(Duration::from_millis(200));
|
||||
|
||||
assert!((200..500).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn park_unpark() {
|
||||
let t1 = thread::current();
|
||||
let t2 = thread::spawn(move || {
|
||||
thread::park();
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
t1.unpark();
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
t2.thread().unpark();
|
||||
thread::park();
|
||||
|
||||
assert!((200..500).contains(&start.elapsed().as_millis()));
|
||||
}
|
||||
|
||||
fn main() {
|
||||
park_timeout();
|
||||
park_unpark();
|
||||
}
|
||||
2
tests/run-pass/concurrency/parking.stderr
Normal file
2
tests/run-pass/concurrency/parking.stderr
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
warning: thread support is experimental. For example, Miri does not detect data races yet.
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue