Convert sync to the new struct syntax
This commit is contained in:
parent
770a21272b
commit
4c9f168372
2 changed files with 51 additions and 50 deletions
|
|
@ -5,10 +5,7 @@
|
|||
* in std.
|
||||
*/
|
||||
|
||||
export condvar;
|
||||
export semaphore, new_semaphore;
|
||||
export mutex, new_mutex;
|
||||
export rwlock;
|
||||
export condvar, semaphore, mutex, rwlock;
|
||||
|
||||
// FIXME (#3119) This shouldn't be a thing exported from core.
|
||||
import arc::exclusive;
|
||||
|
|
@ -21,8 +18,8 @@ import arc::exclusive;
|
|||
type wait_end = pipes::port<()>;
|
||||
type signal_end = pipes::chan<()>;
|
||||
// A doubly-ended queue of waiting tasks.
|
||||
type waitqueue = { head: pipes::port<signal_end>,
|
||||
tail: pipes::chan<signal_end> };
|
||||
struct waitqueue { head: pipes::port<signal_end>;
|
||||
tail: pipes::chan<signal_end>; }
|
||||
|
||||
// Signals one live task from the queue.
|
||||
fn signal_waitqueue(q: &waitqueue) -> bool {
|
||||
|
|
@ -51,23 +48,25 @@ fn broadcast_waitqueue(q: &waitqueue) -> uint {
|
|||
}
|
||||
|
||||
// The building-block used to make semaphores, mutexes, and rwlocks.
|
||||
enum sem<Q: send> = exclusive<{
|
||||
mut count: int,
|
||||
waiters: waitqueue,
|
||||
struct sem_inner<Q> {
|
||||
mut count: int;
|
||||
waiters: waitqueue;
|
||||
// Can be either unit or another waitqueue. Some sems shouldn't come with
|
||||
// a condition variable attached, others should.
|
||||
blocked: Q,
|
||||
}>;
|
||||
blocked: Q;
|
||||
}
|
||||
enum sem<Q: send> = exclusive<sem_inner<Q>>;
|
||||
|
||||
fn new_sem<Q: send>(count: int, +q: Q) -> sem<Q> {
|
||||
let (wait_tail, wait_head) = pipes::stream();
|
||||
sem(exclusive({ mut count: count,
|
||||
waiters: { head: wait_head, tail: wait_tail },
|
||||
blocked: q }))
|
||||
sem(exclusive(sem_inner {
|
||||
mut count: count,
|
||||
waiters: waitqueue { head: wait_head, tail: wait_tail },
|
||||
blocked: q }))
|
||||
}
|
||||
fn new_sem_and_signal(count: int) -> sem<waitqueue> {
|
||||
let (block_tail, block_head) = pipes::stream();
|
||||
new_sem(count, { head: block_head, tail: block_tail })
|
||||
new_sem(count, waitqueue { head: block_head, tail: block_tail })
|
||||
}
|
||||
|
||||
impl<Q: send> &sem<Q> {
|
||||
|
|
@ -143,7 +142,7 @@ struct sem_and_signal_release {
|
|||
}
|
||||
|
||||
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
|
||||
enum condvar = &sem<waitqueue>;
|
||||
struct condvar { priv sem: &sem<waitqueue>; }
|
||||
|
||||
impl condvar {
|
||||
/// Atomically drop the associated lock, and block until a signal is sent.
|
||||
|
|
@ -158,10 +157,10 @@ impl condvar {
|
|||
// killed before or after enqueueing. Deciding whether to
|
||||
// unkillably reacquire the lock needs to happen atomically
|
||||
// wrt enqueuing.
|
||||
reacquire = some(sem_and_signal_reacquire(*self));
|
||||
reacquire = some(sem_and_signal_reacquire(self.sem));
|
||||
|
||||
// Release lock, 'atomically' enqueuing ourselves in so doing.
|
||||
do (***self).with |state| {
|
||||
do (**self.sem).with |state| {
|
||||
// Drop the lock.
|
||||
// FIXME(#3145) investigate why factoring doesn't compile.
|
||||
state.count += 1;
|
||||
|
|
@ -196,7 +195,7 @@ impl condvar {
|
|||
/// Wake up a blocked task. Returns false if there was no blocked task.
|
||||
fn signal() -> bool {
|
||||
unsafe {
|
||||
do (***self).with |state| {
|
||||
do (**self.sem).with |state| {
|
||||
signal_waitqueue(&state.blocked)
|
||||
}
|
||||
}
|
||||
|
|
@ -205,7 +204,7 @@ impl condvar {
|
|||
/// Wake up all blocked tasks. Returns the number of tasks woken.
|
||||
fn broadcast() -> uint {
|
||||
unsafe {
|
||||
do (***self).with |state| {
|
||||
do (**self.sem).with |state| {
|
||||
// FIXME(#3145) fix :broadcast_heavy
|
||||
broadcast_waitqueue(&state.blocked)
|
||||
}
|
||||
|
|
@ -215,7 +214,7 @@ impl condvar {
|
|||
|
||||
impl &sem<waitqueue> {
|
||||
fn access_cond<U>(blk: fn(condvar) -> U) -> U {
|
||||
do self.access { blk(condvar(self)) }
|
||||
do self.access { blk(condvar { sem: self }) }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -224,30 +223,32 @@ impl &sem<waitqueue> {
|
|||
****************************************************************************/
|
||||
|
||||
/// A counting, blocking, bounded-waiting semaphore.
|
||||
enum semaphore = sem<()>;
|
||||
struct semaphore { priv sem: sem<()>; }
|
||||
|
||||
/// Create a new semaphore with the specified count.
|
||||
fn new_semaphore(count: int) -> semaphore { semaphore(new_sem(count, ())) }
|
||||
fn semaphore(count: int) -> semaphore {
|
||||
semaphore { sem: new_sem(count, ()) }
|
||||
}
|
||||
|
||||
impl &semaphore {
|
||||
/// Create a new handle to the semaphore.
|
||||
fn clone() -> semaphore { semaphore(sem((***self).clone())) }
|
||||
fn clone() -> semaphore { semaphore { sem: sem((*self.sem).clone()) } }
|
||||
|
||||
/**
|
||||
* Acquire a resource represented by the semaphore. Blocks if necessary
|
||||
* until resource(s) become available.
|
||||
*/
|
||||
fn acquire() { (&**self).acquire() }
|
||||
fn acquire() { (&self.sem).acquire() }
|
||||
|
||||
/**
|
||||
* Release a held resource represented by the semaphore. Wakes a blocked
|
||||
* contending task, if any exist. Won't block the caller.
|
||||
*/
|
||||
fn release() { (&**self).release() }
|
||||
fn release() { (&self.sem).release() }
|
||||
|
||||
/// Run a function with ownership of one of the semaphore's resources.
|
||||
// FIXME(#3145): figure out whether or not this should get exported.
|
||||
fn access<U>(blk: fn() -> U) -> U { (&**self).access(blk) }
|
||||
fn access<U>(blk: fn() -> U) -> U { (&self.sem).access(blk) }
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
|
|
@ -259,21 +260,21 @@ impl &semaphore {
|
|||
* FIFO condition variable.
|
||||
* FIXME(#3145): document killability
|
||||
*/
|
||||
enum mutex = sem<waitqueue>;
|
||||
struct mutex { priv sem: sem<waitqueue>; }
|
||||
|
||||
/// Create a new mutex.
|
||||
fn new_mutex() -> mutex { mutex(new_sem_and_signal(1)) }
|
||||
fn mutex() -> mutex { mutex { sem: new_sem_and_signal(1) } }
|
||||
|
||||
impl &mutex {
|
||||
/// Create a new handle to the mutex.
|
||||
fn clone() -> mutex { mutex(sem((***self).clone())) }
|
||||
fn clone() -> mutex { mutex { sem: sem((*self.sem).clone()) } }
|
||||
|
||||
/// Run a function with ownership of the mutex.
|
||||
fn lock<U>(blk: fn() -> U) -> U { (&**self).access(blk) }
|
||||
fn lock<U>(blk: fn() -> U) -> U { (&self.sem).access(blk) }
|
||||
|
||||
/// Run a function with ownership of the mutex and a handle to a condvar.
|
||||
fn lock_cond<U>(blk: fn(condvar) -> U) -> U {
|
||||
(&**self).access_cond(blk)
|
||||
(&self.sem).access_cond(blk)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -290,14 +291,14 @@ struct rwlock_inner {
|
|||
|
||||
/// A blocking, no-starvation, reader-writer lock with an associated condvar.
|
||||
struct rwlock {
|
||||
order_lock: semaphore;
|
||||
access_lock: sem<waitqueue>;
|
||||
state: arc::exclusive<rwlock_inner>;
|
||||
/* priv */ order_lock: semaphore;
|
||||
/* priv */ access_lock: sem<waitqueue>;
|
||||
/* priv */ state: arc::exclusive<rwlock_inner>;
|
||||
}
|
||||
|
||||
/// Create a new rwlock.
|
||||
fn rwlock() -> rwlock {
|
||||
rwlock { order_lock: new_semaphore(1), access_lock: new_sem_and_signal(1),
|
||||
rwlock { order_lock: semaphore(1), access_lock: new_sem_and_signal(1),
|
||||
state: arc::exclusive(rwlock_inner { read_mode: false,
|
||||
read_count: 0 }) }
|
||||
}
|
||||
|
|
@ -399,19 +400,19 @@ mod tests {
|
|||
************************************************************************/
|
||||
#[test]
|
||||
fn test_sem_acquire_release() {
|
||||
let s = ~new_semaphore(1);
|
||||
let s = ~semaphore(1);
|
||||
s.acquire();
|
||||
s.release();
|
||||
s.acquire();
|
||||
}
|
||||
#[test]
|
||||
fn test_sem_basic() {
|
||||
let s = ~new_semaphore(1);
|
||||
let s = ~semaphore(1);
|
||||
do s.access { }
|
||||
}
|
||||
#[test]
|
||||
fn test_sem_as_mutex() {
|
||||
let s = ~new_semaphore(1);
|
||||
let s = ~semaphore(1);
|
||||
let s2 = ~s.clone();
|
||||
do task::spawn {
|
||||
do s2.access {
|
||||
|
|
@ -426,7 +427,7 @@ mod tests {
|
|||
fn test_sem_as_cvar() {
|
||||
/* Child waits and parent signals */
|
||||
let (c,p) = pipes::stream();
|
||||
let s = ~new_semaphore(0);
|
||||
let s = ~semaphore(0);
|
||||
let s2 = ~s.clone();
|
||||
do task::spawn {
|
||||
s2.acquire();
|
||||
|
|
@ -438,7 +439,7 @@ mod tests {
|
|||
|
||||
/* Parent waits and child signals */
|
||||
let (c,p) = pipes::stream();
|
||||
let s = ~new_semaphore(0);
|
||||
let s = ~semaphore(0);
|
||||
let s2 = ~s.clone();
|
||||
do task::spawn {
|
||||
for 5.times { task::yield(); }
|
||||
|
|
@ -452,7 +453,7 @@ mod tests {
|
|||
fn test_sem_multi_resource() {
|
||||
// Parent and child both get in the critical section at the same
|
||||
// time, and shake hands.
|
||||
let s = ~new_semaphore(2);
|
||||
let s = ~semaphore(2);
|
||||
let s2 = ~s.clone();
|
||||
let (c1,p1) = pipes::stream();
|
||||
let (c2,p2) = pipes::stream();
|
||||
|
|
@ -472,7 +473,7 @@ mod tests {
|
|||
// Force the runtime to schedule two threads on the same sched_loop.
|
||||
// When one blocks, it should schedule the other one.
|
||||
do task::spawn_sched(task::manual_threads(1)) {
|
||||
let s = ~new_semaphore(1);
|
||||
let s = ~semaphore(1);
|
||||
let s2 = ~s.clone();
|
||||
let (c,p) = pipes::stream();
|
||||
let child_data = ~mut some((s2,c));
|
||||
|
|
@ -497,7 +498,7 @@ mod tests {
|
|||
// Unsafely achieve shared state, and do the textbook
|
||||
// "load tmp <- ptr; inc tmp; store ptr <- tmp" dance.
|
||||
let (c,p) = pipes::stream();
|
||||
let m = ~new_mutex();
|
||||
let m = ~mutex();
|
||||
let m2 = ~m.clone();
|
||||
let sharedstate = ~0;
|
||||
let ptr = ptr::addr_of(*sharedstate);
|
||||
|
|
@ -523,7 +524,7 @@ mod tests {
|
|||
}
|
||||
#[test]
|
||||
fn test_mutex_cond_wait() {
|
||||
let m = ~new_mutex();
|
||||
let m = ~mutex();
|
||||
|
||||
// Child wakes up parent
|
||||
do m.lock_cond |cond| {
|
||||
|
|
@ -555,7 +556,7 @@ mod tests {
|
|||
}
|
||||
#[cfg(test)]
|
||||
fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
|
||||
let m = ~new_mutex();
|
||||
let m = ~mutex();
|
||||
let mut ports = ~[];
|
||||
|
||||
for num_waiters.times {
|
||||
|
|
@ -590,7 +591,7 @@ mod tests {
|
|||
}
|
||||
#[test]
|
||||
fn test_mutex_cond_no_waiter() {
|
||||
let m = ~new_mutex();
|
||||
let m = ~mutex();
|
||||
let m2 = ~m.clone();
|
||||
do task::try {
|
||||
do m.lock_cond |_x| { }
|
||||
|
|
@ -602,7 +603,7 @@ mod tests {
|
|||
#[test] #[ignore(cfg(windows))]
|
||||
fn test_mutex_killed_simple() {
|
||||
// Mutex must get automatically unlocked if failed/killed within.
|
||||
let m = ~new_mutex();
|
||||
let m = ~mutex();
|
||||
let m2 = ~m.clone();
|
||||
|
||||
let result: result::result<(),()> = do task::try {
|
||||
|
|
@ -618,7 +619,7 @@ mod tests {
|
|||
fn test_mutex_killed_cond() {
|
||||
// Getting killed during cond wait must not corrupt the mutex while
|
||||
// unwinding (e.g. double unlock).
|
||||
let m = ~new_mutex();
|
||||
let m = ~mutex();
|
||||
let m2 = ~m.clone();
|
||||
|
||||
let result: result::result<(),()> = do task::try {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
// error-pattern: reference is not valid outside of its lifetime
|
||||
fn main() {
|
||||
let m = ~sync::new_mutex();
|
||||
let m = ~sync::mutex();
|
||||
let mut cond = none;
|
||||
do m.lock_cond |c| {
|
||||
cond = some(c);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue