From 758dd786f65a86a160c76889ebd3d5fc1b206445 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Thu, 9 Aug 2012 22:07:37 -0400 Subject: [PATCH] Make rwlocks fail-proof --- src/libcore/sync.rs | 158 ++++++++++++++++++++++++++++++-------------- 1 file changed, 107 insertions(+), 51 deletions(-) diff --git a/src/libcore/sync.rs b/src/libcore/sync.rs index d0092df4f16a..3d5699890bd8 100644 --- a/src/libcore/sync.rs +++ b/src/libcore/sync.rs @@ -148,19 +148,6 @@ enum condvar = &sem; impl condvar { /// Atomically drop the associated lock, and block until a signal is sent. fn wait() { - // This is needed for a failing condition variable to reacquire the - // mutex during unwinding. As long as the wrapper (mutex, etc) is - // bounded in when it gets released, this shouldn't hang forever. - struct sem_and_signal_reacquire { - sem: &sem; - new(sem: &sem) { self.sem = sem; } - drop unsafe { - do task::unkillable { - self.sem.acquire(); - } - } - } - // Create waiter nobe. let (signal_end, wait_end) = pipes::stream(); let mut signal_end = some(signal_end); @@ -190,9 +177,20 @@ impl condvar { // Unconditionally "block". (Might not actually block if a signaller // did send -- I mean 'unconditionally' in contrast with acquire().) let _ = wait_end.recv(); - // 'reacquire' will pick up the lock again in its destructor - it must - // happen whether or not we are killed, and it needs to succeed at - // reacquiring instead of itself dying. + + // This is needed for a failing condition variable to reacquire the + // mutex during unwinding. As long as the wrapper (mutex, etc) is + // bounded in when it gets released, this shouldn't hang forever. + struct sem_and_signal_reacquire { + sem: &sem; + new(sem: &sem) { self.sem = sem; } + drop unsafe { + // Needs to succeed, instead of itself dying. + do task::unkillable { + self.sem.acquire(); + } + } + } } /// Wake up a blocked task. Returns false if there was no blocked task. @@ -297,6 +295,7 @@ struct rwlock { state: arc::exclusive; } +/// Create a new rwlock. fn rwlock() -> rwlock { rwlock { order_lock: new_semaphore(1), access_lock: new_sem_and_signal(1), state: arc::exclusive(rwlock_inner { read_mode: false, @@ -304,7 +303,7 @@ fn rwlock() -> rwlock { } impl &rwlock { - // Create a new handle to the rwlock. + /// Create a new handle to the rwlock. fn clone() -> rwlock { rwlock { order_lock: (&(self.order_lock)).clone(), access_lock: sem((*self.access_lock).clone()), @@ -316,28 +315,23 @@ impl &rwlock { * tasks may run concurrently with this one. */ fn read(blk: fn() -> U) -> U { - do (&self.order_lock).access { - let mut first_reader = false; - do self.state.with |state| { - state.read_mode = true; - first_reader = (state.read_count == 0); - state.read_count += 1; - } - if first_reader { - (&self.access_lock).acquire(); + unsafe { + do task::unkillable { + do (&self.order_lock).access { + let mut first_reader = false; + do self.state.with |state| { + state.read_mode = true; + first_reader = (state.read_count == 0); + state.read_count += 1; + } + if first_reader { + (&self.access_lock).acquire(); + } + } } } - let result = blk(); - let mut last_reader = false; - do self.state.with |state| { - assert state.read_mode; - state.read_count -= 1; - last_reader = (state.read_count == 0); - } - if last_reader { - (&self.access_lock).release(); - } - result + let _z = rwlock_release_read(self); + blk() } /** @@ -345,11 +339,15 @@ impl &rwlock { * 'write' from other tasks will run concurrently with this one. */ fn write(blk: fn() -> U) -> U { - (&self.order_lock).acquire(); - do (&self.access_lock).access { - (&self.order_lock).release(); - blk() + unsafe { + do task::unkillable { + (&self.order_lock).acquire(); + (&self.access_lock).acquire(); + (&self.order_lock).release(); + } } + let _z = rwlock_release_write(self); + blk() } /** @@ -364,12 +362,41 @@ impl &rwlock { // to-do implement downgrade } +// FIXME(#3136) should go inside of write() and read() respectively +struct rwlock_release_write { + lock: &rwlock; + new(lock: &rwlock) { self.lock = lock; } + drop unsafe { + do task::unkillable { (&self.lock.access_lock).release(); } + } +} +struct rwlock_release_read { + lock: &rwlock; + new(lock: &rwlock) { self.lock = lock; } + drop unsafe { + do task::unkillable { + let mut last_reader = false; + do self.lock.state.with |state| { + assert state.read_mode; + state.read_count -= 1; + last_reader = (state.read_count == 0); + } + if last_reader { + (&self.lock.access_lock).release(); + } + } + } +} + /**************************************************************************** * Tests ****************************************************************************/ #[cfg(test)] mod tests { + /************************************************************************ + * Semaphore tests + ************************************************************************/ #[test] fn test_sem_acquire_release() { let s = ~new_semaphore(1); @@ -462,6 +489,9 @@ mod tests { let _ = p.recv(); // wait for child to be done } } + /************************************************************************ + * Mutex tests + ************************************************************************/ #[test] fn test_mutex_lock() { // Unsafely achieve shared state, and do the textbook @@ -613,32 +643,36 @@ mod tests { // assert !woken; } } + /************************************************************************ + * Reader/writer lock tests + ************************************************************************/ + #[cfg(test)] + fn lock_rwlock_in_mode(x: &rwlock, reader: bool, blk: fn()) { + if reader { x.read(blk); } else { x.write(blk); } + } #[cfg(test)] fn test_rwlock_exclusion(reader1: bool, reader2: bool) { // Test mutual exclusion between readers and writers. Just like the // mutex mutual exclusion test, a ways above. let (c,p) = pipes::stream(); - let m = ~rwlock(); - let m2 = ~m.clone(); + let x = ~rwlock(); + let x2 = ~x.clone(); let sharedstate = ~0; let ptr = ptr::addr_of(*sharedstate); do task::spawn { let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) }; - access_shared(sharedstate, m2, reader1, 10); + access_shared(sharedstate, x2, reader1, 10); c.send(()); } - access_shared(sharedstate, m, reader2, 10); + access_shared(sharedstate, x, reader2, 10); let _ = p.recv(); assert *sharedstate == 20; - fn access_shared(sharedstate: &mut int, m: &rwlock, reader: bool, + fn access_shared(sharedstate: &mut int, x: &rwlock, reader: bool, n: uint) { - let lock_fn = fn@(m: &rwlock, blk: fn()) { - if reader { m.read(blk); } else { m.write(blk); } - }; for n.times { - do lock_fn(m) { + do lock_rwlock_in_mode(x, reader) { let oldval = *sharedstate; task::yield(); *sharedstate = oldval + 1; @@ -672,6 +706,28 @@ mod tests { c2.send(()); let _ = p1.recv(); } - } + #[cfg(test)] #[ignore(cfg(windows))] + fn rwlock_kill_helper(reader1: bool, reader2: bool) { + // Mutex must get automatically unlocked if failed/killed within. + let x = ~rwlock(); + let x2 = ~x.clone(); + + let result: result::result<(),()> = do task::try { + do lock_rwlock_in_mode(x2, reader1) { + fail; + } + }; + assert result.is_err(); + // child task must have finished by the time try returns + do lock_rwlock_in_mode(x, reader2) { } + } + #[test] #[ignore(cfg(windows))] + fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(true, false); } + #[test] #[ignore(cfg(windows))] + fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(false,true ); } + #[test] #[ignore(cfg(windows))] + fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(true, true ); } + #[test] #[ignore(cfg(windows))] + fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(false,false); } }