diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index e539b067edd1..045aeb0feda0 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -18,6 +18,7 @@ use std::borrow; use std::comm; +use std::comm::SendDeferred; use std::task; use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox}; use std::unstable::atomics; @@ -49,7 +50,7 @@ impl WaitQueue { if self.head.peek() { // Pop and send a wakeup signal. If the waiter was killed, its port // will have closed. Keep trying until we get a live task. - if comm::try_send_one(self.head.recv(), ()) { + if self.head.recv().try_send_deferred(()) { true } else { self.signal() @@ -62,7 +63,7 @@ impl WaitQueue { fn broadcast(&self) -> uint { let mut count = 0; while self.head.peek() { - if comm::try_send_one(self.head.recv(), ()) { + if self.head.recv().try_send_deferred(()) { count += 1; } } @@ -102,7 +103,7 @@ impl Sem { // Tell outer scope we need to block. waiter_nobe = Some(WaitEnd); // Enqueue ourself. - state.waiters.tail.send(SignalEnd); + state.waiters.tail.send_deferred(SignalEnd); } } // Uncomment if you wish to test for sem races. Not valgrind-friendly. @@ -256,7 +257,7 @@ impl<'self> Condvar<'self> { } // Enqueue ourself to be woken up by a signaller. let SignalEnd = SignalEnd.take_unwrap(); - state.blocked[condvar_id].tail.send(SignalEnd); + state.blocked[condvar_id].tail.send_deferred(SignalEnd); } else { out_of_bounds = Some(state.blocked.len()); } diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index acdf2cee841f..a0731dc3494c 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -19,6 +19,7 @@ use either::{Either, Left, Right}; use kinds::Send; use option::{Option, Some}; use unstable::sync::Exclusive; +pub use rt::comm::SendDeferred; use rtcomm = rt::comm; use rt; @@ -105,6 +106,21 @@ impl GenericSmartChan for Chan { } } +impl SendDeferred for Chan { + fn send_deferred(&self, x: T) { + match self.inner { + Left(ref chan) => chan.send(x), + Right(ref chan) => chan.send_deferred(x) + } + } + fn try_send_deferred(&self, x: T) -> bool { + match self.inner { + Left(ref chan) => chan.try_send(x), + Right(ref chan) => chan.try_send_deferred(x) + } + } +} + impl GenericPort for Port { fn recv(&self) -> T { match self.inner { @@ -250,6 +266,20 @@ impl ChanOne { Right(p) => p.try_send(data) } } + pub fn send_deferred(self, data: T) { + let ChanOne { inner } = self; + match inner { + Left(p) => p.send(data), + Right(p) => p.send_deferred(data) + } + } + pub fn try_send_deferred(self, data: T) -> bool { + let ChanOne { inner } = self; + match inner { + Left(p) => p.try_send(data), + Right(p) => p.try_send_deferred(data) + } + } } pub fn recv_one(port: PortOne) -> T { diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 00e1aaa21932..c19ac8aa3371 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; use rt::{context, SchedulerContext}; +use tuple::ImmutableTuple; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -86,12 +87,32 @@ impl ChanOne { } } + /// Send a message on the one-shot channel. If a receiver task is blocked + /// waiting for the message, will wake it up and reschedule to it. pub fn send(self, val: T) { self.try_send(val); } + /// As `send`, but also returns whether or not the receiver endpoint is still open. pub fn try_send(self, val: T) -> bool { + self.try_send_inner(val, true) + } + /// Send a message without immediately rescheduling to a blocked receiver. + /// This can be useful in contexts where rescheduling is forbidden, or to + /// optimize for when the sender expects to still have useful work to do. + pub fn send_deferred(self, val: T) { + self.try_send_deferred(val); + } + + /// As `send_deferred` and `try_send` together. + pub fn try_send_deferred(self, val: T) -> bool { + self.try_send_inner(val, false) + } + + // 'do_resched' configures whether the scheduler immediately switches to + // the receiving task, or leaves the sending task still running. + fn try_send_inner(self, val: T, do_resched: bool) -> bool { rtassert!(context() != SchedulerContext); let mut this = self; @@ -130,9 +151,16 @@ impl ChanOne { task_as_state => { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); - do recvr.wake().map_consume |woken_task| { - Scheduler::run_task(woken_task); - }; + if do_resched { + do recvr.wake().map_consume |woken_task| { + Scheduler::run_task(woken_task); + }; + } else { + let recvr = Cell::new(recvr); + do Local::borrow:: |sched| { + sched.enqueue_blocked_task(recvr.take()); + } + } } } } @@ -152,6 +180,7 @@ impl PortOne { } } + /// Wait for a message on the one-shot port. Fails if the send end is closed. pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, @@ -161,6 +190,7 @@ impl PortOne { } } + /// As `recv`, but returns `None` if the send end is closed rather than failing. pub fn try_recv(self) -> Option { let mut this = self; @@ -382,6 +412,12 @@ impl Drop for PortOne { } } +/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. +pub trait SendDeferred { + fn send_deferred(&self, val: T); + fn try_send_deferred(&self, val: T) -> bool; +} + struct StreamPayload { val: T, next: PortOne> @@ -409,6 +445,15 @@ pub fn stream() -> (Port, Chan) { return (port, chan); } +impl Chan { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + let (next_pone, next_cone) = oneshot(); + let cone = self.next.take(); + self.next.put_back(next_cone); + cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) + } +} + impl GenericChan for Chan { fn send(&self, val: T) { self.try_send(val); @@ -417,10 +462,16 @@ impl GenericChan for Chan { impl GenericSmartChan for Chan { fn try_send(&self, val: T) -> bool { - let (next_pone, next_cone) = oneshot(); - let cone = self.next.take(); - self.next.put_back(next_cone); - cone.try_send(StreamPayload { val: val, next: next_pone }) + self.try_send_inner(val, true) + } +} + +impl SendDeferred for Chan { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -495,6 +546,17 @@ impl SharedChan { } } +impl SharedChan { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, + do_resched) + } + } +} + impl GenericChan for SharedChan { fn send(&self, val: T) { self.try_send(val); @@ -503,11 +565,16 @@ impl GenericChan for SharedChan { impl GenericSmartChan for SharedChan { fn try_send(&self, val: T) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) - } + self.try_send_inner(val, true) + } +} + +impl SendDeferred for SharedChan { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -584,31 +651,32 @@ pub fn megapipe() -> MegaPipe { impl GenericChan for MegaPipe { fn send(&self, val: T) { - match *self { - (_, ref c) => c.send(val) - } + self.second_ref().send(val) } } impl GenericSmartChan for MegaPipe { fn try_send(&self, val: T) -> bool { - match *self { - (_, ref c) => c.try_send(val) - } + self.second_ref().try_send(val) } } impl GenericPort for MegaPipe { fn recv(&self) -> T { - match *self { - (ref p, _) => p.recv() - } + self.first_ref().recv() } fn try_recv(&self) -> Option { - match *self { - (ref p, _) => p.try_recv() - } + self.first_ref().try_recv() + } +} + +impl SendDeferred for MegaPipe { + fn send_deferred(&self, val: T) { + self.second_ref().send_deferred(val) + } + fn try_send_deferred(&self, val: T) -> bool { + self.second_ref().try_send_deferred(val) } } @@ -1017,4 +1085,39 @@ mod test { } } } + + #[test] + fn send_deferred() { + use unstable::sync::atomically; + + // Tests no-rescheduling of send_deferred on all types of channels. + do run_in_newsched_task { + let (pone, cone) = oneshot(); + let (pstream, cstream) = stream(); + let (pshared, cshared) = stream(); + let cshared = SharedChan::new(cshared); + let mp = megapipe(); + + let pone = Cell::new(pone); + do spawntask { pone.take().recv(); } + let pstream = Cell::new(pstream); + do spawntask { pstream.take().recv(); } + let pshared = Cell::new(pshared); + do spawntask { pshared.take().recv(); } + let p_mp = Cell::new(mp.clone()); + do spawntask { p_mp.take().recv(); } + + let cs = Cell::new((cone, cstream, cshared, mp)); + unsafe { + do atomically { + let (cone, cstream, cshared, mp) = cs.take(); + cone.send_deferred(()); + cstream.send_deferred(()); + cshared.send_deferred(()); + mp.send_deferred(()); + } + } + } + } + }