Change the HOF context switchers to pass a BlockedTask instead of a ~Task.

This commit is contained in:
Ben Blum 2013-07-11 14:29:33 -04:00
parent 0101f35f27
commit 9ad1997549
7 changed files with 102 additions and 60 deletions

View file

@ -19,7 +19,7 @@ use option::*;
use cast;
use util;
use ops::Drop;
use rt::task::Task;
use rt::kill::BlockedTask;
use kinds::Send;
use rt::sched::Scheduler;
use rt::local::Local;
@ -30,13 +30,13 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
/// A combined refcount / ~Task pointer.
/// A combined refcount / BlockedTask-as-uint pointer.
///
/// Can be equal to the following values:
///
/// * 2 - both endpoints are alive
/// * 1 - either the sender or the receiver is dead, determined by context
/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
type State = uint;
static STATE_BOTH: State = 2;
@ -137,11 +137,13 @@ impl<T> ChanOne<T> {
}
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Task = cast::transmute(task_as_state);
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(recvr);
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(woken_task);
};
}
}
}
@ -177,7 +179,7 @@ impl<T> PortOne<T> {
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent reordering
// of any previous writes to the task structure.
let task_as_state: State = cast::transmute(task);
let task_as_state = task.cast_to_uint();
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
match oldstate {
STATE_BOTH => {
@ -193,8 +195,8 @@ impl<T> PortOne<T> {
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let task: ~Task = cast::transmute(task_as_state);
sched.enqueue_task(task);
let recvr = BlockedTask::cast_from_uint(task_as_state);
sched.enqueue_blocked_task(recvr);
}
_ => util::unreachable()
}
@ -258,9 +260,11 @@ impl<T> Drop for ChanOneHack<T> {
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Task = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let sched = Local::take::<Scheduler>();
sched.schedule_task(woken_task);
};
}
}
}

View file

@ -367,7 +367,7 @@ fn test_context() {
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |sched, task| {
assert_eq!(context(), SchedulerContext);
sched.enqueue_task(task);
sched.enqueue_blocked_task(task);
}
};
sched.enqueue_task(task);

View file

@ -8,7 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use either::{Left, Right};
use option::{Option, Some, None};
use sys;
use cast::transmute;
use clone::Clone;
@ -20,6 +21,7 @@ use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
use super::context::Context;
use super::task::{Task, AnySched, Sched};
use super::message_queue::MessageQueue;
use rt::kill::BlockedTask;
use rt::local_ptr;
use rt::local::Local;
use rt::rtio::RemoteCallback;
@ -271,6 +273,14 @@ impl Scheduler {
};
}
/// As enqueue_task, but with the possibility for the blocked task to
/// already have been killed.
pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
do blocked_task.wake().map_consume |task| {
self.enqueue_task(task);
};
}
// * Scheduler-context operations
fn interpret_message_queue(~self) -> bool {
@ -412,14 +422,26 @@ impl Scheduler {
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
pub fn terminate_current_task(~self) {
assert!(self.in_task_context());
let mut this = self;
assert!(this.in_task_context());
rtdebug!("ending running task");
do self.deschedule_running_task_and_then |sched, dead_task| {
let mut dead_task = dead_task;
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
// This task is post-cleanup, so it must be unkillable. This sequence
// of descheduling and recycling must not get interrupted by a kill.
// FIXME(#7544): Make this use an inner descheduler, like yield should.
this.current_task.get_mut_ref().death.unkillable += 1;
do this.deschedule_running_task_and_then |sched, dead_task| {
match dead_task.wake() {
Some(dead_task) => {
let mut dead_task = dead_task;
dead_task.death.unkillable -= 1; // FIXME(#7544) ugh
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
}
None => rtabort!("dead task killed before recycle"),
}
}
rtabort!("control reached end of task");
@ -440,7 +462,7 @@ impl Scheduler {
// here we know we are home, execute now OR we know we
// aren't homed, and that this sched doesn't care
do this.switch_running_tasks_and_then(task) |sched, last_task| {
sched.enqueue_task(last_task);
sched.enqueue_blocked_task(last_task);
}
} else if !homed && !this.run_anything {
// the task isn't homed, but it can't be run here
@ -491,6 +513,13 @@ impl Scheduler {
}
}
pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
match blocked_task.wake() {
Some(task) => self.resume_task_immediately(task),
None => Local::put(self),
};
}
/// Block a running task, context switch to the scheduler, then pass the
/// blocked task to a closure.
///
@ -503,7 +532,7 @@ impl Scheduler {
/// This passes a Scheduler pointer to the fn after the context switch
/// in order to prevent that fn from performing further scheduling operations.
/// Doing further scheduling could easily result in infinite recursion.
pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) {
pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
let mut this = self;
assert!(this.in_task_context());
@ -512,8 +541,8 @@ impl Scheduler {
unsafe {
let blocked_task = this.current_task.take_unwrap();
let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task),
&fn(&mut Scheduler, ~Task)>(f);
let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask),
&fn(&mut Scheduler, BlockedTask)>(f);
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
}
@ -539,7 +568,7 @@ impl Scheduler {
/// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea.
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
f: &fn(&mut Scheduler, ~Task)) {
f: &fn(&mut Scheduler, BlockedTask)) {
let mut this = self;
assert!(this.in_task_context());
@ -548,8 +577,8 @@ impl Scheduler {
let old_running_task = this.current_task.take_unwrap();
let f_fake_region = unsafe {
transmute::<&fn(&mut Scheduler, ~Task),
&fn(&mut Scheduler, ~Task)>(f)
transmute::<&fn(&mut Scheduler, BlockedTask),
&fn(&mut Scheduler, BlockedTask)>(f)
};
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
@ -590,7 +619,15 @@ impl Scheduler {
let cleanup_job = self.cleanup_job.take_unwrap();
match cleanup_job {
DoNothing => { }
GiveTask(task, f) => (f.to_fn())(self, task)
GiveTask(task, f) => {
let f = f.to_fn();
// Task might need to receive a kill signal instead of blocking.
// We can call the "and_then" only if it blocks successfully.
match BlockedTask::try_block(task) {
Left(killed_task) => self.enqueue_task(killed_task),
Right(blocked_task) => f(self, blocked_task),
}
}
}
}
@ -663,12 +700,14 @@ impl SchedHandle {
// complaining
type UnsafeTaskReceiver = sys::Closure;
trait ClosureConverter {
fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self;
fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask);
}
impl ClosureConverter for UnsafeTaskReceiver {
fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver {
unsafe { transmute(f) }
}
fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } }
}
@ -928,8 +967,7 @@ mod test {
};
// Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |sched, task1| {
let task1 = Cell::new(task1);
sched.enqueue_task(task1.take());
sched.enqueue_blocked_task(task1);
}
unsafe { *count_ptr = *count_ptr + 1; }
};
@ -980,9 +1018,8 @@ mod test {
let sched = Local::take::<Scheduler>();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |sched, task| {
let task = Cell::new(task);
assert!(!sched.in_task_context());
sched.enqueue_task(task.take());
sched.enqueue_blocked_task(task);
}
};
sched.enqueue_task(task);
@ -1004,7 +1041,7 @@ mod test {
do sched.event_loop.callback_ms(10) {
rtdebug!("in callback");
let mut sched = Local::take::<Scheduler>();
sched.enqueue_task(task.take());
sched.enqueue_blocked_task(task.take());
Local::put(sched);
}
}

View file

@ -170,7 +170,7 @@ pub fn spawntask_immediately(f: ~fn()) {
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_task(task);
sched.enqueue_blocked_task(task);
}
}
@ -214,7 +214,7 @@ pub fn spawntask_random(f: ~fn()) {
if run_now {
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_task(task);
sched.enqueue_blocked_task(task);
}
} else {
sched.enqueue_task(task);
@ -284,7 +284,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
sched.enqueue_task(old_task);
sched.enqueue_blocked_task(old_task);
}
rtdebug!("enqueued the new task, now waiting on exit_status");

