diff --git a/src/libcore/sync.rs b/src/libcore/sync.rs index 6def915b6d8c..decf34834a47 100644 --- a/src/libcore/sync.rs +++ b/src/libcore/sync.rs @@ -23,7 +23,33 @@ type signal_end = pipes::chan<()>; type waitqueue = { head: pipes::port, tail: pipes::chan }; -// The building-block used to make semaphores, lock-and-signals, and rwlocks. +// Signals one live task from the queue. +fn signal_waitqueue(q: &waitqueue) -> bool { + // The peek is mandatory to make sure recv doesn't block. + if q.head.peek() { + // Pop and send a wakeup signal. If the waiter was killed, its port + // will have closed. Keep trying until we get a live task. + if q.head.recv().try_send(()) { + true + } else { + signal_waitqueue(q) + } + } else { + false + } +} + +fn broadcast_waitqueue(q: &waitqueue) -> uint { + let mut count = 0; + while q.head.peek() { + if q.head.recv().try_send(()) { + count += 1; + } + } + count +} + +// The building-block used to make semaphores, mutexes, and rwlocks. enum sem = exclusive<{ mut count: int, waiters: waitqueue, @@ -39,6 +65,7 @@ impl &sem { do (**self).with |state| { state.count -= 1; if state.count < 0 { + // Create waiter nobe. let (signal_end, wait_end) = pipes::stream(); // Tell outer scope we need to block. waiter_nobe = some(wait_end); @@ -58,14 +85,8 @@ impl &sem { unsafe { do (**self).with |state| { state.count += 1; - // The peek is mandatory to make sure recv doesn't block. - if state.count <= 0 && state.waiters.head.peek() { - // Pop off the waitqueue and send a wakeup signal. If the - // waiter was killed, its port will have closed, and send - // will fail. Keep trying until we get a live task. - state.waiters.head.recv().send(()); - // FIXME(#3145) use kill-friendly version when ready - // while !state.waiters.head.recv().try_send(()) { } + if state.count <= 0 { + signal_waitqueue(&state.waiters); } } } @@ -74,15 +95,25 @@ impl &sem { // FIXME(#3154) move both copies of this into sem, and unify the 2 structs impl &sem<()> { fn access(blk: fn() -> U) -> U { - self.acquire(); - let _x = sem_release(self); + let mut release = none; + unsafe { + do task::unkillable { + self.acquire(); + release = some(sem_release(self)); + } + } blk() } } impl &sem { fn access(blk: fn() -> U) -> U { - self.acquire(); - let _x = sem_and_signal_release(self); + let mut release = none; + unsafe { + do task::unkillable { + self.acquire(); + release = some(sem_and_signal_release(self)); + } + } blk() } } @@ -105,39 +136,58 @@ enum condvar = &sem; impl condvar { /// Atomically drop the associated lock, and block until a signal is sent. fn wait() { + // This is needed for a failing condition variable to reacquire the + // mutex during unwinding. As long as the wrapper (mutex, etc) is + // bounded in when it gets released, this shouldn't hang forever. + struct sem_and_signal_reacquire { + sem: &sem; + new(sem: &sem) { self.sem = sem; } + drop unsafe { + do task::unkillable { + self.sem.acquire(); + } + } + } + + // Create waiter nobe. let (signal_end, wait_end) = pipes::stream(); let mut signal_end = some(signal_end); + let mut reacquire = none; unsafe { - do (***self).with |state| { - // Drop the lock. - // FIXME(#3145) investigate why factoring doesn't compile. - state.count += 1; - if state.count <= 0 && state.waiters.head.peek() { - state.waiters.head.recv().send(()); - // FIXME(#3145) use kill-friendly version when ready + do task::unkillable { + // If yield checks start getting inserted anywhere, we can be + // killed before or after enqueueing. Deciding whether to + // unkillably reacquire the lock needs to happen atomically + // wrt enqueuing. + reacquire = some(sem_and_signal_reacquire(*self)); + + // Release lock, 'atomically' enqueuing ourselves in so doing. + do (***self).with |state| { + // Drop the lock. + // FIXME(#3145) investigate why factoring doesn't compile. + state.count += 1; + if state.count <= 0 { + signal_waitqueue(&state.waiters); + } + // Enqueue ourself to be woken up by a signaller. + let signal_end = option::swap_unwrap(&mut signal_end); + state.blocked.tail.send(signal_end); } - // Enqueue ourself to be woken up by a signaller. - state.blocked.tail.send(option::swap_unwrap(&mut signal_end)); } } // Unconditionally "block". (Might not actually block if a signaller // did send -- I mean 'unconditionally' in contrast with acquire().) let _ = wait_end.recv(); - // Pick up the lock again. FIXME(#3145): unkillable? destructor? - (*self).acquire(); + // 'reacquire' will pick up the lock again in its destructor - it must + // happen whether or not we are killed, and it needs to succeed at + // reacquiring instead of itself dying. } /// Wake up a blocked task. Returns false if there was no blocked task. fn signal() -> bool { unsafe { do (***self).with |state| { - if state.blocked.head.peek() { - state.blocked.head.recv().send(()); - // FIXME(#3145) use kill-friendly version when ready - true - } else { - false - } + signal_waitqueue(&state.blocked) } } } @@ -146,13 +196,8 @@ impl condvar { fn broadcast() -> uint { unsafe { do (***self).with |state| { - let mut count = 0; - while state.blocked.head.peek() { - // This is already kill-friendly. - state.blocked.head.recv().send(()); - count += 1; - } - count + // FIXME(#3145) fix :broadcast_heavy + broadcast_waitqueue(&state.blocked) } } } @@ -191,11 +236,12 @@ impl &semaphore { /** * Release a held resource represented by the semaphore. Wakes a blocked - * contending task, if any exist. + * contending task, if any exist. Won't block the caller. */ fn release() { (&**self).release() } /// Run a function with ownership of one of the semaphore's resources. + // FIXME(#3145): figure out whether or not this should get exported. fn access(blk: fn() -> U) -> U { (&**self).access(blk) } } @@ -206,6 +252,7 @@ impl &semaphore { /** * A blocking, bounded-waiting, mutual exclusion lock with an associated * FIFO condition variable. + * FIXME(#3145): document killability */ enum mutex = sem; @@ -243,17 +290,29 @@ impl &mutex { #[cfg(test)] mod tests { + #[test] + fn test_sem_acquire_release() { + let s = ~new_semaphore(1); + s.acquire(); + s.release(); + s.acquire(); + } + #[test] + fn test_sem_basic() { + let s = ~new_semaphore(1); + do s.access { } + } #[test] fn test_sem_as_mutex() { let s = ~new_semaphore(1); let s2 = ~s.clone(); do task::spawn { do s2.access { - for 10.times { task::yield(); } + for 5.times { task::yield(); } } } do s.access { - for 10.times { task::yield(); } + for 5.times { task::yield(); } } } #[test] @@ -266,7 +325,7 @@ mod tests { s2.acquire(); c.send(()); } - for 10.times { task::yield(); } + for 5.times { task::yield(); } s.release(); let _ = p.recv(); @@ -275,7 +334,7 @@ mod tests { let s = ~new_semaphore(0); let s2 = ~s.clone(); do task::spawn { - for 10.times { task::yield(); } + for 5.times { task::yield(); } s2.release(); let _ = p.recv(); } @@ -324,7 +383,7 @@ mod tests { } } #[test] - fn test_mutex() { + fn test_mutex_lock() { // Unsafely achieve shared state, and do the textbook // "load tmp <- ptr; inc tmp; store ptr <- tmp" dance. let (c,p) = pipes::stream(); @@ -342,9 +401,9 @@ mod tests { assert *sharedstate == 20; - fn access_shared(sharedstate: &mut int, sem: &mutex, n: uint) { + fn access_shared(sharedstate: &mut int, m: &mutex, n: uint) { for n.times { - do sem.lock { + do m.lock { let oldval = *sharedstate; task::yield(); *sharedstate = oldval + 1; @@ -355,13 +414,15 @@ mod tests { #[test] fn test_mutex_cond_wait() { let m = ~new_mutex(); - let mut m2 = some(~m.clone()); // Child wakes up parent do m.lock_cond |cond| { - let m2 = option::swap_unwrap(&mut m2); + let m2 = ~m.clone(); do task::spawn { - do m2.lock_cond |cond| { cond.signal(); } + do m2.lock_cond |cond| { + let woken = cond.signal(); + assert woken; + } } cond.wait(); } @@ -377,7 +438,8 @@ mod tests { } let _ = port.recv(); // Wait until child gets in the mutex do m.lock_cond |cond| { - cond.signal(); + let woken = cond.signal(); + assert woken; } let _ = port.recv(); // Wait until child wakes up } @@ -409,4 +471,48 @@ mod tests { // wait until all children wake up for ports.each |port| { let _ = port.recv(); } } + #[test] #[ignore(cfg(windows))] + fn test_mutex_killed_simple() { + // Mutex must get automatically unlocked if failed/killed within. + let m = ~new_mutex(); + let m2 = ~m.clone(); + + let result: result::result<(),()> = do task::try { + do m2.lock { + fail; + } + }; + assert result.is_err(); + // child task must have finished by the time try returns + do m.lock { } + } + #[test] #[ignore(cfg(windows))] + fn test_mutex_killed_cond() { + // Getting killed during cond wait must not corrupt the mutex while + // unwinding (e.g. double unlock). + let m = ~new_mutex(); + let m2 = ~m.clone(); + + let result: result::result<(),()> = do task::try { + let (c,p) = pipes::stream(); + do task::spawn { // linked + let _ = p.recv(); // wait for sibling to get in the mutex + task::yield(); + fail; + } + do m2.lock_cond |cond| { + c.send(()); // tell sibling go ahead + cond.wait(); // block forever + } + }; + assert result.is_err(); + // child task must have finished by the time try returns + do m.lock_cond |cond| { + let woken = cond.signal(); + // FIXME(#3145) - The semantics of pipes are not quite what I want + // here - the pipe doesn't get 'terminated' if the child was + // punted awake during failure. + // assert !woken; + } + } }