diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 39577d863b8d..86cc014f714b 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -20,7 +20,8 @@ use core::prelude::*; use core::borrow; use core::comm; use core::task; -use core::unstable::sync::{Exclusive, exclusive}; +use core::unstable::sync::{Exclusive, exclusive, UnsafeAtomicRcBox}; +use core::unstable::atomics; use core::util; /**************************************************************************** @@ -37,6 +38,7 @@ type SignalEnd = comm::ChanOne<()>; struct Waitqueue { head: comm::Port, tail: comm::Chan } +#[doc(hidden)] fn new_waitqueue() -> Waitqueue { let (block_head, block_tail) = comm::stream(); Waitqueue { head: block_head, tail: block_tail } @@ -166,9 +168,12 @@ impl Sem<~[Waitqueue]> { // FIXME(#3588) should go inside of access() #[doc(hidden)] type SemRelease<'self> = SemReleaseGeneric<'self, ()>; +#[doc(hidden)] type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[Waitqueue]>; +#[doc(hidden)] struct SemReleaseGeneric<'self, Q> { sem: &'self Sem } +#[doc(hidden)] #[unsafe_destructor] impl<'self, Q:Owned> Drop for SemReleaseGeneric<'self, Q> { fn finalize(&self) { @@ -176,12 +181,14 @@ impl<'self, Q:Owned> Drop for SemReleaseGeneric<'self, Q> { } } +#[doc(hidden)] fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> { SemReleaseGeneric { sem: sem } } +#[doc(hidden)] fn SemAndSignalRelease<'r>(sem: &'r Sem<~[Waitqueue]>) -> SemAndSignalRelease<'r> { SemReleaseGeneric { @@ -465,8 +472,23 @@ impl Mutex { #[doc(hidden)] struct RWlockInner { + // You might ask, "Why don't you need to use an atomic for the mode flag?" + // This flag affects the behaviour of readers (for plain readers, they + // assert on it; for downgraders, they use it to decide which mode to + // unlock for). Consider that the flag is only unset when the very last + // reader exits; therefore, it can never be unset during a reader/reader + // (or reader/downgrader) race. + // By the way, if we didn't care about the assert in the read unlock path, + // we could instead store the mode flag in write_downgrade's stack frame, + // and have the downgrade tokens store a borrowed pointer to it. read_mode: bool, - read_count: uint + // The only way the count flag is ever accessed is with xadd. Since it is + // a read-modify-write operation, multiple xadds on different cores will + // always be consistent with respect to each other, so a monotonic/relaxed + // consistency ordering suffices (i.e., no extra barriers are needed). + // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use + // acquire/release orderings superfluously. Change these someday. + read_count: atomics::AtomicUint, } /** @@ -479,7 +501,7 @@ struct RWlockInner { pub struct RWlock { priv order_lock: Semaphore, priv access_lock: Sem<~[Waitqueue]>, - priv state: Exclusive + priv state: UnsafeAtomicRcBox, } /// Create a new rwlock, with one associated condvar. @@ -490,10 +512,13 @@ pub fn RWlock() -> RWlock { rwlock_with_condvars(1) } * Similar to mutex_with_condvars. */ pub fn rwlock_with_condvars(num_condvars: uint) -> RWlock { - RWlock { order_lock: semaphore(1), + let state = UnsafeAtomicRcBox::new(RWlockInner { + read_mode: false, + read_count: atomics::AtomicUint::new(0), + }); + RWlock { order_lock: semaphore(1), access_lock: new_sem_and_signal(1, num_condvars), - state: exclusive(RWlockInner { read_mode: false, - read_count: 0 }) } + state: state, } } impl RWlock { @@ -513,20 +538,11 @@ impl RWlock { unsafe { do task::unkillable { do (&self.order_lock).access { - let mut first_reader = false; - do self.state.with |state| { - first_reader = (state.read_count == 0); - state.read_count += 1; - } - if first_reader { + let state = &mut *self.state.get(); + let old_count = state.read_count.fetch_add(1, atomics::Acquire); + if old_count == 0 { (&self.access_lock).acquire(); - do self.state.with |state| { - // Must happen *after* getting access_lock. If - // this is set while readers are waiting, but - // while a writer holds the lock, the writer will - // be confused if they downgrade-then-unlock. - state.read_mode = true; - } + state.read_mode = true; } } release = Some(RWlockReleaseRead(self)); @@ -606,12 +622,12 @@ impl RWlock { * # Example * * ~~~ {.rust} - * do lock.write_downgrade |write_mode| { - * do (&write_mode).write_cond |condvar| { + * do lock.write_downgrade |write_token| { + * do (&write_token).write_cond |condvar| { * ... exclusive access ... * } - * let read_mode = lock.downgrade(write_mode); - * do (&read_mode).read { + * let read_token = lock.downgrade(write_token); + * do (&read_token).read { * ... shared access ... * } * } @@ -640,14 +656,15 @@ impl RWlock { } unsafe { do task::unkillable { - let mut first_reader = false; - do self.state.with |state| { - assert!(!state.read_mode); - state.read_mode = true; - first_reader = (state.read_count == 0); - state.read_count += 1; - } - if !first_reader { + let state = &mut *self.state.get(); + assert!(!state.read_mode); + state.read_mode = true; + // If a reader attempts to enter at this point, both the + // downgrader and reader will set the mode flag. This is fine. + let old_count = state.read_count.fetch_add(1, atomics::Release); + // If another reader was already blocking, we need to hand-off + // the "reader cloud" access lock to them. + if old_count != 0 { // Guaranteed not to let another writer in, because // another reader was holding the order_lock. Hence they // must be the one to get the access_lock (because all @@ -667,22 +684,22 @@ struct RWlockReleaseRead<'self> { lock: &'self RWlock, } +#[doc(hidden)] #[unsafe_destructor] impl<'self> Drop for RWlockReleaseRead<'self> { fn finalize(&self) { unsafe { do task::unkillable { - let mut last_reader = false; - do self.lock.state.with |state| { - assert!(state.read_mode); - assert!(state.read_count > 0); - state.read_count -= 1; - if state.read_count == 0 { - last_reader = true; - state.read_mode = false; - } - } - if last_reader { + let state = &mut *self.lock.state.get(); + assert!(state.read_mode); + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + if old_count == 1 { + state.read_mode = false; + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. (&self.lock.access_lock).release(); } } @@ -690,6 +707,7 @@ impl<'self> Drop for RWlockReleaseRead<'self> { } } +#[doc(hidden)] fn RWlockReleaseRead<'r>(lock: &'r RWlock) -> RWlockReleaseRead<'r> { RWlockReleaseRead { lock: lock @@ -703,30 +721,34 @@ struct RWlockReleaseDowngrade<'self> { lock: &'self RWlock, } +#[doc(hidden)] #[unsafe_destructor] impl<'self> Drop for RWlockReleaseDowngrade<'self> { fn finalize(&self) { unsafe { do task::unkillable { - let mut writer_or_last_reader = false; - do self.lock.state.with |state| { - if state.read_mode { - assert!(state.read_count > 0); - state.read_count -= 1; - if state.read_count == 0 { - // Case 1: Writer downgraded & was the last reader - writer_or_last_reader = true; - state.read_mode = false; - } else { - // Case 2: Writer downgraded & was not the last - // reader - } - } else { - // Case 3: Writer did not downgrade + let writer_or_last_reader; + // Check if we're releasing from read mode or from write mode. + let state = &mut *self.lock.state.get(); + if state.read_mode { + // Releasing from read mode. + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + // Check if other readers remain. + if old_count == 1 { + // Case 1: Writer downgraded & was the last reader writer_or_last_reader = true; + state.read_mode = false; + } else { + // Case 2: Writer downgraded & was not the last reader + writer_or_last_reader = false; } + } else { + // Case 3: Writer did not downgrade + writer_or_last_reader = true; } if writer_or_last_reader { + // Nobody left inside; release the "reader cloud" lock. (&self.lock.access_lock).release(); } } @@ -734,6 +756,7 @@ impl<'self> Drop for RWlockReleaseDowngrade<'self> { } } +#[doc(hidden)] fn RWlockReleaseDowngrade<'r>(lock: &'r RWlock) -> RWlockReleaseDowngrade<'r> { RWlockReleaseDowngrade {