From fa8fc4b2b5bc66aa7b10a04a895c33de0d5e8185 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Tue, 14 Aug 2012 20:48:37 -0400 Subject: [PATCH] Add 1shot pipe chan_one/port_one type aliases and convert std::sync to use them --- src/libcore/pipes.rs | 19 ++++++++++++------- src/libstd/sync.rs | 16 ++++++++-------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 4f39c42137cf..bc6d89e92b6c 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -90,7 +90,8 @@ export send_packet, recv_packet, send, recv, try_recv, peek; export select, select2, selecti, select2i, selectable; export spawn_service, spawn_service_recv; export stream, port, chan, shared_chan, port_set, channel; -export oneshot, recv_one, try_recv_one, send_one, try_send_one; +export oneshot, chan_one, port_one; +export recv_one, try_recv_one, send_one, try_send_one; #[doc(hidden)] const SPIN_COUNT: uint = 0; @@ -1144,9 +1145,13 @@ proto! oneshot { } } +/// The send end of a oneshot pipe. +type chan_one = oneshot::client::oneshot; +/// The receive end of a oneshot pipe. +type port_one = oneshot::server::oneshot; + /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. -fn oneshot() -> (oneshot::client::oneshot, - oneshot::server::oneshot) { +fn oneshot() -> (chan_one, port_one) { oneshot::init() } @@ -1154,13 +1159,13 @@ fn oneshot() -> (oneshot::client::oneshot, * Receive a message from a oneshot pipe, failing if the connection was * closed. */ -fn recv_one(+port: oneshot::server::oneshot) -> T { +fn recv_one(+port: port_one) -> T { let oneshot::send(message) = recv(port); message } /// Receive a message from a oneshot pipe unless the connection was closed. -fn try_recv_one (+port: oneshot::server::oneshot) -> option { +fn try_recv_one (+port: port_one) -> option { let message = try_recv(port); if message == none { none } @@ -1171,7 +1176,7 @@ fn try_recv_one (+port: oneshot::server::oneshot) -> option { } /// Send a message on a oneshot pipe, failing if the connection was closed. -fn send_one(+chan: oneshot::client::oneshot, +data: T) { +fn send_one(+chan: chan_one, +data: T) { oneshot::client::send(chan, data); } @@ -1179,7 +1184,7 @@ fn send_one(+chan: oneshot::client::oneshot, +data: T) { * Send a message on a oneshot pipe, or return false if the connection was * closed. */ -fn try_send_one(+chan: oneshot::client::oneshot, +data: T) +fn try_send_one(+chan: chan_one, +data: T) -> bool { oneshot::client::try_send(chan, data).is_some() } diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs index 4233c351cf1e..85043699595f 100644 --- a/src/libstd/sync.rs +++ b/src/libstd/sync.rs @@ -18,8 +18,8 @@ import unsafe::{Exclusive, exclusive}; ****************************************************************************/ // Each waiting task receives on one of these. FIXME #3125 make these oneshot. -type wait_end = pipes::port<()>; -type signal_end = pipes::chan<()>; +type wait_end = pipes::port_one<()>; +type signal_end = pipes::chan_one<()>; // A doubly-ended queue of waiting tasks. struct waitqueue { head: pipes::port; tail: pipes::chan; } @@ -30,7 +30,7 @@ fn signal_waitqueue(q: &waitqueue) -> bool { if q.head.peek() { // Pop and send a wakeup signal. If the waiter was killed, its port // will have closed. Keep trying until we get a live task. - if q.head.recv().try_send(()) { + if pipes::try_send_one(q.head.recv(), ()) { true } else { signal_waitqueue(q) @@ -43,7 +43,7 @@ fn signal_waitqueue(q: &waitqueue) -> bool { fn broadcast_waitqueue(q: &waitqueue) -> uint { let mut count = 0; while q.head.peek() { - if q.head.recv().try_send(()) { + if pipes::try_send_one(q.head.recv(), ()) { count += 1; } } @@ -80,7 +80,7 @@ impl &sem { state.count -= 1; if state.count < 0 { // Create waiter nobe. - let (signal_end, wait_end) = pipes::stream(); + let (signal_end, wait_end) = pipes::oneshot(); // Tell outer scope we need to block. waiter_nobe = some(wait_end); // Enqueue ourself. @@ -92,7 +92,7 @@ impl &sem { /* for 1000.times { task::yield(); } */ // Need to wait outside the exclusive. if waiter_nobe.is_some() { - let _ = option::unwrap(waiter_nobe).recv(); + let _ = pipes::recv_one(option::unwrap(waiter_nobe)); } } fn release() { @@ -151,7 +151,7 @@ impl &condvar { /// Atomically drop the associated lock, and block until a signal is sent. fn wait() { // Create waiter nobe. - let (signal_end, wait_end) = pipes::stream(); + let (signal_end, wait_end) = pipes::oneshot(); let mut signal_end = some(signal_end); let mut reacquire = none; unsafe { @@ -177,7 +177,7 @@ impl &condvar { } // Unconditionally "block". (Might not actually block if a signaller // did send -- I mean 'unconditionally' in contrast with acquire().) - let _ = wait_end.recv(); + let _ = pipes::recv_one(wait_end); // This is needed for a failing condition variable to reacquire the // mutex during unwinding. As long as the wrapper (mutex, etc) is