Add SendDeferred trait and use it to fix #8214.
This commit is contained in:
parent
f1c1f92d0c
commit
be7738bfa1
3 changed files with 162 additions and 28 deletions
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
use std::borrow;
|
||||
use std::comm;
|
||||
use std::comm::SendDeferred;
|
||||
use std::task;
|
||||
use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
|
||||
use std::unstable::atomics;
|
||||
|
|
@ -49,7 +50,7 @@ impl WaitQueue {
|
|||
if self.head.peek() {
|
||||
// Pop and send a wakeup signal. If the waiter was killed, its port
|
||||
// will have closed. Keep trying until we get a live task.
|
||||
if comm::try_send_one(self.head.recv(), ()) {
|
||||
if self.head.recv().try_send_deferred(()) {
|
||||
true
|
||||
} else {
|
||||
self.signal()
|
||||
|
|
@ -62,7 +63,7 @@ impl WaitQueue {
|
|||
fn broadcast(&self) -> uint {
|
||||
let mut count = 0;
|
||||
while self.head.peek() {
|
||||
if comm::try_send_one(self.head.recv(), ()) {
|
||||
if self.head.recv().try_send_deferred(()) {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
|
|
@ -102,7 +103,7 @@ impl<Q:Send> Sem<Q> {
|
|||
// Tell outer scope we need to block.
|
||||
waiter_nobe = Some(WaitEnd);
|
||||
// Enqueue ourself.
|
||||
state.waiters.tail.send(SignalEnd);
|
||||
state.waiters.tail.send_deferred(SignalEnd);
|
||||
}
|
||||
}
|
||||
// Uncomment if you wish to test for sem races. Not valgrind-friendly.
|
||||
|
|
@ -256,7 +257,7 @@ impl<'self> Condvar<'self> {
|
|||
}
|
||||
// Enqueue ourself to be woken up by a signaller.
|
||||
let SignalEnd = SignalEnd.take_unwrap();
|
||||
state.blocked[condvar_id].tail.send(SignalEnd);
|
||||
state.blocked[condvar_id].tail.send_deferred(SignalEnd);
|
||||
} else {
|
||||
out_of_bounds = Some(state.blocked.len());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ use either::{Either, Left, Right};
|
|||
use kinds::Send;
|
||||
use option::{Option, Some};
|
||||
use unstable::sync::Exclusive;
|
||||
pub use rt::comm::SendDeferred;
|
||||
use rtcomm = rt::comm;
|
||||
use rt;
|
||||
|
||||
|
|
@ -105,6 +106,21 @@ impl<T: Send> GenericSmartChan<T> for Chan<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for Chan<T> {
|
||||
fn send_deferred(&self, x: T) {
|
||||
match self.inner {
|
||||
Left(ref chan) => chan.send(x),
|
||||
Right(ref chan) => chan.send_deferred(x)
|
||||
}
|
||||
}
|
||||
fn try_send_deferred(&self, x: T) -> bool {
|
||||
match self.inner {
|
||||
Left(ref chan) => chan.try_send(x),
|
||||
Right(ref chan) => chan.try_send_deferred(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericPort<T> for Port<T> {
|
||||
fn recv(&self) -> T {
|
||||
match self.inner {
|
||||
|
|
@ -250,6 +266,20 @@ impl<T: Send> ChanOne<T> {
|
|||
Right(p) => p.try_send(data)
|
||||
}
|
||||
}
|
||||
pub fn send_deferred(self, data: T) {
|
||||
let ChanOne { inner } = self;
|
||||
match inner {
|
||||
Left(p) => p.send(data),
|
||||
Right(p) => p.send_deferred(data)
|
||||
}
|
||||
}
|
||||
pub fn try_send_deferred(self, data: T) -> bool {
|
||||
let ChanOne { inner } = self;
|
||||
match inner {
|
||||
Left(p) => p.try_send(data),
|
||||
Right(p) => p.try_send_deferred(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
|
|||
use cell::Cell;
|
||||
use clone::Clone;
|
||||
use rt::{context, SchedulerContext};
|
||||
use tuple::ImmutableTuple;
|
||||
|
||||
/// A combined refcount / BlockedTask-as-uint pointer.
|
||||
///
|
||||
|
|
@ -86,12 +87,32 @@ impl<T> ChanOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Send a message on the one-shot channel. If a receiver task is blocked
|
||||
/// waiting for the message, will wake it up and reschedule to it.
|
||||
pub fn send(self, val: T) {
|
||||
self.try_send(val);
|
||||
}
|
||||
|
||||
/// As `send`, but also returns whether or not the receiver endpoint is still open.
|
||||
pub fn try_send(self, val: T) -> bool {
|
||||
self.try_send_inner(val, true)
|
||||
}
|
||||
|
||||
/// Send a message without immediately rescheduling to a blocked receiver.
|
||||
/// This can be useful in contexts where rescheduling is forbidden, or to
|
||||
/// optimize for when the sender expects to still have useful work to do.
|
||||
pub fn send_deferred(self, val: T) {
|
||||
self.try_send_deferred(val);
|
||||
}
|
||||
|
||||
/// As `send_deferred` and `try_send` together.
|
||||
pub fn try_send_deferred(self, val: T) -> bool {
|
||||
self.try_send_inner(val, false)
|
||||
}
|
||||
|
||||
// 'do_resched' configures whether the scheduler immediately switches to
|
||||
// the receiving task, or leaves the sending task still running.
|
||||
fn try_send_inner(self, val: T, do_resched: bool) -> bool {
|
||||
rtassert!(context() != SchedulerContext);
|
||||
|
||||
let mut this = self;
|
||||
|
|
@ -130,9 +151,16 @@ impl<T> ChanOne<T> {
|
|||
task_as_state => {
|
||||
// Port is blocked. Wake it up.
|
||||
let recvr = BlockedTask::cast_from_uint(task_as_state);
|
||||
do recvr.wake().map_consume |woken_task| {
|
||||
Scheduler::run_task(woken_task);
|
||||
};
|
||||
if do_resched {
|
||||
do recvr.wake().map_consume |woken_task| {
|
||||
Scheduler::run_task(woken_task);
|
||||
};
|
||||
} else {
|
||||
let recvr = Cell::new(recvr);
|
||||
do Local::borrow::<Scheduler, ()> |sched| {
|
||||
sched.enqueue_blocked_task(recvr.take());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -152,6 +180,7 @@ impl<T> PortOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Wait for a message on the one-shot port. Fails if the send end is closed.
|
||||
pub fn recv(self) -> T {
|
||||
match self.try_recv() {
|
||||
Some(val) => val,
|
||||
|
|
@ -161,6 +190,7 @@ impl<T> PortOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// As `recv`, but returns `None` if the send end is closed rather than failing.
|
||||
pub fn try_recv(self) -> Option<T> {
|
||||
let mut this = self;
|
||||
|
||||
|
|
@ -382,6 +412,12 @@ impl<T> Drop for PortOne<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
|
||||
pub trait SendDeferred<T> {
|
||||
fn send_deferred(&self, val: T);
|
||||
fn try_send_deferred(&self, val: T) -> bool;
|
||||
}
|
||||
|
||||
struct StreamPayload<T> {
|
||||
val: T,
|
||||
next: PortOne<StreamPayload<T>>
|
||||
|
|
@ -409,6 +445,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
|
|||
return (port, chan);
|
||||
}
|
||||
|
||||
impl<T: Send> Chan<T> {
|
||||
fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = self.next.take();
|
||||
self.next.put_back(next_cone);
|
||||
cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericChan<T> for Chan<T> {
|
||||
fn send(&self, val: T) {
|
||||
self.try_send(val);
|
||||
|
|
@ -417,10 +462,16 @@ impl<T: Send> GenericChan<T> for Chan<T> {
|
|||
|
||||
impl<T: Send> GenericSmartChan<T> for Chan<T> {
|
||||
fn try_send(&self, val: T) -> bool {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = self.next.take();
|
||||
self.next.put_back(next_cone);
|
||||
cone.try_send(StreamPayload { val: val, next: next_pone })
|
||||
self.try_send_inner(val, true)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for Chan<T> {
|
||||
fn send_deferred(&self, val: T) {
|
||||
self.try_send_deferred(val);
|
||||
}
|
||||
fn try_send_deferred(&self, val: T) -> bool {
|
||||
self.try_send_inner(val, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -495,6 +546,17 @@ impl<T> SharedChan<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SharedChan<T> {
|
||||
fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
|
||||
unsafe {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
|
||||
cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
|
||||
do_resched)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericChan<T> for SharedChan<T> {
|
||||
fn send(&self, val: T) {
|
||||
self.try_send(val);
|
||||
|
|
@ -503,11 +565,16 @@ impl<T: Send> GenericChan<T> for SharedChan<T> {
|
|||
|
||||
impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
|
||||
fn try_send(&self, val: T) -> bool {
|
||||
unsafe {
|
||||
let (next_pone, next_cone) = oneshot();
|
||||
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
|
||||
cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
|
||||
}
|
||||
self.try_send_inner(val, true)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for SharedChan<T> {
|
||||
fn send_deferred(&self, val: T) {
|
||||
self.try_send_deferred(val);
|
||||
}
|
||||
fn try_send_deferred(&self, val: T) -> bool {
|
||||
self.try_send_inner(val, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -584,31 +651,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> {
|
|||
|
||||
impl<T: Send> GenericChan<T> for MegaPipe<T> {
|
||||
fn send(&self, val: T) {
|
||||
match *self {
|
||||
(_, ref c) => c.send(val)
|
||||
}
|
||||
self.second_ref().send(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
|
||||
fn try_send(&self, val: T) -> bool {
|
||||
match *self {
|
||||
(_, ref c) => c.try_send(val)
|
||||
}
|
||||
self.second_ref().try_send(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> GenericPort<T> for MegaPipe<T> {
|
||||
fn recv(&self) -> T {
|
||||
match *self {
|
||||
(ref p, _) => p.recv()
|
||||
}
|
||||
self.first_ref().recv()
|
||||
}
|
||||
|
||||
fn try_recv(&self) -> Option<T> {
|
||||
match *self {
|
||||
(ref p, _) => p.try_recv()
|
||||
}
|
||||
self.first_ref().try_recv()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> SendDeferred<T> for MegaPipe<T> {
|
||||
fn send_deferred(&self, val: T) {
|
||||
self.second_ref().send_deferred(val)
|
||||
}
|
||||
fn try_send_deferred(&self, val: T) -> bool {
|
||||
self.second_ref().try_send_deferred(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1017,4 +1085,39 @@ mod test {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_deferred() {
|
||||
use unstable::sync::atomically;
|
||||
|
||||
// Tests no-rescheduling of send_deferred on all types of channels.
|
||||
do run_in_newsched_task {
|
||||
let (pone, cone) = oneshot();
|
||||
let (pstream, cstream) = stream();
|
||||
let (pshared, cshared) = stream();
|
||||
let cshared = SharedChan::new(cshared);
|
||||
let mp = megapipe();
|
||||
|
||||
let pone = Cell::new(pone);
|
||||
do spawntask { pone.take().recv(); }
|
||||
let pstream = Cell::new(pstream);
|
||||
do spawntask { pstream.take().recv(); }
|
||||
let pshared = Cell::new(pshared);
|
||||
do spawntask { pshared.take().recv(); }
|
||||
let p_mp = Cell::new(mp.clone());
|
||||
do spawntask { p_mp.take().recv(); }
|
||||
|
||||
let cs = Cell::new((cone, cstream, cshared, mp));
|
||||
unsafe {
|
||||
do atomically {
|
||||
let (cone, cstream, cshared, mp) = cs.take();
|
||||
cone.send_deferred(());
|
||||
cstream.send_deferred(());
|
||||
cshared.send_deferred(());
|
||||
mp.send_deferred(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue