Implement support for synchronization primitives.

This commit is contained in:
Vytautas Astrauskas 2020-04-21 16:38:14 -07:00 committed by Vytautas Astrauskas
parent 726373fcaa
commit 679245769b
18 changed files with 1290 additions and 286 deletions

View file

@ -210,6 +210,12 @@ pub fn eval_main<'tcx>(tcx: TyCtxt<'tcx>, main_id: DefId, config: MiriConfig) ->
SchedulingAction::ExecuteStep => {
assert!(ecx.step()?, "a terminated thread was scheduled for execution");
}
SchedulingAction::ExecuteCallback => {
assert!(ecx.machine.communicate,
"scheduler callbacks require disabled isolation, but the code \
that created the callback did not check it");
ecx.run_scheduler_callback()?;
}
SchedulingAction::ExecuteDtors => {
// This will either enable the thread again (so we go back
// to `ExecuteStep`), or determine that this thread is done

View file

@ -31,6 +31,7 @@ mod operator;
mod range_map;
mod shims;
mod stacked_borrows;
mod sync;
mod thread;
// Make all those symbols available in the same place as our own.
@ -45,7 +46,7 @@ pub use crate::shims::fs::{DirHandler, EvalContextExt as FileEvalContextExt, Fil
pub use crate::shims::intrinsics::EvalContextExt as IntrinsicsEvalContextExt;
pub use crate::shims::os_str::EvalContextExt as OsStrEvalContextExt;
pub use crate::shims::panic::{CatchUnwindData, EvalContextExt as PanicEvalContextExt};
pub use crate::shims::sync::{EvalContextExt as SyncEvalContextExt};
pub use crate::shims::sync::{EvalContextExt as SyncShimsEvalContextExt};
pub use crate::shims::thread::EvalContextExt as ThreadShimsEvalContextExt;
pub use crate::shims::time::EvalContextExt as TimeEvalContextExt;
pub use crate::shims::tls::{EvalContextExt as TlsEvalContextExt, TlsData};
@ -70,6 +71,9 @@ pub use crate::stacked_borrows::{
pub use crate::thread::{
EvalContextExt as ThreadsEvalContextExt, SchedulingAction, ThreadId, ThreadManager, ThreadState,
};
pub use crate::sync::{
EvalContextExt as SyncEvalContextExt, CondvarId, MutexId, RwLockId
};
/// Insert rustc arguments at the beginning of the argument list that Miri wants to be
/// set per default, for maximal validation power.

View file

@ -5,7 +5,7 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU64;
use std::rc::Rc;
use std::time::Instant;
use std::time::{Instant, SystemTime};
use std::fmt;
use log::trace;
@ -251,6 +251,11 @@ pub struct Evaluator<'mir, 'tcx> {
/// The "time anchor" for this machine's monotone clock (for `Instant` simulation).
pub(crate) time_anchor: Instant,
/// The approximate system time when "time anchor" was created. This is used
/// for converting system time to monotone time so that we can simplify the
/// thread scheduler to deal only with a single representation of time.
pub(crate) time_anchor_timestamp: SystemTime,
/// The set of threads.
pub(crate) threads: ThreadManager<'mir, 'tcx>,
@ -281,6 +286,7 @@ impl<'mir, 'tcx> Evaluator<'mir, 'tcx> {
dir_handler: Default::default(),
panic_payload: None,
time_anchor: Instant::now(),
time_anchor_timestamp: SystemTime::now(),
layouts,
threads: ThreadManager::default(),
}

View file

@ -330,6 +330,45 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
let result = this.pthread_rwlock_destroy(rwlock)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_init" => {
let result = this.pthread_condattr_init(args[0])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_setclock" => {
let result = this.pthread_condattr_setclock(args[0], args[1])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_getclock" => {
let result = this.pthread_condattr_getclock(args[0], args[1])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_destroy" => {
let result = this.pthread_condattr_destroy(args[0])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_init" => {
let result = this.pthread_cond_init(args[0], args[1])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_signal" => {
let result = this.pthread_cond_signal(args[0])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_broadcast" => {
let result = this.pthread_cond_broadcast(args[0])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_wait" => {
let result = this.pthread_cond_wait(args[0], args[1])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_timedwait" => {
this.pthread_cond_timedwait(args[0], args[1], args[2], dest)?;
}
"pthread_cond_destroy" => {
let result = this.pthread_cond_destroy(args[0])?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
// Threading
"pthread_create" => {
@ -391,16 +430,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
| "pthread_attr_init"
| "pthread_attr_destroy"
| "pthread_condattr_init"
| "pthread_condattr_destroy"
| "pthread_cond_destroy"
if this.frame().instance.to_string().starts_with("std::sys::unix::") => {
let &[_] = check_arg_count(args)?;
this.write_null(dest)?;
}
| "pthread_cond_init"
| "pthread_attr_setstacksize"
| "pthread_condattr_setclock"
if this.frame().instance.to_string().starts_with("std::sys::unix::") => {
let &[_, _] = check_arg_count(args)?;
this.write_null(dest)?;

View file

@ -1,8 +1,10 @@
use std::time::{Duration, SystemTime};
use rustc_middle::ty::{layout::TyAndLayout, TyKind, TypeAndMut};
use rustc_target::abi::{LayoutOf, Size};
use crate::stacked_borrows::Tag;
use crate::thread::BlockSetId;
use crate::*;
fn assert_ptr_target_min_size<'mir, 'tcx: 'mir>(
@ -76,45 +78,12 @@ fn mutexattr_set_kind<'mir, 'tcx: 'mir>(
// Our chosen memory layout for the emulated mutex (does not have to match the platform layout!):
// bytes 0-3: reserved for signature on macOS
// (need to avoid this because it is set by static initializer macros)
// bytes 4-7: count of how many times this mutex has been locked, as a u32
// bytes 8-11: when count > 0, id of the owner thread as a u32
// bytes 4-7: mutex id as u32 or 0 if id is not assigned yet.
// bytes 12-15 or 16-19 (depending on platform): mutex kind, as an i32
// (the kind has to be at its offset for compatibility with static initializer macros)
// bytes 20-23: when count > 0, id of the blockset in which the blocked threads
// are waiting or 0 if blockset is not yet assigned.
const PTHREAD_MUTEX_T_MIN_SIZE: u64 = 24;
fn mutex_get_locked_count<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, mutex_op, 4, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_set_locked_count<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
locked_count: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, mutex_op, 4, locked_count, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_owner<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, mutex_op, 8, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_set_owner<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
owner: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, mutex_op, 8, owner, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_kind<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
@ -132,34 +101,34 @@ fn mutex_set_kind<'mir, 'tcx: 'mir>(
set_at_offset(ecx, mutex_op, offset, kind, ecx.machine.layouts.i32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_blockset<'mir, 'tcx: 'mir>(
fn mutex_get_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, mutex_op, 20, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
) -> InterpResult<'tcx, ScalarMaybeUndef<Tag>> {
get_at_offset(ecx, mutex_op, 4, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_set_blockset<'mir, 'tcx: 'mir>(
fn mutex_set_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
blockset: impl Into<ScalarMaybeUninit<Tag>>,
id: impl Into<ScalarMaybeUndef<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, mutex_op, 20, blockset, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
set_at_offset(ecx, mutex_op, 4, id, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_or_create_blockset<'mir, 'tcx: 'mir>(
fn mutex_get_or_create_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, BlockSetId> {
let blockset = mutex_get_blockset(ecx, mutex_op)?.to_u32()?;
if blockset == 0 {
// 0 is a default value and also not a valid blockset id. Need to
// allocate a new blockset.
let blockset = ecx.create_blockset()?;
mutex_set_blockset(ecx, mutex_op, blockset.to_u32_scalar())?;
Ok(blockset)
) -> InterpResult<'tcx, MutexId> {
let id = mutex_get_id(ecx, mutex_op)?.to_u32()?;
if id == 0 {
// 0 is a default value and also not a valid mutex id. Need to allocate
// a new mutex.
let id = ecx.mutex_create();
mutex_set_id(ecx, mutex_op, id.to_u32_scalar())?;
Ok(id)
} else {
Ok(BlockSetId::new(blockset))
Ok(id.into())
}
}
@ -168,107 +137,162 @@ fn mutex_get_or_create_blockset<'mir, 'tcx: 'mir>(
// Our chosen memory layout for the emulated rwlock (does not have to match the platform layout!):
// bytes 0-3: reserved for signature on macOS
// (need to avoid this because it is set by static initializer macros)
// bytes 4-7: reader count, as a u32
// bytes 8-11: writer count, as a u32
// bytes 12-15: when writer or reader count > 0, id of the blockset in which the
// blocked writers are waiting or 0 if blockset is not yet assigned.
// bytes 16-20: when writer count > 0, id of the blockset in which the blocked
// readers are waiting or 0 if blockset is not yet assigned.
// bytes 4-7: rwlock id as u32 or 0 if id is not assigned yet.
const PTHREAD_RWLOCK_T_MIN_SIZE: u64 = 20;
const PTHREAD_RWLOCK_T_MIN_SIZE: u64 = 32;
fn rwlock_get_readers<'mir, 'tcx: 'mir>(
fn rwlock_get_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 4, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_set_readers<'mir, 'tcx: 'mir>(
fn rwlock_set_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
readers: impl Into<ScalarMaybeUninit<Tag>>,
id: impl Into<ScalarMaybeUndef<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 4, readers, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
set_at_offset(ecx, rwlock_op, 4, id, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_get_writers<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 8, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_set_writers<'mir, 'tcx: 'mir>(
fn rwlock_get_or_create_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
writers: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 8, writers, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_get_writer_blockset<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 12, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_set_writer_blockset<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
blockset: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 12, blockset, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_get_or_create_writer_blockset<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, BlockSetId> {
let blockset = rwlock_get_writer_blockset(ecx, rwlock_op)?.to_u32()?;
if blockset == 0 {
// 0 is a default value and also not a valid blockset id. Need to
// allocate a new blockset.
let blockset = ecx.create_blockset()?;
rwlock_set_writer_blockset(ecx, rwlock_op, blockset.to_u32_scalar())?;
Ok(blockset)
) -> InterpResult<'tcx, RwLockId> {
let id = rwlock_get_id(ecx, rwlock_op)?.to_u32()?;
if id == 0 {
// 0 is a default value and also not a valid rwlock id. Need to allocate
// a new read-write lock.
let id = ecx.rwlock_create();
rwlock_set_id(ecx, rwlock_op, id.to_u32_scalar())?;
Ok(id)
} else {
Ok(BlockSetId::new(blockset))
Ok(id.into())
}
}
fn rwlock_get_reader_blockset<'mir, 'tcx: 'mir>(
// pthread_condattr_t
// Our chosen memory layout for emulation (does not have to match the platform layout!):
// store an i32 in the first four bytes equal to the corresponding libc clock id constant
// (e.g. CLOCK_REALTIME).
const PTHREAD_CONDATTR_T_MIN_SIZE: u64 = 4;
fn condattr_get_clock_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 16, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
attr_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUndef<Tag>> {
get_at_offset(ecx, attr_op, 0, ecx.machine.layouts.i32, PTHREAD_CONDATTR_T_MIN_SIZE)
}
fn rwlock_set_reader_blockset<'mir, 'tcx: 'mir>(
fn condattr_set_clock_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
blockset: impl Into<ScalarMaybeUninit<Tag>>,
attr_op: OpTy<'tcx, Tag>,
clock_id: impl Into<ScalarMaybeUndef<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 16, blockset, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
set_at_offset(ecx, attr_op, 0, clock_id, ecx.machine.layouts.i32, PTHREAD_CONDATTR_T_MIN_SIZE)
}
fn rwlock_get_or_create_reader_blockset<'mir, 'tcx: 'mir>(
// pthread_cond_t
// Our chosen memory layout for the emulated conditional variable (does not have
// to match the platform layout!):
// bytes 4-7: the conditional variable id as u32 or 0 if id is not assigned yet.
// bytes 8-11: the clock id constant as i32
const PTHREAD_COND_T_MIN_SIZE: u64 = 12;
fn cond_get_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUndef<Tag>> {
get_at_offset(ecx, cond_op, 4, ecx.machine.layouts.u32, PTHREAD_COND_T_MIN_SIZE)
}
fn cond_set_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, BlockSetId> {
let blockset = rwlock_get_reader_blockset(ecx, rwlock_op)?.to_u32()?;
if blockset == 0 {
// 0 is a default value and also not a valid blockset id. Need to
// allocate a new blockset.
let blockset = ecx.create_blockset()?;
rwlock_set_reader_blockset(ecx, rwlock_op, blockset.to_u32_scalar())?;
Ok(blockset)
cond_op: OpTy<'tcx, Tag>,
id: impl Into<ScalarMaybeUndef<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, cond_op, 4, id, ecx.machine.layouts.u32, PTHREAD_COND_T_MIN_SIZE)
}
fn cond_get_or_create_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, CondvarId> {
let id = cond_get_id(ecx, cond_op)?.to_u32()?;
if id == 0 {
// 0 is a default value and also not a valid conditional variable id.
// Need to allocate a new id.
let id = ecx.condvar_create();
cond_set_id(ecx, cond_op, id.to_u32_scalar())?;
Ok(id)
} else {
Ok(BlockSetId::new(blockset))
Ok(id.into())
}
}
fn cond_get_clock_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUndef<Tag>> {
get_at_offset(ecx, cond_op, 8, ecx.machine.layouts.i32, PTHREAD_COND_T_MIN_SIZE)
}
fn cond_set_clock_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
clock_id: impl Into<ScalarMaybeUndef<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, cond_op, 8, clock_id, ecx.machine.layouts.i32, PTHREAD_COND_T_MIN_SIZE)
}
/// Try to reacquire the mutex associated with the condition variable after we were signaled.
fn reacquire_cond_mutex<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
thread: ThreadId,
mutex: MutexId,
) -> InterpResult<'tcx> {
if ecx.mutex_is_locked(mutex) {
ecx.mutex_enqueue(mutex, thread);
} else {
ecx.mutex_lock(mutex, thread);
ecx.unblock_thread(thread)?;
}
Ok(())
}
/// Release the mutex associated with the condition variable because we are
/// entering the waiting state.
fn release_cond_mutex<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
active_thread: ThreadId,
mutex: MutexId,
) -> InterpResult<'tcx> {
if let Some((owner_thread, current_locked_count)) = ecx.mutex_unlock(mutex) {
if current_locked_count != 0 {
throw_unsup_format!("awaiting on multiple times acquired lock is not supported");
}
if owner_thread != active_thread {
throw_ub_format!("awaiting on a mutex owned by a different thread");
}
if let Some(thread) = ecx.mutex_dequeue(mutex) {
// We have at least one thread waiting on this mutex. Transfer
// ownership to it.
ecx.mutex_lock(mutex, thread);
ecx.unblock_thread(thread)?;
}
} else {
throw_ub_format!("awaiting on unlocked mutex");
}
ecx.block_thread(active_thread)?;
Ok(())
}
impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> {
fn pthread_mutexattr_init(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
@ -323,7 +347,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
mutexattr_get_kind(this, attr_op)?.not_undef()?
};
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(0))?;
let _ = mutex_get_or_create_id(this, mutex_op)?;
mutex_set_kind(this, mutex_op, kind)?;
Ok(0)
@ -333,21 +357,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
let this = self.eval_context_mut();
let kind = mutex_get_kind(this, mutex_op)?.not_undef()?;
let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?;
let id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
if locked_count == 0 {
// The mutex is unlocked. Let's lock it.
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(1))?;
mutex_set_owner(this, mutex_op, active_thread.to_u32_scalar())?;
Ok(0)
} else {
// The mutex is locked. Let's check by whom.
let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into();
if this.mutex_is_locked(id) {
let owner_thread = this.mutex_get_owner(id);
if owner_thread != active_thread {
// Block the active thread.
let blockset = mutex_get_or_create_blockset(this, mutex_op)?;
this.block_active_thread(blockset)?;
this.block_thread(active_thread)?;
this.mutex_enqueue(id, active_thread);
Ok(0)
} else {
// Trying to acquire the same mutex again.
@ -356,17 +374,16 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
} else if kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")? {
this.eval_libc_i32("EDEADLK")
} else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? {
match locked_count.checked_add(1) {
Some(new_count) => {
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.mutex_lock(id, active_thread);
Ok(0)
} else {
throw_ub_format!("called pthread_mutex_lock on an unsupported type of mutex");
}
}
} else {
// The mutex is unlocked. Let's lock it.
this.mutex_lock(id, active_thread);
Ok(0)
}
}
@ -374,16 +391,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
let this = self.eval_context_mut();
let kind = mutex_get_kind(this, mutex_op)?.not_undef()?;
let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?;
let id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
if locked_count == 0 {
// The mutex is unlocked. Let's lock it.
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(1))?;
mutex_set_owner(this, mutex_op, active_thread.to_u32_scalar())?;
Ok(0)
} else {
let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into();
if this.mutex_is_locked(id) {
let owner_thread = this.mutex_get_owner(id);
if owner_thread != active_thread {
this.eval_libc_i32("EBUSY")
} else {
@ -392,19 +404,18 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
{
this.eval_libc_i32("EBUSY")
} else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? {
match locked_count.checked_add(1) {
Some(new_count) => {
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.mutex_lock(id, active_thread);
Ok(0)
} else {
throw_ub_format!(
"called pthread_mutex_trylock on an unsupported type of mutex"
);
}
}
} else {
// The mutex is unlocked. Let's lock it.
this.mutex_lock(id, active_thread);
Ok(0)
}
}
@ -412,21 +423,20 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
let this = self.eval_context_mut();
let kind = mutex_get_kind(this, mutex_op)?.not_undef()?;
let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?;
let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into();
let id = mutex_get_or_create_id(this, mutex_op)?;
if owner_thread != this.get_active_thread()? {
throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread");
} else if locked_count == 1 {
let blockset = mutex_get_or_create_blockset(this, mutex_op)?;
if let Some(new_owner) = this.unblock_some_thread(blockset)? {
// We have at least one thread waiting on this mutex. Transfer
// ownership to it.
mutex_set_owner(this, mutex_op, new_owner.to_u32_scalar())?;
} else {
// No thread is waiting on this mutex.
mutex_set_owner(this, mutex_op, Scalar::from_u32(0))?;
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(0))?;
if let Some((owner_thread, current_locked_count)) = this.mutex_unlock(id) {
if owner_thread != this.get_active_thread()? {
throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread");
}
if current_locked_count == 0 {
// The mutex is unlocked.
if let Some(thread) = this.mutex_dequeue(id) {
// We have at least one thread waiting on this mutex. Transfer
// ownership to it.
this.mutex_lock(id, thread);
this.unblock_thread(thread)?;
}
}
Ok(0)
} else {
@ -435,16 +445,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
} else if kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")? {
this.eval_libc_i32("EPERM")
} else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? {
match locked_count.checked_sub(1) {
Some(new_count) => {
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?;
Ok(0)
}
None => {
// locked_count was already zero
this.eval_libc_i32("EPERM")
}
}
this.eval_libc_i32("EPERM")
} else {
throw_ub_format!("called pthread_mutex_unlock on an unsupported type of mutex");
}
@ -454,13 +455,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
fn pthread_mutex_destroy(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
if mutex_get_locked_count(this, mutex_op)?.to_u32()? != 0 {
let id = mutex_get_or_create_id(this, mutex_op)?;
if this.mutex_is_locked(id) {
throw_ub_format!("destroyed a locked mutex");
}
mutex_set_kind(this, mutex_op, ScalarMaybeUninit::Uninit)?;
mutex_set_locked_count(this, mutex_op, ScalarMaybeUninit::Uninit)?;
mutex_set_blockset(this, mutex_op, ScalarMaybeUninit::Uninit)?;
mutex_set_kind(this, mutex_op, ScalarMaybeUndef::Undef)?;
mutex_set_id(this, mutex_op, ScalarMaybeUndef::Undef)?;
Ok(0)
}
@ -468,121 +470,305 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
fn pthread_rwlock_rdlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if writers != 0 {
// The lock is locked by a writer.
assert_eq!(writers, 1);
let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?;
this.block_active_thread(reader_blockset)?;
if this.rwlock_is_write_locked(id) {
this.rwlock_enqueue_reader(id, active_thread);
this.block_thread(active_thread)?;
Ok(0)
} else {
match readers.checked_add(1) {
Some(new_readers) => {
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.rwlock_reader_add(id, active_thread);
Ok(0)
}
}
fn pthread_rwlock_tryrdlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
if writers != 0 {
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_is_write_locked(id) {
this.eval_libc_i32("EBUSY")
} else {
match readers.checked_add(1) {
Some(new_readers) => {
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.rwlock_reader_add(id, active_thread);
Ok(0)
}
}
fn pthread_rwlock_wrlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
let writer_blockset = rwlock_get_or_create_writer_blockset(this, rwlock_op)?;
if readers != 0 || writers != 0 {
this.block_active_thread(writer_blockset)?;
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_is_locked(id) {
this.block_thread(active_thread)?;
this.rwlock_enqueue_writer(id, active_thread);
} else {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
this.rwlock_writer_set(id, active_thread);
}
Ok(0)
}
fn pthread_rwlock_trywrlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
if readers != 0 || writers != 0 {
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_is_locked(id) {
this.eval_libc_i32("EBUSY")
} else {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
this.rwlock_writer_set(id, active_thread);
Ok(0)
}
}
// FIXME: We should check that this lock was locked by the active thread.
fn pthread_rwlock_unlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
let writer_blockset = rwlock_get_or_create_writer_blockset(this, rwlock_op)?;
if let Some(new_readers) = readers.checked_sub(1) {
assert_eq!(writers, 0);
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
if new_readers == 0 {
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_reader_remove(id, active_thread) {
// The thread was a reader.
if this.rwlock_is_locked(id) {
// No more readers owning the lock. Give it to a writer if there
// is any.
if let Some(writer) = this.rwlock_dequeue_writer(id) {
this.unblock_thread(writer)?;
this.rwlock_writer_set(id, writer);
}
}
Ok(0)
} else if writers != 0 {
let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?;
} else if Some(active_thread) == this.rwlock_writer_remove(id) {
// The thread was a writer.
//
// We are prioritizing writers here against the readers. As a
// result, not only readers can starve writers, but also writers can
// starve readers.
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
assert_eq!(writers, 1);
if let Some(writer) = this.rwlock_dequeue_writer(id) {
// Give the lock to another writer.
this.unblock_thread(writer)?;
this.rwlock_writer_set(id, writer);
} else {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(0))?;
let mut readers = 0;
while let Some(_reader) = this.unblock_some_thread(reader_blockset)? {
readers += 1;
// Give the lock to all readers.
while let Some(reader) = this.rwlock_dequeue_reader(id) {
this.unblock_thread(reader)?;
this.rwlock_reader_add(id, reader);
}
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(readers))?
}
Ok(0)
} else {
throw_ub_format!("unlocked an rwlock that was not locked");
throw_ub_format!("unlocked an rwlock that was not locked by the active thread");
}
}
fn pthread_rwlock_destroy(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
if rwlock_get_readers(this, rwlock_op)?.to_u32()? != 0
|| rwlock_get_writers(this, rwlock_op)?.to_u32()? != 0
{
let id = rwlock_get_or_create_id(this, rwlock_op)?;
if this.rwlock_is_locked(id) {
throw_ub_format!("destroyed a locked rwlock");
}
rwlock_set_readers(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_writers(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_reader_blockset(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_writer_blockset(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_id(this, rwlock_op, ScalarMaybeUndef::Undef)?;
Ok(0)
}
fn pthread_condattr_init(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let default_clock_id = this.eval_libc("CLOCK_REALTIME")?;
condattr_set_clock_id(this, attr_op, default_clock_id)?;
Ok(0)
}
fn pthread_condattr_setclock(
&mut self,
attr_op: OpTy<'tcx, Tag>,
clock_id_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let clock_id = this.read_scalar(clock_id_op)?.not_undef()?;
if clock_id == this.eval_libc("CLOCK_REALTIME")?
|| clock_id == this.eval_libc("CLOCK_MONOTONIC")?
{
condattr_set_clock_id(this, attr_op, clock_id)?;
} else {
let einval = this.eval_libc_i32("EINVAL")?;
return Ok(einval);
}
Ok(0)
}
fn pthread_condattr_getclock(
&mut self,
attr_op: OpTy<'tcx, Tag>,
clk_id_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let clock_id = condattr_get_clock_id(this, attr_op)?;
this.write_scalar(clock_id, this.deref_operand(clk_id_op)?.into())?;
Ok(0)
}
fn pthread_condattr_destroy(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
condattr_set_clock_id(this, attr_op, ScalarMaybeUndef::Undef)?;
Ok(0)
}
fn pthread_cond_init(
&mut self,
cond_op: OpTy<'tcx, Tag>,
attr_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let attr = this.read_scalar(attr_op)?.not_undef()?;
let clock_id = if this.is_null(attr)? {
this.eval_libc("CLOCK_REALTIME")?
} else {
condattr_get_clock_id(this, attr_op)?.not_undef()?
};
let _ = cond_get_or_create_id(this, cond_op)?;
cond_set_clock_id(this, cond_op, clock_id)?;
Ok(0)
}
fn pthread_cond_signal(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
if let Some((thread, mutex)) = this.condvar_signal(id) {
reacquire_cond_mutex(this, thread, mutex)?;
this.unregister_callback_if_exists(thread)?;
}
Ok(0)
}
fn pthread_cond_broadcast(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
while let Some((thread, mutex)) = this.condvar_signal(id) {
reacquire_cond_mutex(this, thread, mutex)?;
this.unregister_callback_if_exists(thread)?;
}
Ok(0)
}
fn pthread_cond_wait(
&mut self,
cond_op: OpTy<'tcx, Tag>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
let mutex_id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
release_cond_mutex(this, active_thread, mutex_id)?;
this.condvar_wait(id, active_thread, mutex_id);
Ok(0)
}
fn pthread_cond_timedwait(
&mut self,
cond_op: OpTy<'tcx, Tag>,
mutex_op: OpTy<'tcx, Tag>,
abstime_op: OpTy<'tcx, Tag>,
dest: PlaceTy<'tcx, Tag>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.check_no_isolation("pthread_cond_timedwait")?;
let id = cond_get_or_create_id(this, cond_op)?;
let mutex_id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
release_cond_mutex(this, active_thread, mutex_id)?;
this.condvar_wait(id, active_thread, mutex_id);
// We return success for now and override it in the timeout callback.
this.write_scalar(Scalar::from_i32(0), dest)?;
// Extract the timeout.
let clock_id = cond_get_clock_id(this, cond_op)?.to_i32()?;
let duration = {
let tp = this.deref_operand(abstime_op)?;
let mut offset = Size::from_bytes(0);
let layout = this.libc_ty_layout("time_t")?;
let seconds_place = tp.offset(offset, MemPlaceMeta::None, layout, this)?;
let seconds = this.read_scalar(seconds_place.into())?.to_u64()?;
offset += layout.size;
let layout = this.libc_ty_layout("c_long")?;
let nanoseconds_place = tp.offset(offset, MemPlaceMeta::None, layout, this)?;
let nanoseconds = this.read_scalar(nanoseconds_place.into())?.to_u64()?;
Duration::new(seconds, nanoseconds as u32)
};
let timeout_time = if clock_id == this.eval_libc_i32("CLOCK_REALTIME")? {
let time_anchor_since_epoch =
this.machine.time_anchor_timestamp.duration_since(SystemTime::UNIX_EPOCH).unwrap();
let duration_since_time_anchor = duration.checked_sub(time_anchor_since_epoch).unwrap();
this.machine.time_anchor.checked_add(duration_since_time_anchor).unwrap()
} else if clock_id == this.eval_libc_i32("CLOCK_MONOTONIC")? {
this.machine.time_anchor.checked_add(duration).unwrap()
} else {
throw_ub_format!("Unsupported clock id.");
};
// Register the timeout callback.
this.register_callback(
active_thread,
timeout_time,
Box::new(move |ecx| {
// Try to reacquire the mutex.
reacquire_cond_mutex(ecx, active_thread, mutex_id)?;
// Remove the thread from the conditional variable.
ecx.condvar_remove_waiter(id, active_thread);
// Set the timeout value.
let timeout = ecx.eval_libc_i32("ETIMEDOUT")?;
ecx.write_scalar(Scalar::from_i32(timeout), dest)?;
Ok(())
}),
)?;
Ok(())
}
fn pthread_cond_destroy(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
if this.condvar_is_awaited(id) {
throw_ub_format!("destroyed an awaited conditional variable");
}
cond_set_id(this, cond_op, ScalarMaybeUndef::Undef)?;
cond_set_clock_id(this, cond_op, ScalarMaybeUndef::Undef)?;
Ok(0)
}

299
src/sync.rs Normal file
View file

@ -0,0 +1,299 @@
use std::collections::{hash_map::Entry, HashMap, VecDeque};
use std::convert::TryFrom;
use std::num::NonZeroU32;
use std::time::Instant;
use rustc_index::vec::{Idx, IndexVec};
use crate::*;
macro_rules! declare_id {
($name: ident) => {
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct $name(NonZeroU32);
impl Idx for $name {
fn new(idx: usize) -> Self {
$name(NonZeroU32::new(u32::try_from(idx).unwrap() + 1).unwrap())
}
fn index(self) -> usize {
usize::try_from(self.0.get() - 1).unwrap()
}
}
impl From<u32> for $name {
fn from(id: u32) -> Self {
Self(NonZeroU32::new(id).unwrap())
}
}
impl $name {
pub fn to_u32_scalar<'tcx>(&self) -> Scalar<Tag> {
Scalar::from_u32(self.0.get())
}
}
};
}
declare_id!(MutexId);
/// The mutex state.
#[derive(Default, Debug)]
struct Mutex {
/// The thread that currently owns the lock.
owner: Option<ThreadId>,
/// How many times the mutex was locked by the owner.
lock_count: usize,
/// The queue of threads waiting for this mutex.
queue: VecDeque<ThreadId>,
}
declare_id!(RwLockId);
/// The read-write lock state.
#[derive(Default, Debug)]
struct RwLock {
/// The writer thread that currently owns the lock.
writer: Option<ThreadId>,
/// The readers that currently own the lock and how many times they acquired
/// the lock.
readers: HashMap<ThreadId, usize>,
/// The queue of writer threads waiting for this lock.
writer_queue: VecDeque<ThreadId>,
/// The queue of reader threads waiting for this lock.
reader_queue: VecDeque<ThreadId>,
}
declare_id!(CondvarId);
/// A thread waiting on a conditional variable.
#[derive(Debug)]
struct CondvarWaiter {
/// The thread that is waiting on this variable.
thread: ThreadId,
/// The mutex on which the thread is waiting.
mutex: MutexId,
/// The moment in time when the waiter should time out.
timeout: Option<Instant>,
}
/// The conditional variable state.
#[derive(Default, Debug)]
struct Condvar {
waiters: VecDeque<CondvarWaiter>,
}
/// The state of all synchronization variables.
#[derive(Default, Debug)]
pub(super) struct SynchronizationState {
mutexes: IndexVec<MutexId, Mutex>,
rwlocks: IndexVec<RwLockId, RwLock>,
condvars: IndexVec<CondvarId, Condvar>,
}
// Public interface to synchronization primitives. Please note that in most
// cases, the function calls are infallible and it is the client's (shim
// implementation's) responsibility to detect and deal with erroneous
// situations.
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> {
#[inline]
/// Create state for a new mutex.
fn mutex_create(&mut self) -> MutexId {
let this = self.eval_context_mut();
this.machine.threads.sync.mutexes.push(Default::default())
}
#[inline]
/// Get the id of the thread that currently owns this lock.
fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId {
let this = self.eval_context_ref();
this.machine.threads.sync.mutexes[id].owner.unwrap()
}
#[inline]
/// Check if locked.
fn mutex_is_locked(&mut self, id: MutexId) -> bool {
let this = self.eval_context_mut();
this.machine.threads.sync.mutexes[id].owner.is_some()
}
/// Lock by setting the mutex owner and increasing the lock count.
fn mutex_lock(&mut self, id: MutexId, thread: ThreadId) {
let this = self.eval_context_mut();
let mutex = &mut this.machine.threads.sync.mutexes[id];
if let Some(current_owner) = mutex.owner {
assert_eq!(thread, current_owner, "mutex already locked by another thread");
assert!(
mutex.lock_count > 0,
"invariant violation: lock_count == 0 iff the thread is unlocked"
);
} else {
mutex.owner = Some(thread);
}
mutex.lock_count = mutex.lock_count.checked_add(1).unwrap();
}
/// Unlock by decreasing the lock count. If the lock count reaches 0, unset
/// the owner.
fn mutex_unlock(&mut self, id: MutexId) -> Option<(ThreadId, usize)> {
let this = self.eval_context_mut();
let mutex = &mut this.machine.threads.sync.mutexes[id];
if let Some(current_owner) = mutex.owner {
mutex.lock_count = mutex
.lock_count
.checked_sub(1)
.expect("invariant violation: lock_count == 0 iff the thread is unlocked");
if mutex.lock_count == 0 {
mutex.owner = None;
}
Some((current_owner, mutex.lock_count))
} else {
None
}
}
#[inline]
/// Take a thread out the queue waiting for the lock.
fn mutex_enqueue(&mut self, id: MutexId, thread: ThreadId) {
let this = self.eval_context_mut();
this.machine.threads.sync.mutexes[id].queue.push_back(thread);
}
#[inline]
/// Take a thread out the queue waiting for the lock.
fn mutex_dequeue(&mut self, id: MutexId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.mutexes[id].queue.pop_front()
}
#[inline]
/// Create state for a new read write lock.
fn rwlock_create(&mut self) -> RwLockId {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks.push(Default::default())
}
#[inline]
/// Check if locked.
fn rwlock_is_locked(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer.is_some()
|| !this.machine.threads.sync.rwlocks[id].readers.is_empty()
}
#[inline]
/// Check if write locked.
fn rwlock_is_write_locked(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer.is_some()
}
/// Add a reader that collectively with other readers owns the lock.
fn rwlock_reader_add(&mut self, id: RwLockId, reader: ThreadId) {
let this = self.eval_context_mut();
assert!(!this.rwlock_is_write_locked(id), "the lock is write locked");
let count = this.machine.threads.sync.rwlocks[id].readers.entry(reader).or_insert(0);
*count += 1;
}
/// Try removing the reader. Returns `true` if succeeded.
fn rwlock_reader_remove(&mut self, id: RwLockId, reader: ThreadId) -> bool {
let this = self.eval_context_mut();
match this.machine.threads.sync.rwlocks[id].readers.entry(reader) {
Entry::Occupied(mut entry) => {
let count = entry.get_mut();
*count -= 1;
if *count == 0 {
entry.remove();
}
true
}
Entry::Vacant(_) => false,
}
}
#[inline]
/// Put the reader in the queue waiting for the lock.
fn rwlock_enqueue_reader(&mut self, id: RwLockId, reader: ThreadId) {
let this = self.eval_context_mut();
assert!(this.rwlock_is_write_locked(id), "queueing on not write locked lock");
this.machine.threads.sync.rwlocks[id].reader_queue.push_back(reader);
}
#[inline]
/// Take the reader out the queue waiting for the lock.
fn rwlock_dequeue_reader(&mut self, id: RwLockId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].reader_queue.pop_front()
}
#[inline]
/// Lock by setting the writer that owns the lock.
fn rwlock_writer_set(&mut self, id: RwLockId, writer: ThreadId) {
let this = self.eval_context_mut();
assert!(!this.rwlock_is_locked(id), "the lock is already locked");
this.machine.threads.sync.rwlocks[id].writer = Some(writer);
}
#[inline]
/// Try removing the writer.
fn rwlock_writer_remove(&mut self, id: RwLockId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer.take()
}
#[inline]
/// Put the writer in the queue waiting for the lock.
fn rwlock_enqueue_writer(&mut self, id: RwLockId, writer: ThreadId) {
let this = self.eval_context_mut();
assert!(this.rwlock_is_locked(id), "queueing on unlocked lock");
this.machine.threads.sync.rwlocks[id].writer_queue.push_back(writer);
}
#[inline]
/// Take the writer out the queue waiting for the lock.
fn rwlock_dequeue_writer(&mut self, id: RwLockId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer_queue.pop_front()
}
#[inline]
/// Create state for a new conditional variable.
fn condvar_create(&mut self) -> CondvarId {
let this = self.eval_context_mut();
this.machine.threads.sync.condvars.push(Default::default())
}
#[inline]
/// Is the conditional variable awaited?
fn condvar_is_awaited(&mut self, id: CondvarId) -> bool {
let this = self.eval_context_mut();
!this.machine.threads.sync.condvars[id].waiters.is_empty()
}
/// Mark that the thread is waiting on the conditional variable.
fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, mutex: MutexId) {
let this = self.eval_context_mut();
let waiters = &mut this.machine.threads.sync.condvars[id].waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(CondvarWaiter { thread, mutex, timeout: None });
}
/// Wake up some thread (if there is any) sleeping on the conditional
/// variable.
fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> {
let this = self.eval_context_mut();
this.machine.threads.sync.condvars[id]
.waiters
.pop_front()
.map(|waiter| (waiter.thread, waiter.mutex))
}
#[inline]
/// Remove the thread from the queue of threads waiting on this conditional variable.
fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) {
let this = self.eval_context_mut();
this.machine.threads.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread);
}
}

View file

@ -1,8 +1,10 @@
//! Implements threads.
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::convert::TryFrom;
use std::num::{NonZeroU32, TryFromIntError};
use std::time::Instant;
use log::trace;
@ -15,18 +17,24 @@ use rustc_middle::{
ty::{self, Instance},
};
use crate::sync::SynchronizationState;
use crate::*;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SchedulingAction {
/// Execute step on the active thread.
ExecuteStep,
/// Execute a scheduler's callback.
ExecuteCallback,
/// Execute destructors of the active thread.
ExecuteDtors,
/// Stop the program.
Stop,
}
type EventCallback<'mir, 'tcx> =
Box<dyn FnOnce(&mut InterpCx<'mir, 'tcx, Evaluator<'mir, 'tcx>>) -> InterpResult<'tcx> + 'tcx>;
/// A thread identifier.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct ThreadId(u32);
@ -94,6 +102,7 @@ pub enum ThreadState {
BlockedOnJoin(ThreadId),
/// The thread is blocked and belongs to the given blockset.
Blocked(BlockSetId),
BlockedThread,
/// The thread has terminated its execution (we do not delete terminated
/// threads).
Terminated,
@ -162,6 +171,23 @@ impl<'mir, 'tcx> Default for Thread<'mir, 'tcx> {
}
}
/// Callbacks are used to implement timeouts. For example, waiting on a
/// conditional variable with a timeout creates a callback that is called after
/// the specified time and unblocks the thread. If another thread signals on the
/// conditional variable, the signal handler deletes the callback.
struct CallBackInfo<'mir, 'tcx> {
/// The callback should be called no earlier than this time.
call_time: Instant,
/// The called function.
callback: EventCallback<'mir, 'tcx>,
}
impl<'mir, 'tcx> std::fmt::Debug for CallBackInfo<'mir, 'tcx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CallBack({:?})", self.call_time)
}
}
/// A set of threads.
#[derive(Debug)]
pub struct ThreadManager<'mir, 'tcx> {
@ -171,6 +197,8 @@ pub struct ThreadManager<'mir, 'tcx> {
///
/// Note that this vector also contains terminated threads.
threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
/// FIXME: make private.
pub(crate) sync: SynchronizationState,
/// A counter used to generate unique identifiers for blocksets.
blockset_counter: u32,
/// A mapping from a thread-local static to an allocation id of a thread
@ -178,6 +206,8 @@ pub struct ThreadManager<'mir, 'tcx> {
thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), AllocId>>,
/// A flag that indicates that we should change the active thread.
yield_active_thread: bool,
/// Callbacks that are called once the specified time passes.
callbacks: FxHashMap<ThreadId, CallBackInfo<'mir, 'tcx>>,
}
impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
@ -191,9 +221,11 @@ impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
Self {
active_thread: ThreadId::new(0),
threads: threads,
sync: SynchronizationState::default(),
blockset_counter: 0,
thread_local_alloc_ids: Default::default(),
yield_active_thread: false,
callbacks: FxHashMap::default(),
}
}
}
@ -321,30 +353,18 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
self.active_thread_ref().thread_name()
}
/// Allocate a new blockset id.
fn create_blockset(&mut self) -> BlockSetId {
self.blockset_counter = self.blockset_counter.checked_add(1).unwrap();
BlockSetId::new(self.blockset_counter)
}
/// Block the currently active thread and put it into the given blockset.
fn block_active_thread(&mut self, set: BlockSetId) {
let state = &mut self.active_thread_mut().state;
/// Put the thread into the blocked state.
fn block_thread(&mut self, thread: ThreadId) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::Enabled);
*state = ThreadState::Blocked(set);
*state = ThreadState::BlockedThread;
}
/// Unblock any one thread from the given blockset if it contains at least
/// one. Return the id of the unblocked thread.
fn unblock_some_thread(&mut self, set: BlockSetId) -> Option<ThreadId> {
for (id, thread) in self.threads.iter_enumerated_mut() {
if thread.state == ThreadState::Blocked(set) {
trace!("unblocking {:?} in blockset {:?}", id, set);
thread.state = ThreadState::Enabled;
return Some(id);
}
}
None
/// Put the blocked thread into the enabled state.
fn unblock_thread(&mut self, thread: ThreadId) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::BlockedThread);
*state = ThreadState::Enabled;
}
/// Change the active thread to some enabled thread.
@ -352,6 +372,39 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
self.yield_active_thread = true;
}
/// Register the given `callback` to be called once the `call_time` passes.
fn register_callback(
&mut self,
thread: ThreadId,
call_time: Instant,
callback: EventCallback<'mir, 'tcx>,
) {
self.callbacks
.insert(thread, CallBackInfo { call_time: call_time, callback: callback })
.unwrap_none();
}
/// Unregister the callback for the `thread`.
fn unregister_callback_if_exists(&mut self, thread: ThreadId) {
self.callbacks.remove(&thread);
}
/// Get a callback that is ready to be called.
fn get_callback(&mut self) -> Option<(ThreadId, EventCallback<'mir, 'tcx>)> {
let current_time = Instant::now();
// We use a for loop here to make the scheduler more deterministic.
for thread in self.threads.indices() {
match self.callbacks.entry(thread) {
Entry::Occupied(entry) =>
if current_time >= entry.get().call_time {
return Some((thread, entry.remove().callback));
},
Entry::Vacant(_) => {}
}
}
None
}
/// Decide which action to take next and on which thread.
///
/// The currently implemented scheduling policy is the one that is commonly
@ -407,6 +460,18 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
// We have not found a thread to execute.
if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
unreachable!();
} else if let Some(next_call_time) =
self.callbacks.values().min_by_key(|info| info.call_time)
{
// All threads are currently blocked, but we have unexecuted
// callbacks, which may unblock some of the threads. Hence,
// sleep until the first callback.
if let Some(sleep_time) =
next_call_time.call_time.checked_duration_since(Instant::now())
{
std::thread::sleep(sleep_time);
}
Ok(SchedulingAction::ExecuteCallback)
} else {
throw_machine_stop!(TerminationInfo::Deadlock);
}
@ -577,21 +642,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
}
#[inline]
fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> {
fn block_thread(&mut self, thread: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
Ok(this.machine.threads.create_blockset())
Ok(this.machine.threads.block_thread(thread))
}
#[inline]
fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> {
fn unblock_thread(&mut self, thread: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
Ok(this.machine.threads.block_active_thread(set))
}
#[inline]
fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option<ThreadId>> {
let this = self.eval_context_mut();
Ok(this.machine.threads.unblock_some_thread(set))
Ok(this.machine.threads.unblock_thread(thread))
}
#[inline]
@ -601,6 +660,36 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
Ok(())
}
#[inline]
fn register_callback(
&mut self,
thread: ThreadId,
call_time: Instant,
callback: EventCallback<'mir, 'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.machine.threads.register_callback(thread, call_time, callback);
Ok(())
}
#[inline]
fn unregister_callback_if_exists(&mut self, thread: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.machine.threads.unregister_callback_if_exists(thread);
Ok(())
}
/// Execute the callback on the callback's thread.
#[inline]
fn run_scheduler_callback(&mut self) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let (thread, callback) = this.machine.threads.get_callback().expect("no callback found");
let old_thread = this.set_active_thread(thread)?;
callback(this)?;
this.set_active_thread(old_thread)?;
Ok(())
}
/// Decide which action to take next and on which thread.
#[inline]
fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {

View file

@ -0,0 +1,27 @@
// ignore-windows: Concurrency on Windows is not supported yet.
//! Check if Rust barriers are working.
use std::sync::{Arc, Barrier};
use std::thread;
/// This test is taken from the Rust documentation.
fn main() {
let mut handles = Vec::with_capacity(10);
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
let c = barrier.clone();
// The same messages will be printed together.
// You will NOT see any interleaving.
handles.push(thread::spawn(move|| {
println!("before wait");
c.wait();
println!("after wait");
}));
}
// Wait for other threads to finish.
for handle in handles {
handle.join().unwrap();
}
}

