core::rt: Change the signature of context switching methods to avoid infinite recursion

This commit is contained in:
Brian Anderson 2013-05-29 17:52:00 -07:00
parent f343e6172b
commit 134bb0f3ee
6 changed files with 66 additions and 100 deletions

View file

@ -159,7 +159,7 @@ impl<T> PortOne<T> {
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
@ -178,12 +178,10 @@ impl<T> PortOne<T> {
// triggering infinite recursion on the scheduler's stack.
let task: ~Coroutine = cast::transmute(task_as_state);
let task = Cell(task);
let mut sched = Local::take::<Scheduler>();
do sched.event_loop.callback {
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task.take());
}
Local::put(sched);
}
_ => util::unreachable()
}

View file

@ -238,12 +238,9 @@ fn test_context() {
let task = ~do Coroutine::new(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |task| {
do sched.deschedule_running_task_and_then() |sched, task| {
assert_eq!(context(), SchedulerContext);
let task = Cell(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
sched.enqueue_task(task);
}
};
sched.enqueue_task(task);

View file

@ -280,11 +280,9 @@ pub impl Scheduler {
rtdebug!("ending running task");
do self.deschedule_running_task_and_then |dead_task| {
do self.deschedule_running_task_and_then |sched, dead_task| {
let dead_task = Cell(dead_task);
do Local::borrow::<Scheduler> |sched| {
dead_task.take().recycle(&mut sched.stack_pool);
}
dead_task.take().recycle(&mut sched.stack_pool);
}
abort!("control reached end of task");
@ -293,22 +291,18 @@ pub impl Scheduler {
fn schedule_new_task(~self, task: ~Coroutine) {
assert!(self.in_task_context());
do self.switch_running_tasks_and_then(task) |last_task| {
do self.switch_running_tasks_and_then(task) |sched, last_task| {
let last_task = Cell(last_task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(last_task.take());
}
sched.enqueue_task(last_task.take());
}
}
fn schedule_task(~self, task: ~Coroutine) {
assert!(self.in_task_context());
do self.switch_running_tasks_and_then(task) |last_task| {
do self.switch_running_tasks_and_then(task) |sched, last_task| {
let last_task = Cell(last_task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(last_task.take());
}
sched.enqueue_task(last_task.take());
}
}
@ -352,7 +346,11 @@ pub impl Scheduler {
/// The closure here is a *stack* closure that lives in the
/// running task. It gets transmuted to the scheduler's lifetime
/// and called while the task is blocked.
fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
///
/// This passes a Scheduler pointer to the fn after the context switch
/// in order to prevent that fn from performing further scheduling operations.
/// Doing further scheduling could easily result in infinite recursion.
fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) {
let mut this = self;
assert!(this.in_task_context());
@ -360,7 +358,8 @@ pub impl Scheduler {
unsafe {
let blocked_task = this.current_task.swap_unwrap();
let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine),
&fn(&mut Scheduler, ~Coroutine)>(f);
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
}
@ -382,14 +381,18 @@ pub impl Scheduler {
/// Switch directly to another task, without going through the scheduler.
/// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea.
fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) {
fn switch_running_tasks_and_then(~self, next_task: ~Coroutine,
f: &fn(&mut Scheduler, ~Coroutine)) {
let mut this = self;
assert!(this.in_task_context());
rtdebug!("switching tasks");
let old_running_task = this.current_task.swap_unwrap();
let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
let f_fake_region = unsafe {
transmute::<&fn(&mut Scheduler, ~Coroutine),
&fn(&mut Scheduler, ~Coroutine)>(f)
};
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
this.current_task = Some(next_task);
@ -426,7 +429,7 @@ pub impl Scheduler {
let cleanup_job = self.cleanup_job.swap_unwrap();
match cleanup_job {
DoNothing => { }
GiveTask(task, f) => (f.to_fn())(task)
GiveTask(task, f) => (f.to_fn())(self, task)
}
}
@ -535,12 +538,12 @@ pub impl Coroutine {
// complaining
type UnsafeTaskReceiver = sys::Closure;
trait ClosureConverter {
fn from_fn(&fn(~Coroutine)) -> Self;
fn to_fn(self) -> &fn(~Coroutine);
fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self;
fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine);
}
impl ClosureConverter for UnsafeTaskReceiver {
fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } }
}
#[cfg(test)]
@ -604,11 +607,9 @@ mod test {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |task1| {
do sched.switch_running_tasks_and_then(task2) |sched, task1| {
let task1 = Cell(task1);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task1.take());
}
sched.enqueue_task(task1.take());
}
unsafe { *count_ptr = *count_ptr + 1; }
};
@ -658,12 +659,10 @@ mod test {
let task = ~do Coroutine::new(&mut sched.stack_pool) {
let sched = Local::take::<Scheduler>();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |task| {
do sched.deschedule_running_task_and_then() |sched, task| {
let task = Cell(task);
do Local::borrow::<Scheduler> |sched| {
assert!(!sched.in_task_context());
sched.enqueue_task(task.take());
}
assert!(!sched.in_task_context());
sched.enqueue_task(task.take());
}
};
sched.enqueue_task(task);
@ -680,8 +679,7 @@ mod test {
do run_in_newsched_task {
do spawn {
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
let mut sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |sched, task| {
let task = Cell(task);
do sched.event_loop.callback_ms(10) {
rtdebug!("in callback");
@ -689,7 +687,6 @@ mod test {
sched.enqueue_task(task.take());
Local::put(sched);
}
Local::put(sched);
}
}
}

View file

@ -122,11 +122,7 @@ pub fn spawntask(f: ~fn()) {
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
let sched = Local::take::<Scheduler>();
sched.schedule_new_task(task.take());
}
sched.schedule_new_task(task);
}
/// Create a new task and run it right now. Aborts on failure
@ -137,11 +133,8 @@ pub fn spawntask_immediately(f: ~fn()) {
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_task(task);
}
}
@ -172,11 +165,8 @@ pub fn spawntask_random(f: ~fn()) {
f);
if run_now {
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_task(task);
}
} else {
sched.enqueue_task(task);
@ -199,10 +189,9 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
// Switch to the scheduler
let f = Cell(Cell(f));
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |old_task| {
do sched.deschedule_running_task_and_then() |sched, old_task| {
let old_task = Cell(old_task);
let f = f.take();
let mut sched = Local::take::<Scheduler>();
let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
do (|| {
(f.take())()
@ -210,16 +199,13 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
// Check for failure then resume the parent task
unsafe { *failed_ptr = task::failing(); }
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
let new_task = Cell(new_task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(new_task.take());
}
do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| {
sched.enqueue_task(new_task);
}
}
};
sched.resume_task_immediately(new_task);
sched.enqueue_task(new_task);
}
if !failed { Ok(()) } else { Err(()) }

View file

@ -72,7 +72,7 @@ impl<T> Tube<T> {
assert!(self.p.refcount() > 1); // There better be somebody to wake us up
assert!((*state).blocked_task.is_none());
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |_, task| {
(*state).blocked_task = Some(task);
}
rtdebug!("waking after tube recv");
@ -107,11 +107,10 @@ mod test {
let tube_clone = tube.clone();
let tube_clone_cell = Cell(tube_clone);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
let mut tube_clone = tube_clone_cell.take();
tube_clone.send(1);
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.enqueue_task(task);
}
assert!(tube.recv() == 1);
@ -123,21 +122,17 @@ mod test {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell(Cell(Cell(tube_clone)));
let tube_clone = Cell(tube_clone);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
let tube_clone = tube_clone.take();
do Local::borrow::<Scheduler> |sched| {
let tube_clone = tube_clone.take();
do sched.event_loop.callback {
let mut tube_clone = tube_clone.take();
// The task should be blocked on this now and
// sending will wake it up.
tube_clone.send(1);
}
do sched.deschedule_running_task_and_then |sched, task| {
let tube_clone = Cell(tube_clone.take());
do sched.event_loop.callback {
let mut tube_clone = tube_clone.take();
// The task should be blocked on this now and
// sending will wake it up.
tube_clone.send(1);
}
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.enqueue_task(task);
}
assert!(tube.recv() == 1);
@ -153,7 +148,7 @@ mod test {
let tube_clone = tube.clone();
let tube_clone = Cell(tube_clone);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
callback_send(tube_clone.take(), 0);
fn callback_send(tube: Tube<int>, i: int) {
@ -172,8 +167,7 @@ mod test {
}
}
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.enqueue_task(task);
}
for int::range(0, MAX) |i| {

View file

@ -205,12 +205,10 @@ impl IoFactory for UvIoFactory {
assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |sched, task| {
rtdebug!("connect: entered scheduler context");
do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context());
}
assert!(!sched.in_task_context());
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
let task_cell = Cell(task);
@ -250,7 +248,7 @@ impl IoFactory for UvIoFactory {
Ok(_) => Ok(~UvTcpListener::new(watcher)),
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = Local::take::<Scheduler>();
@ -286,7 +284,7 @@ impl Drop for UvTcpListener {
fn finalize(&self) {
let watcher = self.watcher();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = Local::take::<Scheduler>();
@ -348,7 +346,7 @@ impl Drop for UvTcpStream {
rtdebug!("closing tcp stream");
let watcher = self.watcher();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
@ -367,11 +365,9 @@ impl RtioTcpStream for UvTcpStream {
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |sched, task| {
rtdebug!("read: entered scheduler context");
do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context());
}
assert!(!sched.in_task_context());
let mut watcher = watcher;
let task_cell = Cell(task);
// XXX: We shouldn't reallocate these callbacks every
@ -413,7 +409,7 @@ impl RtioTcpStream for UvTcpStream {
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let mut watcher = watcher;
let task_cell = Cell(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
@ -507,11 +503,9 @@ fn test_read_and_block() {
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |sched, task| {
let task = Cell(task);
do Local::borrow::<Scheduler> |scheduler| {
scheduler.enqueue_task(task.take());
}
sched.enqueue_task(task.take());
}
}