From 7f483459045789d6bb44671269fd9aec73dbeb63 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 13 Dec 2013 18:25:26 -0800 Subject: [PATCH] std: Remove must deferred sending functions These functions are all unnecessary now, and they only have meaning in the M:N context. Removing these functions uncovered a bug in the librustuv timer bindings, but it was fairly easy to cover (and the test is already committed). These cannot be completely removed just yet due to their usage in the WaitQueue of extra::sync, and until the mutex in libextra is rewritten it will not be possible to remove the deferred sends for channels. --- src/libextra/sync.rs | 2 +- src/libgreen/simple.rs | 2 +- src/libgreen/task.rs | 11 ++++++++--- src/librustuv/pipe.rs | 2 +- src/librustuv/signal.rs | 2 +- src/librustuv/timer.rs | 4 ++-- src/libstd/comm/mod.rs | 39 +++++---------------------------------- 7 files changed, 19 insertions(+), 43 deletions(-) diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index fb11eb6a3c46..2a53775a907d 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -79,7 +79,7 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (wait_end, signal_end) = Chan::new(); - self.tail.send_deferred(signal_end); + assert!(self.tail.try_send_deferred(signal_end)); wait_end } } diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 0db81c5fea3b..4f904ee6e6dd 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -54,7 +54,7 @@ impl Runtime for SimpleTask { } Local::put(cur_task); } - fn reawaken(mut ~self, mut to_wake: ~Task) { + fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { let me = &mut *self as *mut SimpleTask; to_wake.put_runtime(self as ~Runtime); unsafe { diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 9da9af9f50bc..eff80df2a118 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -346,7 +346,7 @@ impl Runtime for GreenTask { } } - fn reawaken(mut ~self, to_wake: ~Task) { + fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) { self.put_task(to_wake); assert!(self.sched.is_none()); @@ -372,10 +372,15 @@ impl Runtime for GreenTask { match running_task.maybe_take_runtime::() { Some(mut running_green_task) => { running_green_task.put_task(running_task); - let sched = running_green_task.sched.take_unwrap(); + let mut sched = running_green_task.sched.take_unwrap(); if sched.pool_id == self.pool_id { - sched.run_task(running_green_task, self); + if can_resched { + sched.run_task(running_green_task, self); + } else { + sched.enqueue_task(self); + running_green_task.put_with_sched(sched); + } } else { self.reawaken_remotely(); diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 0bc198a4a3f5..ff4481e8b97f 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -210,7 +210,7 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - pipe.outgoing.send_deferred(msg); + pipe.outgoing.send(msg); } impl Drop for PipeListener { diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index b53acd4ebd69..0f81966b169a 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -52,7 +52,7 @@ impl SignalWatcher { extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; assert_eq!(signum as int, s.signal as int); - s.channel.try_send_deferred(s.signal); + s.channel.try_send(s.signal); } impl HomingIO for SignalWatcher { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 9c4473ead368..e87090753f59 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -140,9 +140,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { WakeTask(task) => { task.wake().map(|t| t.reawaken(true)); } - SendOnce(chan) => { chan.try_send_deferred(()); } + SendOnce(chan) => { chan.try_send(()); } SendMany(chan, id) => { - chan.try_send_deferred(()); + chan.try_send(()); // Note that the above operation could have performed some form of // scheduling. This means that the timer may have decided to insert diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index ae440894b4e2..21db234122b2 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -496,7 +496,7 @@ impl Packet { match self.channels.fetch_sub(1, SeqCst) { 1 => { match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(false); } + -1 => { self.wakeup(true); } DISCONNECTED => {} n => { assert!(n >= 0); } } @@ -537,9 +537,6 @@ impl Chan { /// port. /// /// Rust channels are infinitely buffered so this method will never block. - /// This method may trigger a rescheduling, however, in order to wake up a - /// blocked receiver (if one is present). If no scheduling is desired, then - /// the `send_deferred` guarantees that there will be no reschedulings. /// /// # Failure /// @@ -561,15 +558,6 @@ impl Chan { } } - /// This function is equivalent in the semantics of `send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. - pub fn send_deferred(&self, t: T) { - if !self.try_send_deferred(t) { - fail!("sending on a closed channel"); - } - } - /// Attempts to send a value on this channel, returning whether it was /// successfully sent. /// @@ -585,9 +573,8 @@ impl Chan { /// be tolerated, then this method should be used instead. pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - /// This function is equivalent in the semantics of `try_send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. + /// This function will not stick around for very long. The purpose of this + /// function is to guarantee that no rescheduling is performed. pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } fn try(&self, t: T, can_resched: bool) -> bool { @@ -649,25 +636,9 @@ impl SharedChan { } } - /// This function is equivalent in the semantics of `send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. - pub fn send_deferred(&self, t: T) { - if !self.try_send_deferred(t) { - fail!("sending on a closed channel"); - } - } - /// Equivalent method to `try_send` on the `Chan` type (using the same /// semantics) - pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - - /// This function is equivalent in the semantics of `try_send`, but it - /// guarantees that a rescheduling will never occur when this method is - /// called. - pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } - - fn try(&self, t: T, can_resched: bool) -> bool { + pub fn try_send(&self, t: T) -> bool { unsafe { // Note that the multiple sender case is a little tricker // semantically than the single sender case. The logic for @@ -704,7 +675,7 @@ impl SharedChan { match (*packet).increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(can_resched); } + -1 => { (*packet).wakeup(true); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take();