Enable multiple condvars on a single mutex/rwlock.

This commit is contained in:
Ben Blum 2012-08-14 23:21:39 -04:00
parent f6f9333d5c
commit d1fc7368c8

View file

@ -67,9 +67,13 @@ fn new_sem<Q: send>(count: int, +q: Q) -> sem<Q> {
waiters: waitqueue { head: wait_head, tail: wait_tail },
blocked: q }))
}
fn new_sem_and_signal(count: int) -> sem<waitqueue> {
let (block_tail, block_head) = pipes::stream();
new_sem(count, waitqueue { head: block_head, tail: block_tail })
fn new_sem_and_signal(count: int, num_condvars: uint) -> sem<~[waitqueue]> {
let mut queues = ~[];
for num_condvars.times {
let (block_tail, block_head) = pipes::stream();
vec::push(queues, waitqueue { head: block_head, tail: block_tail });
}
new_sem(count, queues)
}
impl<Q: send> &sem<Q> {
@ -119,7 +123,7 @@ impl &sem<()> {
blk()
}
}
impl &sem<waitqueue> {
impl &sem<~[waitqueue]> {
fn access<U>(blk: fn() -> U) -> U {
let mut release = none;
unsafe {
@ -139,52 +143,75 @@ struct sem_release {
drop { self.sem.release(); }
}
struct sem_and_signal_release {
sem: &sem<waitqueue>;
new(sem: &sem<waitqueue>) { self.sem = sem; }
sem: &sem<~[waitqueue]>;
new(sem: &sem<~[waitqueue]>) { self.sem = sem; }
drop { self.sem.release(); }
}
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
struct condvar { priv sem: &sem<waitqueue>; drop { } }
struct condvar { priv sem: &sem<~[waitqueue]>; drop { } }
impl &condvar {
/// Atomically drop the associated lock, and block until a signal is sent.
fn wait() {
fn wait() { self.wait_on(0) }
/**
* As wait(), but can specify which of multiple condition variables to
* wait on. Only a signal_on() or broadcast_on() with the same condvar_id
* will wake this thread.
*
* The associated lock must have been initialised with an appropriate
* number of condvars. The condvar_id must be between 0 and num_condvars-1
* or else this call will fail.
*
* wait() is equivalent to wait_on(0).
*/
fn wait_on(condvar_id: uint) {
// Create waiter nobe.
let (signal_end, wait_end) = pipes::oneshot();
let mut wait_end = some(wait_end);
let mut signal_end = some(signal_end);
let mut reacquire = none;
let mut out_of_bounds = none;
unsafe {
do task::unkillable {
// Release lock, 'atomically' enqueuing ourselves in so doing.
do (**self.sem).with |state| {
if condvar_id < vec::len(state.blocked) {
// Drop the lock.
state.count += 1;
if state.count <= 0 {
signal_waitqueue(&state.waiters);
}
// Enqueue ourself to be woken up by a signaller.
let signal_end = option::swap_unwrap(&mut signal_end);
state.blocked[condvar_id].tail.send(signal_end);
} else {
out_of_bounds = some(vec::len(state.blocked));
}
}
// If yield checks start getting inserted anywhere, we can be
// killed before or after enqueueing. Deciding whether to
// unkillably reacquire the lock needs to happen atomically
// wrt enqueuing.
reacquire = some(sem_and_signal_reacquire(self.sem));
// Release lock, 'atomically' enqueuing ourselves in so doing.
do (**self.sem).with |state| {
// Drop the lock.
state.count += 1;
if state.count <= 0 {
signal_waitqueue(&state.waiters);
}
// Enqueue ourself to be woken up by a signaller.
let signal_end = option::swap_unwrap(&mut signal_end);
state.blocked.tail.send(signal_end);
if out_of_bounds.is_none() {
reacquire = some(sem_and_signal_reacquire(self.sem));
}
}
}
// Unconditionally "block". (Might not actually block if a signaller
// did send -- I mean 'unconditionally' in contrast with acquire().)
let _ = pipes::recv_one(wait_end);
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
// Unconditionally "block". (Might not actually block if a
// signaller already sent -- I mean 'unconditionally' in contrast
// with acquire().)
let _ = pipes::recv_one(option::swap_unwrap(&mut wait_end));
}
// This is needed for a failing condition variable to reacquire the
// mutex during unwinding. As long as the wrapper (mutex, etc) is
// bounded in when it gets released, this shouldn't hang forever.
struct sem_and_signal_reacquire {
sem: &sem<waitqueue>;
new(sem: &sem<waitqueue>) { self.sem = sem; }
sem: &sem<~[waitqueue]>;
new(sem: &sem<~[waitqueue]>) { self.sem = sem; }
drop unsafe {
// Needs to succeed, instead of itself dying.
do task::unkillable {
@ -195,26 +222,64 @@ impl &condvar {
}
/// Wake up a blocked task. Returns false if there was no blocked task.
fn signal() -> bool {
fn signal() -> bool { self.signal_on(0) }
/// As signal, but with a specified condvar_id. See wait_on.
fn signal_on(condvar_id: uint) -> bool {
let mut out_of_bounds = none;
let mut result = false;
unsafe {
do (**self.sem).with |state| {
signal_waitqueue(&state.blocked)
if condvar_id < vec::len(state.blocked) {
result = signal_waitqueue(&state.blocked[condvar_id]);
} else {
out_of_bounds = some(vec::len(state.blocked));
}
}
}
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
result
}
}
/// Wake up all blocked tasks. Returns the number of tasks woken.
fn broadcast() -> uint {
fn broadcast() -> uint { self.broadcast_on(0) }
/// As broadcast, but with a specified condvar_id. See wait_on.
fn broadcast_on(condvar_id: uint) -> uint {
let mut out_of_bounds = none;
let mut result = 0;
unsafe {
do (**self.sem).with |state| {
// FIXME(#3145) fix :broadcast_heavy
broadcast_waitqueue(&state.blocked)
if condvar_id < vec::len(state.blocked) {
// FIXME(#3145) fix :broadcast_heavy
result = broadcast_waitqueue(&state.blocked[condvar_id])
} else {
out_of_bounds = some(vec::len(state.blocked));
}
}
}
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
result
}
}
}
impl &sem<waitqueue> {
// Checks whether a condvar ID was out of bounds, and fails if so, or does
// something else next on success.
#[inline(always)]
fn check_cvar_bounds<U>(out_of_bounds: option<uint>, id: uint, act: &str,
blk: fn() -> U) -> U {
match out_of_bounds {
some(0) =>
fail fmt!("%s with illegal ID %u - this lock has no condvars!",
act, id),
some(length) =>
fail fmt!("%s with illegal ID %u - ID must be less than %u",
act, id, length),
none => blk()
}
}
impl &sem<~[waitqueue]> {
// The only other place that condvars get built is rwlock_write_mode.
fn access_cond<U>(blk: fn(c: &condvar) -> U) -> U {
do self.access { blk(&condvar { sem: self }) }
@ -263,10 +328,19 @@ impl &semaphore {
* FIFO condition variable.
* FIXME(#3145): document killability
*/
struct mutex { priv sem: sem<waitqueue>; }
struct mutex { priv sem: sem<~[waitqueue]>; }
/// Create a new mutex.
fn mutex() -> mutex { mutex { sem: new_sem_and_signal(1) } }
/// Create a new mutex, with one associated condvar.
fn mutex() -> mutex { mutex_with_condvars(1) }
/**
* Create a new mutex, with a specified number of associated condvars. This
* will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
* 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
* any operations on the condvar will fail.)
*/
fn mutex_with_condvars(num_condvars: uint) -> mutex {
mutex { sem: new_sem_and_signal(1, num_condvars) }
}
impl &mutex {
/// Create a new handle to the mutex.
@ -295,13 +369,20 @@ struct rwlock_inner {
/// A blocking, no-starvation, reader-writer lock with an associated condvar.
struct rwlock {
/* priv */ order_lock: semaphore;
/* priv */ access_lock: sem<waitqueue>;
/* priv */ access_lock: sem<~[waitqueue]>;
/* priv */ state: Exclusive<rwlock_inner>;
}
/// Create a new rwlock.
fn rwlock() -> rwlock {
rwlock { order_lock: semaphore(1), access_lock: new_sem_and_signal(1),
/// Create a new rwlock, with one associated condvar.
fn rwlock() -> rwlock { rwlock_with_condvars(1) }
/**
* Create a new rwlock, with a specified number of associated condvars.
* Similar to mutex_with_condvars.
*/
fn rwlock_with_condvars(num_condvars: uint) -> rwlock {
rwlock { order_lock: semaphore(1),
access_lock: new_sem_and_signal(1, num_condvars),
state: exclusive(rwlock_inner { read_mode: false,
read_count: 0 }) }
}
@ -813,6 +894,59 @@ mod tests {
drop { self.c.send(()); }
}
}
#[test]
fn test_mutex_cond_signal_on_0() {
// Tests that signal_on(0) is equivalent to signal().
let m = ~mutex();
do m.lock_cond |cond| {
let m2 = ~m.clone();
do task::spawn {
do m2.lock_cond |cond| {
cond.signal_on(0);
}
}
cond.wait();
}
}
#[test] #[ignore(cfg(windows))]
fn test_mutex_different_conds() {
let result = do task::try {
let m = ~mutex_with_condvars(2);
let m2 = ~m.clone();
let (c,p) = pipes::stream();
do task::spawn {
do m2.lock_cond |cond| {
c.send(());
cond.wait_on(1);
}
}
let _ = p.recv();
do m.lock_cond |cond| {
if !cond.signal_on(0) {
fail; // success; punt sibling awake.
}
}
};
assert result.is_err();
}
#[test] #[ignore(cfg(windows))]
fn test_mutex_no_condvars() {
let result = do task::try {
let m = ~mutex_with_condvars(0);
do m.lock_cond |cond| { cond.wait(); }
};
assert result.is_err();
let result = do task::try {
let m = ~mutex_with_condvars(0);
do m.lock_cond |cond| { cond.signal(); }
};
assert result.is_err();
let result = do task::try {
let m = ~mutex_with_condvars(0);
do m.lock_cond |cond| { cond.broadcast(); }
};
assert result.is_err();
}
/************************************************************************
* Reader/writer lock tests
************************************************************************/