From 7cf21e52ebd8ac2b3ce6d0aac8292bf55c39477e Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Mon, 13 Aug 2012 15:22:32 -0400 Subject: [PATCH] Implement rwlock.downgrade and tests --- src/libstd/sync.rs | 238 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 202 insertions(+), 36 deletions(-) diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs index 3ca0d43d9853..2650c1f734d5 100644 --- a/src/libstd/sync.rs +++ b/src/libstd/sync.rs @@ -142,9 +142,9 @@ struct sem_and_signal_release { } /// A mechanism for atomic-unlock-and-deschedule blocking and signalling. -struct condvar { priv sem: &sem; } +struct condvar { priv sem: &sem; drop { } } -impl condvar { +impl &condvar { /// Atomically drop the associated lock, and block until a signal is sent. fn wait() { // Create waiter nobe. @@ -212,8 +212,9 @@ impl condvar { } impl &sem { - fn access_cond(blk: fn(condvar) -> U) -> U { - do self.access { blk(condvar { sem: self }) } + // The only other place that condvars get built is rwlock_write_mode. + fn access_cond(blk: fn(c: &condvar) -> U) -> U { + do self.access { blk(&condvar { sem: self }) } } } @@ -272,7 +273,7 @@ impl &mutex { fn lock(blk: fn() -> U) -> U { (&self.sem).access(blk) } /// Run a function with ownership of the mutex and a handle to a condvar. - fn lock_cond(blk: fn(condvar) -> U) -> U { + fn lock_cond(blk: fn(c: &condvar) -> U) -> U { (&self.sem).access_cond(blk) } } @@ -321,12 +322,18 @@ impl &rwlock { do (&self.order_lock).access { let mut first_reader = false; do self.state.with |state| { - state.read_mode = true; first_reader = (state.read_count == 0); state.read_count += 1; } if first_reader { (&self.access_lock).acquire(); + do self.state.with |state| { + // Must happen *after* getting access_lock. If + // this is set while readers are waiting, but + // while a writer holds the lock, the writer will + // be confused if they downgrade-then-unlock. + state.read_mode = true; + } } } release = some(rwlock_release_read(self)); @@ -357,7 +364,7 @@ impl &rwlock { * the waiting task is signalled. (Note: a writer that waited and then * was signalled might reacquire the lock before other waiting writers.) */ - fn write_cond(blk: fn(condvar) -> U) -> U { + fn write_cond(blk: fn(c: &condvar) -> U) -> U { // NB: You might think I should thread the order_lock into the cond // wait call, so that it gets waited on before access_lock gets // reacquired upon being woken up. However, (a) this would be not @@ -374,7 +381,62 @@ impl &rwlock { } } - // to-do implement downgrade + /** + * As write(), but with the ability to atomically 'downgrade' the lock; + * i.e., to become a reader without letting other writers get the lock in + * the meantime (such as unlocking and then re-locking as a reader would + * do). The block takes a "write mode token" argument, which can be + * transformed into a "read mode token" by calling downgrade(). Example: + * + * do lock.write_downgrade |write_mode| { + * do (&write_mode).write_cond |condvar| { + * ... exclusive access ... + * } + * let read_mode = lock.downgrade(write_mode); + * do (&read_mode).read { + * ... shared access ... + * } + * } + */ + fn write_downgrade(blk: fn(+rwlock_write_mode) -> U) -> U { + // Implementation slightly different from the slicker 'write's above. + // The exit path is conditional on whether the caller downgrades. + let mut _release = none; + unsafe { + do task::unkillable { + (&self.order_lock).acquire(); + (&self.access_lock).acquire(); + (&self.order_lock).release(); + } + _release = some(rwlock_release_downgrade(self)); + } + blk(rwlock_write_mode { lock: self }) + } + + fn downgrade(+token: rwlock_write_mode) -> rwlock_read_mode { + if !ptr::ref_eq(self, token.lock) { + fail ~"Can't downgrade() with a different rwlock's write_mode!"; + } + unsafe { + do task::unkillable { + let mut first_reader = false; + do self.state.with |state| { + assert !state.read_mode; + state.read_mode = true; + first_reader = (state.read_count == 0); + state.read_count += 1; + } + if !first_reader { + // Guaranteed not to let another writer in, because + // another reader was holding the order_lock. Hence they + // must be the one to get the access_lock (because all + // access_locks are acquired with order_lock held). + (&self.access_lock).release(); + } + } + } + rwlock_read_mode { lock: token.lock } + } } // FIXME(#3136) should go inside of read() @@ -386,8 +448,12 @@ struct rwlock_release_read { let mut last_reader = false; do self.lock.state.with |state| { assert state.read_mode; + assert state.read_count > 0; state.read_count -= 1; - last_reader = (state.read_count == 0); + if state.read_count == 0 { + last_reader = true; + state.read_mode = false; + } } if last_reader { (&self.lock.access_lock).release(); @@ -396,6 +462,56 @@ struct rwlock_release_read { } } +// FIXME(#3136) should go inside of downgrade() +struct rwlock_release_downgrade { + lock: &rwlock; + new(lock: &rwlock) { self.lock = lock; } + drop unsafe { + do task::unkillable { + let mut writer_or_last_reader = false; + do self.lock.state.with |state| { + if state.read_mode { + assert state.read_count > 0; + state.read_count -= 1; + if state.read_count == 0 { + // Case 1: Writer downgraded & was the last reader + writer_or_last_reader = true; + state.read_mode = false; + } else { + // Case 2: Writer downgraded & was not the last reader + } + } else { + // Case 3: Writer did not downgrade + writer_or_last_reader = true; + } + } + if writer_or_last_reader { + (&self.lock.access_lock).release(); + } + } + } +} + +/// The "write permission" token used for rwlock.write_downgrade(). +// FIXME(#3145): make lock priv somehow +struct rwlock_write_mode { lock: &rwlock; drop { } } +/// The "read permission" token used for rwlock.write_downgrade(). +struct rwlock_read_mode { priv lock: &rwlock; drop { } } + +// FIXME(#3145) XXX Region invariance forbids "mode.write(blk)" +impl rwlock_write_mode { + /// Access the pre-downgrade rwlock in write mode. + fn write(blk: fn() -> U) -> U { blk() } + /// Access the pre-downgrade rwlock in write mode with a condvar. + fn write_cond(blk: fn(c: &condvar) -> U) -> U { + blk(&condvar { sem: &self.lock.access_lock }) + } +} +impl rwlock_read_mode { + /// Access the post-downgrade rwlock in read mode. + fn read(blk: fn() -> U) -> U { blk() } +} + /**************************************************************************** * Tests ****************************************************************************/ @@ -510,9 +626,11 @@ mod tests { let sharedstate = ~0; let ptr = ptr::addr_of(*sharedstate); do task::spawn { - let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) }; + let sharedstate: &mut int = + unsafe { unsafe::reinterpret_cast(ptr) }; access_shared(sharedstate, m2, 10); c.send(()); + } access_shared(sharedstate, m, 10); let _ = p.recv(); @@ -645,21 +763,27 @@ mod tests { // child task must have finished by the time try returns do m.lock_cond |cond| { let _woken = cond.signal(); - // FIXME(#3145) - The semantics of pipes are not quite what I want - // here - the pipe doesn't get 'terminated' if the child was - // punted awake during failure. - // assert !woken; + // FIXME(#3145) this doesn't work + //assert !woken; } } /************************************************************************ * Reader/writer lock tests ************************************************************************/ #[cfg(test)] - fn lock_rwlock_in_mode(x: &rwlock, reader: bool, blk: fn()) { - if reader { x.read(blk); } else { x.write(blk); } + enum rwlock_mode { read, write, downgrade, downgrade_read } + #[cfg(test)] + fn lock_rwlock_in_mode(x: &rwlock, mode: rwlock_mode, blk: fn()) { + match mode { + read => x.read(blk), + write => x.write(blk), + downgrade => do x.write_downgrade |mode| { mode.write(blk); }, + downgrade_read => + do x.write_downgrade |mode| { x.downgrade(mode).read(blk); }, + } } #[cfg(test)] - fn test_rwlock_exclusion(reader1: bool, reader2: bool) { + fn test_rwlock_exclusion(mode1: rwlock_mode, mode2: rwlock_mode) { // Test mutual exclusion between readers and writers. Just like the // mutex mutual exclusion test, a ways above. let (c,p) = pipes::stream(); @@ -668,19 +792,20 @@ mod tests { let sharedstate = ~0; let ptr = ptr::addr_of(*sharedstate); do task::spawn { - let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) }; - access_shared(sharedstate, x2, reader1, 10); + let sharedstate: &mut int = + unsafe { unsafe::reinterpret_cast(ptr) }; + access_shared(sharedstate, x2, mode1, 10); c.send(()); } - access_shared(sharedstate, x, reader2, 10); + access_shared(sharedstate, x, mode2, 10); let _ = p.recv(); assert *sharedstate == 20; - fn access_shared(sharedstate: &mut int, x: &rwlock, reader: bool, + fn access_shared(sharedstate: &mut int, x: &rwlock, mode: rwlock_mode, n: uint) { for n.times { - do lock_rwlock_in_mode(x, reader) { + do lock_rwlock_in_mode(x, mode) { let oldval = *sharedstate; task::yield(); *sharedstate = oldval + 1; @@ -690,32 +815,59 @@ mod tests { } #[test] fn test_rwlock_readers_wont_modify_the_data() { - test_rwlock_exclusion(true, false); - test_rwlock_exclusion(false, true); + test_rwlock_exclusion(read, write); + test_rwlock_exclusion(write, read); + test_rwlock_exclusion(read, downgrade); + test_rwlock_exclusion(downgrade, read); } #[test] fn test_rwlock_writers_and_writers() { - test_rwlock_exclusion(false, false); + test_rwlock_exclusion(write, write); + test_rwlock_exclusion(write, downgrade); + test_rwlock_exclusion(downgrade, write); + test_rwlock_exclusion(downgrade, downgrade); } - #[test] - fn test_rwlock_readers_and_readers() { + #[cfg(test)] + fn test_rwlock_handshake(mode1: rwlock_mode, mode2: rwlock_mode, + make_mode2_go_first: bool) { // Much like sem_multi_resource. let x = ~rwlock(); let x2 = ~x.clone(); let (c1,p1) = pipes::stream(); let (c2,p2) = pipes::stream(); do task::spawn { - do x2.read { + if !make_mode2_go_first { + let _ = p2.recv(); // parent sends to us once it locks, or ... + } + do lock_rwlock_in_mode(x2, mode2) { + if make_mode2_go_first { + c1.send(()); // ... we send to it once we lock + } let _ = p2.recv(); c1.send(()); } } - do x.read { + if make_mode2_go_first { + let _ = p1.recv(); // child sends to us once it locks, or ... + } + do lock_rwlock_in_mode(x, mode1) { + if !make_mode2_go_first { + c2.send(()); // ... we send to it once we lock + } c2.send(()); let _ = p1.recv(); } } #[test] + fn test_rwlock_readers_and_readers() { + test_rwlock_handshake(read, read, false); + // The downgrader needs to get in before the reader gets in, otherwise + // they cannot end up reading at the same time. + test_rwlock_handshake(downgrade_read, read, false); + test_rwlock_handshake(read, downgrade_read, true); + // Two downgrade_reads can never both end up reading at the same time. + } + #[test] fn test_rwlock_cond_wait() { // As test_mutex_cond_wait above. let x = ~rwlock(); @@ -751,26 +903,40 @@ mod tests { do x.read { } // Just for good measure } #[cfg(test)] #[ignore(cfg(windows))] - fn rwlock_kill_helper(reader1: bool, reader2: bool) { + fn rwlock_kill_helper(mode1: rwlock_mode, mode2: rwlock_mode) { // Mutex must get automatically unlocked if failed/killed within. let x = ~rwlock(); let x2 = ~x.clone(); let result: result::result<(),()> = do task::try { - do lock_rwlock_in_mode(x2, reader1) { + do lock_rwlock_in_mode(x2, mode1) { fail; } }; assert result.is_err(); // child task must have finished by the time try returns - do lock_rwlock_in_mode(x, reader2) { } + do lock_rwlock_in_mode(x, mode2) { } } #[test] #[ignore(cfg(windows))] - fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(true, false); } + fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(read, write); } #[test] #[ignore(cfg(windows))] - fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(false,true ); } + fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(write,read ); } #[test] #[ignore(cfg(windows))] - fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(true, true ); } + fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(read, read ); } #[test] #[ignore(cfg(windows))] - fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(false,false); } + fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(write,write); } + #[test] #[should_fail] #[ignore(cfg(windows))] + fn test_rwlock_downgrade_cant_swap() { + // Tests that you can't downgrade with a different rwlock's token. + let x = ~rwlock(); + let y = ~rwlock(); + do x.write_downgrade |xwrite| { + let mut xopt = some(xwrite); + do y.write_downgrade |_ywrite| { + do y.downgrade(option::swap_unwrap(&mut xopt)).read { + error!("oops, y.downgrade(x) should have failed!"); + } + } + } + } }