From 679245769b6984ec5a7edf70fb4744d8411468b8 Mon Sep 17 00:00:00 2001 From: Vytautas Astrauskas Date: Tue, 21 Apr 2020 16:38:14 -0700 Subject: [PATCH] Implement support for synchronization primitives. --- src/eval.rs | 6 + src/lib.rs | 6 +- src/machine.rs | 8 +- src/shims/foreign_items/posix.rs | 48 +- src/shims/sync.rs | 674 +++++++++++------- src/sync.rs | 299 ++++++++ src/thread.rs | 151 +++- tests/run-pass/concurrency/barrier.rs | 27 + tests/run-pass/concurrency/barrier.stderr | 2 + tests/run-pass/concurrency/barrier.stdout | 20 + tests/run-pass/concurrency/condvar.rs | 28 + tests/run-pass/concurrency/condvar.stderr | 2 + .../run-pass/concurrency/libc_pthread_cond.rs | 199 ++++++ .../concurrency/libc_pthread_cond.stderr | 2 + tests/run-pass/concurrency/mpsc.rs | 56 ++ tests/run-pass/concurrency/mpsc.stderr | 2 + tests/run-pass/concurrency/once.rs | 44 ++ tests/run-pass/concurrency/once.stderr | 2 + 18 files changed, 1290 insertions(+), 286 deletions(-) create mode 100644 src/sync.rs create mode 100644 tests/run-pass/concurrency/barrier.rs create mode 100644 tests/run-pass/concurrency/barrier.stderr create mode 100644 tests/run-pass/concurrency/barrier.stdout create mode 100644 tests/run-pass/concurrency/condvar.rs create mode 100644 tests/run-pass/concurrency/condvar.stderr create mode 100644 tests/run-pass/concurrency/libc_pthread_cond.rs create mode 100644 tests/run-pass/concurrency/libc_pthread_cond.stderr create mode 100644 tests/run-pass/concurrency/mpsc.rs create mode 100644 tests/run-pass/concurrency/mpsc.stderr create mode 100644 tests/run-pass/concurrency/once.rs create mode 100644 tests/run-pass/concurrency/once.stderr diff --git a/src/eval.rs b/src/eval.rs index 5daad7cc068b..30901a8f127f 100644 --- a/src/eval.rs +++ b/src/eval.rs @@ -210,6 +210,12 @@ pub fn eval_main<'tcx>(tcx: TyCtxt<'tcx>, main_id: DefId, config: MiriConfig) -> SchedulingAction::ExecuteStep => { assert!(ecx.step()?, "a terminated thread was scheduled for execution"); } + SchedulingAction::ExecuteCallback => { + assert!(ecx.machine.communicate, + "scheduler callbacks require disabled isolation, but the code \ + that created the callback did not check it"); + ecx.run_scheduler_callback()?; + } SchedulingAction::ExecuteDtors => { // This will either enable the thread again (so we go back // to `ExecuteStep`), or determine that this thread is done diff --git a/src/lib.rs b/src/lib.rs index 0ea0d57caccc..e79fc2add39e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ mod operator; mod range_map; mod shims; mod stacked_borrows; +mod sync; mod thread; // Make all those symbols available in the same place as our own. @@ -45,7 +46,7 @@ pub use crate::shims::fs::{DirHandler, EvalContextExt as FileEvalContextExt, Fil pub use crate::shims::intrinsics::EvalContextExt as IntrinsicsEvalContextExt; pub use crate::shims::os_str::EvalContextExt as OsStrEvalContextExt; pub use crate::shims::panic::{CatchUnwindData, EvalContextExt as PanicEvalContextExt}; -pub use crate::shims::sync::{EvalContextExt as SyncEvalContextExt}; +pub use crate::shims::sync::{EvalContextExt as SyncShimsEvalContextExt}; pub use crate::shims::thread::EvalContextExt as ThreadShimsEvalContextExt; pub use crate::shims::time::EvalContextExt as TimeEvalContextExt; pub use crate::shims::tls::{EvalContextExt as TlsEvalContextExt, TlsData}; @@ -70,6 +71,9 @@ pub use crate::stacked_borrows::{ pub use crate::thread::{ EvalContextExt as ThreadsEvalContextExt, SchedulingAction, ThreadId, ThreadManager, ThreadState, }; +pub use crate::sync::{ + EvalContextExt as SyncEvalContextExt, CondvarId, MutexId, RwLockId +}; /// Insert rustc arguments at the beginning of the argument list that Miri wants to be /// set per default, for maximal validation power. diff --git a/src/machine.rs b/src/machine.rs index 51aa7ae31047..4fb08cd259b6 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -5,7 +5,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::num::NonZeroU64; use std::rc::Rc; -use std::time::Instant; +use std::time::{Instant, SystemTime}; use std::fmt; use log::trace; @@ -251,6 +251,11 @@ pub struct Evaluator<'mir, 'tcx> { /// The "time anchor" for this machine's monotone clock (for `Instant` simulation). pub(crate) time_anchor: Instant, + /// The approximate system time when "time anchor" was created. This is used + /// for converting system time to monotone time so that we can simplify the + /// thread scheduler to deal only with a single representation of time. + pub(crate) time_anchor_timestamp: SystemTime, + /// The set of threads. pub(crate) threads: ThreadManager<'mir, 'tcx>, @@ -281,6 +286,7 @@ impl<'mir, 'tcx> Evaluator<'mir, 'tcx> { dir_handler: Default::default(), panic_payload: None, time_anchor: Instant::now(), + time_anchor_timestamp: SystemTime::now(), layouts, threads: ThreadManager::default(), } diff --git a/src/shims/foreign_items/posix.rs b/src/shims/foreign_items/posix.rs index 09191011a4a8..352e38113abb 100644 --- a/src/shims/foreign_items/posix.rs +++ b/src/shims/foreign_items/posix.rs @@ -330,6 +330,45 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx let result = this.pthread_rwlock_destroy(rwlock)?; this.write_scalar(Scalar::from_i32(result), dest)?; } + "pthread_condattr_init" => { + let result = this.pthread_condattr_init(args[0])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_condattr_setclock" => { + let result = this.pthread_condattr_setclock(args[0], args[1])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_condattr_getclock" => { + let result = this.pthread_condattr_getclock(args[0], args[1])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_condattr_destroy" => { + let result = this.pthread_condattr_destroy(args[0])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_cond_init" => { + let result = this.pthread_cond_init(args[0], args[1])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_cond_signal" => { + let result = this.pthread_cond_signal(args[0])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_cond_broadcast" => { + let result = this.pthread_cond_broadcast(args[0])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_cond_wait" => { + let result = this.pthread_cond_wait(args[0], args[1])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } + "pthread_cond_timedwait" => { + this.pthread_cond_timedwait(args[0], args[1], args[2], dest)?; + } + "pthread_cond_destroy" => { + let result = this.pthread_cond_destroy(args[0])?; + this.write_scalar(Scalar::from_i32(result), dest)?; + } // Threading "pthread_create" => { @@ -391,16 +430,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx | "pthread_attr_init" | "pthread_attr_destroy" - | "pthread_condattr_init" - | "pthread_condattr_destroy" - | "pthread_cond_destroy" - if this.frame().instance.to_string().starts_with("std::sys::unix::") => { - let &[_] = check_arg_count(args)?; - this.write_null(dest)?; - } - | "pthread_cond_init" | "pthread_attr_setstacksize" - | "pthread_condattr_setclock" if this.frame().instance.to_string().starts_with("std::sys::unix::") => { let &[_, _] = check_arg_count(args)?; this.write_null(dest)?; diff --git a/src/shims/sync.rs b/src/shims/sync.rs index c205c5c8dddb..dfd7999457eb 100644 --- a/src/shims/sync.rs +++ b/src/shims/sync.rs @@ -1,8 +1,10 @@ +use std::time::{Duration, SystemTime}; + use rustc_middle::ty::{layout::TyAndLayout, TyKind, TypeAndMut}; use rustc_target::abi::{LayoutOf, Size}; use crate::stacked_borrows::Tag; -use crate::thread::BlockSetId; + use crate::*; fn assert_ptr_target_min_size<'mir, 'tcx: 'mir>( @@ -76,45 +78,12 @@ fn mutexattr_set_kind<'mir, 'tcx: 'mir>( // Our chosen memory layout for the emulated mutex (does not have to match the platform layout!): // bytes 0-3: reserved for signature on macOS // (need to avoid this because it is set by static initializer macros) -// bytes 4-7: count of how many times this mutex has been locked, as a u32 -// bytes 8-11: when count > 0, id of the owner thread as a u32 +// bytes 4-7: mutex id as u32 or 0 if id is not assigned yet. // bytes 12-15 or 16-19 (depending on platform): mutex kind, as an i32 // (the kind has to be at its offset for compatibility with static initializer macros) -// bytes 20-23: when count > 0, id of the blockset in which the blocked threads -// are waiting or 0 if blockset is not yet assigned. const PTHREAD_MUTEX_T_MIN_SIZE: u64 = 24; -fn mutex_get_locked_count<'mir, 'tcx: 'mir>( - ecx: &MiriEvalContext<'mir, 'tcx>, - mutex_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, ScalarMaybeUninit> { - get_at_offset(ecx, mutex_op, 4, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) -} - -fn mutex_set_locked_count<'mir, 'tcx: 'mir>( - ecx: &mut MiriEvalContext<'mir, 'tcx>, - mutex_op: OpTy<'tcx, Tag>, - locked_count: impl Into>, -) -> InterpResult<'tcx, ()> { - set_at_offset(ecx, mutex_op, 4, locked_count, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) -} - -fn mutex_get_owner<'mir, 'tcx: 'mir>( - ecx: &MiriEvalContext<'mir, 'tcx>, - mutex_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, ScalarMaybeUninit> { - get_at_offset(ecx, mutex_op, 8, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) -} - -fn mutex_set_owner<'mir, 'tcx: 'mir>( - ecx: &mut MiriEvalContext<'mir, 'tcx>, - mutex_op: OpTy<'tcx, Tag>, - owner: impl Into>, -) -> InterpResult<'tcx, ()> { - set_at_offset(ecx, mutex_op, 8, owner, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) -} - fn mutex_get_kind<'mir, 'tcx: 'mir>( ecx: &mut MiriEvalContext<'mir, 'tcx>, mutex_op: OpTy<'tcx, Tag>, @@ -132,34 +101,34 @@ fn mutex_set_kind<'mir, 'tcx: 'mir>( set_at_offset(ecx, mutex_op, offset, kind, ecx.machine.layouts.i32, PTHREAD_MUTEX_T_MIN_SIZE) } -fn mutex_get_blockset<'mir, 'tcx: 'mir>( +fn mutex_get_id<'mir, 'tcx: 'mir>( ecx: &MiriEvalContext<'mir, 'tcx>, mutex_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, ScalarMaybeUninit> { - get_at_offset(ecx, mutex_op, 20, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) +) -> InterpResult<'tcx, ScalarMaybeUndef> { + get_at_offset(ecx, mutex_op, 4, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) } -fn mutex_set_blockset<'mir, 'tcx: 'mir>( +fn mutex_set_id<'mir, 'tcx: 'mir>( ecx: &mut MiriEvalContext<'mir, 'tcx>, mutex_op: OpTy<'tcx, Tag>, - blockset: impl Into>, + id: impl Into>, ) -> InterpResult<'tcx, ()> { - set_at_offset(ecx, mutex_op, 20, blockset, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) + set_at_offset(ecx, mutex_op, 4, id, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE) } -fn mutex_get_or_create_blockset<'mir, 'tcx: 'mir>( +fn mutex_get_or_create_id<'mir, 'tcx: 'mir>( ecx: &mut MiriEvalContext<'mir, 'tcx>, mutex_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, BlockSetId> { - let blockset = mutex_get_blockset(ecx, mutex_op)?.to_u32()?; - if blockset == 0 { - // 0 is a default value and also not a valid blockset id. Need to - // allocate a new blockset. - let blockset = ecx.create_blockset()?; - mutex_set_blockset(ecx, mutex_op, blockset.to_u32_scalar())?; - Ok(blockset) +) -> InterpResult<'tcx, MutexId> { + let id = mutex_get_id(ecx, mutex_op)?.to_u32()?; + if id == 0 { + // 0 is a default value and also not a valid mutex id. Need to allocate + // a new mutex. + let id = ecx.mutex_create(); + mutex_set_id(ecx, mutex_op, id.to_u32_scalar())?; + Ok(id) } else { - Ok(BlockSetId::new(blockset)) + Ok(id.into()) } } @@ -168,107 +137,162 @@ fn mutex_get_or_create_blockset<'mir, 'tcx: 'mir>( // Our chosen memory layout for the emulated rwlock (does not have to match the platform layout!): // bytes 0-3: reserved for signature on macOS // (need to avoid this because it is set by static initializer macros) -// bytes 4-7: reader count, as a u32 -// bytes 8-11: writer count, as a u32 -// bytes 12-15: when writer or reader count > 0, id of the blockset in which the -// blocked writers are waiting or 0 if blockset is not yet assigned. -// bytes 16-20: when writer count > 0, id of the blockset in which the blocked -// readers are waiting or 0 if blockset is not yet assigned. +// bytes 4-7: rwlock id as u32 or 0 if id is not assigned yet. -const PTHREAD_RWLOCK_T_MIN_SIZE: u64 = 20; +const PTHREAD_RWLOCK_T_MIN_SIZE: u64 = 32; -fn rwlock_get_readers<'mir, 'tcx: 'mir>( +fn rwlock_get_id<'mir, 'tcx: 'mir>( ecx: &MiriEvalContext<'mir, 'tcx>, rwlock_op: OpTy<'tcx, Tag>, ) -> InterpResult<'tcx, ScalarMaybeUninit> { get_at_offset(ecx, rwlock_op, 4, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) } -fn rwlock_set_readers<'mir, 'tcx: 'mir>( +fn rwlock_set_id<'mir, 'tcx: 'mir>( ecx: &mut MiriEvalContext<'mir, 'tcx>, rwlock_op: OpTy<'tcx, Tag>, - readers: impl Into>, + id: impl Into>, ) -> InterpResult<'tcx, ()> { - set_at_offset(ecx, rwlock_op, 4, readers, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) + set_at_offset(ecx, rwlock_op, 4, id, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) } -fn rwlock_get_writers<'mir, 'tcx: 'mir>( - ecx: &MiriEvalContext<'mir, 'tcx>, - rwlock_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, ScalarMaybeUninit> { - get_at_offset(ecx, rwlock_op, 8, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) -} - -fn rwlock_set_writers<'mir, 'tcx: 'mir>( +fn rwlock_get_or_create_id<'mir, 'tcx: 'mir>( ecx: &mut MiriEvalContext<'mir, 'tcx>, rwlock_op: OpTy<'tcx, Tag>, - writers: impl Into>, -) -> InterpResult<'tcx, ()> { - set_at_offset(ecx, rwlock_op, 8, writers, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) -} - -fn rwlock_get_writer_blockset<'mir, 'tcx: 'mir>( - ecx: &MiriEvalContext<'mir, 'tcx>, - rwlock_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, ScalarMaybeUninit> { - get_at_offset(ecx, rwlock_op, 12, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) -} - -fn rwlock_set_writer_blockset<'mir, 'tcx: 'mir>( - ecx: &mut MiriEvalContext<'mir, 'tcx>, - rwlock_op: OpTy<'tcx, Tag>, - blockset: impl Into>, -) -> InterpResult<'tcx, ()> { - set_at_offset(ecx, rwlock_op, 12, blockset, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) -} - -fn rwlock_get_or_create_writer_blockset<'mir, 'tcx: 'mir>( - ecx: &mut MiriEvalContext<'mir, 'tcx>, - rwlock_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, BlockSetId> { - let blockset = rwlock_get_writer_blockset(ecx, rwlock_op)?.to_u32()?; - if blockset == 0 { - // 0 is a default value and also not a valid blockset id. Need to - // allocate a new blockset. - let blockset = ecx.create_blockset()?; - rwlock_set_writer_blockset(ecx, rwlock_op, blockset.to_u32_scalar())?; - Ok(blockset) +) -> InterpResult<'tcx, RwLockId> { + let id = rwlock_get_id(ecx, rwlock_op)?.to_u32()?; + if id == 0 { + // 0 is a default value and also not a valid rwlock id. Need to allocate + // a new read-write lock. + let id = ecx.rwlock_create(); + rwlock_set_id(ecx, rwlock_op, id.to_u32_scalar())?; + Ok(id) } else { - Ok(BlockSetId::new(blockset)) + Ok(id.into()) } } -fn rwlock_get_reader_blockset<'mir, 'tcx: 'mir>( +// pthread_condattr_t + +// Our chosen memory layout for emulation (does not have to match the platform layout!): +// store an i32 in the first four bytes equal to the corresponding libc clock id constant +// (e.g. CLOCK_REALTIME). + +const PTHREAD_CONDATTR_T_MIN_SIZE: u64 = 4; + +fn condattr_get_clock_id<'mir, 'tcx: 'mir>( ecx: &MiriEvalContext<'mir, 'tcx>, - rwlock_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, ScalarMaybeUninit> { - get_at_offset(ecx, rwlock_op, 16, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) + attr_op: OpTy<'tcx, Tag>, +) -> InterpResult<'tcx, ScalarMaybeUndef> { + get_at_offset(ecx, attr_op, 0, ecx.machine.layouts.i32, PTHREAD_CONDATTR_T_MIN_SIZE) } -fn rwlock_set_reader_blockset<'mir, 'tcx: 'mir>( +fn condattr_set_clock_id<'mir, 'tcx: 'mir>( ecx: &mut MiriEvalContext<'mir, 'tcx>, - rwlock_op: OpTy<'tcx, Tag>, - blockset: impl Into>, + attr_op: OpTy<'tcx, Tag>, + clock_id: impl Into>, ) -> InterpResult<'tcx, ()> { - set_at_offset(ecx, rwlock_op, 16, blockset, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE) + set_at_offset(ecx, attr_op, 0, clock_id, ecx.machine.layouts.i32, PTHREAD_CONDATTR_T_MIN_SIZE) } -fn rwlock_get_or_create_reader_blockset<'mir, 'tcx: 'mir>( +// pthread_cond_t + +// Our chosen memory layout for the emulated conditional variable (does not have +// to match the platform layout!): + +// bytes 4-7: the conditional variable id as u32 or 0 if id is not assigned yet. +// bytes 8-11: the clock id constant as i32 + +const PTHREAD_COND_T_MIN_SIZE: u64 = 12; + +fn cond_get_id<'mir, 'tcx: 'mir>( + ecx: &MiriEvalContext<'mir, 'tcx>, + cond_op: OpTy<'tcx, Tag>, +) -> InterpResult<'tcx, ScalarMaybeUndef> { + get_at_offset(ecx, cond_op, 4, ecx.machine.layouts.u32, PTHREAD_COND_T_MIN_SIZE) +} + +fn cond_set_id<'mir, 'tcx: 'mir>( ecx: &mut MiriEvalContext<'mir, 'tcx>, - rwlock_op: OpTy<'tcx, Tag>, -) -> InterpResult<'tcx, BlockSetId> { - let blockset = rwlock_get_reader_blockset(ecx, rwlock_op)?.to_u32()?; - if blockset == 0 { - // 0 is a default value and also not a valid blockset id. Need to - // allocate a new blockset. - let blockset = ecx.create_blockset()?; - rwlock_set_reader_blockset(ecx, rwlock_op, blockset.to_u32_scalar())?; - Ok(blockset) + cond_op: OpTy<'tcx, Tag>, + id: impl Into>, +) -> InterpResult<'tcx, ()> { + set_at_offset(ecx, cond_op, 4, id, ecx.machine.layouts.u32, PTHREAD_COND_T_MIN_SIZE) +} + +fn cond_get_or_create_id<'mir, 'tcx: 'mir>( + ecx: &mut MiriEvalContext<'mir, 'tcx>, + cond_op: OpTy<'tcx, Tag>, +) -> InterpResult<'tcx, CondvarId> { + let id = cond_get_id(ecx, cond_op)?.to_u32()?; + if id == 0 { + // 0 is a default value and also not a valid conditional variable id. + // Need to allocate a new id. + let id = ecx.condvar_create(); + cond_set_id(ecx, cond_op, id.to_u32_scalar())?; + Ok(id) } else { - Ok(BlockSetId::new(blockset)) + Ok(id.into()) } } +fn cond_get_clock_id<'mir, 'tcx: 'mir>( + ecx: &MiriEvalContext<'mir, 'tcx>, + cond_op: OpTy<'tcx, Tag>, +) -> InterpResult<'tcx, ScalarMaybeUndef> { + get_at_offset(ecx, cond_op, 8, ecx.machine.layouts.i32, PTHREAD_COND_T_MIN_SIZE) +} + +fn cond_set_clock_id<'mir, 'tcx: 'mir>( + ecx: &mut MiriEvalContext<'mir, 'tcx>, + cond_op: OpTy<'tcx, Tag>, + clock_id: impl Into>, +) -> InterpResult<'tcx, ()> { + set_at_offset(ecx, cond_op, 8, clock_id, ecx.machine.layouts.i32, PTHREAD_COND_T_MIN_SIZE) +} + +/// Try to reacquire the mutex associated with the condition variable after we were signaled. +fn reacquire_cond_mutex<'mir, 'tcx: 'mir>( + ecx: &mut MiriEvalContext<'mir, 'tcx>, + thread: ThreadId, + mutex: MutexId, +) -> InterpResult<'tcx> { + if ecx.mutex_is_locked(mutex) { + ecx.mutex_enqueue(mutex, thread); + } else { + ecx.mutex_lock(mutex, thread); + ecx.unblock_thread(thread)?; + } + Ok(()) +} + +/// Release the mutex associated with the condition variable because we are +/// entering the waiting state. +fn release_cond_mutex<'mir, 'tcx: 'mir>( + ecx: &mut MiriEvalContext<'mir, 'tcx>, + active_thread: ThreadId, + mutex: MutexId, +) -> InterpResult<'tcx> { + if let Some((owner_thread, current_locked_count)) = ecx.mutex_unlock(mutex) { + if current_locked_count != 0 { + throw_unsup_format!("awaiting on multiple times acquired lock is not supported"); + } + if owner_thread != active_thread { + throw_ub_format!("awaiting on a mutex owned by a different thread"); + } + if let Some(thread) = ecx.mutex_dequeue(mutex) { + // We have at least one thread waiting on this mutex. Transfer + // ownership to it. + ecx.mutex_lock(mutex, thread); + ecx.unblock_thread(thread)?; + } + } else { + throw_ub_format!("awaiting on unlocked mutex"); + } + ecx.block_thread(active_thread)?; + Ok(()) +} + impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {} pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> { fn pthread_mutexattr_init(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { @@ -323,7 +347,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx mutexattr_get_kind(this, attr_op)?.not_undef()? }; - mutex_set_locked_count(this, mutex_op, Scalar::from_u32(0))?; + let _ = mutex_get_or_create_id(this, mutex_op)?; mutex_set_kind(this, mutex_op, kind)?; Ok(0) @@ -333,21 +357,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx let this = self.eval_context_mut(); let kind = mutex_get_kind(this, mutex_op)?.not_undef()?; - let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?; + let id = mutex_get_or_create_id(this, mutex_op)?; let active_thread = this.get_active_thread()?; - if locked_count == 0 { - // The mutex is unlocked. Let's lock it. - mutex_set_locked_count(this, mutex_op, Scalar::from_u32(1))?; - mutex_set_owner(this, mutex_op, active_thread.to_u32_scalar())?; - Ok(0) - } else { - // The mutex is locked. Let's check by whom. - let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into(); + if this.mutex_is_locked(id) { + let owner_thread = this.mutex_get_owner(id); if owner_thread != active_thread { // Block the active thread. - let blockset = mutex_get_or_create_blockset(this, mutex_op)?; - this.block_active_thread(blockset)?; + this.block_thread(active_thread)?; + this.mutex_enqueue(id, active_thread); Ok(0) } else { // Trying to acquire the same mutex again. @@ -356,17 +374,16 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx } else if kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")? { this.eval_libc_i32("EDEADLK") } else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? { - match locked_count.checked_add(1) { - Some(new_count) => { - mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?; - Ok(0) - } - None => this.eval_libc_i32("EAGAIN"), - } + this.mutex_lock(id, active_thread); + Ok(0) } else { throw_ub_format!("called pthread_mutex_lock on an unsupported type of mutex"); } } + } else { + // The mutex is unlocked. Let's lock it. + this.mutex_lock(id, active_thread); + Ok(0) } } @@ -374,16 +391,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx let this = self.eval_context_mut(); let kind = mutex_get_kind(this, mutex_op)?.not_undef()?; - let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?; + let id = mutex_get_or_create_id(this, mutex_op)?; let active_thread = this.get_active_thread()?; - if locked_count == 0 { - // The mutex is unlocked. Let's lock it. - mutex_set_locked_count(this, mutex_op, Scalar::from_u32(1))?; - mutex_set_owner(this, mutex_op, active_thread.to_u32_scalar())?; - Ok(0) - } else { - let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into(); + if this.mutex_is_locked(id) { + let owner_thread = this.mutex_get_owner(id); if owner_thread != active_thread { this.eval_libc_i32("EBUSY") } else { @@ -392,19 +404,18 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx { this.eval_libc_i32("EBUSY") } else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? { - match locked_count.checked_add(1) { - Some(new_count) => { - mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?; - Ok(0) - } - None => this.eval_libc_i32("EAGAIN"), - } + this.mutex_lock(id, active_thread); + Ok(0) } else { throw_ub_format!( "called pthread_mutex_trylock on an unsupported type of mutex" ); } } + } else { + // The mutex is unlocked. Let's lock it. + this.mutex_lock(id, active_thread); + Ok(0) } } @@ -412,21 +423,20 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx let this = self.eval_context_mut(); let kind = mutex_get_kind(this, mutex_op)?.not_undef()?; - let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?; - let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into(); + let id = mutex_get_or_create_id(this, mutex_op)?; - if owner_thread != this.get_active_thread()? { - throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread"); - } else if locked_count == 1 { - let blockset = mutex_get_or_create_blockset(this, mutex_op)?; - if let Some(new_owner) = this.unblock_some_thread(blockset)? { - // We have at least one thread waiting on this mutex. Transfer - // ownership to it. - mutex_set_owner(this, mutex_op, new_owner.to_u32_scalar())?; - } else { - // No thread is waiting on this mutex. - mutex_set_owner(this, mutex_op, Scalar::from_u32(0))?; - mutex_set_locked_count(this, mutex_op, Scalar::from_u32(0))?; + if let Some((owner_thread, current_locked_count)) = this.mutex_unlock(id) { + if owner_thread != this.get_active_thread()? { + throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread"); + } + if current_locked_count == 0 { + // The mutex is unlocked. + if let Some(thread) = this.mutex_dequeue(id) { + // We have at least one thread waiting on this mutex. Transfer + // ownership to it. + this.mutex_lock(id, thread); + this.unblock_thread(thread)?; + } } Ok(0) } else { @@ -435,16 +445,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx } else if kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")? { this.eval_libc_i32("EPERM") } else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? { - match locked_count.checked_sub(1) { - Some(new_count) => { - mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?; - Ok(0) - } - None => { - // locked_count was already zero - this.eval_libc_i32("EPERM") - } - } + this.eval_libc_i32("EPERM") } else { throw_ub_format!("called pthread_mutex_unlock on an unsupported type of mutex"); } @@ -454,13 +455,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx fn pthread_mutex_destroy(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); - if mutex_get_locked_count(this, mutex_op)?.to_u32()? != 0 { + let id = mutex_get_or_create_id(this, mutex_op)?; + + if this.mutex_is_locked(id) { throw_ub_format!("destroyed a locked mutex"); } - mutex_set_kind(this, mutex_op, ScalarMaybeUninit::Uninit)?; - mutex_set_locked_count(this, mutex_op, ScalarMaybeUninit::Uninit)?; - mutex_set_blockset(this, mutex_op, ScalarMaybeUninit::Uninit)?; + mutex_set_kind(this, mutex_op, ScalarMaybeUndef::Undef)?; + mutex_set_id(this, mutex_op, ScalarMaybeUndef::Undef)?; Ok(0) } @@ -468,121 +470,305 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx fn pthread_rwlock_rdlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); - let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?; - let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?; + let id = rwlock_get_or_create_id(this, rwlock_op)?; + let active_thread = this.get_active_thread()?; - if writers != 0 { - // The lock is locked by a writer. - assert_eq!(writers, 1); - let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?; - this.block_active_thread(reader_blockset)?; + if this.rwlock_is_write_locked(id) { + this.rwlock_enqueue_reader(id, active_thread); + this.block_thread(active_thread)?; Ok(0) } else { - match readers.checked_add(1) { - Some(new_readers) => { - rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?; - Ok(0) - } - None => this.eval_libc_i32("EAGAIN"), - } + this.rwlock_reader_add(id, active_thread); + Ok(0) } } fn pthread_rwlock_tryrdlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); - let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?; - let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?; - if writers != 0 { + let id = rwlock_get_or_create_id(this, rwlock_op)?; + let active_thread = this.get_active_thread()?; + + if this.rwlock_is_write_locked(id) { this.eval_libc_i32("EBUSY") } else { - match readers.checked_add(1) { - Some(new_readers) => { - rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?; - Ok(0) - } - None => this.eval_libc_i32("EAGAIN"), - } + this.rwlock_reader_add(id, active_thread); + Ok(0) } } fn pthread_rwlock_wrlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); - let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?; - let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?; - let writer_blockset = rwlock_get_or_create_writer_blockset(this, rwlock_op)?; - if readers != 0 || writers != 0 { - this.block_active_thread(writer_blockset)?; + let id = rwlock_get_or_create_id(this, rwlock_op)?; + let active_thread = this.get_active_thread()?; + + if this.rwlock_is_locked(id) { + this.block_thread(active_thread)?; + this.rwlock_enqueue_writer(id, active_thread); } else { - rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?; + this.rwlock_writer_set(id, active_thread); } + Ok(0) } fn pthread_rwlock_trywrlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); - let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?; - let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?; - if readers != 0 || writers != 0 { + let id = rwlock_get_or_create_id(this, rwlock_op)?; + let active_thread = this.get_active_thread()?; + + if this.rwlock_is_locked(id) { this.eval_libc_i32("EBUSY") } else { - rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?; + this.rwlock_writer_set(id, active_thread); Ok(0) } } - // FIXME: We should check that this lock was locked by the active thread. fn pthread_rwlock_unlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); - let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?; - let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?; - let writer_blockset = rwlock_get_or_create_writer_blockset(this, rwlock_op)?; - if let Some(new_readers) = readers.checked_sub(1) { - assert_eq!(writers, 0); - rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?; - if new_readers == 0 { - if let Some(_writer) = this.unblock_some_thread(writer_blockset)? { - rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?; + let id = rwlock_get_or_create_id(this, rwlock_op)?; + let active_thread = this.get_active_thread()?; + + if this.rwlock_reader_remove(id, active_thread) { + // The thread was a reader. + if this.rwlock_is_locked(id) { + // No more readers owning the lock. Give it to a writer if there + // is any. + if let Some(writer) = this.rwlock_dequeue_writer(id) { + this.unblock_thread(writer)?; + this.rwlock_writer_set(id, writer); } } Ok(0) - } else if writers != 0 { - let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?; + } else if Some(active_thread) == this.rwlock_writer_remove(id) { + // The thread was a writer. + // // We are prioritizing writers here against the readers. As a // result, not only readers can starve writers, but also writers can // starve readers. - if let Some(_writer) = this.unblock_some_thread(writer_blockset)? { - assert_eq!(writers, 1); + if let Some(writer) = this.rwlock_dequeue_writer(id) { + // Give the lock to another writer. + this.unblock_thread(writer)?; + this.rwlock_writer_set(id, writer); } else { - rwlock_set_writers(this, rwlock_op, Scalar::from_u32(0))?; - let mut readers = 0; - while let Some(_reader) = this.unblock_some_thread(reader_blockset)? { - readers += 1; + // Give the lock to all readers. + while let Some(reader) = this.rwlock_dequeue_reader(id) { + this.unblock_thread(reader)?; + this.rwlock_reader_add(id, reader); } - rwlock_set_readers(this, rwlock_op, Scalar::from_u32(readers))? } Ok(0) } else { - throw_ub_format!("unlocked an rwlock that was not locked"); + throw_ub_format!("unlocked an rwlock that was not locked by the active thread"); } } fn pthread_rwlock_destroy(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); - if rwlock_get_readers(this, rwlock_op)?.to_u32()? != 0 - || rwlock_get_writers(this, rwlock_op)?.to_u32()? != 0 - { + let id = rwlock_get_or_create_id(this, rwlock_op)?; + + if this.rwlock_is_locked(id) { throw_ub_format!("destroyed a locked rwlock"); } - rwlock_set_readers(this, rwlock_op, ScalarMaybeUninit::Uninit)?; - rwlock_set_writers(this, rwlock_op, ScalarMaybeUninit::Uninit)?; - rwlock_set_reader_blockset(this, rwlock_op, ScalarMaybeUninit::Uninit)?; - rwlock_set_writer_blockset(this, rwlock_op, ScalarMaybeUninit::Uninit)?; + rwlock_set_id(this, rwlock_op, ScalarMaybeUndef::Undef)?; + + Ok(0) + } + + fn pthread_condattr_init(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + + let default_clock_id = this.eval_libc("CLOCK_REALTIME")?; + condattr_set_clock_id(this, attr_op, default_clock_id)?; + + Ok(0) + } + + fn pthread_condattr_setclock( + &mut self, + attr_op: OpTy<'tcx, Tag>, + clock_id_op: OpTy<'tcx, Tag>, + ) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + + let clock_id = this.read_scalar(clock_id_op)?.not_undef()?; + if clock_id == this.eval_libc("CLOCK_REALTIME")? + || clock_id == this.eval_libc("CLOCK_MONOTONIC")? + { + condattr_set_clock_id(this, attr_op, clock_id)?; + } else { + let einval = this.eval_libc_i32("EINVAL")?; + return Ok(einval); + } + + Ok(0) + } + + fn pthread_condattr_getclock( + &mut self, + attr_op: OpTy<'tcx, Tag>, + clk_id_op: OpTy<'tcx, Tag>, + ) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + + let clock_id = condattr_get_clock_id(this, attr_op)?; + this.write_scalar(clock_id, this.deref_operand(clk_id_op)?.into())?; + + Ok(0) + } + + fn pthread_condattr_destroy(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + + condattr_set_clock_id(this, attr_op, ScalarMaybeUndef::Undef)?; + + Ok(0) + } + + fn pthread_cond_init( + &mut self, + cond_op: OpTy<'tcx, Tag>, + attr_op: OpTy<'tcx, Tag>, + ) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + + let attr = this.read_scalar(attr_op)?.not_undef()?; + let clock_id = if this.is_null(attr)? { + this.eval_libc("CLOCK_REALTIME")? + } else { + condattr_get_clock_id(this, attr_op)?.not_undef()? + }; + + let _ = cond_get_or_create_id(this, cond_op)?; + cond_set_clock_id(this, cond_op, clock_id)?; + + Ok(0) + } + + fn pthread_cond_signal(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + let id = cond_get_or_create_id(this, cond_op)?; + if let Some((thread, mutex)) = this.condvar_signal(id) { + reacquire_cond_mutex(this, thread, mutex)?; + this.unregister_callback_if_exists(thread)?; + } + + Ok(0) + } + + fn pthread_cond_broadcast(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + let id = cond_get_or_create_id(this, cond_op)?; + + while let Some((thread, mutex)) = this.condvar_signal(id) { + reacquire_cond_mutex(this, thread, mutex)?; + this.unregister_callback_if_exists(thread)?; + } + + Ok(0) + } + + fn pthread_cond_wait( + &mut self, + cond_op: OpTy<'tcx, Tag>, + mutex_op: OpTy<'tcx, Tag>, + ) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + + let id = cond_get_or_create_id(this, cond_op)?; + let mutex_id = mutex_get_or_create_id(this, mutex_op)?; + let active_thread = this.get_active_thread()?; + + release_cond_mutex(this, active_thread, mutex_id)?; + this.condvar_wait(id, active_thread, mutex_id); + + Ok(0) + } + + fn pthread_cond_timedwait( + &mut self, + cond_op: OpTy<'tcx, Tag>, + mutex_op: OpTy<'tcx, Tag>, + abstime_op: OpTy<'tcx, Tag>, + dest: PlaceTy<'tcx, Tag>, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + + this.check_no_isolation("pthread_cond_timedwait")?; + + let id = cond_get_or_create_id(this, cond_op)?; + let mutex_id = mutex_get_or_create_id(this, mutex_op)?; + let active_thread = this.get_active_thread()?; + + release_cond_mutex(this, active_thread, mutex_id)?; + this.condvar_wait(id, active_thread, mutex_id); + + // We return success for now and override it in the timeout callback. + this.write_scalar(Scalar::from_i32(0), dest)?; + + // Extract the timeout. + let clock_id = cond_get_clock_id(this, cond_op)?.to_i32()?; + let duration = { + let tp = this.deref_operand(abstime_op)?; + let mut offset = Size::from_bytes(0); + let layout = this.libc_ty_layout("time_t")?; + let seconds_place = tp.offset(offset, MemPlaceMeta::None, layout, this)?; + let seconds = this.read_scalar(seconds_place.into())?.to_u64()?; + offset += layout.size; + let layout = this.libc_ty_layout("c_long")?; + let nanoseconds_place = tp.offset(offset, MemPlaceMeta::None, layout, this)?; + let nanoseconds = this.read_scalar(nanoseconds_place.into())?.to_u64()?; + Duration::new(seconds, nanoseconds as u32) + }; + + let timeout_time = if clock_id == this.eval_libc_i32("CLOCK_REALTIME")? { + let time_anchor_since_epoch = + this.machine.time_anchor_timestamp.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let duration_since_time_anchor = duration.checked_sub(time_anchor_since_epoch).unwrap(); + this.machine.time_anchor.checked_add(duration_since_time_anchor).unwrap() + } else if clock_id == this.eval_libc_i32("CLOCK_MONOTONIC")? { + this.machine.time_anchor.checked_add(duration).unwrap() + } else { + throw_ub_format!("Unsupported clock id."); + }; + + // Register the timeout callback. + this.register_callback( + active_thread, + timeout_time, + Box::new(move |ecx| { + // Try to reacquire the mutex. + reacquire_cond_mutex(ecx, active_thread, mutex_id)?; + + // Remove the thread from the conditional variable. + ecx.condvar_remove_waiter(id, active_thread); + + // Set the timeout value. + let timeout = ecx.eval_libc_i32("ETIMEDOUT")?; + ecx.write_scalar(Scalar::from_i32(timeout), dest)?; + + Ok(()) + }), + )?; + + Ok(()) + } + + fn pthread_cond_destroy(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> { + let this = self.eval_context_mut(); + + let id = cond_get_or_create_id(this, cond_op)?; + if this.condvar_is_awaited(id) { + throw_ub_format!("destroyed an awaited conditional variable"); + } + cond_set_id(this, cond_op, ScalarMaybeUndef::Undef)?; + cond_set_clock_id(this, cond_op, ScalarMaybeUndef::Undef)?; Ok(0) } diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 000000000000..5d181692fb2a --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,299 @@ +use std::collections::{hash_map::Entry, HashMap, VecDeque}; +use std::convert::TryFrom; +use std::num::NonZeroU32; +use std::time::Instant; + +use rustc_index::vec::{Idx, IndexVec}; + +use crate::*; + +macro_rules! declare_id { + ($name: ident) => { + #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] + pub struct $name(NonZeroU32); + + impl Idx for $name { + fn new(idx: usize) -> Self { + $name(NonZeroU32::new(u32::try_from(idx).unwrap() + 1).unwrap()) + } + fn index(self) -> usize { + usize::try_from(self.0.get() - 1).unwrap() + } + } + + impl From for $name { + fn from(id: u32) -> Self { + Self(NonZeroU32::new(id).unwrap()) + } + } + + impl $name { + pub fn to_u32_scalar<'tcx>(&self) -> Scalar { + Scalar::from_u32(self.0.get()) + } + } + }; +} + +declare_id!(MutexId); + +/// The mutex state. +#[derive(Default, Debug)] +struct Mutex { + /// The thread that currently owns the lock. + owner: Option, + /// How many times the mutex was locked by the owner. + lock_count: usize, + /// The queue of threads waiting for this mutex. + queue: VecDeque, +} + +declare_id!(RwLockId); + +/// The read-write lock state. +#[derive(Default, Debug)] +struct RwLock { + /// The writer thread that currently owns the lock. + writer: Option, + /// The readers that currently own the lock and how many times they acquired + /// the lock. + readers: HashMap, + /// The queue of writer threads waiting for this lock. + writer_queue: VecDeque, + /// The queue of reader threads waiting for this lock. + reader_queue: VecDeque, +} + +declare_id!(CondvarId); + +/// A thread waiting on a conditional variable. +#[derive(Debug)] +struct CondvarWaiter { + /// The thread that is waiting on this variable. + thread: ThreadId, + /// The mutex on which the thread is waiting. + mutex: MutexId, + /// The moment in time when the waiter should time out. + timeout: Option, +} + +/// The conditional variable state. +#[derive(Default, Debug)] +struct Condvar { + waiters: VecDeque, +} + +/// The state of all synchronization variables. +#[derive(Default, Debug)] +pub(super) struct SynchronizationState { + mutexes: IndexVec, + rwlocks: IndexVec, + condvars: IndexVec, +} + +// Public interface to synchronization primitives. Please note that in most +// cases, the function calls are infallible and it is the client's (shim +// implementation's) responsibility to detect and deal with erroneous +// situations. +impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {} +pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> { + #[inline] + /// Create state for a new mutex. + fn mutex_create(&mut self) -> MutexId { + let this = self.eval_context_mut(); + this.machine.threads.sync.mutexes.push(Default::default()) + } + + #[inline] + /// Get the id of the thread that currently owns this lock. + fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId { + let this = self.eval_context_ref(); + this.machine.threads.sync.mutexes[id].owner.unwrap() + } + + #[inline] + /// Check if locked. + fn mutex_is_locked(&mut self, id: MutexId) -> bool { + let this = self.eval_context_mut(); + this.machine.threads.sync.mutexes[id].owner.is_some() + } + + /// Lock by setting the mutex owner and increasing the lock count. + fn mutex_lock(&mut self, id: MutexId, thread: ThreadId) { + let this = self.eval_context_mut(); + let mutex = &mut this.machine.threads.sync.mutexes[id]; + if let Some(current_owner) = mutex.owner { + assert_eq!(thread, current_owner, "mutex already locked by another thread"); + assert!( + mutex.lock_count > 0, + "invariant violation: lock_count == 0 iff the thread is unlocked" + ); + } else { + mutex.owner = Some(thread); + } + mutex.lock_count = mutex.lock_count.checked_add(1).unwrap(); + } + + /// Unlock by decreasing the lock count. If the lock count reaches 0, unset + /// the owner. + fn mutex_unlock(&mut self, id: MutexId) -> Option<(ThreadId, usize)> { + let this = self.eval_context_mut(); + let mutex = &mut this.machine.threads.sync.mutexes[id]; + if let Some(current_owner) = mutex.owner { + mutex.lock_count = mutex + .lock_count + .checked_sub(1) + .expect("invariant violation: lock_count == 0 iff the thread is unlocked"); + if mutex.lock_count == 0 { + mutex.owner = None; + } + Some((current_owner, mutex.lock_count)) + } else { + None + } + } + + #[inline] + /// Take a thread out the queue waiting for the lock. + fn mutex_enqueue(&mut self, id: MutexId, thread: ThreadId) { + let this = self.eval_context_mut(); + this.machine.threads.sync.mutexes[id].queue.push_back(thread); + } + + #[inline] + /// Take a thread out the queue waiting for the lock. + fn mutex_dequeue(&mut self, id: MutexId) -> Option { + let this = self.eval_context_mut(); + this.machine.threads.sync.mutexes[id].queue.pop_front() + } + + #[inline] + /// Create state for a new read write lock. + fn rwlock_create(&mut self) -> RwLockId { + let this = self.eval_context_mut(); + this.machine.threads.sync.rwlocks.push(Default::default()) + } + + #[inline] + /// Check if locked. + fn rwlock_is_locked(&mut self, id: RwLockId) -> bool { + let this = self.eval_context_mut(); + this.machine.threads.sync.rwlocks[id].writer.is_some() + || !this.machine.threads.sync.rwlocks[id].readers.is_empty() + } + + #[inline] + /// Check if write locked. + fn rwlock_is_write_locked(&mut self, id: RwLockId) -> bool { + let this = self.eval_context_mut(); + this.machine.threads.sync.rwlocks[id].writer.is_some() + } + + /// Add a reader that collectively with other readers owns the lock. + fn rwlock_reader_add(&mut self, id: RwLockId, reader: ThreadId) { + let this = self.eval_context_mut(); + assert!(!this.rwlock_is_write_locked(id), "the lock is write locked"); + let count = this.machine.threads.sync.rwlocks[id].readers.entry(reader).or_insert(0); + *count += 1; + } + + /// Try removing the reader. Returns `true` if succeeded. + fn rwlock_reader_remove(&mut self, id: RwLockId, reader: ThreadId) -> bool { + let this = self.eval_context_mut(); + match this.machine.threads.sync.rwlocks[id].readers.entry(reader) { + Entry::Occupied(mut entry) => { + let count = entry.get_mut(); + *count -= 1; + if *count == 0 { + entry.remove(); + } + true + } + Entry::Vacant(_) => false, + } + } + + #[inline] + /// Put the reader in the queue waiting for the lock. + fn rwlock_enqueue_reader(&mut self, id: RwLockId, reader: ThreadId) { + let this = self.eval_context_mut(); + assert!(this.rwlock_is_write_locked(id), "queueing on not write locked lock"); + this.machine.threads.sync.rwlocks[id].reader_queue.push_back(reader); + } + + #[inline] + /// Take the reader out the queue waiting for the lock. + fn rwlock_dequeue_reader(&mut self, id: RwLockId) -> Option { + let this = self.eval_context_mut(); + this.machine.threads.sync.rwlocks[id].reader_queue.pop_front() + } + + #[inline] + /// Lock by setting the writer that owns the lock. + fn rwlock_writer_set(&mut self, id: RwLockId, writer: ThreadId) { + let this = self.eval_context_mut(); + assert!(!this.rwlock_is_locked(id), "the lock is already locked"); + this.machine.threads.sync.rwlocks[id].writer = Some(writer); + } + + #[inline] + /// Try removing the writer. + fn rwlock_writer_remove(&mut self, id: RwLockId) -> Option { + let this = self.eval_context_mut(); + this.machine.threads.sync.rwlocks[id].writer.take() + } + + #[inline] + /// Put the writer in the queue waiting for the lock. + fn rwlock_enqueue_writer(&mut self, id: RwLockId, writer: ThreadId) { + let this = self.eval_context_mut(); + assert!(this.rwlock_is_locked(id), "queueing on unlocked lock"); + this.machine.threads.sync.rwlocks[id].writer_queue.push_back(writer); + } + + #[inline] + /// Take the writer out the queue waiting for the lock. + fn rwlock_dequeue_writer(&mut self, id: RwLockId) -> Option { + let this = self.eval_context_mut(); + this.machine.threads.sync.rwlocks[id].writer_queue.pop_front() + } + + #[inline] + /// Create state for a new conditional variable. + fn condvar_create(&mut self) -> CondvarId { + let this = self.eval_context_mut(); + this.machine.threads.sync.condvars.push(Default::default()) + } + + #[inline] + /// Is the conditional variable awaited? + fn condvar_is_awaited(&mut self, id: CondvarId) -> bool { + let this = self.eval_context_mut(); + !this.machine.threads.sync.condvars[id].waiters.is_empty() + } + + /// Mark that the thread is waiting on the conditional variable. + fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, mutex: MutexId) { + let this = self.eval_context_mut(); + let waiters = &mut this.machine.threads.sync.condvars[id].waiters; + assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); + waiters.push_back(CondvarWaiter { thread, mutex, timeout: None }); + } + + /// Wake up some thread (if there is any) sleeping on the conditional + /// variable. + fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> { + let this = self.eval_context_mut(); + this.machine.threads.sync.condvars[id] + .waiters + .pop_front() + .map(|waiter| (waiter.thread, waiter.mutex)) + } + + #[inline] + /// Remove the thread from the queue of threads waiting on this conditional variable. + fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) { + let this = self.eval_context_mut(); + this.machine.threads.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread); + } +} diff --git a/src/thread.rs b/src/thread.rs index d78beed28cfb..6ebf35a6527f 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,8 +1,10 @@ //! Implements threads. use std::cell::RefCell; +use std::collections::hash_map::Entry; use std::convert::TryFrom; use std::num::{NonZeroU32, TryFromIntError}; +use std::time::Instant; use log::trace; @@ -15,18 +17,24 @@ use rustc_middle::{ ty::{self, Instance}, }; +use crate::sync::SynchronizationState; use crate::*; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum SchedulingAction { /// Execute step on the active thread. ExecuteStep, + /// Execute a scheduler's callback. + ExecuteCallback, /// Execute destructors of the active thread. ExecuteDtors, /// Stop the program. Stop, } +type EventCallback<'mir, 'tcx> = + Box>) -> InterpResult<'tcx> + 'tcx>; + /// A thread identifier. #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub struct ThreadId(u32); @@ -94,6 +102,7 @@ pub enum ThreadState { BlockedOnJoin(ThreadId), /// The thread is blocked and belongs to the given blockset. Blocked(BlockSetId), + BlockedThread, /// The thread has terminated its execution (we do not delete terminated /// threads). Terminated, @@ -162,6 +171,23 @@ impl<'mir, 'tcx> Default for Thread<'mir, 'tcx> { } } +/// Callbacks are used to implement timeouts. For example, waiting on a +/// conditional variable with a timeout creates a callback that is called after +/// the specified time and unblocks the thread. If another thread signals on the +/// conditional variable, the signal handler deletes the callback. +struct CallBackInfo<'mir, 'tcx> { + /// The callback should be called no earlier than this time. + call_time: Instant, + /// The called function. + callback: EventCallback<'mir, 'tcx>, +} + +impl<'mir, 'tcx> std::fmt::Debug for CallBackInfo<'mir, 'tcx> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CallBack({:?})", self.call_time) + } +} + /// A set of threads. #[derive(Debug)] pub struct ThreadManager<'mir, 'tcx> { @@ -171,6 +197,8 @@ pub struct ThreadManager<'mir, 'tcx> { /// /// Note that this vector also contains terminated threads. threads: IndexVec>, + /// FIXME: make private. + pub(crate) sync: SynchronizationState, /// A counter used to generate unique identifiers for blocksets. blockset_counter: u32, /// A mapping from a thread-local static to an allocation id of a thread @@ -178,6 +206,8 @@ pub struct ThreadManager<'mir, 'tcx> { thread_local_alloc_ids: RefCell>, /// A flag that indicates that we should change the active thread. yield_active_thread: bool, + /// Callbacks that are called once the specified time passes. + callbacks: FxHashMap>, } impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { @@ -191,9 +221,11 @@ impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { Self { active_thread: ThreadId::new(0), threads: threads, + sync: SynchronizationState::default(), blockset_counter: 0, thread_local_alloc_ids: Default::default(), yield_active_thread: false, + callbacks: FxHashMap::default(), } } } @@ -321,30 +353,18 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { self.active_thread_ref().thread_name() } - /// Allocate a new blockset id. - fn create_blockset(&mut self) -> BlockSetId { - self.blockset_counter = self.blockset_counter.checked_add(1).unwrap(); - BlockSetId::new(self.blockset_counter) - } - - /// Block the currently active thread and put it into the given blockset. - fn block_active_thread(&mut self, set: BlockSetId) { - let state = &mut self.active_thread_mut().state; + /// Put the thread into the blocked state. + fn block_thread(&mut self, thread: ThreadId) { + let state = &mut self.threads[thread].state; assert_eq!(*state, ThreadState::Enabled); - *state = ThreadState::Blocked(set); + *state = ThreadState::BlockedThread; } - /// Unblock any one thread from the given blockset if it contains at least - /// one. Return the id of the unblocked thread. - fn unblock_some_thread(&mut self, set: BlockSetId) -> Option { - for (id, thread) in self.threads.iter_enumerated_mut() { - if thread.state == ThreadState::Blocked(set) { - trace!("unblocking {:?} in blockset {:?}", id, set); - thread.state = ThreadState::Enabled; - return Some(id); - } - } - None + /// Put the blocked thread into the enabled state. + fn unblock_thread(&mut self, thread: ThreadId) { + let state = &mut self.threads[thread].state; + assert_eq!(*state, ThreadState::BlockedThread); + *state = ThreadState::Enabled; } /// Change the active thread to some enabled thread. @@ -352,6 +372,39 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { self.yield_active_thread = true; } + /// Register the given `callback` to be called once the `call_time` passes. + fn register_callback( + &mut self, + thread: ThreadId, + call_time: Instant, + callback: EventCallback<'mir, 'tcx>, + ) { + self.callbacks + .insert(thread, CallBackInfo { call_time: call_time, callback: callback }) + .unwrap_none(); + } + + /// Unregister the callback for the `thread`. + fn unregister_callback_if_exists(&mut self, thread: ThreadId) { + self.callbacks.remove(&thread); + } + + /// Get a callback that is ready to be called. + fn get_callback(&mut self) -> Option<(ThreadId, EventCallback<'mir, 'tcx>)> { + let current_time = Instant::now(); + // We use a for loop here to make the scheduler more deterministic. + for thread in self.threads.indices() { + match self.callbacks.entry(thread) { + Entry::Occupied(entry) => + if current_time >= entry.get().call_time { + return Some((thread, entry.remove().callback)); + }, + Entry::Vacant(_) => {} + } + } + None + } + /// Decide which action to take next and on which thread. /// /// The currently implemented scheduling policy is the one that is commonly @@ -407,6 +460,18 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { // We have not found a thread to execute. if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { unreachable!(); + } else if let Some(next_call_time) = + self.callbacks.values().min_by_key(|info| info.call_time) + { + // All threads are currently blocked, but we have unexecuted + // callbacks, which may unblock some of the threads. Hence, + // sleep until the first callback. + if let Some(sleep_time) = + next_call_time.call_time.checked_duration_since(Instant::now()) + { + std::thread::sleep(sleep_time); + } + Ok(SchedulingAction::ExecuteCallback) } else { throw_machine_stop!(TerminationInfo::Deadlock); } @@ -577,21 +642,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx } #[inline] - fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> { + fn block_thread(&mut self, thread: ThreadId) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - Ok(this.machine.threads.create_blockset()) + Ok(this.machine.threads.block_thread(thread)) } #[inline] - fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> { + fn unblock_thread(&mut self, thread: ThreadId) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - Ok(this.machine.threads.block_active_thread(set)) - } - - #[inline] - fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option> { - let this = self.eval_context_mut(); - Ok(this.machine.threads.unblock_some_thread(set)) + Ok(this.machine.threads.unblock_thread(thread)) } #[inline] @@ -601,6 +660,36 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx Ok(()) } + #[inline] + fn register_callback( + &mut self, + thread: ThreadId, + call_time: Instant, + callback: EventCallback<'mir, 'tcx>, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + this.machine.threads.register_callback(thread, call_time, callback); + Ok(()) + } + + #[inline] + fn unregister_callback_if_exists(&mut self, thread: ThreadId) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + this.machine.threads.unregister_callback_if_exists(thread); + Ok(()) + } + + /// Execute the callback on the callback's thread. + #[inline] + fn run_scheduler_callback(&mut self) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let (thread, callback) = this.machine.threads.get_callback().expect("no callback found"); + let old_thread = this.set_active_thread(thread)?; + callback(this)?; + this.set_active_thread(old_thread)?; + Ok(()) + } + /// Decide which action to take next and on which thread. #[inline] fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { diff --git a/tests/run-pass/concurrency/barrier.rs b/tests/run-pass/concurrency/barrier.rs new file mode 100644 index 000000000000..1e976a63453d --- /dev/null +++ b/tests/run-pass/concurrency/barrier.rs @@ -0,0 +1,27 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +//! Check if Rust barriers are working. + +use std::sync::{Arc, Barrier}; +use std::thread; + + +/// This test is taken from the Rust documentation. +fn main() { + let mut handles = Vec::with_capacity(10); + let barrier = Arc::new(Barrier::new(10)); + for _ in 0..10 { + let c = barrier.clone(); + // The same messages will be printed together. + // You will NOT see any interleaving. + handles.push(thread::spawn(move|| { + println!("before wait"); + c.wait(); + println!("after wait"); + })); + } + // Wait for other threads to finish. + for handle in handles { + handle.join().unwrap(); + } +} \ No newline at end of file diff --git a/tests/run-pass/concurrency/barrier.stderr b/tests/run-pass/concurrency/barrier.stderr new file mode 100644 index 000000000000..2dbfb7721d36 --- /dev/null +++ b/tests/run-pass/concurrency/barrier.stderr @@ -0,0 +1,2 @@ +warning: thread support is experimental. For example, Miri does not detect data races yet. + diff --git a/tests/run-pass/concurrency/barrier.stdout b/tests/run-pass/concurrency/barrier.stdout new file mode 100644 index 000000000000..f2c036a1735e --- /dev/null +++ b/tests/run-pass/concurrency/barrier.stdout @@ -0,0 +1,20 @@ +before wait +before wait +before wait +before wait +before wait +before wait +before wait +before wait +before wait +before wait +after wait +after wait +after wait +after wait +after wait +after wait +after wait +after wait +after wait +after wait diff --git a/tests/run-pass/concurrency/condvar.rs b/tests/run-pass/concurrency/condvar.rs new file mode 100644 index 000000000000..ab971ee6e8c6 --- /dev/null +++ b/tests/run-pass/concurrency/condvar.rs @@ -0,0 +1,28 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +//! Check if Rust conditional variables are working. + +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; + +/// The test taken from the Rust documentation. +fn main() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // Inside of our lock, spawn a new thread, and then wait for it to start. + thread::spawn(move || { + let (lock, cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + *started = true; + // We notify the condvar that the value has changed. + cvar.notify_one(); + }); + + // Wait for the thread to start up. + let (lock, cvar) = &*pair; + let mut started = lock.lock().unwrap(); + while !*started { + started = cvar.wait(started).unwrap(); + } +} diff --git a/tests/run-pass/concurrency/condvar.stderr b/tests/run-pass/concurrency/condvar.stderr new file mode 100644 index 000000000000..2dbfb7721d36 --- /dev/null +++ b/tests/run-pass/concurrency/condvar.stderr @@ -0,0 +1,2 @@ +warning: thread support is experimental. For example, Miri does not detect data races yet. + diff --git a/tests/run-pass/concurrency/libc_pthread_cond.rs b/tests/run-pass/concurrency/libc_pthread_cond.rs new file mode 100644 index 000000000000..83a651e6f04a --- /dev/null +++ b/tests/run-pass/concurrency/libc_pthread_cond.rs @@ -0,0 +1,199 @@ +// ignore-windows: No libc on Windows +// compile-flags: -Zmiri-disable-isolation + +#![feature(rustc_private)] + +extern crate libc; + +use std::cell::UnsafeCell; +use std::mem::{self, MaybeUninit}; +use std::sync::Arc; +use std::thread; + +struct Mutex { + inner: UnsafeCell, +} + +unsafe impl Sync for Mutex {} + +impl std::fmt::Debug for Mutex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Mutex") + } +} + +struct Cond { + inner: UnsafeCell, +} + +unsafe impl Sync for Cond {} + +impl std::fmt::Debug for Cond { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Cond") + } +} + +unsafe fn create_cond_attr_monotonic() -> libc::pthread_condattr_t { + let mut attr = MaybeUninit::::uninit(); + assert_eq!(libc::pthread_condattr_init(attr.as_mut_ptr()), 0); + assert_eq!(libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC), 0); + attr.assume_init() +} + +unsafe fn create_cond(attr: Option) -> Cond { + let cond: Cond = mem::zeroed(); + if let Some(mut attr) = attr { + assert_eq!(libc::pthread_cond_init(cond.inner.get() as *mut _, &attr as *const _), 0); + assert_eq!(libc::pthread_condattr_destroy(&mut attr as *mut _), 0); + } else { + assert_eq!(libc::pthread_cond_init(cond.inner.get() as *mut _, 0 as *const _), 0); + } + cond +} + +unsafe fn create_mutex() -> Mutex { + mem::zeroed() +} + +unsafe fn create_timeout(seconds: i64) -> libc::timespec { + let mut now: libc::timespec = mem::zeroed(); + assert_eq!(libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut now), 0); + libc::timespec { tv_sec: now.tv_sec + seconds, tv_nsec: now.tv_nsec } +} + +fn test_pthread_condattr_t() { + unsafe { + let mut attr = create_cond_attr_monotonic(); + let mut clock_id = MaybeUninit::::uninit(); + assert_eq!(libc::pthread_condattr_getclock(&attr as *const _, clock_id.as_mut_ptr()), 0); + assert_eq!(clock_id.assume_init(), libc::CLOCK_MONOTONIC); + assert_eq!(libc::pthread_condattr_destroy(&mut attr as *mut _), 0); + } +} + +fn test_signal() { + unsafe { + let cond = Arc::new(create_cond(None)); + let mutex = Arc::new(create_mutex()); + + assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0); + + let spawn_mutex = Arc::clone(&mutex); + let spawn_cond = Arc::clone(&cond); + let handle = thread::spawn(move || { + assert_eq!(libc::pthread_mutex_lock(spawn_mutex.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_cond_signal(spawn_cond.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_mutex_unlock(spawn_mutex.inner.get() as *mut _), 0); + }); + + assert_eq!( + libc::pthread_cond_wait(cond.inner.get() as *mut _, mutex.inner.get() as *mut _), + 0 + ); + assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0); + + handle.join().unwrap(); + + let mutex = Arc::try_unwrap(mutex).unwrap(); + assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0); + let cond = Arc::try_unwrap(cond).unwrap(); + assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0); + } +} + +fn test_broadcast() { + unsafe { + let cond = Arc::new(create_cond(None)); + let mutex = Arc::new(create_mutex()); + + assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0); + + let spawn_mutex = Arc::clone(&mutex); + let spawn_cond = Arc::clone(&cond); + let handle = thread::spawn(move || { + assert_eq!(libc::pthread_mutex_lock(spawn_mutex.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_cond_broadcast(spawn_cond.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_mutex_unlock(spawn_mutex.inner.get() as *mut _), 0); + }); + + assert_eq!( + libc::pthread_cond_wait(cond.inner.get() as *mut _, mutex.inner.get() as *mut _), + 0 + ); + assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0); + + handle.join().unwrap(); + + let mutex = Arc::try_unwrap(mutex).unwrap(); + assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0); + let cond = Arc::try_unwrap(cond).unwrap(); + assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0); + } +} + +fn test_timed_wait_timeout() { + unsafe { + let attr = create_cond_attr_monotonic(); + let cond = create_cond(Some(attr)); + let mutex = create_mutex(); + let timeout = create_timeout(1); + + assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0); + assert_eq!( + libc::pthread_cond_timedwait( + cond.inner.get() as *mut _, + mutex.inner.get() as *mut _, + &timeout + ), + libc::ETIMEDOUT + ); + assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0); + } +} + +fn test_timed_wait_notimeout() { + unsafe { + let attr = create_cond_attr_monotonic(); + let cond = Arc::new(create_cond(Some(attr))); + let mutex = Arc::new(create_mutex()); + let timeout = create_timeout(100); + + assert_eq!(libc::pthread_mutex_lock(mutex.inner.get() as *mut _), 0); + + let spawn_mutex = Arc::clone(&mutex); + let spawn_cond = Arc::clone(&cond); + let handle = thread::spawn(move || { + assert_eq!(libc::pthread_mutex_lock(spawn_mutex.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_cond_signal(spawn_cond.inner.get() as *mut _), 0); + assert_eq!(libc::pthread_mutex_unlock(spawn_mutex.inner.get() as *mut _), 0); + }); + + assert_eq!( + libc::pthread_cond_timedwait( + cond.inner.get() as *mut _, + mutex.inner.get() as *mut _, + &timeout + ), + 0 + ); + assert_eq!(libc::pthread_mutex_unlock(mutex.inner.get() as *mut _), 0); + + handle.join().unwrap(); + + let mutex = Arc::try_unwrap(mutex).unwrap(); + assert_eq!(libc::pthread_mutex_destroy(mutex.inner.get() as *mut _), 0); + let cond = Arc::try_unwrap(cond).unwrap(); + assert_eq!(libc::pthread_cond_destroy(cond.inner.get() as *mut _), 0); + } +} + +fn main() { + test_pthread_condattr_t(); + test_signal(); + test_broadcast(); + test_timed_wait_timeout(); + test_timed_wait_notimeout(); +} diff --git a/tests/run-pass/concurrency/libc_pthread_cond.stderr b/tests/run-pass/concurrency/libc_pthread_cond.stderr new file mode 100644 index 000000000000..2dbfb7721d36 --- /dev/null +++ b/tests/run-pass/concurrency/libc_pthread_cond.stderr @@ -0,0 +1,2 @@ +warning: thread support is experimental. For example, Miri does not detect data races yet. + diff --git a/tests/run-pass/concurrency/mpsc.rs b/tests/run-pass/concurrency/mpsc.rs new file mode 100644 index 000000000000..3558f5415d07 --- /dev/null +++ b/tests/run-pass/concurrency/mpsc.rs @@ -0,0 +1,56 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +//! Check if Rust channels are working. + +use std::sync::mpsc::{channel, sync_channel}; +use std::thread; + +/// The test taken from the Rust documentation. +fn simple_send() { + let (tx, rx) = channel(); + thread::spawn(move || { + tx.send(10).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 10); +} + +/// The test taken from the Rust documentation. +fn multiple_send() { + let (tx, rx) = channel(); + for i in 0..10 { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(i).unwrap(); + }); + } + + let mut sum = 0; + for _ in 0..10 { + let j = rx.recv().unwrap(); + assert!(0 <= j && j < 10); + sum += j; + } + assert_eq!(sum, 45); +} + +/// The test taken from the Rust documentation. +fn send_on_sync() { + let (sender, receiver) = sync_channel(1); + + // this returns immediately + sender.send(1).unwrap(); + + thread::spawn(move || { + // this will block until the previous message has been received + sender.send(2).unwrap(); + }); + + assert_eq!(receiver.recv().unwrap(), 1); + assert_eq!(receiver.recv().unwrap(), 2); +} + +fn main() { + simple_send(); + multiple_send(); + send_on_sync(); +} diff --git a/tests/run-pass/concurrency/mpsc.stderr b/tests/run-pass/concurrency/mpsc.stderr new file mode 100644 index 000000000000..2dbfb7721d36 --- /dev/null +++ b/tests/run-pass/concurrency/mpsc.stderr @@ -0,0 +1,2 @@ +warning: thread support is experimental. For example, Miri does not detect data races yet. + diff --git a/tests/run-pass/concurrency/once.rs b/tests/run-pass/concurrency/once.rs new file mode 100644 index 000000000000..499ceacfa8c4 --- /dev/null +++ b/tests/run-pass/concurrency/once.rs @@ -0,0 +1,44 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +//! Check if Rust once statics are working. The test taken from the Rust +//! documentation. + +use std::sync::Once; +use std::thread; + +static mut VAL: usize = 0; +static INIT: Once = Once::new(); + +fn get_cached_val() -> usize { + unsafe { + INIT.call_once(|| { + VAL = expensive_computation(); + }); + VAL + } +} + +fn expensive_computation() -> usize { + let mut i = 1; + let mut c = 1; + while i < 10000 { + i *= c; + c += 1; + } + i +} + +fn main() { + let handles: Vec<_> = (0..10) + .map(|_| { + thread::spawn(|| { + thread::yield_now(); + let val = get_cached_val(); + assert_eq!(val, 40320); + }) + }) + .collect(); + for handle in handles { + handle.join().unwrap(); + } +} diff --git a/tests/run-pass/concurrency/once.stderr b/tests/run-pass/concurrency/once.stderr new file mode 100644 index 000000000000..2dbfb7721d36 --- /dev/null +++ b/tests/run-pass/concurrency/once.stderr @@ -0,0 +1,2 @@ +warning: thread support is experimental. For example, Miri does not detect data races yet. +