auto merge of #8234 : bblum/rust/assorted-fixes, r=brson
This fixes 4 bugs that prevented the extra::arc and extra::sync tests from passing on the new runtime. * In ```Add SendDeferred trait``` I add a non-rescheduling ```send_deferred``` method to our various channel types. The ```extra::sync``` concurrency primitives need this guarantee so they can send while inside of an exclusive. (This fixes deterministic deadlocks seen with ```RUST_THREADS=1```.) * In "Fix nasty double-free bug" I make sure that a ```ChanOne``` suppresses_finalize *before* rescheduling away to the receiver, so in case it gets a kill signal upon coming back, the destructor is inhibited as desired. (This is pretty uncommon on multiple CPUs but showed up always with ```RUST_THREADS=1```.) * In ```Fix embarrassing bug where 'unkillable' would unwind improperly``` I make sure the task's unkillable counter stays consistent when a kill signal is received right at the start of an unkillable section. (This is a very uncommon race and can only occur with multiple CPUs.) * In ```Don't fail from kill signals if already unwinding``` I do pretty much what it says on the tin. Surprising that it took the whole suite of sync/arc tests to expose this. The other two commits are cleanup. r @brson
This commit is contained in:
commit
3ddc72f69b
7 changed files with 328 additions and 242 deletions
|
|
@ -18,10 +18,13 @@
|
|||
|
||||
use std::borrow;
|
||||
use std::comm;
|
||||
use std::comm::SendDeferred;
|
||||
use std::task;
|
||||
use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
|
||||
use std::unstable::atomics;
|
||||
use std::unstable::finally::Finally;
|
||||
use std::util;
|
||||
use std::util::NonCopyable;
|
||||
|
||||
/****************************************************************************
|
||||
* Internals
|
||||
|
|
@ -49,7 +52,7 @@ impl WaitQueue {
|
|||
if self.head.peek() {
|
||||
// Pop and send a wakeup signal. If the waiter was killed, its port
|
||||
// will have closed. Keep trying until we get a live task.
|
||||
if comm::try_send_one(self.head.recv(), ()) {
|
||||
if self.head.recv().try_send_deferred(()) {
|
||||
true
|
||||
} else {
|
||||
self.signal()
|
||||
|
|
@ -62,7 +65,7 @@ impl WaitQueue {
|
|||
fn broadcast(&self) -> uint {
|
||||
let mut count = 0;
|
||||
while self.head.peek() {
|
||||
if comm::try_send_one(self.head.recv(), ()) {
|
||||
if self.head.recv().try_send_deferred(()) {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
|
|
@ -83,7 +86,6 @@ struct SemInner<Q> {
|
|||
#[doc(hidden)]
|
||||
struct Sem<Q>(Exclusive<SemInner<Q>>);
|
||||
|
||||
|
||||
#[doc(hidden)]
|
||||
impl<Q:Send> Sem<Q> {
|
||||
fn new(count: int, q: Q) -> Sem<Q> {
|
||||
|
|
@ -102,7 +104,7 @@ impl<Q:Send> Sem<Q> {
|
|||
// Tell outer scope we need to block.
|
||||
waiter_nobe = Some(WaitEnd);
|
||||
// Enqueue ourself.
|
||||
state.waiters.tail.send(SignalEnd);
|
||||
state.waiters.tail.send_deferred(SignalEnd);
|
||||
}
|
||||
}
|
||||
// Uncomment if you wish to test for sem races. Not valgrind-friendly.
|
||||
|
|
@ -124,17 +126,18 @@ impl<Q:Send> Sem<Q> {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// FIXME(#3154) move both copies of this into Sem<Q>, and unify the 2 structs
|
||||
#[doc(hidden)]
|
||||
impl Sem<()> {
|
||||
|
||||
pub fn access<U>(&self, blk: &fn() -> U) -> U {
|
||||
let mut release = None;
|
||||
do task::unkillable {
|
||||
self.acquire();
|
||||
release = Some(SemRelease(self));
|
||||
do (|| {
|
||||
self.acquire();
|
||||
unsafe {
|
||||
do task::rekillable { blk() }
|
||||
}
|
||||
}).finally {
|
||||
self.release();
|
||||
}
|
||||
}
|
||||
blk()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -148,46 +151,6 @@ impl Sem<~[WaitQueue]> {
|
|||
}
|
||||
Sem::new(count, queues)
|
||||
}
|
||||
|
||||
pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U {
|
||||
let mut release = None;
|
||||
do task::unkillable {
|
||||
self.acquire();
|
||||
release = Some(SemAndSignalRelease(self));
|
||||
}
|
||||
blk()
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#3588) should go inside of access()
|
||||
#[doc(hidden)]
|
||||
type SemRelease<'self> = SemReleaseGeneric<'self, ()>;
|
||||
#[doc(hidden)]
|
||||
type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[WaitQueue]>;
|
||||
#[doc(hidden)]
|
||||
struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> }
|
||||
|
||||
#[doc(hidden)]
|
||||
#[unsafe_destructor]
|
||||
impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> {
|
||||
fn drop(&self) {
|
||||
self.sem.release();
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> {
|
||||
SemReleaseGeneric {
|
||||
sem: sem
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn SemAndSignalRelease<'r>(sem: &'r Sem<~[WaitQueue]>)
|
||||
-> SemAndSignalRelease<'r> {
|
||||
SemReleaseGeneric {
|
||||
sem: sem
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#3598): Want to use an Option down below, but we need a custom enum
|
||||
|
|
@ -210,11 +173,10 @@ pub struct Condvar<'self> {
|
|||
// writer waking up from a cvar wait can't race with a reader to steal it,
|
||||
// See the comment in write_cond for more detail.
|
||||
priv order: ReacquireOrderLock<'self>,
|
||||
// Make sure condvars are non-copyable.
|
||||
priv token: util::NonCopyable,
|
||||
}
|
||||
|
||||
#[unsafe_destructor]
|
||||
impl<'self> Drop for Condvar<'self> { fn drop(&self) {} }
|
||||
|
||||
impl<'self> Condvar<'self> {
|
||||
/**
|
||||
* Atomically drop the associated lock, and block until a signal is sent.
|
||||
|
|
@ -242,11 +204,10 @@ impl<'self> Condvar<'self> {
|
|||
let (WaitEnd, SignalEnd) = comm::oneshot();
|
||||
let mut WaitEnd = Some(WaitEnd);
|
||||
let mut SignalEnd = Some(SignalEnd);
|
||||
let mut reacquire = None;
|
||||
let mut out_of_bounds = None;
|
||||
unsafe {
|
||||
do task::unkillable {
|
||||
// Release lock, 'atomically' enqueuing ourselves in so doing.
|
||||
do task::unkillable {
|
||||
// Release lock, 'atomically' enqueuing ourselves in so doing.
|
||||
unsafe {
|
||||
do (**self.sem).with |state| {
|
||||
if condvar_id < state.blocked.len() {
|
||||
// Drop the lock.
|
||||
|
|
@ -256,42 +217,30 @@ impl<'self> Condvar<'self> {
|
|||
}
|
||||
// Enqueue ourself to be woken up by a signaller.
|
||||
let SignalEnd = SignalEnd.take_unwrap();
|
||||
state.blocked[condvar_id].tail.send(SignalEnd);
|
||||
state.blocked[condvar_id].tail.send_deferred(SignalEnd);
|
||||
} else {
|
||||
out_of_bounds = Some(state.blocked.len());
|
||||
}
|
||||
}
|
||||
|
||||
// If yield checks start getting inserted anywhere, we can be
|
||||
// killed before or after enqueueing. Deciding whether to
|
||||
// unkillably reacquire the lock needs to happen atomically
|
||||
// wrt enqueuing.
|
||||
if out_of_bounds.is_none() {
|
||||
reacquire = Some(CondvarReacquire { sem: self.sem,
|
||||
order: self.order });
|
||||
}
|
||||
}
|
||||
}
|
||||
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
|
||||
// Unconditionally "block". (Might not actually block if a
|
||||
// signaller already sent -- I mean 'unconditionally' in contrast
|
||||
// with acquire().)
|
||||
let _ = comm::recv_one(WaitEnd.take_unwrap());
|
||||
}
|
||||
|
||||
// This is needed for a failing condition variable to reacquire the
|
||||
// mutex during unwinding. As long as the wrapper (mutex, etc) is
|
||||
// bounded in when it gets released, this shouldn't hang forever.
|
||||
struct CondvarReacquire<'self> {
|
||||
sem: &'self Sem<~[WaitQueue]>,
|
||||
order: ReacquireOrderLock<'self>,
|
||||
}
|
||||
|
||||
#[unsafe_destructor]
|
||||
impl<'self> Drop for CondvarReacquire<'self> {
|
||||
fn drop(&self) {
|
||||
// Needs to succeed, instead of itself dying.
|
||||
do task::unkillable {
|
||||
// If yield checks start getting inserted anywhere, we can be
|
||||
// killed before or after enqueueing. Deciding whether to
|
||||
// unkillably reacquire the lock needs to happen atomically
|
||||
// wrt enqueuing.
|
||||
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
|
||||
// Unconditionally "block". (Might not actually block if a
|
||||
// signaller already sent -- I mean 'unconditionally' in contrast
|
||||
// with acquire().)
|
||||
do (|| {
|
||||
unsafe {
|
||||
do task::rekillable {
|
||||
let _ = comm::recv_one(WaitEnd.take_unwrap());
|
||||
}
|
||||
}
|
||||
}).finally {
|
||||
// Reacquire the condvar. Note this is back in the unkillable
|
||||
// section; it needs to succeed, instead of itself dying.
|
||||
match self.order {
|
||||
Just(lock) => do lock.access {
|
||||
self.sem.acquire();
|
||||
|
|
@ -373,8 +322,8 @@ impl Sem<~[WaitQueue]> {
|
|||
// The only other places that condvars get built are rwlock.write_cond()
|
||||
// and rwlock_write_mode.
|
||||
pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
|
||||
do self.access_waitqueue {
|
||||
blk(&Condvar { sem: self, order: Nothing })
|
||||
do self.access {
|
||||
blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -452,7 +401,7 @@ impl Mutex {
|
|||
|
||||
/// Run a function with ownership of the mutex.
|
||||
pub fn lock<U>(&self, blk: &fn() -> U) -> U {
|
||||
(&self.sem).access_waitqueue(blk)
|
||||
(&self.sem).access(blk)
|
||||
}
|
||||
|
||||
/// Run a function with ownership of the mutex and a handle to a condvar.
|
||||
|
|
@ -531,7 +480,6 @@ impl RWLock {
|
|||
* tasks may run concurrently with this one.
|
||||
*/
|
||||
pub fn read<U>(&self, blk: &fn() -> U) -> U {
|
||||
let mut release = None;
|
||||
unsafe {
|
||||
do task::unkillable {
|
||||
do (&self.order_lock).access {
|
||||
|
|
@ -542,10 +490,24 @@ impl RWLock {
|
|||
state.read_mode = true;
|
||||
}
|
||||
}
|
||||
release = Some(RWLockReleaseRead(self));
|
||||
do (|| {
|
||||
do task::rekillable { blk() }
|
||||
}).finally {
|
||||
let state = &mut *self.state.get();
|
||||
assert!(state.read_mode);
|
||||
let old_count = state.read_count.fetch_sub(1, atomics::Release);
|
||||
assert!(old_count > 0);
|
||||
if old_count == 1 {
|
||||
state.read_mode = false;
|
||||
// Note: this release used to be outside of a locked access
|
||||
// to exclusive-protected state. If this code is ever
|
||||
// converted back to such (instead of using atomic ops),
|
||||
// this access MUST NOT go inside the exclusive access.
|
||||
(&self.access_lock).release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
blk()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -556,7 +518,7 @@ impl RWLock {
|
|||
unsafe {
|
||||
do task::unkillable {
|
||||
(&self.order_lock).acquire();
|
||||
do (&self.access_lock).access_waitqueue {
|
||||
do (&self.access_lock).access {
|
||||
(&self.order_lock).release();
|
||||
do task::rekillable {
|
||||
blk()
|
||||
|
|
@ -606,7 +568,8 @@ impl RWLock {
|
|||
(&self.order_lock).release();
|
||||
do task::rekillable {
|
||||
let opt_lock = Just(&self.order_lock);
|
||||
blk(&Condvar { order: opt_lock, ..*cond })
|
||||
blk(&Condvar { sem: cond.sem, order: opt_lock,
|
||||
token: NonCopyable::new() })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -637,14 +600,43 @@ impl RWLock {
|
|||
pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
|
||||
// Implementation slightly different from the slicker 'write's above.
|
||||
// The exit path is conditional on whether the caller downgrades.
|
||||
let mut _release = None;
|
||||
do task::unkillable {
|
||||
(&self.order_lock).acquire();
|
||||
(&self.access_lock).acquire();
|
||||
(&self.order_lock).release();
|
||||
do (|| {
|
||||
unsafe {
|
||||
do task::rekillable {
|
||||
blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
|
||||
}
|
||||
}
|
||||
}).finally {
|
||||
let writer_or_last_reader;
|
||||
// Check if we're releasing from read mode or from write mode.
|
||||
let state = unsafe { &mut *self.state.get() };
|
||||
if state.read_mode {
|
||||
// Releasing from read mode.
|
||||
let old_count = state.read_count.fetch_sub(1, atomics::Release);
|
||||
assert!(old_count > 0);
|
||||
// Check if other readers remain.
|
||||
if old_count == 1 {
|
||||
// Case 1: Writer downgraded & was the last reader
|
||||
writer_or_last_reader = true;
|
||||
state.read_mode = false;
|
||||
} else {
|
||||
// Case 2: Writer downgraded & was not the last reader
|
||||
writer_or_last_reader = false;
|
||||
}
|
||||
} else {
|
||||
// Case 3: Writer did not downgrade
|
||||
writer_or_last_reader = true;
|
||||
}
|
||||
if writer_or_last_reader {
|
||||
// Nobody left inside; release the "reader cloud" lock.
|
||||
(&self.access_lock).release();
|
||||
}
|
||||
}
|
||||
}
|
||||
_release = Some(RWLockReleaseDowngrade(self));
|
||||
blk(RWLockWriteMode { lock: self })
|
||||
}
|
||||
|
||||
/// To be called inside of the write_downgrade block.
|
||||
|
|
@ -673,105 +665,16 @@ impl RWLock {
|
|||
}
|
||||
}
|
||||
}
|
||||
RWLockReadMode { lock: token.lock }
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#3588) should go inside of read()
|
||||
#[doc(hidden)]
|
||||
struct RWLockReleaseRead<'self> {
|
||||
lock: &'self RWLock,
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[unsafe_destructor]
|
||||
impl<'self> Drop for RWLockReleaseRead<'self> {
|
||||
fn drop(&self) {
|
||||
unsafe {
|
||||
do task::unkillable {
|
||||
let state = &mut *self.lock.state.get();
|
||||
assert!(state.read_mode);
|
||||
let old_count = state.read_count.fetch_sub(1, atomics::Release);
|
||||
assert!(old_count > 0);
|
||||
if old_count == 1 {
|
||||
state.read_mode = false;
|
||||
// Note: this release used to be outside of a locked access
|
||||
// to exclusive-protected state. If this code is ever
|
||||
// converted back to such (instead of using atomic ops),
|
||||
// this access MUST NOT go inside the exclusive access.
|
||||
(&self.lock.access_lock).release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn RWLockReleaseRead<'r>(lock: &'r RWLock) -> RWLockReleaseRead<'r> {
|
||||
RWLockReleaseRead {
|
||||
lock: lock
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#3588) should go inside of downgrade()
|
||||
#[doc(hidden)]
|
||||
#[unsafe_destructor]
|
||||
struct RWLockReleaseDowngrade<'self> {
|
||||
lock: &'self RWLock,
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[unsafe_destructor]
|
||||
impl<'self> Drop for RWLockReleaseDowngrade<'self> {
|
||||
fn drop(&self) {
|
||||
unsafe {
|
||||
do task::unkillable {
|
||||
let writer_or_last_reader;
|
||||
// Check if we're releasing from read mode or from write mode.
|
||||
let state = &mut *self.lock.state.get();
|
||||
if state.read_mode {
|
||||
// Releasing from read mode.
|
||||
let old_count = state.read_count.fetch_sub(1, atomics::Release);
|
||||
assert!(old_count > 0);
|
||||
// Check if other readers remain.
|
||||
if old_count == 1 {
|
||||
// Case 1: Writer downgraded & was the last reader
|
||||
writer_or_last_reader = true;
|
||||
state.read_mode = false;
|
||||
} else {
|
||||
// Case 2: Writer downgraded & was not the last reader
|
||||
writer_or_last_reader = false;
|
||||
}
|
||||
} else {
|
||||
// Case 3: Writer did not downgrade
|
||||
writer_or_last_reader = true;
|
||||
}
|
||||
if writer_or_last_reader {
|
||||
// Nobody left inside; release the "reader cloud" lock.
|
||||
(&self.lock.access_lock).release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn RWLockReleaseDowngrade<'r>(lock: &'r RWLock)
|
||||
-> RWLockReleaseDowngrade<'r> {
|
||||
RWLockReleaseDowngrade {
|
||||
lock: lock
|
||||
RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
|
||||
}
|
||||
}
|
||||
|
||||
/// The "write permission" token used for rwlock.write_downgrade().
|
||||
pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock }
|
||||
#[unsafe_destructor]
|
||||
impl<'self> Drop for RWLockWriteMode<'self> { fn drop(&self) {} }
|
||||
pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
|
||||
|
||||
/// The "read permission" token used for rwlock.write_downgrade().
|
||||
pub struct RWLockReadMode<'self> { priv lock: &'self RWLock }
|
||||
#[unsafe_destructor]
|
||||
impl<'self> Drop for RWLockReadMode<'self> { fn drop(&self) {} }
|
||||
pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
|
||||
priv token: NonCopyable }
|
||||
|
||||
impl<'self> RWLockWriteMode<'self> {
|
||||
/// Access the pre-downgrade rwlock in write mode.
|
||||
|
|
@ -781,7 +684,8 @@ impl<'self> RWLockWriteMode<'self> {
|
|||
// Need to make the condvar use the order lock when reacquiring the
|
||||
// access lock. See comment in RWLock::write_cond for why.
|
||||
blk(&Condvar { sem: &self.lock.access_lock,
|
||||
order: Just(&self.lock.order_lock), })
|
||||
order: Just(&self.lock.order_lock),
|
||||
token: NonCopyable::new() })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1059,6 +963,8 @@ mod tests {
|
|||
}
|
||||
#[test] #[ignore(cfg(windows))]
|
||||
fn test_mutex_killed_broadcast() {
|
||||
use std::unstable::finally::Finally;
|
||||
|
||||
let m = ~Mutex::new();
|
||||
let m2 = ~m.clone();
|
||||
let (p,c) = comm::stream();
|
||||
|
|
@ -1075,8 +981,13 @@ mod tests {
|
|||
do mi.lock_cond |cond| {
|
||||
let c = c.take();
|
||||
c.send(()); // tell sibling to go ahead
|
||||
let _z = SendOnFailure(c);
|
||||
cond.wait(); // block forever
|
||||
do (|| {
|
||||
cond.wait(); // block forever
|
||||
}).finally {
|
||||
error!("task unwinding and sending");
|
||||
c.send(());
|
||||
error!("task unwinding and done sending");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1095,21 +1006,6 @@ mod tests {
|
|||
let woken = cond.broadcast();
|
||||
assert_eq!(woken, 0);
|
||||
}
|
||||
struct SendOnFailure {
|
||||
c: comm::Chan<()>,
|
||||
}
|
||||
|
||||
impl Drop for SendOnFailure {
|
||||
fn drop(&self) {
|
||||
self.c.send(());
|
||||
}
|
||||
}
|
||||
|
||||
fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure {
|
||||
SendOnFailure {
|
||||
c: c
|
||||
}
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_mutex_cond_signal_on_0() {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ use either::{Either, Left, Right};
|
|||
use kinds::Send;
|
||||
use option::{Option, Some};
|
||||
use unstable::sync::Exclusive;
|
||||
pub use rt::comm::SendDeferred;
|
||||
use rtcomm = rt::comm;
|
||||
use rt;
|
||||
|
||||
|
|
@ -105,6 +106,21 @@ impl<T: Send> GenericSmartChan<T> for Chan<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for Chan<T> {
|
||||
fn send_deferred(&self, x: T) {
|
||||
match self.inner {
|
||||
Left(ref chan) => chan.send(x),
|
||||
Right(ref chan) => chan.send_deferred(x)
|
||||
}
|
||||
}
|
||||
fn try_send_deferred(&self, x: T) -> bool {
|
||||
match self.inner {
|
||||
Left(ref chan) => chan.try_send(x),
|
||||
Right(ref chan) => chan.try_send_deferred(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericPort<T> for Port<T> {
|
||||
fn recv(&self) -> T {
|
||||
match self.inner {
|
||||
|
|
@ -250,6 +266,20 @@ impl<T: Send> ChanOne<T> {
|
|||
Right(p) => p.try_send(data)
|
||||
}
|
||||
}
|
||||
pub fn send_deferred(self, data: T) {
|
||||
let ChanOne { inner } = self;
|
||||
match inner {
|
||||
Left(p) => p.send(data),
|
||||
Right(p) => p.send_deferred(data)
|
||||
}
|
||||
}
|
||||
pub fn try_send_deferred(self, data: T) -> bool {
|
||||
let ChanOne { inner } = self;
|
||||
match inner {
|
||||
Left(p) => p.try_send(data),
|
||||
Right(p) => p.try_send_deferred(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
|
|||
use cell::Cell;
|
||||
use clone::Clone;
|
||||
use rt::{context, SchedulerContext};
|
||||
use tuple::ImmutableTuple;
|
||||
|
||||
/// A combined refcount / BlockedTask-as-uint pointer.
|
||||
///
|
||||
|
|
@ -86,12 +87,32 @@ impl<T> ChanOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Send a message on the one-shot channel. If a receiver task is blocked
|
||||
/// waiting for the message, will wake it up and reschedule to it.
|
||||
pub fn send(self, val: T) {
|
||||
self.try_send(val);
|
||||
}
|
||||
|
||||
/// As `send`, but also returns whether or not the receiver endpoint is still open.
|
||||
pub fn try_send(self, val: T) -> bool {
|
||||
self.try_send_inner(val, true)
|
||||
}
|
||||
|
||||
/// Send a message without immediately rescheduling to a blocked receiver.
|
||||
/// This can be useful in contexts where rescheduling is forbidden, or to
|
||||
/// optimize for when the sender expects to still have useful work to do.
|
||||
pub fn send_deferred(self, val: T) {
|
||||
self.try_send_deferred(val);
|
||||
}
|
||||
|
||||
/// As `send_deferred` and `try_send` together.
|
||||
pub fn try_send_deferred(self, val: T) -> bool {
|
||||
self.try_send_inner(val, false)
|
||||
}
|
||||
|
||||
// 'do_resched' configures whether the scheduler immediately switches to
|
||||
// the receiving task, or leaves the sending task still running.
|
||||
fn try_send_inner(self, val: T, do_resched: bool) -> bool {
|
||||
rtassert!(context() != SchedulerContext);
|
||||
|
||||
let mut this = self;
|
||||
|
|
@ -110,6 +131,13 @@ impl<T> ChanOne<T> {
|
|||
// acquire barrier that keeps the subsequent access of the
|
||||
// ~Task pointer from being reordered.
|
||||
let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
|
||||
|
||||
// Suppress the synchronizing actions in the finalizer. We're
|
||||
// done with the packet. NB: In case of do_resched, this *must*
|
||||
// happen before waking up a blocked task (or be unkillable),
|
||||
// because we might get a kill signal during the reschedule.
|
||||
this.suppress_finalize = true;
|
||||
|
||||
match oldstate {
|
||||
STATE_BOTH => {
|
||||
// Port is not waiting yet. Nothing to do
|
||||
|
|
@ -130,15 +158,20 @@ impl<T> ChanOne<T> {
|
|||
task_as_state => {
|
||||
// Port is blocked. Wake it up.
|
||||
let recvr = BlockedTask::cast_from_uint(task_as_state);
|
||||
do recvr.wake().map_consume |woken_task| {
|
||||
Scheduler::run_task(woken_task);
|
||||
};
|
||||
if do_resched {
|
||||
do recvr.wake().map_consume |woken_task| {
|
||||
Scheduler::run_task(woken_task);
|
||||
};
|
||||
} else {
|
||||
let recvr = Cell::new(recvr);
|
||||
do Local::borrow::<Scheduler, ()> |sched| {
|
||||
sched.enqueue_blocked_task(recvr.take());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
|
||||
this.suppress_finalize = true;
|
||||
return recvr_active;
|
||||
}
|
||||
}
|
||||
|
|
@ -152,6 +185,7 @@ impl<T> PortOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Wait for a message on the one-shot port. Fails if the send end is closed.
|
||||
pub fn recv(self) -> T {
|
||||
match self.try_recv() {
|
||||
Some(val) => val,
|
||||
|
|
@ -161,6 +195,7 @@ impl<T> PortOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// As `recv`, but returns `None` if the send end is closed rather than failing.
|
||||
pub fn try_recv(self) -> Option<T> {
|
||||
let mut this = self;
|
||||
|
||||
|
|
@ -382,6 +417,12 @@ impl<T> Drop for PortOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
|
||||
pub trait SendDeferred<T> {
|
||||
fn send_deferred(&self, val: T);
|
||||
fn try_send_deferred(&self, val: T) -> bool;
|
||||
}
|
||||
|
||||
struct StreamPayload<T> {
|
||||
val: T,
|
||||
next: PortOne<StreamPayload<T>>
|
||||
|
|
@ -409,6 +450,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
|
|||
return (port, chan);
|
||||
}
|
||||
|
||||
impl<T: Send> Chan<T> {
|
||||
fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = self.next.take();
|
||||
self.next.put_back(next_cone);
|
||||
cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericChan<T> for Chan<T> {
|
||||
fn send(&self, val: T) {
|
||||
self.try_send(val);
|
||||
|
|
@ -417,10 +467,16 @@ impl<T: Send> GenericChan<T> for Chan<T> {
|
|||
|
||||
impl<T: Send> GenericSmartChan<T> for Chan<T> {
|
||||
fn try_send(&self, val: T) -> bool {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = self.next.take();
|
||||
self.next.put_back(next_cone);
|
||||
cone.try_send(StreamPayload { val: val, next: next_pone })
|
||||
self.try_send_inner(val, true)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for Chan<T> {
|
||||
fn send_deferred(&self, val: T) {
|
||||
self.try_send_deferred(val);
|
||||
}
|
||||
fn try_send_deferred(&self, val: T) -> bool {
|
||||
self.try_send_inner(val, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -495,6 +551,17 @@ impl<T> SharedChan<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SharedChan<T> {
|
||||
fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
|
||||
unsafe {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
|
||||
cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
|
||||
do_resched)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericChan<T> for SharedChan<T> {
|
||||
fn send(&self, val: T) {
|
||||
self.try_send(val);
|
||||
|
|
@ -503,11 +570,16 @@ impl<T: Send> GenericChan<T> for SharedChan<T> {
|
|||
|
||||
impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
|
||||
fn try_send(&self, val: T) -> bool {
|
||||
unsafe {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
|
||||
cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
|
||||
}
|
||||
self.try_send_inner(val, true)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for SharedChan<T> {
|
||||
fn send_deferred(&self, val: T) {
|
||||
self.try_send_deferred(val);
|
||||
}
|
||||
fn try_send_deferred(&self, val: T) -> bool {
|
||||
self.try_send_inner(val, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -584,31 +656,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> {
|
|||
|
||||
impl<T: Send> GenericChan<T> for MegaPipe<T> {
|
||||
fn send(&self, val: T) {
|
||||
match *self {
|
||||
(_, ref c) => c.send(val)
|
||||
}
|
||||
self.second_ref().send(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
|
||||
fn try_send(&self, val: T) -> bool {
|
||||
match *self {
|
||||
(_, ref c) => c.try_send(val)
|
||||
}
|
||||
self.second_ref().try_send(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericPort<T> for MegaPipe<T> {
|
||||
fn recv(&self) -> T {
|
||||
match *self {
|
||||
(ref p, _) => p.recv()
|
||||
}
|
||||
self.first_ref().recv()
|
||||
}
|
||||
|
||||
fn try_recv(&self) -> Option<T> {
|
||||
match *self {
|
||||
(ref p, _) => p.try_recv()
|
||||
}
|
||||
self.first_ref().try_recv()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for MegaPipe<T> {
|
||||
fn send_deferred(&self, val: T) {
|
||||
self.second_ref().send_deferred(val)
|
||||
}
|
||||
fn try_send_deferred(&self, val: T) -> bool {
|
||||
self.second_ref().try_send_deferred(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1017,4 +1090,39 @@ mod test {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_deferred() {
|
||||
use unstable::sync::atomically;
|
||||
|
||||
// Tests no-rescheduling of send_deferred on all types of channels.
|
||||
do run_in_newsched_task {
|
||||
let (pone, cone) = oneshot();
|
||||
let (pstream, cstream) = stream();
|
||||
let (pshared, cshared) = stream();
|
||||
let cshared = SharedChan::new(cshared);
|
||||
let mp = megapipe();
|
||||
|
||||
let pone = Cell::new(pone);
|
||||
do spawntask { pone.take().recv(); }
|
||||
let pstream = Cell::new(pstream);
|
||||
do spawntask { pstream.take().recv(); }
|
||||
let pshared = Cell::new(pshared);
|
||||
do spawntask { pshared.take().recv(); }
|
||||
let p_mp = Cell::new(mp.clone());
|
||||
do spawntask { p_mp.take().recv(); }
|
||||
|
||||
let cs = Cell::new((cone, cstream, cshared, mp));
|
||||
unsafe {
|
||||
do atomically {
|
||||
let (cone, cstream, cshared, mp) = cs.take();
|
||||
cone.send_deferred(());
|
||||
cstream.send_deferred(());
|
||||
cshared.send_deferred(());
|
||||
mp.send_deferred(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -530,13 +530,13 @@ impl Death {
|
|||
|
||||
/// Fails if a kill signal was received.
|
||||
#[inline]
|
||||
pub fn check_killed(&self) {
|
||||
pub fn check_killed(&self, already_failing: bool) {
|
||||
match self.kill_handle {
|
||||
Some(ref kill_handle) =>
|
||||
// The task may be both unkillable and killed if it does some
|
||||
// synchronization during unwinding or cleanup (for example,
|
||||
// sending on a notify port). In that case failing won't help.
|
||||
if self.unkillable == 0 && kill_handle.killed() {
|
||||
if self.unkillable == 0 && (!already_failing) && kill_handle.killed() {
|
||||
fail!(KILLED_MSG);
|
||||
},
|
||||
// This may happen during task death (see comments in collect_failure).
|
||||
|
|
@ -548,11 +548,12 @@ impl Death {
|
|||
/// All calls must be paired with a subsequent call to allow_kill.
|
||||
#[inline]
|
||||
pub fn inhibit_kill(&mut self, already_failing: bool) {
|
||||
if self.unkillable == 0 {
|
||||
self.unkillable += 1;
|
||||
// May fail, hence must happen *after* incrementing the counter
|
||||
if self.unkillable == 1 {
|
||||
rtassert!(self.kill_handle.is_some());
|
||||
self.kill_handle.get_mut_ref().inhibit_kill(already_failing);
|
||||
}
|
||||
self.unkillable += 1;
|
||||
}
|
||||
|
||||
/// Exit a possibly-nested unkillable section of code.
|
||||
|
|
|
|||
|
|
@ -540,6 +540,10 @@ impl Scheduler {
|
|||
// The current task is grabbed from TLS, not taken as an input.
|
||||
let current_task: ~Task = Local::take::<Task>();
|
||||
|
||||
// Check that the task is not in an atomically() section (e.g.,
|
||||
// holding a pthread mutex, which could deadlock the scheduler).
|
||||
current_task.death.assert_may_sleep();
|
||||
|
||||
// These transmutes do something fishy with a closure.
|
||||
let f_fake_region = unsafe {
|
||||
transmute::<&fn(&mut Scheduler, ~Task),
|
||||
|
|
@ -600,7 +604,7 @@ impl Scheduler {
|
|||
|
||||
// Must happen after running the cleanup job (of course).
|
||||
let task = Local::unsafe_borrow::<Task>();
|
||||
(*task).death.check_killed();
|
||||
(*task).death.check_killed((*task).unwinder.unwinding);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -655,6 +655,47 @@ pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
|
|||
}
|
||||
}
|
||||
|
||||
#[test] #[ignore(cfg(windows))]
|
||||
fn test_kill_unkillable_task() {
|
||||
use rt::test::*;
|
||||
|
||||
// Attempt to test that when a kill signal is received at the start of an
|
||||
// unkillable section, 'unkillable' unwinds correctly. This is actually
|
||||
// quite a difficult race to expose, as the kill has to happen on a second
|
||||
// CPU, *after* the spawner is already switched-back-to (and passes the
|
||||
// killed check at the start of its timeslice). As far as I know, it's not
|
||||
// possible to make this race deterministic, or even more likely to happen.
|
||||
do run_in_newsched_task {
|
||||
do task::try {
|
||||
do task::spawn {
|
||||
fail!();
|
||||
}
|
||||
do task::unkillable { }
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[test] #[ignore(cfg(windows))]
|
||||
fn test_kill_rekillable_task() {
|
||||
use rt::test::*;
|
||||
|
||||
// Tests that when a kill signal is received, 'rekillable' and
|
||||
// 'unkillable' unwind correctly in conjunction with each other.
|
||||
do run_in_newsched_task {
|
||||
do task::try {
|
||||
do task::unkillable {
|
||||
unsafe {
|
||||
do task::rekillable {
|
||||
do task::spawn {
|
||||
fail!();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[test] #[should_fail] #[ignore(cfg(windows))]
|
||||
fn test_cant_dup_task_builder() {
|
||||
let mut builder = task();
|
||||
|
|
|
|||
|
|
@ -79,6 +79,12 @@ pub fn replace<T>(dest: &mut T, mut src: T) -> T {
|
|||
#[unsafe_no_drop_flag]
|
||||
pub struct NonCopyable;
|
||||
|
||||
impl NonCopyable {
|
||||
// FIXME(#8233) should not be necessary
|
||||
/// Create a new noncopyable token.
|
||||
pub fn new() -> NonCopyable { NonCopyable }
|
||||
}
|
||||
|
||||
impl Drop for NonCopyable {
|
||||
fn drop(&self) { }
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue