Make rwlocks fail-proof

This commit is contained in:
Ben Blum 2012-08-09 22:07:37 -04:00
parent 1b2d91c79d
commit 758dd786f6

View file

@ -148,19 +148,6 @@ enum condvar = &sem<waitqueue>;
impl condvar {
/// Atomically drop the associated lock, and block until a signal is sent.
fn wait() {
// This is needed for a failing condition variable to reacquire the
// mutex during unwinding. As long as the wrapper (mutex, etc) is
// bounded in when it gets released, this shouldn't hang forever.
struct sem_and_signal_reacquire {
sem: &sem<waitqueue>;
new(sem: &sem<waitqueue>) { self.sem = sem; }
drop unsafe {
do task::unkillable {
self.sem.acquire();
}
}
}
// Create waiter nobe.
let (signal_end, wait_end) = pipes::stream();
let mut signal_end = some(signal_end);
@ -190,9 +177,20 @@ impl condvar {
// Unconditionally "block". (Might not actually block if a signaller
// did send -- I mean 'unconditionally' in contrast with acquire().)
let _ = wait_end.recv();
// 'reacquire' will pick up the lock again in its destructor - it must
// happen whether or not we are killed, and it needs to succeed at
// reacquiring instead of itself dying.
// This is needed for a failing condition variable to reacquire the
// mutex during unwinding. As long as the wrapper (mutex, etc) is
// bounded in when it gets released, this shouldn't hang forever.
struct sem_and_signal_reacquire {
sem: &sem<waitqueue>;
new(sem: &sem<waitqueue>) { self.sem = sem; }
drop unsafe {
// Needs to succeed, instead of itself dying.
do task::unkillable {
self.sem.acquire();
}
}
}
}
/// Wake up a blocked task. Returns false if there was no blocked task.
@ -297,6 +295,7 @@ struct rwlock {
state: arc::exclusive<rwlock_inner>;
}
/// Create a new rwlock.
fn rwlock() -> rwlock {
rwlock { order_lock: new_semaphore(1), access_lock: new_sem_and_signal(1),
state: arc::exclusive(rwlock_inner { read_mode: false,
@ -304,7 +303,7 @@ fn rwlock() -> rwlock {
}
impl &rwlock {
// Create a new handle to the rwlock.
/// Create a new handle to the rwlock.
fn clone() -> rwlock {
rwlock { order_lock: (&(self.order_lock)).clone(),
access_lock: sem((*self.access_lock).clone()),
@ -316,28 +315,23 @@ impl &rwlock {
* tasks may run concurrently with this one.
*/
fn read<U>(blk: fn() -> U) -> U {
do (&self.order_lock).access {
let mut first_reader = false;
do self.state.with |state| {
state.read_mode = true;
first_reader = (state.read_count == 0);
state.read_count += 1;
}
if first_reader {
(&self.access_lock).acquire();
unsafe {
do task::unkillable {
do (&self.order_lock).access {
let mut first_reader = false;
do self.state.with |state| {
state.read_mode = true;
first_reader = (state.read_count == 0);
state.read_count += 1;
}
if first_reader {
(&self.access_lock).acquire();
}
}
}
}
let result = blk();
let mut last_reader = false;
do self.state.with |state| {
assert state.read_mode;
state.read_count -= 1;
last_reader = (state.read_count == 0);
}
if last_reader {
(&self.access_lock).release();
}
result
let _z = rwlock_release_read(self);
blk()
}
/**
@ -345,11 +339,15 @@ impl &rwlock {
* 'write' from other tasks will run concurrently with this one.
*/
fn write<U>(blk: fn() -> U) -> U {
(&self.order_lock).acquire();
do (&self.access_lock).access {
(&self.order_lock).release();
blk()
unsafe {
do task::unkillable {
(&self.order_lock).acquire();
(&self.access_lock).acquire();
(&self.order_lock).release();
}
}
let _z = rwlock_release_write(self);
blk()
}
/**
@ -364,12 +362,41 @@ impl &rwlock {
// to-do implement downgrade
}
// FIXME(#3136) should go inside of write() and read() respectively
struct rwlock_release_write {
lock: &rwlock;
new(lock: &rwlock) { self.lock = lock; }
drop unsafe {
do task::unkillable { (&self.lock.access_lock).release(); }
}
}
struct rwlock_release_read {
lock: &rwlock;
new(lock: &rwlock) { self.lock = lock; }
drop unsafe {
do task::unkillable {
let mut last_reader = false;
do self.lock.state.with |state| {
assert state.read_mode;
state.read_count -= 1;
last_reader = (state.read_count == 0);
}
if last_reader {
(&self.lock.access_lock).release();
}
}
}
}
/****************************************************************************
* Tests
****************************************************************************/
#[cfg(test)]
mod tests {
/************************************************************************
* Semaphore tests
************************************************************************/
#[test]
fn test_sem_acquire_release() {
let s = ~new_semaphore(1);
@ -462,6 +489,9 @@ mod tests {
let _ = p.recv(); // wait for child to be done
}
}
/************************************************************************
* Mutex tests
************************************************************************/
#[test]
fn test_mutex_lock() {
// Unsafely achieve shared state, and do the textbook
@ -613,32 +643,36 @@ mod tests {
// assert !woken;
}
}
/************************************************************************
* Reader/writer lock tests
************************************************************************/
#[cfg(test)]
fn lock_rwlock_in_mode(x: &rwlock, reader: bool, blk: fn()) {
if reader { x.read(blk); } else { x.write(blk); }
}
#[cfg(test)]
fn test_rwlock_exclusion(reader1: bool, reader2: bool) {
// Test mutual exclusion between readers and writers. Just like the
// mutex mutual exclusion test, a ways above.
let (c,p) = pipes::stream();
let m = ~rwlock();
let m2 = ~m.clone();
let x = ~rwlock();
let x2 = ~x.clone();
let sharedstate = ~0;
let ptr = ptr::addr_of(*sharedstate);
do task::spawn {
let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) };
access_shared(sharedstate, m2, reader1, 10);
access_shared(sharedstate, x2, reader1, 10);
c.send(());
}
access_shared(sharedstate, m, reader2, 10);
access_shared(sharedstate, x, reader2, 10);
let _ = p.recv();
assert *sharedstate == 20;
fn access_shared(sharedstate: &mut int, m: &rwlock, reader: bool,
fn access_shared(sharedstate: &mut int, x: &rwlock, reader: bool,
n: uint) {
let lock_fn = fn@(m: &rwlock, blk: fn()) {
if reader { m.read(blk); } else { m.write(blk); }
};
for n.times {
do lock_fn(m) {
do lock_rwlock_in_mode(x, reader) {
let oldval = *sharedstate;
task::yield();
*sharedstate = oldval + 1;
@ -672,6 +706,28 @@ mod tests {
c2.send(());
let _ = p1.recv();
}
}
#[cfg(test)] #[ignore(cfg(windows))]
fn rwlock_kill_helper(reader1: bool, reader2: bool) {
// Mutex must get automatically unlocked if failed/killed within.
let x = ~rwlock();
let x2 = ~x.clone();
let result: result::result<(),()> = do task::try {
do lock_rwlock_in_mode(x2, reader1) {
fail;
}
};
assert result.is_err();
// child task must have finished by the time try returns
do lock_rwlock_in_mode(x, reader2) { }
}
#[test] #[ignore(cfg(windows))]
fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(true, false); }
#[test] #[ignore(cfg(windows))]
fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(false,true ); }
#[test] #[ignore(cfg(windows))]
fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(true, true ); }
#[test] #[ignore(cfg(windows))]
fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(false,false); }
}