From bcb6a68cbd928bb5d8331821272993bdde4fd388 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Thu, 9 Aug 2012 20:22:43 -0400 Subject: [PATCH] sync: Add rwlocks (half-done) and test cases --- src/libcore/sync.rs | 195 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 177 insertions(+), 18 deletions(-) diff --git a/src/libcore/sync.rs b/src/libcore/sync.rs index decf34834a47..d0092df4f16a 100644 --- a/src/libcore/sync.rs +++ b/src/libcore/sync.rs @@ -8,6 +8,7 @@ export condvar; export semaphore, new_semaphore; export mutex, new_mutex; +export rwlock; // FIXME (#3119) This shouldn't be a thing exported from core. import arc::exclusive; @@ -58,6 +59,17 @@ enum sem = exclusive<{ blocked: Q, }>; +fn new_sem(count: int, +q: Q) -> sem { + let (wait_tail, wait_head) = pipes::stream(); + sem(exclusive({ mut count: count, + waiters: { head: wait_head, tail: wait_tail }, + blocked: q })) +} +fn new_sem_and_signal(count: int) -> sem { + let (block_tail, block_head) = pipes::stream(); + new_sem(count, { head: block_head, tail: block_tail }) +} + impl &sem { fn acquire() { let mut waiter_nobe = none; @@ -217,12 +229,7 @@ impl &sem { enum semaphore = sem<()>; /// Create a new semaphore with the specified count. -fn new_semaphore(count: int) -> semaphore { - let (wait_tail, wait_head) = pipes::stream(); - semaphore(sem(exclusive({ mut count: count, - waiters: { head: wait_head, tail: wait_tail }, - blocked: () }))) -} +fn new_semaphore(count: int) -> semaphore { semaphore(new_sem(count, ())) } impl &semaphore { /// Create a new handle to the semaphore. @@ -257,13 +264,7 @@ impl &semaphore { enum mutex = sem; /// Create a new mutex. -fn new_mutex() -> mutex { - let (wait_tail, wait_head) = pipes::stream(); - let (block_tail, block_head) = pipes::stream(); - mutex(sem(exclusive({ mut count: 1, - waiters: { head: wait_head, tail: wait_tail }, - blocked: { head: block_head, tail: block_tail } }))) -} +fn new_mutex() -> mutex { mutex(new_sem_and_signal(1)) } impl &mutex { /// Create a new handle to the mutex. @@ -282,7 +283,86 @@ impl &mutex { * Reader-writer locks ****************************************************************************/ -// FIXME(#3145) implement +// NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem + +struct rwlock_inner { + read_mode: bool; + read_count: uint; +} + +/// A blocking, no-starvation, reader-writer lock with an associated condvar. +struct rwlock { + order_lock: semaphore; + access_lock: sem; + state: arc::exclusive; +} + +fn rwlock() -> rwlock { + rwlock { order_lock: new_semaphore(1), access_lock: new_sem_and_signal(1), + state: arc::exclusive(rwlock_inner { read_mode: false, + read_count: 0 }) } +} + +impl &rwlock { + // Create a new handle to the rwlock. + fn clone() -> rwlock { + rwlock { order_lock: (&(self.order_lock)).clone(), + access_lock: sem((*self.access_lock).clone()), + state: self.state.clone() } + } + + /** + * Run a function with the rwlock in read mode. Calls to 'read' from other + * tasks may run concurrently with this one. + */ + fn read(blk: fn() -> U) -> U { + do (&self.order_lock).access { + let mut first_reader = false; + do self.state.with |state| { + state.read_mode = true; + first_reader = (state.read_count == 0); + state.read_count += 1; + } + if first_reader { + (&self.access_lock).acquire(); + } + } + let result = blk(); + let mut last_reader = false; + do self.state.with |state| { + assert state.read_mode; + state.read_count -= 1; + last_reader = (state.read_count == 0); + } + if last_reader { + (&self.access_lock).release(); + } + result + } + + /** + * Run a function with the rwlock in write mode. No calls to 'read' or + * 'write' from other tasks will run concurrently with this one. + */ + fn write(blk: fn() -> U) -> U { + (&self.order_lock).acquire(); + do (&self.access_lock).access { + (&self.order_lock).release(); + blk() + } + } + + /** + * As write(), but also with a handle to a condvar. Waiting on this + * condvar will allow readers and writers alike to take the rwlock before + * the waiting task is signalled. + */ + fn write_cond(_blk: fn(condvar) -> U) -> U { + fail ~"Need implement lock order lock before access lock"; + } + + // to-do implement downgrade +} /**************************************************************************** * Tests @@ -443,9 +523,8 @@ mod tests { } let _ = port.recv(); // Wait until child wakes up } - #[test] - fn test_mutex_cond_broadcast() { - let num_waiters: uint = 12; + #[cfg(test)] + fn test_mutex_cond_broadcast_helper(num_waiters: uint) { let m = ~new_mutex(); let mut ports = ~[]; @@ -471,6 +550,25 @@ mod tests { // wait until all children wake up for ports.each |port| { let _ = port.recv(); } } + #[test] + fn test_mutex_cond_broadcast() { + test_mutex_cond_broadcast_helper(12); + } + #[test] + fn test_mutex_cond_broadcast_none() { + test_mutex_cond_broadcast_helper(0); + } + #[test] + fn test_mutex_cond_no_waiter() { + let m = ~new_mutex(); + let m2 = ~m.clone(); + do task::try { + do m.lock_cond |_x| { } + }; + do m2.lock_cond |cond| { + assert !cond.signal(); + } + } #[test] #[ignore(cfg(windows))] fn test_mutex_killed_simple() { // Mutex must get automatically unlocked if failed/killed within. @@ -508,11 +606,72 @@ mod tests { assert result.is_err(); // child task must have finished by the time try returns do m.lock_cond |cond| { - let woken = cond.signal(); + let _woken = cond.signal(); // FIXME(#3145) - The semantics of pipes are not quite what I want // here - the pipe doesn't get 'terminated' if the child was // punted awake during failure. // assert !woken; } } + #[cfg(test)] + fn test_rwlock_exclusion(reader1: bool, reader2: bool) { + // Test mutual exclusion between readers and writers. Just like the + // mutex mutual exclusion test, a ways above. + let (c,p) = pipes::stream(); + let m = ~rwlock(); + let m2 = ~m.clone(); + let sharedstate = ~0; + let ptr = ptr::addr_of(*sharedstate); + do task::spawn { + let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) }; + access_shared(sharedstate, m2, reader1, 10); + c.send(()); + } + access_shared(sharedstate, m, reader2, 10); + let _ = p.recv(); + + assert *sharedstate == 20; + + fn access_shared(sharedstate: &mut int, m: &rwlock, reader: bool, + n: uint) { + let lock_fn = fn@(m: &rwlock, blk: fn()) { + if reader { m.read(blk); } else { m.write(blk); } + }; + for n.times { + do lock_fn(m) { + let oldval = *sharedstate; + task::yield(); + *sharedstate = oldval + 1; + } + } + } + } + #[test] + fn test_rwlock_readers_wont_modify_the_data() { + test_rwlock_exclusion(true, false); + test_rwlock_exclusion(false, true); + } + #[test] + fn test_rwlock_writers_and_writers() { + test_rwlock_exclusion(false, false); + } + #[test] + fn test_rwlock_readers_and_readers() { + // Much like sem_multi_resource. + let x = ~rwlock(); + let x2 = ~x.clone(); + let (c1,p1) = pipes::stream(); + let (c2,p2) = pipes::stream(); + do task::spawn { + do x2.read { + let _ = p2.recv(); + c1.send(()); + } + } + do x.read { + c2.send(()); + let _ = p1.recv(); + } + + } }