diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 50c6a894093f..3f7b332e184b 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -15,7 +15,7 @@ use cell::Cell; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::Task; use rt::local_ptr; @@ -41,16 +41,19 @@ pub struct Scheduler { priv cleanup_job: Option } -// XXX: Some hacks to put a &fn in Scheduler without borrowck -// complaining -type UnsafeTaskReceiver = sys::Closure; -trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); +pub struct Coroutine { + /// The segment of stack on which the task is currently running or, + /// if the task is blocked, on which the task will resume execution + priv current_stack_segment: StackSegment, + /// These are always valid when the task is not running, unless + /// the task is dead + priv saved_context: Context, + /// The heap, GC, unwinding, local storage, logging + task: ~Task } -impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject } enum CleanupJob { @@ -103,6 +106,17 @@ pub impl Scheduler { return sched; } + fn make_handle(&mut self) -> SchedHandle { + let remote = self.event_loop.remote_callback(wake_up); + + return SchedHandle { + remote: remote + }; + + fn wake_up() { + } + } + /// Schedule a task to be executed later. /// /// Pushes the task onto the work stealing queue and tells the event loop @@ -337,19 +351,6 @@ pub impl Scheduler { } } -static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack - -pub struct Coroutine { - /// The segment of stack on which the task is currently running or, - /// if the task is blocked, on which the task will resume execution - priv current_stack_segment: StackSegment, - /// These are always valid when the task is not running, unless - /// the task is dead - priv saved_context: Context, - /// The heap, GC, unwinding, local storage, logging - task: ~Task -} - pub impl Coroutine { fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { Coroutine::with_task(stack_pool, ~Task::new(), start) @@ -358,6 +359,9 @@ pub impl Coroutine { fn with_task(stack_pool: &mut StackPool, task: ~Task, start: ~fn()) -> Coroutine { + + static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack + let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn @@ -401,6 +405,18 @@ pub impl Coroutine { } } +// XXX: Some hacks to put a &fn in Scheduler without borrowck +// complaining +type UnsafeTaskReceiver = sys::Closure; +trait ClosureConverter { + fn from_fn(&fn(~Coroutine)) -> Self; + fn to_fn(self) -> &fn(~Coroutine); +} +impl ClosureConverter for UnsafeTaskReceiver { + fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } +} + #[cfg(test)] mod test { use int; @@ -411,6 +427,7 @@ mod test { use rt::local::Local; use rt::test::*; use super::*; + use rt::thread::Thread; #[test] fn test_simple_scheduling() { @@ -551,4 +568,42 @@ mod test { } } } + + #[test] + fn handle() { + use rt::comm::*; + + do run_in_bare_thread { + let (port, chan) = oneshot::<()>(); + let port_cell = Cell(port); + let chan_cell = Cell(chan); + let mut sched1 = ~UvEventLoop::new_scheduler(); + let handle1 = sched1.make_handle(); + let handle1_cell = Cell(handle1); + let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { + chan_cell.take().send(()); + }; + sched1.enqueue_task(task1); + + let mut sched2 = ~UvEventLoop::new_scheduler(); + let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { + port_cell.take().recv(); + // Release the other scheduler's handle so it can exit + handle1_cell.take(); + }; + sched2.enqueue_task(task2); + + let sched1_cell = Cell(sched1); + let _thread1 = do Thread::start { + let mut sched1 = sched1_cell.take(); + sched1.run(); + }; + + let sched2_cell = Cell(sched2); + let _thread2 = do Thread::start { + let mut sched2 = sched2_cell.take(); + sched2.run(); + }; + } + } }