diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index fb11eb6a3c46..2a53775a907d 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -79,7 +79,7 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (wait_end, signal_end) = Chan::new(); - self.tail.send_deferred(signal_end); + assert!(self.tail.try_send_deferred(signal_end)); wait_end } } diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 0db81c5fea3b..4f904ee6e6dd 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -54,7 +54,7 @@ impl Runtime for SimpleTask { } Local::put(cur_task); } - fn reawaken(mut ~self, mut to_wake: ~Task) { + fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { let me = &mut *self as *mut SimpleTask; to_wake.put_runtime(self as ~Runtime); unsafe { diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 9da9af9f50bc..eff80df2a118 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -346,7 +346,7 @@ impl Runtime for GreenTask { } } - fn reawaken(mut ~self, to_wake: ~Task) { + fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) { self.put_task(to_wake); assert!(self.sched.is_none()); @@ -372,10 +372,15 @@ impl Runtime for GreenTask { match running_task.maybe_take_runtime::() { Some(mut running_green_task) => { running_green_task.put_task(running_task); - let sched = running_green_task.sched.take_unwrap(); + let mut sched = running_green_task.sched.take_unwrap(); if sched.pool_id == self.pool_id { - sched.run_task(running_green_task, self); + if can_resched { + sched.run_task(running_green_task, self); + } else { + sched.enqueue_task(self); + running_green_task.put_with_sched(sched); + } } else { self.reawaken_remotely(); diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 0bc198a4a3f5..ff4481e8b97f 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -210,7 +210,7 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - pipe.outgoing.send_deferred(msg); + pipe.outgoing.send(msg); } impl Drop for PipeListener { diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index b53acd4ebd69..0f81966b169a 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -52,7 +52,7 @@ impl SignalWatcher { extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; assert_eq!(signum as int, s.signal as int); - s.channel.try_send_deferred(s.signal); + s.channel.try_send(s.signal); } impl HomingIO for SignalWatcher { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 9c4473ead368..e87090753f59 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -140,9 +140,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { WakeTask(task) => { task.wake().map(|t| t.reawaken(true)); } - SendOnce(chan) => { chan.try_send_deferred(()); } + SendOnce(chan) => { chan.try_send(()); } SendMany(chan, id) => { - chan.try_send_deferred(()); + chan.try_send(()); // Note that the above operation could have performed some form of // scheduling. This means that the timer may have decided to insert diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index ae440894b4e2..21db234122b2 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -496,7 +496,7 @@ impl Packet { match self.channels.fetch_sub(1, SeqCst) { 1 => { match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(false); } + -1 => { self.wakeup(true); } DISCONNECTED => {} n => { assert!(n >= 0); } } @@ -537,9 +537,6 @@ impl Chan { /// port. /// /// Rust channels are infinitely buffered so this method will never block. - /// This method may trigger a rescheduling, however, in order to wake up a - /// blocked receiver (if one is present). If no scheduling is desired, then - /// the `send_deferred` guarantees that there will be no reschedulings. /// /// # Failure /// @@ -561,15 +558,6 @@ impl Chan { } } - /// This function is equivalent in the semantics of `send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. - pub fn send_deferred(&self, t: T) { - if !self.try_send_deferred(t) { - fail!("sending on a closed channel"); - } - } - /// Attempts to send a value on this channel, returning whether it was /// successfully sent. /// @@ -585,9 +573,8 @@ impl Chan { /// be tolerated, then this method should be used instead. pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - /// This function is equivalent in the semantics of `try_send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. + /// This function will not stick around for very long. The purpose of this + /// function is to guarantee that no rescheduling is performed. pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } fn try(&self, t: T, can_resched: bool) -> bool { @@ -649,25 +636,9 @@ impl SharedChan { } } - /// This function is equivalent in the semantics of `send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. - pub fn send_deferred(&self, t: T) { - if !self.try_send_deferred(t) { - fail!("sending on a closed channel"); - } - } - /// Equivalent method to `try_send` on the `Chan` type (using the same /// semantics) - pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - - /// This function is equivalent in the semantics of `try_send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. - pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } - - fn try(&self, t: T, can_resched: bool) -> bool { + pub fn try_send(&self, t: T) -> bool { unsafe { // Note that the multiple sender case is a little tricker // semantically than the single sender case. The logic for @@ -704,7 +675,7 @@ impl SharedChan { match (*packet).increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(can_resched); } + -1 => { (*packet).wakeup(true); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take();