From ebefe07792caf17c03c6f90fb1979d4e6c935001 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 16:00:15 -0700 Subject: [PATCH] core::rt: Make Scheduler::unsafe_local return a fabricated region pointer Instead of taking a closure. It's unsafe either way. Rename it to unsafe_local_borrow. --- src/libcore/rt/sched/local.rs | 7 +- src/libcore/rt/sched/mod.rs | 97 +++++----- src/libcore/rt/uvio.rs | 343 ++++++++++++++++------------------ 3 files changed, 214 insertions(+), 233 deletions(-) diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local.rs index 020d581546aa..d80010111146 100644 --- a/src/libcore/rt/sched/local.rs +++ b/src/libcore/rt/sched/local.rs @@ -43,7 +43,7 @@ pub fn take() -> ~Scheduler { /// # Safety Note /// Because this leaves the Scheduler in thread-local storage it is possible /// For the Scheduler pointer to be aliased -pub unsafe fn borrow(f: &fn(&mut Scheduler)) { +pub unsafe fn borrow() -> &mut Scheduler { unsafe { let key = tls_key(); let mut void_sched: *mut c_void = tls::get(key); @@ -54,7 +54,7 @@ pub unsafe fn borrow(f: &fn(&mut Scheduler)) { transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) }; let sched: &mut Scheduler = &mut **sched; - f(sched); + return sched; } } } @@ -93,8 +93,7 @@ fn borrow_smoke_test() { let scheduler = ~UvEventLoop::new_scheduler(); put(scheduler); unsafe { - do borrow |_sched| { - } + let _scheduler = borrow(); } let _scheduler = take(); } diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index 82f0c3592c49..a2a440ba76ea 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -95,20 +95,18 @@ pub impl Scheduler { // Give ownership of the scheduler (self) to the thread local::put(self); - do Scheduler::unsafe_local |scheduler| { - fn run_scheduler_once() { - do Scheduler::unsafe_local |scheduler| { - if scheduler.resume_task_from_queue() { - // Ok, a task ran. Nice! We'll do it again later - scheduler.event_loop.callback(run_scheduler_once); - } - } + let scheduler = Scheduler::unsafe_local_borrow(); + fn run_scheduler_once() { + let scheduler = Scheduler::unsafe_local_borrow(); + if scheduler.resume_task_from_queue() { + // Ok, a task ran. Nice! We'll do it again later + scheduler.event_loop.callback(run_scheduler_once); } - - scheduler.event_loop.callback(run_scheduler_once); - scheduler.event_loop.run(); } + scheduler.event_loop.callback(run_scheduler_once); + scheduler.event_loop.run(); + return local::take(); } @@ -116,8 +114,14 @@ pub impl Scheduler { /// # Safety Note /// This allows other mutable aliases to the scheduler, both in the current /// execution context and other execution contexts. - fn unsafe_local(f: &fn(&mut Scheduler)) { - unsafe { local::borrow(f) } + fn unsafe_local_borrow() -> &mut Scheduler { + unsafe { local::borrow() } + } + + fn local_borrow(f: &fn(&mut Scheduler)) { + let mut sched = local::take(); + f(sched); + local::put(sched); } // * Scheduler-context operations @@ -208,9 +212,8 @@ pub impl Scheduler { } // We could be executing in a different thread now - do Scheduler::unsafe_local |sched| { - sched.run_cleanup_job(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.run_cleanup_job(); } /// Switch directly to another task, without going through the scheduler. @@ -232,9 +235,8 @@ pub impl Scheduler { } // We could be executing in a different thread now - do Scheduler::unsafe_local |sched| { - sched.run_cleanup_job(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.run_cleanup_job(); } // * Other stuff @@ -331,15 +333,13 @@ pub impl Task { // This is the first code to execute after the initial // context switch to the task. The previous context may // have asked us to do some cleanup. - do Scheduler::unsafe_local |sched| { - sched.run_cleanup_job(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.run_cleanup_job(); start(); - do Scheduler::unsafe_local |sched| { - sched.terminate_current_task(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.terminate_current_task(); }; return wrapper; } @@ -398,13 +398,12 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - do Scheduler::unsafe_local |sched| { - let task2 = ~do Task::new(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - sched.resume_task_from_running_task_direct(task2); - } + let sched = Scheduler::unsafe_local_borrow(); + let task2 = ~do Task::new(&mut sched.stack_pool) { + unsafe { *count_ptr = *count_ptr + 1; } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task2); unsafe { *count_ptr = *count_ptr + 1; } }; sched.task_queue.push_back(task1); @@ -431,7 +430,7 @@ fn test_run_a_lot_of_tasks_queued() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::unsafe_local |sched| { + do Scheduler::local_borrow |sched| { let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -464,18 +463,17 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::unsafe_local |sched| { - let task = ~do Task::new(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } + let sched = Scheduler::unsafe_local_borrow(); + let task = ~do Task::new(&mut sched.stack_pool) { + unsafe { + *count_ptr = *count_ptr + 1; + if *count_ptr != MAX { + run_task(count_ptr); } - }; - // Context switch directly to the new task - sched.resume_task_from_running_task_direct(task); - } + } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task); }; } } @@ -485,12 +483,11 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.task_queue.push_back(task); - } + let sched = Scheduler::unsafe_local_borrow(); + assert!(sched.in_task_context()); + do sched.deschedule_running_task_and_then() |sched, task| { + assert!(!sched.in_task_context()); + sched.task_queue.push_back(task); } }; sched.task_queue.push_back(task); diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 7dfc6fff4cdf..a43ec07c2de1 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -104,37 +104,35 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); - // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |scheduler, task| { + // Block this task and take ownership, switch to scheduler context + do scheduler.deschedule_running_task_and_then |scheduler, task| { - rtdebug!("connect: entered scheduler context"); - assert!(!scheduler.in_task_context()); - let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); - let task_cell = Cell(task); + rtdebug!("connect: entered scheduler context"); + assert!(!scheduler.in_task_context()); + let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); + let task_cell = Cell(task); - // Wait for a connection - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connect: in connect callback"); - let maybe_stream = if status.is_none() { - rtdebug!("status is none"); - Some(~UvStream(stream_watcher)) - } else { - rtdebug!("status is some"); - stream_watcher.close(||()); - None - }; + // Wait for a connection + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("connect: in connect callback"); + let maybe_stream = if status.is_none() { + rtdebug!("status is none"); + Some(~UvStream(stream_watcher)) + } else { + rtdebug!("status is some"); + stream_watcher.close(||()); + None + }; - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(maybe_stream); } + // Store the stream in the task's stack + unsafe { (*result_cell_ptr).put_back(maybe_stream); } - // Context switch - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + // Context switch + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -178,33 +176,31 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell(task); - let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |server_stream_watcher, status| { - let maybe_stream = if status.is_none() { - let mut server_stream_watcher = server_stream_watcher; - let mut loop_ = loop_from_watcher(&server_stream_watcher); - let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = client_tcp_watcher.as_stream(); - // XXX: Need's to be surfaced in interface - server_stream_watcher.accept(client_tcp_watcher); - Some(~UvStream::new(client_tcp_watcher)) - } else { - None - }; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell(task); + let mut server_tcp_watcher = server_tcp_watcher; + do server_tcp_watcher.listen |server_stream_watcher, status| { + let maybe_stream = if status.is_none() { + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = loop_from_watcher(&server_stream_watcher); + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + // XXX: Need's to be surfaced in interface + server_stream_watcher.accept(client_tcp_watcher); + Some(~UvStream::new(client_tcp_watcher)) + } else { + None + }; - unsafe { (*result_cell_ptr).put_back(maybe_stream); } + unsafe { (*result_cell_ptr).put_back(maybe_stream); } - rtdebug!("resuming task from listen"); - // Context switch - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + rtdebug!("resuming task from listen"); + // Context switch + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -243,42 +239,40 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); - let watcher = self.watcher(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |scheduler, task| { - rtdebug!("read: entered scheduler context"); - assert!(!scheduler.in_task_context()); + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |scheduler, task| { + rtdebug!("read: entered scheduler context"); + assert!(!scheduler.in_task_context()); + let mut watcher = watcher; + let task_cell = Cell(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) + }; + do watcher.read_start(alloc) |watcher, nread, _buf, status| { + + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? let mut watcher = watcher; - let task_cell = Cell(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) + watcher.read_stop(); + + let result = if status.is_none() { + assert!(nread >= 0); + Ok(nread as uint) + } else { + Err(()) }; - do watcher.read_start(alloc) |watcher, nread, _buf, status| { - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - let mut watcher = watcher; - watcher.read_stop(); + unsafe { (*result_cell_ptr).put_back(result); } - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(()) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -289,29 +283,27 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); - let watcher = self.watcher(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let mut watcher = watcher; - let task_cell = Cell(task); - let buf = unsafe { &*buf_ptr }; - // XXX: OMGCOPIES - let buf = buf.to_vec(); - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(()) - }; + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let mut watcher = watcher; + let task_cell = Cell(task); + let buf = unsafe { &*buf_ptr }; + // XXX: OMGCOPIES + let buf = buf.to_vec(); + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(()) + }; - unsafe { (*result_cell_ptr).put_back(result); } + unsafe { (*result_cell_ptr).put_back(result); } - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -326,12 +318,11 @@ fn test_simple_io_no_connect() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let addr = Ipv4(127, 0, 0, 1, 2926); - let maybe_chan = io.connect(addr); - assert!(maybe_chan.is_none()); - } + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let addr = Ipv4(127, 0, 0, 1, 2926); + let maybe_chan = io.connect(addr); + assert!(maybe_chan.is_none()); }; sched.task_queue.push_back(task); sched.run(); @@ -346,29 +337,27 @@ fn test_simple_tcp_server_and_client() { let addr = Ipv4(127, 0, 0, 1, 2929); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); - } + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert!(nread == 8); - for uint::range(0, nread) |i| { - rtdebug!("%u", buf[i] as uint); - assert!(buf[i] == i as u8); - } - stream.close(); - listener.close(); + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert!(nread == 8); + for uint::range(0, nread) |i| { + rtdebug!("%u", buf[i] as uint); + assert!(buf[i] == i as u8); } + stream.close(); + listener.close(); }; // Start the server first so it listens before the client connects @@ -385,53 +374,50 @@ fn test_read_and_block() { let addr = Ipv4(127, 0, 0, 1, 2930); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); - } + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); - let mut buf = [0, .. 2048]; + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; - let expected = 32; - let mut current = 0; - let mut reads = 0; + let expected = 32; + let mut current = 0; + let mut reads = 0; - while current < expected { - let nread = stream.read(buf).unwrap(); - for uint::range(0, nread) |i| { - let val = buf[i] as uint; - assert!(val == current % 8); - current += 1; - } - reads += 1; - - do Scheduler::unsafe_local |scheduler| { - // Yield to the other task in hopes that it - // will trigger a read callback while we are - // not ready for it - do scheduler.deschedule_running_task_and_then |scheduler, task| { - scheduler.task_queue.push_back(task); - } - } + while current < expected { + let nread = stream.read(buf).unwrap(); + for uint::range(0, nread) |i| { + let val = buf[i] as uint; + assert!(val == current % 8); + current += 1; } + reads += 1; - // Make sure we had multiple reads - assert!(reads > 1); - - stream.close(); - listener.close(); + let scheduler = Scheduler::unsafe_local_borrow(); + // Yield to the other task in hopes that it + // will trigger a read callback while we are + // not ready for it + do scheduler.deschedule_running_task_and_then |scheduler, task| { + scheduler.task_queue.push_back(task); + } } + + // Make sure we had multiple reads + assert!(reads > 1); + + stream.close(); + listener.close(); }; // Start the server first so it listens before the client connects @@ -448,19 +434,18 @@ fn test_read_read_read() { let addr = Ipv4(127, 0, 0, 1, 2931); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < 500000000 { - let nread = stream.read(buf).unwrap(); - rtdebug!("read %u bytes", nread as uint); - total_bytes_read += nread; - } - rtdebug_!("read %u bytes total", total_bytes_read as uint); - stream.close(); + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < 500000000 { + let nread = stream.read(buf).unwrap(); + rtdebug!("read %u bytes", nread as uint); + total_bytes_read += nread; } + rtdebug_!("read %u bytes total", total_bytes_read as uint); + stream.close(); }; sched.task_queue.push_back(client_task);