Add 1shot pipe chan_one/port_one type aliases and convert std::sync to use them

This commit is contained in:
Ben Blum 2012-08-14 20:48:37 -04:00
parent a63f85ce8c
commit fa8fc4b2b5
2 changed files with 20 additions and 15 deletions

View file

@ -90,7 +90,8 @@ export send_packet, recv_packet, send, recv, try_recv, peek;
export select, select2, selecti, select2i, selectable;
export spawn_service, spawn_service_recv;
export stream, port, chan, shared_chan, port_set, channel;
export oneshot, recv_one, try_recv_one, send_one, try_send_one;
export oneshot, chan_one, port_one;
export recv_one, try_recv_one, send_one, try_send_one;
#[doc(hidden)]
const SPIN_COUNT: uint = 0;
@ -1144,9 +1145,13 @@ proto! oneshot {
}
}
/// The send end of a oneshot pipe.
type chan_one<T: send> = oneshot::client::oneshot<T>;
/// The receive end of a oneshot pipe.
type port_one<T: send> = oneshot::server::oneshot<T>;
/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
fn oneshot<T: send>() -> (oneshot::client::oneshot<T>,
oneshot::server::oneshot<T>) {
fn oneshot<T: send>() -> (chan_one<T>, port_one<T>) {
oneshot::init()
}
@ -1154,13 +1159,13 @@ fn oneshot<T: send>() -> (oneshot::client::oneshot<T>,
* Receive a message from a oneshot pipe, failing if the connection was
* closed.
*/
fn recv_one<T: send>(+port: oneshot::server::oneshot<T>) -> T {
fn recv_one<T: send>(+port: port_one<T>) -> T {
let oneshot::send(message) = recv(port);
message
}
/// Receive a message from a oneshot pipe unless the connection was closed.
fn try_recv_one<T: send> (+port: oneshot::server::oneshot<T>) -> option<T> {
fn try_recv_one<T: send> (+port: port_one<T>) -> option<T> {
let message = try_recv(port);
if message == none { none }
@ -1171,7 +1176,7 @@ fn try_recv_one<T: send> (+port: oneshot::server::oneshot<T>) -> option<T> {
}
/// Send a message on a oneshot pipe, failing if the connection was closed.
fn send_one<T: send>(+chan: oneshot::client::oneshot<T>, +data: T) {
fn send_one<T: send>(+chan: chan_one<T>, +data: T) {
oneshot::client::send(chan, data);
}
@ -1179,7 +1184,7 @@ fn send_one<T: send>(+chan: oneshot::client::oneshot<T>, +data: T) {
* Send a message on a oneshot pipe, or return false if the connection was
* closed.
*/
fn try_send_one<T: send>(+chan: oneshot::client::oneshot<T>, +data: T)
fn try_send_one<T: send>(+chan: chan_one<T>, +data: T)
-> bool {
oneshot::client::try_send(chan, data).is_some()
}

View file

@ -18,8 +18,8 @@ import unsafe::{Exclusive, exclusive};
****************************************************************************/
// Each waiting task receives on one of these. FIXME #3125 make these oneshot.
type wait_end = pipes::port<()>;
type signal_end = pipes::chan<()>;
type wait_end = pipes::port_one<()>;
type signal_end = pipes::chan_one<()>;
// A doubly-ended queue of waiting tasks.
struct waitqueue { head: pipes::port<signal_end>;
tail: pipes::chan<signal_end>; }
@ -30,7 +30,7 @@ fn signal_waitqueue(q: &waitqueue) -> bool {
if q.head.peek() {
// Pop and send a wakeup signal. If the waiter was killed, its port
// will have closed. Keep trying until we get a live task.
if q.head.recv().try_send(()) {
if pipes::try_send_one(q.head.recv(), ()) {
true
} else {
signal_waitqueue(q)
@ -43,7 +43,7 @@ fn signal_waitqueue(q: &waitqueue) -> bool {
fn broadcast_waitqueue(q: &waitqueue) -> uint {
let mut count = 0;
while q.head.peek() {
if q.head.recv().try_send(()) {
if pipes::try_send_one(q.head.recv(), ()) {
count += 1;
}
}
@ -80,7 +80,7 @@ impl<Q: send> &sem<Q> {
state.count -= 1;
if state.count < 0 {
// Create waiter nobe.
let (signal_end, wait_end) = pipes::stream();
let (signal_end, wait_end) = pipes::oneshot();
// Tell outer scope we need to block.
waiter_nobe = some(wait_end);
// Enqueue ourself.
@ -92,7 +92,7 @@ impl<Q: send> &sem<Q> {
/* for 1000.times { task::yield(); } */
// Need to wait outside the exclusive.
if waiter_nobe.is_some() {
let _ = option::unwrap(waiter_nobe).recv();
let _ = pipes::recv_one(option::unwrap(waiter_nobe));
}
}
fn release() {
@ -151,7 +151,7 @@ impl &condvar {
/// Atomically drop the associated lock, and block until a signal is sent.
fn wait() {
// Create waiter nobe.
let (signal_end, wait_end) = pipes::stream();
let (signal_end, wait_end) = pipes::oneshot();
let mut signal_end = some(signal_end);
let mut reacquire = none;
unsafe {
@ -177,7 +177,7 @@ impl &condvar {
}
// Unconditionally "block". (Might not actually block if a signaller
// did send -- I mean 'unconditionally' in contrast with acquire().)
let _ = wait_end.recv();
let _ = pipes::recv_one(wait_end);
// This is needed for a failing condition variable to reacquire the
// mutex during unwinding. As long as the wrapper (mutex, etc) is