auto merge of #8566 : toddaaro/rust/idle-opt+cleaning, r=catamorphism,brson
Instead of a furious storm of idle callbacks we just have one. This is a major performance gain - around 40% on my machine for the ping pong bench. Also in this PR is a cleanup commit for the scheduler code. Was previously up as a separate PR, but bors load + imminent merge hell led me to roll them together. Was #8549.
This commit is contained in:
commit
67c954e365
6 changed files with 390 additions and 306 deletions
|
|
@ -24,10 +24,12 @@ pub type RtioTcpStreamObject = uvio::UvTcpStream;
|
|||
pub type RtioTcpListenerObject = uvio::UvTcpListener;
|
||||
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
|
||||
pub type RtioTimerObject = uvio::UvTimer;
|
||||
pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
|
||||
|
||||
pub trait EventLoop {
|
||||
fn run(&mut self);
|
||||
fn callback(&mut self, ~fn());
|
||||
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
|
||||
fn callback_ms(&mut self, ms: u64, ~fn());
|
||||
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
|
||||
/// The asynchronous I/O services. Not all event loops may provide one
|
||||
|
|
@ -35,11 +37,12 @@ pub trait EventLoop {
|
|||
}
|
||||
|
||||
pub trait RemoteCallback {
|
||||
/// Trigger the remote callback. Note that the number of times the callback
|
||||
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
|
||||
/// the callback will be called at least once, but multiple callbacks may be coalesced
|
||||
/// and callbacks may be called more often requested. Destruction also triggers the
|
||||
/// callback.
|
||||
/// Trigger the remote callback. Note that the number of times the
|
||||
/// callback is run is not guaranteed. All that is guaranteed is
|
||||
/// that, after calling 'fire', the callback will be called at
|
||||
/// least once, but multiple callbacks may be coalesced and
|
||||
/// callbacks may be called more often requested. Destruction also
|
||||
/// triggers the callback.
|
||||
fn fire(&mut self);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ use super::message_queue::MessageQueue;
|
|||
use rt::kill::BlockedTask;
|
||||
use rt::local_ptr;
|
||||
use rt::local::Local;
|
||||
use rt::rtio::RemoteCallback;
|
||||
use rt::rtio::{RemoteCallback, PausibleIdleCallback};
|
||||
use rt::metrics::SchedMetrics;
|
||||
use borrow::{to_uint};
|
||||
use cell::Cell;
|
||||
|
|
@ -31,10 +31,11 @@ use rand::{XorShiftRng, RngUtil};
|
|||
use iterator::{range};
|
||||
use vec::{OwnedVector};
|
||||
|
||||
/// The Scheduler is responsible for coordinating execution of Coroutines
|
||||
/// on a single thread. When the scheduler is running it is owned by
|
||||
/// thread local storage and the running task is owned by the
|
||||
/// scheduler.
|
||||
/// A scheduler is responsible for coordinating the execution of Tasks
|
||||
/// on a single thread. The scheduler runs inside a slightly modified
|
||||
/// Rust Task. When not running this task is stored in the scheduler
|
||||
/// struct. The scheduler struct acts like a baton, all scheduling
|
||||
/// actions are transfers of the baton.
|
||||
///
|
||||
/// XXX: This creates too many callbacks to run_sched_once, resulting
|
||||
/// in too much allocation and too many events.
|
||||
|
|
@ -64,11 +65,12 @@ pub struct Scheduler {
|
|||
stack_pool: StackPool,
|
||||
/// The event loop used to drive the scheduler and perform I/O
|
||||
event_loop: ~EventLoopObject,
|
||||
/// The scheduler runs on a special task.
|
||||
/// The scheduler runs on a special task. When it is not running
|
||||
/// it is stored here instead of the work queue.
|
||||
sched_task: Option<~Task>,
|
||||
/// An action performed after a context switch on behalf of the
|
||||
/// code running before the context switch
|
||||
priv cleanup_job: Option<CleanupJob>,
|
||||
cleanup_job: Option<CleanupJob>,
|
||||
metrics: SchedMetrics,
|
||||
/// Should this scheduler run any task, or only pinned tasks?
|
||||
run_anything: bool,
|
||||
|
|
@ -76,31 +78,14 @@ pub struct Scheduler {
|
|||
/// them to.
|
||||
friend_handle: Option<SchedHandle>,
|
||||
/// A fast XorShift rng for scheduler use
|
||||
rng: XorShiftRng
|
||||
|
||||
}
|
||||
|
||||
pub struct SchedHandle {
|
||||
priv remote: ~RemoteCallbackObject,
|
||||
priv queue: MessageQueue<SchedMessage>,
|
||||
sched_id: uint
|
||||
}
|
||||
|
||||
pub enum SchedMessage {
|
||||
Wake,
|
||||
Shutdown,
|
||||
PinnedTask(~Task),
|
||||
TaskFromFriend(~Task)
|
||||
}
|
||||
|
||||
enum CleanupJob {
|
||||
DoNothing,
|
||||
GiveTask(~Task, UnsafeTaskReceiver)
|
||||
rng: XorShiftRng,
|
||||
/// A toggleable idle callback
|
||||
idle_callback: ~PausibleIdleCallback
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
|
||||
pub fn sched_id(&self) -> uint { to_uint(self) }
|
||||
// * Initialization Functions
|
||||
|
||||
pub fn new(event_loop: ~EventLoopObject,
|
||||
work_queue: WorkQueue<~Task>,
|
||||
|
|
@ -114,8 +99,6 @@ impl Scheduler {
|
|||
|
||||
}
|
||||
|
||||
// When you create a scheduler it isn't yet "in" a task, so the
|
||||
// task field is None.
|
||||
pub fn new_special(event_loop: ~EventLoopObject,
|
||||
work_queue: WorkQueue<~Task>,
|
||||
work_queues: ~[WorkQueue<~Task>],
|
||||
|
|
@ -124,6 +107,9 @@ impl Scheduler {
|
|||
friend: Option<SchedHandle>)
|
||||
-> Scheduler {
|
||||
|
||||
let mut event_loop = event_loop;
|
||||
let idle_callback = event_loop.pausible_idle_callback();
|
||||
|
||||
Scheduler {
|
||||
sleeper_list: sleeper_list,
|
||||
message_queue: MessageQueue::new(),
|
||||
|
|
@ -138,7 +124,8 @@ impl Scheduler {
|
|||
metrics: SchedMetrics::new(),
|
||||
run_anything: run_anything,
|
||||
friend_handle: friend,
|
||||
rng: XorShiftRng::new()
|
||||
rng: XorShiftRng::new(),
|
||||
idle_callback: idle_callback
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -151,6 +138,8 @@ impl Scheduler {
|
|||
// scheduler task and bootstrap into it.
|
||||
pub fn bootstrap(~self, task: ~Task) {
|
||||
|
||||
let mut this = self;
|
||||
|
||||
// Initialize the TLS key.
|
||||
local_ptr::init_tls_key();
|
||||
|
||||
|
|
@ -161,10 +150,15 @@ impl Scheduler {
|
|||
// task, put it in TLS.
|
||||
Local::put::(sched_task);
|
||||
|
||||
// Before starting our first task, make sure the idle callback
|
||||
// is active. As we do not start in the sleep state this is
|
||||
// important.
|
||||
this.idle_callback.start(Scheduler::run_sched_once);
|
||||
|
||||
// Now, as far as all the scheduler state is concerned, we are
|
||||
// inside the "scheduler" context. So we can act like the
|
||||
// scheduler and resume the provided task.
|
||||
self.resume_task_immediately(task);
|
||||
this.resume_task_immediately(task);
|
||||
|
||||
// Now we are back in the scheduler context, having
|
||||
// successfully run the input task. Start by running the
|
||||
|
|
@ -173,7 +167,6 @@ impl Scheduler {
|
|||
let sched = Local::take::<Scheduler>();
|
||||
|
||||
rtdebug!("starting scheduler %u", sched.sched_id());
|
||||
|
||||
sched.run();
|
||||
|
||||
// Now that we are done with the scheduler, clean up the
|
||||
|
|
@ -189,6 +182,9 @@ impl Scheduler {
|
|||
let message = stask.sched.get_mut_ref().message_queue.pop();
|
||||
assert!(message.is_none());
|
||||
|
||||
// Close the idle callback.
|
||||
stask.sched.get_mut_ref().idle_callback.close();
|
||||
|
||||
stask.destroyed = true;
|
||||
}
|
||||
|
||||
|
|
@ -198,11 +194,6 @@ impl Scheduler {
|
|||
|
||||
let mut self_sched = self;
|
||||
|
||||
// Always run through the scheduler loop at least once so that
|
||||
// we enter the sleep state and can then be woken up by other
|
||||
// schedulers.
|
||||
self_sched.event_loop.callback(Scheduler::run_sched_once);
|
||||
|
||||
// This is unsafe because we need to place the scheduler, with
|
||||
// the event_loop inside, inside our task. But we still need a
|
||||
// mutable reference to the event_loop to give it the "run"
|
||||
|
|
@ -221,11 +212,11 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
// One iteration of the scheduler loop, always run at least once.
|
||||
// * Execution Functions - Core Loop Logic
|
||||
|
||||
// The model for this function is that you continue through it
|
||||
// until you either use the scheduler while performing a schedule
|
||||
// action, in which case you give it away and do not return, or
|
||||
// action, in which case you give it away and return early, or
|
||||
// you reach the end and sleep. In the case that a scheduler
|
||||
// action is performed the loop is evented such that this function
|
||||
// is called again.
|
||||
|
|
@ -235,41 +226,24 @@ impl Scheduler {
|
|||
// already have a scheduler stored in our local task, so we
|
||||
// start off by taking it. This is the only path through the
|
||||
// scheduler where we get the scheduler this way.
|
||||
let sched = Local::take::<Scheduler>();
|
||||
let mut sched = Local::take::<Scheduler>();
|
||||
|
||||
// Our first task is to read mail to see if we have important
|
||||
// messages.
|
||||
// Assume that we need to continue idling unless we reach the
|
||||
// end of this function without performing an action.
|
||||
sched.idle_callback.resume();
|
||||
|
||||
// 1) A wake message is easy, mutate sched struct and return
|
||||
// it.
|
||||
// 2) A shutdown is also easy, shutdown.
|
||||
// 3) A pinned task - we resume immediately and do not return
|
||||
// here.
|
||||
// 4) A message from another scheduler with a non-homed task
|
||||
// to run here.
|
||||
|
||||
let result = sched.interpret_message_queue();
|
||||
let sched = match result {
|
||||
Some(sched) => {
|
||||
// We did not resume a task, so we returned.
|
||||
sched
|
||||
}
|
||||
None => {
|
||||
return;
|
||||
}
|
||||
// First we check for scheduler messages, these are higher
|
||||
// priority than regular tasks.
|
||||
let sched = match sched.interpret_message_queue() {
|
||||
Some(sched) => sched,
|
||||
None => return
|
||||
};
|
||||
|
||||
// Second activity is to try resuming a task from the queue.
|
||||
|
||||
let result = sched.do_work();
|
||||
let mut sched = match result {
|
||||
Some(sched) => {
|
||||
// Failed to dequeue a task, so we return.
|
||||
sched
|
||||
}
|
||||
None => {
|
||||
return;
|
||||
}
|
||||
// This helper will use a randomized work-stealing algorithm
|
||||
// to find work.
|
||||
let mut sched = match sched.do_work() {
|
||||
Some(sched) => sched,
|
||||
None => return
|
||||
};
|
||||
|
||||
// If we got here then there was no work to do.
|
||||
|
|
@ -282,8 +256,13 @@ impl Scheduler {
|
|||
sched.sleepy = true;
|
||||
let handle = sched.make_handle();
|
||||
sched.sleeper_list.push(handle);
|
||||
// Since we are sleeping, deactivate the idle callback.
|
||||
sched.idle_callback.pause();
|
||||
} else {
|
||||
rtdebug!("not sleeping, already doing so or no_sleep set");
|
||||
// We may not be sleeping, but we still need to deactivate
|
||||
// the idle callback.
|
||||
sched.idle_callback.pause();
|
||||
}
|
||||
|
||||
// Finished a cycle without using the Scheduler. Place it back
|
||||
|
|
@ -291,85 +270,33 @@ impl Scheduler {
|
|||
Local::put(sched);
|
||||
}
|
||||
|
||||
pub fn make_handle(&mut self) -> SchedHandle {
|
||||
let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
|
||||
|
||||
return SchedHandle {
|
||||
remote: remote,
|
||||
queue: self.message_queue.clone(),
|
||||
sched_id: self.sched_id()
|
||||
};
|
||||
}
|
||||
|
||||
/// Schedule a task to be executed later.
|
||||
///
|
||||
/// Pushes the task onto the work stealing queue and tells the
|
||||
/// event loop to run it later. Always use this instead of pushing
|
||||
/// to the work queue directly.
|
||||
pub fn enqueue_task(&mut self, task: ~Task) {
|
||||
|
||||
let this = self;
|
||||
|
||||
// We push the task onto our local queue clone.
|
||||
this.work_queue.push(task);
|
||||
this.event_loop.callback(Scheduler::run_sched_once);
|
||||
|
||||
// We've made work available. Notify a
|
||||
// sleeping scheduler.
|
||||
|
||||
// XXX: perf. Check for a sleeper without
|
||||
// synchronizing memory. It's not critical
|
||||
// that we always find it.
|
||||
|
||||
// XXX: perf. If there's a sleeper then we
|
||||
// might as well just send it the task
|
||||
// directly instead of pushing it to the
|
||||
// queue. That is essentially the intent here
|
||||
// and it is less work.
|
||||
match this.sleeper_list.pop() {
|
||||
Some(handle) => {
|
||||
let mut handle = handle;
|
||||
handle.send(Wake)
|
||||
}
|
||||
None => { (/* pass */) }
|
||||
};
|
||||
}
|
||||
|
||||
/// As enqueue_task, but with the possibility for the blocked task to
|
||||
/// already have been killed.
|
||||
pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
|
||||
do blocked_task.wake().map_move |task| {
|
||||
self.enqueue_task(task);
|
||||
};
|
||||
}
|
||||
|
||||
// * Scheduler-context operations
|
||||
|
||||
// This function returns None if the scheduler is "used", or it
|
||||
// returns the still-available scheduler.
|
||||
// returns the still-available scheduler. At this point all
|
||||
// message-handling will count as a turn of work, and as a result
|
||||
// return None.
|
||||
fn interpret_message_queue(~self) -> Option<~Scheduler> {
|
||||
|
||||
let mut this = self;
|
||||
match this.message_queue.pop() {
|
||||
Some(PinnedTask(task)) => {
|
||||
this.event_loop.callback(Scheduler::run_sched_once);
|
||||
let mut task = task;
|
||||
task.give_home(Sched(this.make_handle()));
|
||||
this.resume_task_immediately(task);
|
||||
return None;
|
||||
}
|
||||
Some(TaskFromFriend(task)) => {
|
||||
this.event_loop.callback(Scheduler::run_sched_once);
|
||||
rtdebug!("got a task from a friend. lovely!");
|
||||
return this.sched_schedule_task(task);
|
||||
this.process_task(task,
|
||||
Scheduler::resume_task_immediately_cl).map_move(Local::put);
|
||||
return None;
|
||||
}
|
||||
Some(Wake) => {
|
||||
this.event_loop.callback(Scheduler::run_sched_once);
|
||||
this.sleepy = false;
|
||||
return Some(this);
|
||||
Local::put(this);
|
||||
return None;
|
||||
}
|
||||
Some(Shutdown) => {
|
||||
this.event_loop.callback(Scheduler::run_sched_once);
|
||||
rtdebug!("shutting down");
|
||||
if this.sleepy {
|
||||
// There may be an outstanding handle on the
|
||||
// sleeper list. Pop them all to make sure that's
|
||||
|
|
@ -388,11 +315,8 @@ impl Scheduler {
|
|||
// event loop references we will shut down.
|
||||
this.no_sleep = true;
|
||||
this.sleepy = false;
|
||||
// YYY: Does a shutdown count as a "use" of the
|
||||
// scheduler? This seems to work - so I'm leaving it
|
||||
// this way despite not having a solid rational for
|
||||
// why I should return the scheduler here.
|
||||
return Some(this);
|
||||
Local::put(this);
|
||||
return None;
|
||||
}
|
||||
None => {
|
||||
return Some(this);
|
||||
|
|
@ -400,30 +324,19 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
/// Given an input Coroutine sends it back to its home scheduler.
|
||||
fn send_task_home(task: ~Task) {
|
||||
let mut task = task;
|
||||
let mut home = task.take_unwrap_home();
|
||||
match home {
|
||||
Sched(ref mut home_handle) => {
|
||||
home_handle.send(PinnedTask(task));
|
||||
}
|
||||
AnySched => {
|
||||
rtabort!("error: cannot send anysched task home");
|
||||
}
|
||||
}
|
||||
}
|
||||
fn do_work(~self) -> Option<~Scheduler> {
|
||||
let mut this = self;
|
||||
|
||||
/// Take a non-homed task we aren't allowed to run here and send
|
||||
/// it to the designated friend scheduler to execute.
|
||||
fn send_to_friend(&mut self, task: ~Task) {
|
||||
rtdebug!("sending a task to friend");
|
||||
match self.friend_handle {
|
||||
Some(ref mut handle) => {
|
||||
handle.send(TaskFromFriend(task));
|
||||
rtdebug!("scheduler calling do work");
|
||||
match this.find_work() {
|
||||
Some(task) => {
|
||||
rtdebug!("found some work! processing the task");
|
||||
return this.process_task(task,
|
||||
Scheduler::resume_task_immediately_cl);
|
||||
}
|
||||
None => {
|
||||
rtabort!("tried to send task to a friend but scheduler has no friends");
|
||||
rtdebug!("no work was found, returning the scheduler struct");
|
||||
return Some(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -447,8 +360,8 @@ impl Scheduler {
|
|||
None => {
|
||||
// Our naive stealing, try kinda hard.
|
||||
rtdebug!("scheduler trying to steal");
|
||||
let _len = self.work_queues.len();
|
||||
return self.try_steals(2);
|
||||
let len = self.work_queues.len();
|
||||
return self.try_steals(len/2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -462,7 +375,8 @@ impl Scheduler {
|
|||
let work_queues = &mut self.work_queues;
|
||||
match work_queues[index].steal() {
|
||||
Some(task) => {
|
||||
rtdebug!("found task by stealing"); return Some(task)
|
||||
rtdebug!("found task by stealing");
|
||||
return Some(task)
|
||||
}
|
||||
None => ()
|
||||
}
|
||||
|
|
@ -471,8 +385,11 @@ impl Scheduler {
|
|||
return None;
|
||||
}
|
||||
|
||||
// Given a task, execute it correctly.
|
||||
fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
|
||||
// * Task Routing Functions - Make sure tasks send up in the right
|
||||
// place.
|
||||
|
||||
fn process_task(~self, task: ~Task,
|
||||
schedule_fn: SchedulingFn) -> Option<~Scheduler> {
|
||||
let mut this = self;
|
||||
let mut task = task;
|
||||
|
||||
|
|
@ -489,15 +406,13 @@ impl Scheduler {
|
|||
} else {
|
||||
rtdebug!("running task here");
|
||||
task.give_home(Sched(home_handle));
|
||||
this.resume_task_immediately(task);
|
||||
return None;
|
||||
return schedule_fn(this, task);
|
||||
}
|
||||
}
|
||||
AnySched if this.run_anything => {
|
||||
rtdebug!("running anysched task here");
|
||||
task.give_home(AnySched);
|
||||
this.resume_task_immediately(task);
|
||||
return None;
|
||||
return schedule_fn(this, task);
|
||||
}
|
||||
AnySched => {
|
||||
rtdebug!("sending task to friend");
|
||||
|
|
@ -508,98 +423,71 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
// Bundle the helpers together.
|
||||
fn do_work(~self) -> Option<~Scheduler> {
|
||||
let mut this = self;
|
||||
fn send_task_home(task: ~Task) {
|
||||
let mut task = task;
|
||||
let mut home = task.take_unwrap_home();
|
||||
match home {
|
||||
Sched(ref mut home_handle) => {
|
||||
home_handle.send(PinnedTask(task));
|
||||
}
|
||||
AnySched => {
|
||||
rtabort!("error: cannot send anysched task home");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rtdebug!("scheduler calling do work");
|
||||
match this.find_work() {
|
||||
Some(task) => {
|
||||
rtdebug!("found some work! processing the task");
|
||||
return this.process_task(task);
|
||||
/// Take a non-homed task we aren't allowed to run here and send
|
||||
/// it to the designated friend scheduler to execute.
|
||||
fn send_to_friend(&mut self, task: ~Task) {
|
||||
rtdebug!("sending a task to friend");
|
||||
match self.friend_handle {
|
||||
Some(ref mut handle) => {
|
||||
handle.send(TaskFromFriend(task));
|
||||
}
|
||||
None => {
|
||||
rtdebug!("no work was found, returning the scheduler struct");
|
||||
return Some(this);
|
||||
rtabort!("tried to send task to a friend but scheduler has no friends");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Called by a running task to end execution, after which it will
|
||||
/// be recycled by the scheduler for reuse in a new task.
|
||||
pub fn terminate_current_task(~self) {
|
||||
// Similar to deschedule running task and then, but cannot go through
|
||||
// the task-blocking path. The task is already dying.
|
||||
let mut this = self;
|
||||
let stask = this.sched_task.take_unwrap();
|
||||
do this.change_task_context(stask) |sched, mut dead_task| {
|
||||
let coroutine = dead_task.coroutine.take_unwrap();
|
||||
coroutine.recycle(&mut sched.stack_pool);
|
||||
}
|
||||
}
|
||||
/// Schedule a task to be executed later.
|
||||
///
|
||||
/// Pushes the task onto the work stealing queue and tells the
|
||||
/// event loop to run it later. Always use this instead of pushing
|
||||
/// to the work queue directly.
|
||||
pub fn enqueue_task(&mut self, task: ~Task) {
|
||||
|
||||
// Scheduling a task requires a few checks to make sure the task
|
||||
// ends up in the appropriate location. The run_anything flag on
|
||||
// the scheduler and the home on the task need to be checked. This
|
||||
// helper performs that check. It takes a function that specifies
|
||||
// how to queue the the provided task if that is the correct
|
||||
// action. This is a "core" function that requires handling the
|
||||
// returned Option correctly.
|
||||
let this = self;
|
||||
|
||||
pub fn schedule_task(~self, task: ~Task,
|
||||
schedule_fn: ~fn(sched: ~Scheduler, task: ~Task))
|
||||
-> Option<~Scheduler> {
|
||||
// We push the task onto our local queue clone.
|
||||
this.work_queue.push(task);
|
||||
this.idle_callback.resume();
|
||||
|
||||
// is the task home?
|
||||
let is_home = task.is_home_no_tls(&self);
|
||||
// We've made work available. Notify a
|
||||
// sleeping scheduler.
|
||||
|
||||
// does the task have a home?
|
||||
let homed = task.homed();
|
||||
|
||||
let mut this = self;
|
||||
|
||||
if is_home || (!homed && this.run_anything) {
|
||||
// here we know we are home, execute now OR we know we
|
||||
// aren't homed, and that this sched doesn't care
|
||||
rtdebug!("task: %u is on ok sched, executing", to_uint(task));
|
||||
schedule_fn(this, task);
|
||||
return None;
|
||||
} else if !homed && !this.run_anything {
|
||||
// the task isn't homed, but it can't be run here
|
||||
this.send_to_friend(task);
|
||||
return Some(this);
|
||||
} else {
|
||||
// task isn't home, so don't run it here, send it home
|
||||
Scheduler::send_task_home(task);
|
||||
return Some(this);
|
||||
}
|
||||
}
|
||||
|
||||
// There are two contexts in which schedule_task can be called:
|
||||
// inside the scheduler, and inside a task. These contexts handle
|
||||
// executing the task slightly differently. In the scheduler
|
||||
// context case we want to receive the scheduler as an input, and
|
||||
// manually deal with the option. In the task context case we want
|
||||
// to use TLS to find the scheduler, and deal with the option
|
||||
// inside the helper.
|
||||
|
||||
pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
|
||||
do self.schedule_task(task) |sched, next_task| {
|
||||
sched.resume_task_immediately(next_task);
|
||||
}
|
||||
}
|
||||
|
||||
// Task context case - use TLS.
|
||||
pub fn run_task(task: ~Task) {
|
||||
let sched = Local::take::<Scheduler>();
|
||||
let opt = do sched.schedule_task(task) |sched, next_task| {
|
||||
do sched.switch_running_tasks_and_then(next_task) |sched, last_task| {
|
||||
sched.enqueue_blocked_task(last_task);
|
||||
// XXX: perf. Check for a sleeper without
|
||||
// synchronizing memory. It's not critical
|
||||
// that we always find it.
|
||||
match this.sleeper_list.pop() {
|
||||
Some(handle) => {
|
||||
let mut handle = handle;
|
||||
handle.send(Wake)
|
||||
}
|
||||
None => { (/* pass */) }
|
||||
};
|
||||
opt.map_move(Local::put);
|
||||
}
|
||||
|
||||
/// As enqueue_task, but with the possibility for the blocked task to
|
||||
/// already have been killed.
|
||||
pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
|
||||
do blocked_task.wake().map_move |task| {
|
||||
self.enqueue_task(task);
|
||||
};
|
||||
}
|
||||
|
||||
// * Core Context Switching Functions
|
||||
|
||||
// The primary function for changing contexts. In the current
|
||||
// design the scheduler is just a slightly modified GreenTask, so
|
||||
// all context swaps are from Task to Task. The only difference
|
||||
|
|
@ -629,7 +517,7 @@ impl Scheduler {
|
|||
|
||||
// The current task is placed inside an enum with the cleanup
|
||||
// function. This enum is then placed inside the scheduler.
|
||||
this.enqueue_cleanup_job(GiveTask(current_task, f_opaque));
|
||||
this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
|
||||
|
||||
// The scheduler is then placed inside the next task.
|
||||
let mut next_task = next_task;
|
||||
|
|
@ -645,12 +533,9 @@ impl Scheduler {
|
|||
transmute_mut_region(*next_task.sched.get_mut_ref());
|
||||
|
||||
let current_task: &mut Task = match sched.cleanup_job {
|
||||
Some(GiveTask(ref task, _)) => {
|
||||
Some(CleanupJob { task: ref task, _ }) => {
|
||||
transmute_mut_region(*transmute_mut_unsafe(task))
|
||||
}
|
||||
Some(DoNothing) => {
|
||||
rtabort!("no next task");
|
||||
}
|
||||
None => {
|
||||
rtabort!("no cleanup job");
|
||||
}
|
||||
|
|
@ -684,19 +569,42 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
// Old API for task manipulation implemented over the new core
|
||||
// function.
|
||||
|
||||
pub fn resume_task_immediately(~self, task: ~Task) {
|
||||
do self.change_task_context(task) |sched, stask| {
|
||||
sched.sched_task = Some(stask);
|
||||
// Returns a mutable reference to both contexts involved in this
|
||||
// swap. This is unsafe - we are getting mutable internal
|
||||
// references to keep even when we don't own the tasks. It looks
|
||||
// kinda safe because we are doing transmutes before passing in
|
||||
// the arguments.
|
||||
pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
|
||||
(&'a mut Context, &'a mut Context) {
|
||||
let current_task_context =
|
||||
&mut current_task.coroutine.get_mut_ref().saved_context;
|
||||
let next_task_context =
|
||||
&mut next_task.coroutine.get_mut_ref().saved_context;
|
||||
unsafe {
|
||||
(transmute_mut_region(current_task_context),
|
||||
transmute_mut_region(next_task_context))
|
||||
}
|
||||
}
|
||||
|
||||
// * Context Swapping Helpers - Here be ugliness!
|
||||
|
||||
pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
|
||||
do self.change_task_context(task) |sched, stask| {
|
||||
sched.sched_task = Some(stask);
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
fn resume_task_immediately_cl(sched: ~Scheduler,
|
||||
task: ~Task) -> Option<~Scheduler> {
|
||||
sched.resume_task_immediately(task)
|
||||
}
|
||||
|
||||
|
||||
pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
|
||||
match blocked_task.wake() {
|
||||
Some(task) => self.resume_task_immediately(task),
|
||||
None => Local::put(self),
|
||||
Some(task) => { self.resume_task_immediately(task); }
|
||||
None => Local::put(self)
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -735,54 +643,75 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
// A helper that looks up the scheduler and runs a task later by
|
||||
// enqueuing it.
|
||||
fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
|
||||
do sched.switch_running_tasks_and_then(task) |sched, last_task| {
|
||||
sched.enqueue_blocked_task(last_task);
|
||||
};
|
||||
return None;
|
||||
}
|
||||
|
||||
// * Task Context Helpers
|
||||
|
||||
/// Called by a running task to end execution, after which it will
|
||||
/// be recycled by the scheduler for reuse in a new task.
|
||||
pub fn terminate_current_task(~self) {
|
||||
// Similar to deschedule running task and then, but cannot go through
|
||||
// the task-blocking path. The task is already dying.
|
||||
let mut this = self;
|
||||
let stask = this.sched_task.take_unwrap();
|
||||
do this.change_task_context(stask) |sched, mut dead_task| {
|
||||
let coroutine = dead_task.coroutine.take_unwrap();
|
||||
coroutine.recycle(&mut sched.stack_pool);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_task(task: ~Task) {
|
||||
let sched = Local::take::<Scheduler>();
|
||||
sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
|
||||
}
|
||||
|
||||
pub fn run_task_later(next_task: ~Task) {
|
||||
// We aren't performing a scheduler operation, so we want to
|
||||
// put the Scheduler back when we finish.
|
||||
let next_task = Cell::new(next_task);
|
||||
do Local::borrow::<Scheduler,()> |sched| {
|
||||
sched.enqueue_task(next_task.take());
|
||||
};
|
||||
}
|
||||
|
||||
// Returns a mutable reference to both contexts involved in this
|
||||
// swap. This is unsafe - we are getting mutable internal
|
||||
// references to keep even when we don't own the tasks. It looks
|
||||
// kinda safe because we are doing transmutes before passing in
|
||||
// the arguments.
|
||||
pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
|
||||
(&'a mut Context, &'a mut Context) {
|
||||
let current_task_context =
|
||||
&mut current_task.coroutine.get_mut_ref().saved_context;
|
||||
let next_task_context =
|
||||
&mut next_task.coroutine.get_mut_ref().saved_context;
|
||||
unsafe {
|
||||
(transmute_mut_region(current_task_context),
|
||||
transmute_mut_region(next_task_context))
|
||||
}
|
||||
}
|
||||
// * Utility Functions
|
||||
|
||||
pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
|
||||
self.cleanup_job = Some(job);
|
||||
}
|
||||
pub fn sched_id(&self) -> uint { to_uint(self) }
|
||||
|
||||
pub fn run_cleanup_job(&mut self) {
|
||||
rtdebug!("running cleanup job");
|
||||
let cleanup_job = self.cleanup_job.take_unwrap();
|
||||
match cleanup_job {
|
||||
DoNothing => { }
|
||||
GiveTask(task, f) => f.to_fn()(self, task)
|
||||
}
|
||||
cleanup_job.run(self);
|
||||
}
|
||||
|
||||
pub fn make_handle(&mut self) -> SchedHandle {
|
||||
let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
|
||||
|
||||
return SchedHandle {
|
||||
remote: remote,
|
||||
queue: self.message_queue.clone(),
|
||||
sched_id: self.sched_id()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// The cases for the below function.
|
||||
enum ResumeAction {
|
||||
SendHome,
|
||||
Requeue,
|
||||
ResumeNow,
|
||||
Homeless
|
||||
// Supporting types
|
||||
|
||||
type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
|
||||
|
||||
pub enum SchedMessage {
|
||||
Wake,
|
||||
Shutdown,
|
||||
PinnedTask(~Task),
|
||||
TaskFromFriend(~Task)
|
||||
}
|
||||
|
||||
pub struct SchedHandle {
|
||||
priv remote: ~RemoteCallbackObject,
|
||||
priv queue: MessageQueue<SchedMessage>,
|
||||
sched_id: uint
|
||||
}
|
||||
|
||||
impl SchedHandle {
|
||||
|
|
@ -792,6 +721,25 @@ impl SchedHandle {
|
|||
}
|
||||
}
|
||||
|
||||
struct CleanupJob {
|
||||
task: ~Task,
|
||||
f: UnsafeTaskReceiver
|
||||
}
|
||||
|
||||
impl CleanupJob {
|
||||
pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
|
||||
CleanupJob {
|
||||
task: task,
|
||||
f: f
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(self, sched: &mut Scheduler) {
|
||||
let CleanupJob { task: task, f: f } = self;
|
||||
f.to_fn()(sched, task)
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: Some hacks to put a &fn in Scheduler without borrowck
|
||||
// complaining
|
||||
type UnsafeTaskReceiver = raw::Closure;
|
||||
|
|
@ -1098,6 +1046,51 @@ mod test {
|
|||
}
|
||||
}
|
||||
|
||||
// A regression test that the final message is always handled.
|
||||
// Used to deadlock because Shutdown was never recvd.
|
||||
#[test]
|
||||
fn no_missed_messages() {
|
||||
use rt::work_queue::WorkQueue;
|
||||
use rt::sleeper_list::SleeperList;
|
||||
use rt::stack::StackPool;
|
||||
use rt::uv::uvio::UvEventLoop;
|
||||
use rt::sched::{Shutdown, TaskFromFriend};
|
||||
use util;
|
||||
|
||||
do run_in_bare_thread {
|
||||
do stress_factor().times {
|
||||
let sleepers = SleeperList::new();
|
||||
let queue = WorkQueue::new();
|
||||
let queues = ~[queue.clone()];
|
||||
|
||||
let mut sched = ~Scheduler::new(
|
||||
~UvEventLoop::new(),
|
||||
queue,
|
||||
queues.clone(),
|
||||
sleepers.clone());
|
||||
|
||||
let mut handle = sched.make_handle();
|
||||
|
||||
let sched = Cell::new(sched);
|
||||
|
||||
let thread = do Thread::start {
|
||||
let mut sched = sched.take();
|
||||
let bootstrap_task = ~Task::new_root(&mut sched.stack_pool, None, ||());
|
||||
sched.bootstrap(bootstrap_task);
|
||||
};
|
||||
|
||||
let mut stack_pool = StackPool::new();
|
||||
let task = ~Task::new_root(&mut stack_pool, None, ||());
|
||||
handle.send(TaskFromFriend(task));
|
||||
|
||||
handle.send(Shutdown);
|
||||
util::ignore(handle);
|
||||
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multithreading() {
|
||||
use rt::comm::*;
|
||||
|
|
|
|||
|
|
@ -370,7 +370,7 @@ impl Coroutine {
|
|||
|
||||
// Again - might work while safe, or it might not.
|
||||
do Local::borrow::<Scheduler,()> |sched| {
|
||||
(sched).run_cleanup_job();
|
||||
sched.run_cleanup_job();
|
||||
}
|
||||
|
||||
// To call the run method on a task we need a direct
|
||||
|
|
|
|||
|
|
@ -66,8 +66,7 @@ pub fn default_sched_threads() -> uint {
|
|||
pub fn dumb_println(s: &str) {
|
||||
use io::WriterUtil;
|
||||
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
|
||||
dbg.write_str(s);
|
||||
dbg.write_str("\n");
|
||||
dbg.write_str(s + "\n");
|
||||
}
|
||||
|
||||
pub fn abort(msg: &str) -> ! {
|
||||
|
|
|
|||
|
|
@ -48,6 +48,20 @@ impl IdleWatcher {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn restart(&mut self) {
|
||||
unsafe {
|
||||
assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
|
||||
};
|
||||
|
||||
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
|
||||
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
|
||||
let data = idle_watcher.get_watcher_data();
|
||||
let cb: &IdleCallback = data.idle_cb.get_ref();
|
||||
let status = status_to_maybe_uv_error(idle_watcher, status);
|
||||
(*cb)(idle_watcher, status);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&mut self) {
|
||||
// NB: Not resetting the Rust idle_cb to None here because `stop` is
|
||||
// likely called from *within* the idle callback, causing a use after
|
||||
|
|
|
|||
|
|
@ -117,6 +117,15 @@ impl EventLoop for UvEventLoop {
|
|||
}
|
||||
}
|
||||
|
||||
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
|
||||
let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
|
||||
return ~UvPausibleIdleCallback {
|
||||
watcher: idle_watcher,
|
||||
idle_flag: false,
|
||||
closed: false
|
||||
};
|
||||
}
|
||||
|
||||
fn callback_ms(&mut self, ms: u64, f: ~fn()) {
|
||||
let mut timer = TimerWatcher::new(self.uvio.uv_loop());
|
||||
do timer.start(ms, 0) |timer, status| {
|
||||
|
|
@ -135,6 +144,44 @@ impl EventLoop for UvEventLoop {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct UvPausibleIdleCallback {
|
||||
watcher: IdleWatcher,
|
||||
idle_flag: bool,
|
||||
closed: bool
|
||||
}
|
||||
|
||||
impl UvPausibleIdleCallback {
|
||||
#[inline]
|
||||
pub fn start(&mut self, f: ~fn()) {
|
||||
do self.watcher.start |_idle_watcher, _status| {
|
||||
f();
|
||||
};
|
||||
self.idle_flag = true;
|
||||
}
|
||||
#[inline]
|
||||
pub fn pause(&mut self) {
|
||||
if self.idle_flag == true {
|
||||
self.watcher.stop();
|
||||
self.idle_flag = false;
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
pub fn resume(&mut self) {
|
||||
if self.idle_flag == false {
|
||||
self.watcher.restart();
|
||||
self.idle_flag = true;
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
pub fn close(&mut self) {
|
||||
self.pause();
|
||||
if !self.closed {
|
||||
self.closed = true;
|
||||
self.watcher.close(||());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_callback_run_once() {
|
||||
do run_in_bare_thread {
|
||||
|
|
@ -163,14 +210,39 @@ impl UvRemoteCallback {
|
|||
let exit_flag_clone = exit_flag.clone();
|
||||
let async = do AsyncWatcher::new(loop_) |watcher, status| {
|
||||
assert!(status.is_none());
|
||||
|
||||
// The synchronization logic here is subtle. To review,
|
||||
// the uv async handle type promises that, after it is
|
||||
// triggered the remote callback is definitely called at
|
||||
// least once. UvRemoteCallback needs to maintain those
|
||||
// semantics while also shutting down cleanly from the
|
||||
// dtor. In our case that means that, when the
|
||||
// UvRemoteCallback dtor calls `async.send()`, here `f` is
|
||||
// always called later.
|
||||
|
||||
// In the dtor both the exit flag is set and the async
|
||||
// callback fired under a lock. Here, before calling `f`,
|
||||
// we take the lock and check the flag. Because we are
|
||||
// checking the flag before calling `f`, and the flag is
|
||||
// set under the same lock as the send, then if the flag
|
||||
// is set then we're guaranteed to call `f` after the
|
||||
// final send.
|
||||
|
||||
// If the check was done after `f()` then there would be a
|
||||
// period between that call and the check where the dtor
|
||||
// could be called in the other thread, missing the final
|
||||
// callback while still destroying the handle.
|
||||
|
||||
let should_exit = unsafe {
|
||||
exit_flag_clone.with_imm(|&should_exit| should_exit)
|
||||
};
|
||||
|
||||
f();
|
||||
unsafe {
|
||||
do exit_flag_clone.with_imm |&should_exit| {
|
||||
if should_exit {
|
||||
watcher.close(||());
|
||||
}
|
||||
}
|
||||
|
||||
if should_exit {
|
||||
watcher.close(||());
|
||||
}
|
||||
|
||||
};
|
||||
UvRemoteCallback {
|
||||
async: async,
|
||||
|
|
@ -219,7 +291,10 @@ mod test_remote {
|
|||
let tube_clone = tube_clone.clone();
|
||||
let tube_clone_cell = Cell::new(tube_clone);
|
||||
let remote = do sched.event_loop.remote_callback {
|
||||
tube_clone_cell.take().send(1);
|
||||
// This could be called multiple times
|
||||
if !tube_clone_cell.is_empty() {
|
||||
tube_clone_cell.take().send(1);
|
||||
}
|
||||
};
|
||||
remote_cell.put_back(remote);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue