From 94cbe88e8073381dbf7aeed2f0cf720b08f05785 Mon Sep 17 00:00:00 2001 From: Vytautas Astrauskas Date: Sun, 19 Apr 2020 14:21:18 -0700 Subject: [PATCH] Many small changes to thread management. --- src/shims/sync.rs | 8 ++-- src/thread.rs | 112 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 33 deletions(-) diff --git a/src/shims/sync.rs b/src/shims/sync.rs index 6a1ea108dbb0..97afbbe98f6a 100644 --- a/src/shims/sync.rs +++ b/src/shims/sync.rs @@ -419,7 +419,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread"); } else if locked_count == 1 { let blockset = mutex_get_or_create_blockset(this, mutex_op)?; - if let Some(new_owner) = this.unblock_random_thread(blockset)? { + if let Some(new_owner) = this.unblock_some_thread(blockset)? { // We have at least one thread waiting on this mutex. Transfer // ownership to it. mutex_set_owner(this, mutex_op, new_owner.to_u32_scalar())?; @@ -543,7 +543,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx assert_eq!(writers, 0); rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?; if new_readers == 0 { - if let Some(_writer) = this.unblock_random_thread(writer_blockset)? { + if let Some(_writer) = this.unblock_some_thread(writer_blockset)? { rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?; } } @@ -551,11 +551,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx } else if writers != 0 { let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?; rwlock_set_writers(this, rwlock_op, Scalar::from_u32(0))?; - if let Some(_writer) = this.unblock_random_thread(writer_blockset)? { + if let Some(_writer) = this.unblock_some_thread(writer_blockset)? { rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?; } else { let mut readers = 0; - while let Some(_reader) = this.unblock_random_thread(reader_blockset)? { + while let Some(_reader) = this.unblock_some_thread(reader_blockset)? { readers += 1; } rwlock_set_readers(this, rwlock_op, Scalar::from_u32(readers))? diff --git a/src/thread.rs b/src/thread.rs index ab6a4c94db83..5eb6560a09e6 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -31,6 +31,9 @@ pub enum SchedulingAction { #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub struct ThreadId(usize); +/// The main thread. When it terminates, the whole application terminates. +const MAIN_THREAD: ThreadId = ThreadId(0); + impl Idx for ThreadId { fn new(idx: usize) -> Self { ThreadId(idx) @@ -42,13 +45,13 @@ impl Idx for ThreadId { impl From for ThreadId { fn from(id: u64) -> Self { - Self(id as usize) + Self(usize::try_from(id).unwrap()) } } impl From for ThreadId { fn from(id: u32) -> Self { - Self(id as usize) + Self(usize::try_from(id).unwrap()) } } @@ -82,10 +85,10 @@ pub enum ThreadState { /// The thread tried to join the specified thread and is blocked until that /// thread terminates. BlockedOnJoin(ThreadId), - /// The thread is blocked and belongs to the given blockset.. + /// The thread is blocked and belongs to the given blockset. Blocked(BlockSetId), /// The thread has terminated its execution (we do not delete terminated - /// threads.) + /// threads). Terminated, } @@ -150,6 +153,7 @@ pub struct ThreadManager<'mir, 'tcx> { impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { fn default() -> Self { let mut threads = IndexVec::new(); + // Create the main thread and add it to the list of threads. threads.push(Default::default()); Self { active_thread: ThreadId::new(0), @@ -170,14 +174,13 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { /// Set the allocation id as the allocation id of the given thread local /// static for the active thread. + /// + /// Panics if a thread local is initialized twice for the same thread. fn set_thread_local_alloc_id(&self, def_id: DefId, new_alloc_id: AllocId) { - assert!( - self.thread_local_alloc_ids - .borrow_mut() - .insert((def_id, self.active_thread), new_alloc_id) - .is_none(), - "a thread local initialized twice for the same thread" - ); + self.thread_local_alloc_ids + .borrow_mut() + .insert((def_id, self.active_thread), new_alloc_id) + .unwrap_none(); } /// Borrow the stack of the active thread. @@ -227,15 +230,20 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { } /// Mark that the active thread tries to join the thread with `joined_thread_id`. - fn join_thread(&mut self, joined_thread_id: ThreadId) { - assert!(!self.threads[joined_thread_id].detached, "Bug: trying to join a detached thread."); - assert_ne!(joined_thread_id, self.active_thread, "Bug: trying to join itself"); - assert!( - self.threads - .iter() - .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)), - "Bug: multiple threads try to join the same thread." - ); + fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { + if self.threads[joined_thread_id].detached { + throw_ub_format!("trying to join a detached thread"); + } + if joined_thread_id == self.active_thread { + throw_ub_format!("trying to join itself"); + } + if self + .threads + .iter() + .any(|thread| thread.state == ThreadState::BlockedOnJoin(joined_thread_id)) + { + throw_ub_format!("multiple threads try to join the same thread"); + } if self.threads[joined_thread_id].state != ThreadState::Terminated { // The joined thread is still running, we need to wait for it. self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id); @@ -245,6 +253,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { joined_thread_id ); } + Ok(()) } /// Set the name of the active thread. @@ -252,6 +261,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { self.active_thread_mut().thread_name = Some(new_thread_name); } + /// Get the name of the active thread. + fn get_thread_name(&mut self) -> InterpResult<'tcx, Vec> { + if let Some(ref thread_name) = self.active_thread_mut().thread_name { + Ok(thread_name.clone()) + } else { + throw_ub_format!("thread {:?} has no name set", self.active_thread) + } + } + /// Allocate a new blockset id. fn create_blockset(&mut self) -> BlockSetId { self.blockset_counter = self.blockset_counter.checked_add(1).unwrap(); @@ -267,7 +285,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { /// Unblock any one thread from the given blockset if it contains at least /// one. Return the id of the unblocked thread. - fn unblock_random_thread(&mut self, set: BlockSetId) -> Option { + fn unblock_some_thread(&mut self, set: BlockSetId) -> Option { for (id, thread) in self.threads.iter_enumerated_mut() { if thread.state == ThreadState::Blocked(set) { trace!("unblocking {:?} in blockset {:?}", id, set); @@ -284,6 +302,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { } /// Decide which action to take next and on which thread. + /// + /// The currently implemented scheduling policy is the one that is commonly + /// used in stateless model checkers such as Loom: run the active thread as + /// long as we can and switch only when we have to (the active thread was + /// blocked, terminated, or was explicitly asked to be preempted). fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { if self.threads[self.active_thread].check_terminated() { // Check if we need to unblock any threads. @@ -295,14 +318,24 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { } return Ok(SchedulingAction::ExecuteDtors); } + if self.threads[MAIN_THREAD].state == ThreadState::Terminated { + // The main thread terminated; stop the program. + if self.threads.iter().any(|thread| thread.state != ThreadState::Terminated) { + // FIXME: This check should be either configurable or just emit a warning. + throw_unsup_format!("the main thread terminated without waiting for other threads"); + } + return Ok(SchedulingAction::Stop); + } if self.threads[self.active_thread].state == ThreadState::Enabled && !self.yield_active_thread { + // The currently active thread is still enabled, just continue with it. return Ok(SchedulingAction::ExecuteStep); } + // We need to pick a new thread for execution. for (id, thread) in self.threads.iter_enumerated() { if thread.state == ThreadState::Enabled { - if !(self.yield_active_thread && id == self.active_thread) { + if !self.yield_active_thread || id != self.active_thread { self.active_thread = id; break; } @@ -312,14 +345,16 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { if self.threads[self.active_thread].state == ThreadState::Enabled { return Ok(SchedulingAction::ExecuteStep); } + // We have not found a thread to execute. if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { - Ok(SchedulingAction::Stop) + unreachable!(); } else { throw_machine_stop!(TerminationInfo::Deadlock); } } } +// Public interface to thread management. impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {} pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> { /// A workaround for thread-local statics until @@ -331,8 +366,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx val: &mut mir::interpret::ConstValue<'tcx>, ) -> InterpResult<'tcx> { let this = self.eval_context_ref(); - match val { - mir::interpret::ConstValue::Scalar(Scalar::Ptr(ptr)) => { + match *val { + mir::interpret::ConstValue::Scalar(Scalar::Ptr(ref mut ptr)) => { let alloc_id = ptr.alloc_id; let alloc = this.tcx.alloc_map.lock().get(alloc_id); let tcx = this.tcx; @@ -407,68 +442,86 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx } } + #[inline] fn create_thread(&mut self) -> InterpResult<'tcx, ThreadId> { let this = self.eval_context_mut(); Ok(this.machine.threads.create_thread()) } + #[inline] fn detach_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> { let this = self.eval_context_mut(); this.machine.threads.detach_thread(thread_id); Ok(()) } + #[inline] fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - this.machine.threads.join_thread(joined_thread_id); - Ok(()) + this.machine.threads.join_thread(joined_thread_id) } + #[inline] fn set_active_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx, ThreadId> { let this = self.eval_context_mut(); Ok(this.machine.threads.set_active_thread_id(thread_id)) } + #[inline] fn get_active_thread(&self) -> InterpResult<'tcx, ThreadId> { let this = self.eval_context_ref(); Ok(this.machine.threads.get_active_thread_id()) } + #[inline] fn has_terminated(&self, thread_id: ThreadId) -> InterpResult<'tcx, bool> { let this = self.eval_context_ref(); Ok(this.machine.threads.has_terminated(thread_id)) } + #[inline] fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Tag, FrameData<'tcx>>] { let this = self.eval_context_ref(); this.machine.threads.active_thread_stack() } + #[inline] fn active_thread_stack_mut(&mut self) -> &mut Vec>> { let this = self.eval_context_mut(); this.machine.threads.active_thread_stack_mut() } + #[inline] fn set_active_thread_name(&mut self, new_thread_name: Vec) -> InterpResult<'tcx, ()> { let this = self.eval_context_mut(); Ok(this.machine.threads.set_thread_name(new_thread_name)) } + #[inline] + fn get_active_thread_name(&mut self) -> InterpResult<'tcx, Vec> { + let this = self.eval_context_mut(); + this.machine.threads.get_thread_name() + } + + #[inline] fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> { let this = self.eval_context_mut(); Ok(this.machine.threads.create_blockset()) } + #[inline] fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> { let this = self.eval_context_mut(); Ok(this.machine.threads.block_active_thread(set)) } - fn unblock_random_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option> { + #[inline] + fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option> { let this = self.eval_context_mut(); - Ok(this.machine.threads.unblock_random_thread(set)) + Ok(this.machine.threads.unblock_some_thread(set)) } + #[inline] fn yield_active_thread(&mut self) -> InterpResult<'tcx> { let this = self.eval_context_mut(); this.machine.threads.yield_active_thread(); @@ -476,6 +529,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx } /// Decide which action to take next and on which thread. + #[inline] fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { let this = self.eval_context_mut(); this.machine.threads.schedule()