diff --git a/src/libcore/sync.rs b/src/libcore/sync.rs index 33823729662f..3e1b85441e9a 100644 --- a/src/libcore/sync.rs +++ b/src/libcore/sync.rs @@ -17,19 +17,23 @@ type signal_end = pipes::chan<()>; type waitqueue = { head: pipes::port, tail: pipes::chan }; -fn new_waiter() -> (signal_end, wait_end) { pipes::stream() } +fn waitqueue() -> waitqueue { + let (tail, head) = pipes::stream(); + { head: head, tail: tail } +} -/// A counting semaphore. -enum semaphore = exclusive<{ +/// A counting, blocking, bounded-waiting semaphore. +enum semaphore = exclusive; +type semaphore_inner = { mut count: int, waiters: waitqueue, -}>; + //blocked: waitqueue, +}; /// Create a new semaphore with the specified count. fn new_semaphore(count: int) -> semaphore { - let (tail, head) = pipes::stream(); semaphore(exclusive({ mut count: count, - waiters: { head: head, tail: tail } })) + waiters: waitqueue(), /* blocked: waitqueue() */ })) } impl semaphore for &semaphore { @@ -42,20 +46,21 @@ impl semaphore for &semaphore { * Acquires a resource represented by the semaphore. Blocks if necessary * until resource(s) become available. */ - fn wait() { + fn acquire() { let mut waiter_nobe = none; unsafe { do (**self).with |state| { state.count -= 1; if state.count < 0 { - let (signal_end,wait_end) = new_waiter(); + let (signal_end,wait_end) = pipes::stream(); waiter_nobe = some(wait_end); // Enqueue ourself. state.waiters.tail.send(signal_end); } } } - for 1000.times { task::yield(); } + // Uncomment if you wish to test for sem races. Not valgrind-friendly. + /* for 1000.times { task::yield(); } */ // Need to wait outside the exclusive. if waiter_nobe.is_some() { let _ = option::unwrap(waiter_nobe).recv(); @@ -66,7 +71,7 @@ impl semaphore for &semaphore { * Release a held resource represented by the semaphore. Wakes a blocked * contending task, if any exist. */ - fn signal() { + fn release() { unsafe { do (**self).with |state| { state.count += 1; @@ -85,7 +90,7 @@ impl semaphore for &semaphore { /// Runs a function with ownership of one of the semaphore's resources. fn access(blk: fn() -> U) -> U { - self.wait(); + self.acquire(); let _x = sem_release(self); blk() } @@ -95,7 +100,7 @@ impl semaphore for &semaphore { struct sem_release { sem: &semaphore; new(sem: &semaphore) { self.sem = sem; } - drop { self.sem.signal(); } + drop { self.sem.release(); } } #[cfg(test)] @@ -120,11 +125,11 @@ mod tests { let s = ~new_semaphore(0); let s2 = ~s.clone(); do task::spawn { - s2.wait(); + s2.acquire(); c.send(()); } for 10.times { task::yield(); } - s.signal(); + s.release(); let _ = p.recv(); /* Parent waits and child signals */ @@ -133,14 +138,16 @@ mod tests { let s2 = ~s.clone(); do task::spawn { for 10.times { task::yield(); } - s2.signal(); + s2.release(); let _ = p.recv(); } - s.wait(); + s.acquire(); c.send(()); } #[test] fn test_sem_mutual_exclusion() { + // Unsafely achieve shared state, and do the textbook + // "load tmp <- ptr; inc tmp; store ptr <- tmp" dance. let (c,p) = pipes::stream(); let s = ~new_semaphore(1); let s2 = ~s.clone(); @@ -167,7 +174,28 @@ mod tests { } } #[test] + fn test_sem_multi_resource() { + // Parent and child both get in the critical section at the same + // time, and shake hands. + let s = ~new_semaphore(2); + let s2 = ~s.clone(); + let (c1,p1) = pipes::stream(); + let (c2,p2) = pipes::stream(); + do task::spawn { + do s2.access { + let _ = p2.recv(); + c1.send(()); + } + } + do s.access { + c2.send(()); + let _ = p1.recv(); + } + } + #[test] fn test_sem_runtime_friendly_blocking() { + // Force the runtime to schedule two threads on the same sched_loop. + // When one blocks, it should schedule the other one. do task::spawn_sched(task::manual_threads(1)) { let s = ~new_semaphore(1); let s2 = ~s.clone();