diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs index 85043699595f..c8a16b58d32d 100644 --- a/src/libstd/sync.rs +++ b/src/libstd/sync.rs @@ -67,9 +67,13 @@ fn new_sem(count: int, +q: Q) -> sem { waiters: waitqueue { head: wait_head, tail: wait_tail }, blocked: q })) } -fn new_sem_and_signal(count: int) -> sem { - let (block_tail, block_head) = pipes::stream(); - new_sem(count, waitqueue { head: block_head, tail: block_tail }) +fn new_sem_and_signal(count: int, num_condvars: uint) -> sem<~[waitqueue]> { + let mut queues = ~[]; + for num_condvars.times { + let (block_tail, block_head) = pipes::stream(); + vec::push(queues, waitqueue { head: block_head, tail: block_tail }); + } + new_sem(count, queues) } impl &sem { @@ -119,7 +123,7 @@ impl &sem<()> { blk() } } -impl &sem { +impl &sem<~[waitqueue]> { fn access(blk: fn() -> U) -> U { let mut release = none; unsafe { @@ -139,52 +143,75 @@ struct sem_release { drop { self.sem.release(); } } struct sem_and_signal_release { - sem: &sem; - new(sem: &sem) { self.sem = sem; } + sem: &sem<~[waitqueue]>; + new(sem: &sem<~[waitqueue]>) { self.sem = sem; } drop { self.sem.release(); } } /// A mechanism for atomic-unlock-and-deschedule blocking and signalling. -struct condvar { priv sem: &sem; drop { } } +struct condvar { priv sem: &sem<~[waitqueue]>; drop { } } impl &condvar { /// Atomically drop the associated lock, and block until a signal is sent. - fn wait() { + fn wait() { self.wait_on(0) } + /** + * As wait(), but can specify which of multiple condition variables to + * wait on. Only a signal_on() or broadcast_on() with the same condvar_id + * will wake this thread. + * + * The associated lock must have been initialised with an appropriate + * number of condvars. The condvar_id must be between 0 and num_condvars-1 + * or else this call will fail. + * + * wait() is equivalent to wait_on(0). + */ + fn wait_on(condvar_id: uint) { // Create waiter nobe. let (signal_end, wait_end) = pipes::oneshot(); + let mut wait_end = some(wait_end); let mut signal_end = some(signal_end); let mut reacquire = none; + let mut out_of_bounds = none; unsafe { do task::unkillable { + // Release lock, 'atomically' enqueuing ourselves in so doing. + do (**self.sem).with |state| { + if condvar_id < vec::len(state.blocked) { + // Drop the lock. + state.count += 1; + if state.count <= 0 { + signal_waitqueue(&state.waiters); + } + // Enqueue ourself to be woken up by a signaller. + let signal_end = option::swap_unwrap(&mut signal_end); + state.blocked[condvar_id].tail.send(signal_end); + } else { + out_of_bounds = some(vec::len(state.blocked)); + } + } + // If yield checks start getting inserted anywhere, we can be // killed before or after enqueueing. Deciding whether to // unkillably reacquire the lock needs to happen atomically // wrt enqueuing. - reacquire = some(sem_and_signal_reacquire(self.sem)); - - // Release lock, 'atomically' enqueuing ourselves in so doing. - do (**self.sem).with |state| { - // Drop the lock. - state.count += 1; - if state.count <= 0 { - signal_waitqueue(&state.waiters); - } - // Enqueue ourself to be woken up by a signaller. - let signal_end = option::swap_unwrap(&mut signal_end); - state.blocked.tail.send(signal_end); + if out_of_bounds.is_none() { + reacquire = some(sem_and_signal_reacquire(self.sem)); } } } - // Unconditionally "block". (Might not actually block if a signaller - // did send -- I mean 'unconditionally' in contrast with acquire().) - let _ = pipes::recv_one(wait_end); + do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + let _ = pipes::recv_one(option::swap_unwrap(&mut wait_end)); + } // This is needed for a failing condition variable to reacquire the // mutex during unwinding. As long as the wrapper (mutex, etc) is // bounded in when it gets released, this shouldn't hang forever. struct sem_and_signal_reacquire { - sem: &sem; - new(sem: &sem) { self.sem = sem; } + sem: &sem<~[waitqueue]>; + new(sem: &sem<~[waitqueue]>) { self.sem = sem; } drop unsafe { // Needs to succeed, instead of itself dying. do task::unkillable { @@ -195,26 +222,64 @@ impl &condvar { } /// Wake up a blocked task. Returns false if there was no blocked task. - fn signal() -> bool { + fn signal() -> bool { self.signal_on(0) } + /// As signal, but with a specified condvar_id. See wait_on. + fn signal_on(condvar_id: uint) -> bool { + let mut out_of_bounds = none; + let mut result = false; unsafe { do (**self.sem).with |state| { - signal_waitqueue(&state.blocked) + if condvar_id < vec::len(state.blocked) { + result = signal_waitqueue(&state.blocked[condvar_id]); + } else { + out_of_bounds = some(vec::len(state.blocked)); + } } } + do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") { + result + } } /// Wake up all blocked tasks. Returns the number of tasks woken. - fn broadcast() -> uint { + fn broadcast() -> uint { self.broadcast_on(0) } + /// As broadcast, but with a specified condvar_id. See wait_on. + fn broadcast_on(condvar_id: uint) -> uint { + let mut out_of_bounds = none; + let mut result = 0; unsafe { do (**self.sem).with |state| { - // FIXME(#3145) fix :broadcast_heavy - broadcast_waitqueue(&state.blocked) + if condvar_id < vec::len(state.blocked) { + // FIXME(#3145) fix :broadcast_heavy + result = broadcast_waitqueue(&state.blocked[condvar_id]) + } else { + out_of_bounds = some(vec::len(state.blocked)); + } } } + do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") { + result + } } } -impl &sem { +// Checks whether a condvar ID was out of bounds, and fails if so, or does +// something else next on success. +#[inline(always)] +fn check_cvar_bounds(out_of_bounds: option, id: uint, act: &str, + blk: fn() -> U) -> U { + match out_of_bounds { + some(0) => + fail fmt!("%s with illegal ID %u - this lock has no condvars!", + act, id), + some(length) => + fail fmt!("%s with illegal ID %u - ID must be less than %u", + act, id, length), + none => blk() + } +} + +impl &sem<~[waitqueue]> { // The only other place that condvars get built is rwlock_write_mode. fn access_cond(blk: fn(c: &condvar) -> U) -> U { do self.access { blk(&condvar { sem: self }) } @@ -263,10 +328,19 @@ impl &semaphore { * FIFO condition variable. * FIXME(#3145): document killability */ -struct mutex { priv sem: sem; } +struct mutex { priv sem: sem<~[waitqueue]>; } -/// Create a new mutex. -fn mutex() -> mutex { mutex { sem: new_sem_and_signal(1) } } +/// Create a new mutex, with one associated condvar. +fn mutex() -> mutex { mutex_with_condvars(1) } +/** + * Create a new mutex, with a specified number of associated condvars. This + * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between + * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but + * any operations on the condvar will fail.) + */ +fn mutex_with_condvars(num_condvars: uint) -> mutex { + mutex { sem: new_sem_and_signal(1, num_condvars) } +} impl &mutex { /// Create a new handle to the mutex. @@ -295,13 +369,20 @@ struct rwlock_inner { /// A blocking, no-starvation, reader-writer lock with an associated condvar. struct rwlock { /* priv */ order_lock: semaphore; - /* priv */ access_lock: sem; + /* priv */ access_lock: sem<~[waitqueue]>; /* priv */ state: Exclusive; } -/// Create a new rwlock. -fn rwlock() -> rwlock { - rwlock { order_lock: semaphore(1), access_lock: new_sem_and_signal(1), +/// Create a new rwlock, with one associated condvar. +fn rwlock() -> rwlock { rwlock_with_condvars(1) } + +/** + * Create a new rwlock, with a specified number of associated condvars. + * Similar to mutex_with_condvars. + */ +fn rwlock_with_condvars(num_condvars: uint) -> rwlock { + rwlock { order_lock: semaphore(1), + access_lock: new_sem_and_signal(1, num_condvars), state: exclusive(rwlock_inner { read_mode: false, read_count: 0 }) } } @@ -813,6 +894,59 @@ mod tests { drop { self.c.send(()); } } } + #[test] + fn test_mutex_cond_signal_on_0() { + // Tests that signal_on(0) is equivalent to signal(). + let m = ~mutex(); + do m.lock_cond |cond| { + let m2 = ~m.clone(); + do task::spawn { + do m2.lock_cond |cond| { + cond.signal_on(0); + } + } + cond.wait(); + } + } + #[test] #[ignore(cfg(windows))] + fn test_mutex_different_conds() { + let result = do task::try { + let m = ~mutex_with_condvars(2); + let m2 = ~m.clone(); + let (c,p) = pipes::stream(); + do task::spawn { + do m2.lock_cond |cond| { + c.send(()); + cond.wait_on(1); + } + } + let _ = p.recv(); + do m.lock_cond |cond| { + if !cond.signal_on(0) { + fail; // success; punt sibling awake. + } + } + }; + assert result.is_err(); + } + #[test] #[ignore(cfg(windows))] + fn test_mutex_no_condvars() { + let result = do task::try { + let m = ~mutex_with_condvars(0); + do m.lock_cond |cond| { cond.wait(); } + }; + assert result.is_err(); + let result = do task::try { + let m = ~mutex_with_condvars(0); + do m.lock_cond |cond| { cond.signal(); } + }; + assert result.is_err(); + let result = do task::try { + let m = ~mutex_with_condvars(0); + do m.lock_cond |cond| { cond.broadcast(); } + }; + assert result.is_err(); + } /************************************************************************ * Reader/writer lock tests ************************************************************************/