View file

@ -0,0 +1,2 @@
warning: thread support is experimental. For example, Miri does not detect data races yet.

View file

@ -0,0 +1,20 @@
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait

View file

@ -0,0 +1,28 @@
// ignore-windows: Concurrency on Windows is not supported yet.
//! Check if Rust conditional variables are working.
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
/// The test taken from the Rust documentation.
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
}

View file

@ -0,0 +1,2 @@
warning: thread support is experimental. For example, Miri does not detect data races yet.

View file

@ -0,0 +1,199 @@
// ignore-windows: No libc on Windows
// compile-flags: -Zmiri-disable-isolation
#![feature(rustc_private)]
extern crate libc;
use std::cell::UnsafeCell;
use std::mem::{self, MaybeUninit};
use std::sync::Arc;
use std::thread;
struct Mutex {
inner: UnsafeCell<libc::pthread_mutex_t>,
}
unsafe impl Sync for Mutex {}
impl std::fmt::Debug for Mutex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Mutex")
}
}
struct Cond {
inner: UnsafeCell<libc::pthread_cond_t>,
}
unsafe impl Sync for Cond {}
impl std::fmt::Debug for Cond {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Cond")
}
}
unsafe fn create_cond_attr_monotonic() -> libc::pthread_condattr_t {
let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit();
assert_eq!(libc::pthread_condattr_init(attr.as_mut_ptr()), 0);
assert_eq!(libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC), 0);
attr.assume_init()
}
unsafe fn create_cond(attr: Option<libc::pthread_condattr_t>) -> Cond {
let cond: Cond = mem::zeroed();
if let Some(mut attr) = attr {
assert_eq!(libc::pthread_cond_init(cond.inner.get() as *mut _, &attr as *const _), 0);
assert_eq!(libc::pthread_condattr_destroy(&mut attr as *mut _), 0);
} else {
assert_eq!(libc::pthread_cond_init(cond.inner.get() as *mut _, 0 as *const _), 0);
}
cond
}
unsafe fn create_mutex() -> Mutex {
mem::zeroed()
}
unsafe fn create_timeout(seconds: i64) -> libc::timespec {
let mut now: libc::timespec = mem::zeroed();
assert_eq!(libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut now), 0);
libc::timespec { tv_sec: now.tv_sec + seconds, tv_nsec: now.tv_nsec }
}
fn test_pthread_condattr_t() {
unsafe {
let mut attr = create_cond_attr_monotonic();
let mut clock_id = MaybeUninit::<libc::clockid_t>::uninit();
assert_eq!(libc::pthread_condattr_getclock(&attr as *const _, clock_id.as_mut_ptr()), 0);
assert_eq!(clock_id.assume_init(), libc::CLOCK_MONOTONIC);
assert_eq!(libc::pthread_condattr_destroy(&mut attr as *mut _), 0);
}
}
fn test_signal() {
unsafe {
let cond = Arc::new(create_cond(None));
let mutex = Arc::new(create_mutex());
assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0);
let spawn_mutex = Arc::clone(&mutex);
let spawn_cond = Arc::clone(&cond);
let handle = thread::spawn(move || {
assert_eq!(libc::pthread_mutex_lock(spawn_mutex.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_cond_signal(spawn_cond.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_mutex_unlock(spawn_mutex.inner.get() as *mut _), 0);
});
assert_eq!(
libc::pthread_cond_wait(cond.inner.get() as *mut _, mutex.inner.get() as *mut _),
0
);
assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0);
handle.join().unwrap();
let mutex = Arc::try_unwrap(mutex).unwrap();
assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0);
let cond = Arc::try_unwrap(cond).unwrap();
assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0);
}
}
fn test_broadcast() {
unsafe {
let cond = Arc::new(create_cond(None));
let mutex = Arc::new(create_mutex());
assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0);
let spawn_mutex = Arc::clone(&mutex);
let spawn_cond = Arc::clone(&cond);
let handle = thread::spawn(move || {
assert_eq!(libc::pthread_mutex_lock(spawn_mutex.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_cond_broadcast(spawn_cond.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_mutex_unlock(spawn_mutex.inner.get() as *mut _), 0);
});
assert_eq!(
libc::pthread_cond_wait(cond.inner.get() as *mut _, mutex.inner.get() as *mut _),
0
);
assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0);
handle.join().unwrap();
let mutex = Arc::try_unwrap(mutex).unwrap();
assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0);
let cond = Arc::try_unwrap(cond).unwrap();
assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0);
}
}
fn test_timed_wait_timeout() {
unsafe {
let attr = create_cond_attr_monotonic();
let cond = create_cond(Some(attr));
let mutex = create_mutex();
let timeout = create_timeout(1);
assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0);
assert_eq!(
libc::pthread_cond_timedwait(
cond.inner.get() as *mut _,
mutex.inner.get() as *mut _,
&timeout
),
libc::ETIMEDOUT
);
assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0);
}
}
fn test_timed_wait_notimeout() {
unsafe {
let attr = create_cond_attr_monotonic();
let cond = Arc::new(create_cond(Some(attr)));
let mutex = Arc::new(create_mutex());
let timeout = create_timeout(100);
assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0);
let spawn_mutex = Arc::clone(&mutex);
let spawn_cond = Arc::clone(&cond);
let handle = thread::spawn(move || {
assert_eq!(libc::pthread_mutex_lock(spawn_mutex.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_cond_signal(spawn_cond.inner.get() as *mut _), 0);
assert_eq!(libc::pthread_mutex_unlock(spawn_mutex.inner.get() as *mut _), 0);
});
assert_eq!(
libc::pthread_cond_timedwait(
cond.inner.get() as *mut _,
mutex.inner.get() as *mut _,
&timeout
),
0
);
assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0);
handle.join().unwrap();
let mutex = Arc::try_unwrap(mutex).unwrap();
assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0);
let cond = Arc::try_unwrap(cond).unwrap();
assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0);
}
}
fn main() {
test_pthread_condattr_t();
test_signal();
test_broadcast();
test_timed_wait_timeout();
test_timed_wait_notimeout();
}

