From d261bb32d95732ef8aa74b010bb8c98f058785b2 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 18 Apr 2013 19:32:32 -0700 Subject: [PATCH] core: More tweaks to the thread-local scheduler interface --- src/libcore/rt/mod.rs | 14 +- .../rt/sched/{local.rs => local_sched.rs} | 22 ++- src/libcore/rt/sched/mod.rs | 136 ++++++++---------- src/libcore/rt/uvio.rs | 68 ++++----- src/libcore/task/spawn.rs | 2 +- 5 files changed, 122 insertions(+), 120 deletions(-) rename src/libcore/rt/sched/{local.rs => local_sched.rs} (87%) diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 2b9f147bf627..e93e0c6fc6cc 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -106,16 +106,16 @@ pub enum RuntimeContext { pub fn context() -> RuntimeContext { use task::rt::rust_task; - use self::sched::Scheduler; + use self::sched::local_sched; // XXX: Hitting TLS twice to check if the scheduler exists // then to check for the task is not good for perf if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; } else { - if Scheduler::have_local() { + if local_sched::exists() { let context = ::cell::empty_cell(); - do Scheduler::borrow_local |sched| { + do local_sched::borrow |sched| { if sched.in_task_context() { context.put_back(TaskContext); } else { @@ -137,7 +137,7 @@ pub fn context() -> RuntimeContext { #[test] fn test_context() { use unstable::run_in_bare_thread; - use self::sched::{Scheduler, Task}; + use self::sched::{local_sched, Task}; use self::uvio::UvEventLoop; use cell::Cell; @@ -147,11 +147,11 @@ fn test_context() { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { assert!(context() == TaskContext); - let sched = Scheduler::take_local(); + let sched = local_sched::take(); do sched.deschedule_running_task_and_then() |task| { assert!(context() == SchedulerContext); let task = Cell(task); - do Scheduler::borrow_local |sched| { + do local_sched::borrow |sched| { sched.task_queue.push_back(task.take()); } } @@ -166,7 +166,7 @@ fn test_context() { pub fn run_in_newsched_task(f: ~fn()) { use cell::Cell; use unstable::run_in_bare_thread; - use self::sched::{Scheduler, Task}; + use self::sched::Task; use self::uvio::UvEventLoop; let f = Cell(Cell(f)); diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local_sched.rs similarity index 87% rename from src/libcore/rt/sched/local.rs rename to src/libcore/rt/sched/local_sched.rs index 0eb97ee67ec8..2ab50252ac69 100644 --- a/src/libcore/rt/sched/local.rs +++ b/src/libcore/rt/sched/local_sched.rs @@ -16,6 +16,7 @@ use libc::c_void; use cast::transmute; use super::Scheduler; +use super::super::rtio::IoFactoryObject; use tls = super::super::thread_local_storage; #[cfg(test)] use super::super::uvio::UvEventLoop; @@ -50,11 +51,21 @@ pub fn exists() -> bool { } } +/// Borrow the thread-local scheduler from thread-local storage. +/// While the scheduler is borrowed it is not available in TLS. +pub fn borrow(f: &fn(&mut Scheduler)) { + let mut sched = take(); + f(sched); + put(sched); +} + /// Borrow a mutable reference to the thread-local Scheduler +/// /// # Safety Note +/// /// Because this leaves the Scheduler in thread-local storage it is possible /// For the Scheduler pointer to be aliased -pub unsafe fn borrow() -> &mut Scheduler { +pub unsafe fn unsafe_borrow() -> &mut Scheduler { unsafe { let key = tls_key(); let mut void_sched: *mut c_void = tls::get(key); @@ -70,6 +81,13 @@ pub unsafe fn borrow() -> &mut Scheduler { } } +pub unsafe fn unsafe_borrow_io() -> &mut IoFactoryObject { + unsafe { + let sched = unsafe_borrow(); + return sched.event_loop.io().unwrap(); + } +} + fn tls_key() -> tls::Key { maybe_tls_key().get() } @@ -125,7 +143,7 @@ fn borrow_smoke_test() { let scheduler = ~UvEventLoop::new_scheduler(); put(scheduler); unsafe { - let _scheduler = borrow(); + let _scheduler = unsafe_borrow(); } let _scheduler = take(); } diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index 1141ea480c95..fe443437e367 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -14,7 +14,7 @@ use cast::transmute; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject, IoFactoryObject}; +use super::rtio::{EventLoop, EventLoopObject}; use super::context::Context; #[cfg(test)] use super::uvio::UvEventLoop; @@ -22,7 +22,8 @@ use super::context::Context; #[cfg(test)] use int; #[cfg(test)] use cell::Cell; -mod local; +// A more convenient name for external callers, e.g. `local_sched::take()` +pub mod local_sched; /// The Scheduler is responsible for coordinating execution of Tasks /// on a single thread. When the scheduler is running it is owned by @@ -90,52 +91,25 @@ pub impl Scheduler { assert!(!self.in_task_context()); // Give ownership of the scheduler (self) to the thread - local::put(self); + local_sched::put(self); - let scheduler = unsafe { local::borrow() }; - fn run_scheduler_once() { - let scheduler = Scheduler::take_local(); - if scheduler.resume_task_from_queue() { - // Ok, a task ran. Nice! We'll do it again later - do Scheduler::borrow_local |scheduler| { - scheduler.event_loop.callback(run_scheduler_once); + unsafe { + let scheduler = local_sched::unsafe_borrow(); + fn run_scheduler_once() { + let scheduler = local_sched::take(); + if scheduler.resume_task_from_queue() { + // Ok, a task ran. Nice! We'll do it again later + do local_sched::borrow |scheduler| { + scheduler.event_loop.callback(run_scheduler_once); + } } } + + scheduler.event_loop.callback(run_scheduler_once); + scheduler.event_loop.run(); } - scheduler.event_loop.callback(run_scheduler_once); - scheduler.event_loop.run(); - - return local::take(); - } - - /// Get a mutable pointer to the thread-local I/O - /// # Safety Note - /// This allows other mutable aliases to the scheduler, both in the current - /// execution context and other execution contexts. - unsafe fn borrow_local_io() -> &mut IoFactoryObject { - unsafe { - let io = local::borrow().event_loop.io().unwrap(); - transmute::<&mut IoFactoryObject, &mut IoFactoryObject>(io) - } - } - - /// Borrow the thread-local scheduler from thread-local storage. - /// While the scheduler is borrowed it is not available in TLS. - fn borrow_local(f: &fn(&mut Scheduler)) { - let mut sched = local::take(); - f(sched); - local::put(sched); - } - - /// Take ownership of the scheduler from thread local storage - fn take_local() -> ~Scheduler { - local::take() - } - - /// Just check whether there is a local scheduler - fn have_local() -> bool { - local::exists() + return local_sched::take(); } // * Scheduler-context operations @@ -151,7 +125,7 @@ pub impl Scheduler { } None => { rtdebug!("no tasks in queue"); - local::put(self); + local_sched::put(self); return false; } } @@ -167,22 +141,24 @@ pub impl Scheduler { self.current_task = Some(task); self.enqueue_cleanup_job(DoNothing); - local::put(self); + local_sched::put(self); // Take pointers to both the task and scheduler's saved registers. - let sched = unsafe { local::borrow() }; - let (sched_context, _, next_task_context) = sched.get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); + unsafe { + let sched = local_sched::unsafe_borrow(); + let (sched_context, _, next_task_context) = sched.get_contexts(); + let next_task_context = next_task_context.unwrap(); + // Context switch to the task, restoring it's registers + // and saving the scheduler's + Context::swap(sched_context, next_task_context); - let sched = unsafe { local::borrow() }; - // The running task should have passed ownership elsewhere - assert!(sched.current_task.is_none()); + let sched = local_sched::unsafe_borrow(); + // The running task should have passed ownership elsewhere + assert!(sched.current_task.is_none()); - // Running tasks may have asked us to do some cleanup - sched.run_cleanup_job(); + // Running tasks may have asked us to do some cleanup + sched.run_cleanup_job(); + } } @@ -199,9 +175,9 @@ pub impl Scheduler { let dead_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RecycleTask(dead_task)); - local::put(self); + local_sched::put(self); - let sched = unsafe { local::borrow() }; + let sched = unsafe { local_sched::unsafe_borrow() }; let (sched_context, last_task_context, _) = sched.get_contexts(); let last_task_context = last_task_context.unwrap(); Context::swap(last_task_context, sched_context); @@ -228,15 +204,15 @@ pub impl Scheduler { let f_opaque = ClosureConverter::from_fn(f_fake_region); self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - local::put(self); + local_sched::put(self); - let sched = unsafe { local::borrow() }; + let sched = unsafe { local_sched::unsafe_borrow() }; let (sched_context, last_task_context, _) = sched.get_contexts(); let last_task_context = last_task_context.unwrap(); Context::swap(last_task_context, sched_context); // We could be executing in a different thread now - let sched = unsafe { local::borrow() }; + let sched = unsafe { local_sched::unsafe_borrow() }; sched.run_cleanup_job(); } @@ -253,17 +229,19 @@ pub impl Scheduler { self.enqueue_cleanup_job(RescheduleTask(old_running_task)); self.current_task = Some(next_task); - local::put(self); + local_sched::put(self); - let sched = unsafe { local::borrow() }; - let (_, last_task_context, next_task_context) = sched.get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); + unsafe { + let sched = local_sched::unsafe_borrow(); + let (_, last_task_context, next_task_context) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + let next_task_context = next_task_context.unwrap(); + Context::swap(last_task_context, next_task_context); - // We could be executing in a different thread now - let sched = unsafe { local::borrow() }; - sched.run_cleanup_job(); + // We could be executing in a different thread now + let sched = local_sched::unsafe_borrow(); + sched.run_cleanup_job(); + } } // * Other stuff @@ -363,12 +341,14 @@ pub impl Task { // This is the first code to execute after the initial // context switch to the task. The previous context may // have asked us to do some cleanup. - let sched = unsafe { local::borrow() }; - sched.run_cleanup_job(); + unsafe { + let sched = local_sched::unsafe_borrow(); + sched.run_cleanup_job(); + } start(); - let sched = Scheduler::take_local(); + let sched = local_sched::take(); sched.terminate_current_task(); }; return wrapper; @@ -428,7 +408,7 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Scheduler::take_local(); + let mut sched = local_sched::take(); let task2 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; @@ -460,7 +440,7 @@ fn test_run_a_lot_of_tasks_queued() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::borrow_local |sched| { + do local_sched::borrow |sched| { let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -493,7 +473,7 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - let mut sched = Scheduler::take_local(); + let mut sched = local_sched::take(); let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -513,11 +493,11 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::take_local(); + let sched = local_sched::take(); assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |task| { let task = Cell(task); - do Scheduler::borrow_local |sched| { + do local_sched::borrow |sched| { assert!(!sched.in_task_context()); sched.task_queue.push_back(task.take()); } diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index b069c67a5f7f..ff5397398354 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -17,7 +17,7 @@ use super::rtio::*; use ops::Drop; use cell::{Cell, empty_cell}; use cast::transmute; -use super::sched::Scheduler; +use super::sched::{Scheduler, local_sched}; #[cfg(test)] use super::sched::Task; #[cfg(test)] use unstable::run_in_bare_thread; @@ -121,14 +121,14 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context do scheduler.deschedule_running_task_and_then |task| { rtdebug!("connect: entered scheduler context"); - do Scheduler::borrow_local |scheduler| { + do local_sched::borrow |scheduler| { assert!(!scheduler.in_task_context()); } let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); @@ -150,7 +150,7 @@ impl IoFactory for UvIoFactory { unsafe { (*result_cell_ptr).put_back(maybe_stream); } // Context switch - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -195,7 +195,7 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); do scheduler.deschedule_running_task_and_then |task| { @@ -218,7 +218,7 @@ impl TcpListener for UvTcpListener { rtdebug!("resuming task from listen"); // Context switch - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -258,13 +258,13 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; do scheduler.deschedule_running_task_and_then |task| { rtdebug!("read: entered scheduler context"); - do Scheduler::borrow_local |scheduler| { + do local_sched::borrow |scheduler| { assert!(!scheduler.in_task_context()); } let mut watcher = watcher; @@ -292,7 +292,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -304,7 +304,7 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; @@ -323,7 +323,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -339,7 +339,7 @@ fn test_simple_io_no_connect() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - let io = unsafe { Scheduler::borrow_local_io() }; + let io = unsafe { local_sched::unsafe_borrow_io() }; let addr = Ipv4(127, 0, 0, 1, 2926); let maybe_chan = io.connect(addr); assert!(maybe_chan.is_none()); @@ -357,25 +357,29 @@ fn test_simple_tcp_server_and_client() { let addr = Ipv4(127, 0, 0, 1, 2929); let client_task = ~do Task::new(&mut sched.stack_pool) { - let io = unsafe { Scheduler::borrow_local_io() }; - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); + unsafe { + let io = local_sched::unsafe_borrow_io(); + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); + } }; let server_task = ~do Task::new(&mut sched.stack_pool) { - let io = unsafe { Scheduler::borrow_local_io() }; - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert!(nread == 8); - for uint::range(0, nread) |i| { - rtdebug!("%u", buf[i] as uint); - assert!(buf[i] == i as u8); + unsafe { + let io = local_sched::unsafe_borrow_io(); + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert!(nread == 8); + for uint::range(0, nread) |i| { + rtdebug!("%u", buf[i] as uint); + assert!(buf[i] == i as u8); + } + stream.close(); + listener.close(); } - stream.close(); - listener.close(); }; // Start the server first so it listens before the client connects @@ -392,7 +396,7 @@ fn test_read_and_block() { let addr = Ipv4(127, 0, 0, 1, 2930); let client_task = ~do Task::new(&mut sched.stack_pool) { - let io = unsafe { Scheduler::borrow_local_io() }; + let io = unsafe { local_sched::unsafe_borrow_io() }; let mut stream = io.connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -402,7 +406,7 @@ fn test_read_and_block() { }; let server_task = ~do Task::new(&mut sched.stack_pool) { - let io = unsafe { Scheduler::borrow_local_io() }; + let io = unsafe { local_sched::unsafe_borrow_io() }; let mut listener = io.bind(addr).unwrap(); let mut stream = listener.listen().unwrap(); let mut buf = [0, .. 2048]; @@ -420,13 +424,13 @@ fn test_read_and_block() { } reads += 1; - let scheduler = Scheduler::take_local(); + let scheduler = local_sched::take(); // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it do scheduler.deschedule_running_task_and_then |task| { let task = Cell(task); - do Scheduler::borrow_local |scheduler| { + do local_sched::borrow |scheduler| { scheduler.task_queue.push_back(task.take()); } } @@ -453,7 +457,7 @@ fn test_read_read_read() { let addr = Ipv4(127, 0, 0, 1, 2931); let client_task = ~do Task::new(&mut sched.stack_pool) { - let io = unsafe { Scheduler::borrow_local_io() }; + let io = unsafe { local_sched::unsafe_borrow_io() }; let mut stream = io.connect(addr).unwrap(); let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs index 47e386029955..5b45f498319f 100644 --- a/src/libcore/task/spawn.rs +++ b/src/libcore/task/spawn.rs @@ -553,7 +553,7 @@ fn spawn_raw_newsched(opts: TaskOpts, f: ~fn()) { use rt::sched::*; // XXX: How to schedule a new task is a policy decision that shouldn't be made here - let mut sched = Scheduler::take_local(); + let mut sched = local_sched::take(); let task = ~Task::new(&mut sched.stack_pool, f); sched.resume_task_from_running_task_direct(task); }