View file

@ -18,13 +18,13 @@ use clone::Clone;
use super::rc::RC;
use rt::sched::Scheduler;
use rt::{context, TaskContext, SchedulerContext};
use rt::kill::BlockedTask;
use rt::local::Local;
use rt::task::Task;
use vec::OwnedVector;
use container::Container;
struct TubeState<T> {
blocked_task: Option<~Task>,
blocked_task: Option<BlockedTask>,
buf: ~[T]
}
@ -55,7 +55,7 @@ impl<T> Tube<T> {
rtdebug!("waking blocked tube");
let task = (*state).blocked_task.take_unwrap();
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.resume_blocked_task_immediately(task);
}
}
}
@ -111,7 +111,7 @@ mod test {
do sched.deschedule_running_task_and_then |sched, task| {
let mut tube_clone = tube_clone_cell.take();
tube_clone.send(1);
sched.enqueue_task(task);
sched.enqueue_blocked_task(task);
}
assert!(tube.recv() == 1);
@ -133,7 +133,7 @@ mod test {
// sending will wake it up.
tube_clone.send(1);
}
sched.enqueue_task(task);
sched.enqueue_blocked_task(task);
}
assert!(tube.recv() == 1);
@ -168,7 +168,7 @@ mod test {
}
}
sched.enqueue_task(task);
sched.enqueue_blocked_task(task);
}
for int::range(0, MAX) |i| {

View file

@ -227,7 +227,7 @@ impl IoFactory for UvIoFactory {
// Context switch
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
} else {
rtdebug!("status is some");
let task_cell = Cell::new(task_cell.take());
@ -235,7 +235,7 @@ impl IoFactory for UvIoFactory {
let res = Err(uv_error_to_io_error(status.get()));
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
};
}
@ -255,7 +255,7 @@ impl IoFactory for UvIoFactory {
let task_cell = Cell::new(task);
do watcher.as_stream().close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
@ -273,7 +273,7 @@ impl IoFactory for UvIoFactory {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
@ -309,7 +309,7 @@ impl Drop for UvTcpListener {
let task_cell = Cell::new(task);
do watcher.as_stream().close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -372,7 +372,7 @@ impl Drop for UvTcpStream {
let task_cell = Cell::new(task);
do self.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -419,7 +419,7 @@ impl RtioTcpStream for UvTcpStream {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
@ -447,7 +447,7 @@ impl RtioTcpStream for UvTcpStream {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
@ -473,7 +473,7 @@ impl Drop for UvUdpSocket {
let task_cell = Cell::new(task);
do self.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -513,7 +513,7 @@ impl RtioUdpSocket for UvUdpSocket {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
@ -540,7 +540,7 @@ impl RtioUdpSocket for UvUdpSocket {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
@ -678,7 +678,7 @@ fn test_read_and_block() {
// not ready for it
do scheduler.deschedule_running_task_and_then |sched, task| {
let task = Cell::new(task);
sched.enqueue_task(task.take());
sched.enqueue_blocked_task(task.take());
}
}

View file

@ -515,9 +515,10 @@ pub fn yield() {
}
_ => {
// XXX: What does yield really mean in newsched?
// FIXME(#7544): Optimize this, since we know we won't block.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |sched, task| {
sched.enqueue_task(task);
sched.enqueue_blocked_task(task);
}
}
}