View file

@ -0,0 +1,2 @@
warning: thread support is experimental. For example, Miri does not detect data races yet.

View file

@ -0,0 +1,56 @@
// ignore-windows: Concurrency on Windows is not supported yet.
//! Check if Rust channels are working.
use std::sync::mpsc::{channel, sync_channel};
use std::thread;
/// The test taken from the Rust documentation.
fn simple_send() {
let (tx, rx) = channel();
thread::spawn(move || {
tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);
}
/// The test taken from the Rust documentation.
fn multiple_send() {
let (tx, rx) = channel();
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
tx.send(i).unwrap();
});
}
let mut sum = 0;
for _ in 0..10 {
let j = rx.recv().unwrap();
assert!(0 <= j && j < 10);
sum += j;
}
assert_eq!(sum, 45);
}
/// The test taken from the Rust documentation.
fn send_on_sync() {
let (sender, receiver) = sync_channel(1);
// this returns immediately
sender.send(1).unwrap();
thread::spawn(move || {
// this will block until the previous message has been received
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}
fn main() {
simple_send();
multiple_send();
send_on_sync();
}

View file

@ -0,0 +1,2 @@
warning: thread support is experimental. For example, Miri does not detect data races yet.

View file

@ -0,0 +1,44 @@
// ignore-windows: Concurrency on Windows is not supported yet.
//! Check if Rust once statics are working. The test taken from the Rust
//! documentation.
use std::sync::Once;
use std::thread;
static mut VAL: usize = 0;
static INIT: Once = Once::new();
fn get_cached_val() -> usize {
unsafe {
INIT.call_once(|| {
VAL = expensive_computation();
});
VAL
}
}
fn expensive_computation() -> usize {
let mut i = 1;
let mut c = 1;
while i < 10000 {
i *= c;
c += 1;
}
i
}
fn main() {
let handles: Vec<_> = (0..10)
.map(|_| {
thread::spawn(|| {
thread::yield_now();
let val = get_cached_val();
assert_eq!(val, 40320);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}

View file

@ -0,0 +1,2 @@
warning: thread support is experimental. For example, Miri does not detect data races yet.