core::rt: Make Scheduler::unsafe_local return a fabricated region pointer

Instead of taking a closure. It's unsafe either way. Rename it to unsafe_local_borrow.
This commit is contained in:
Brian Anderson 2013-04-15 16:00:15 -07:00
parent 473b4d19ad
commit ebefe07792
3 changed files with 214 additions and 233 deletions

View file

@ -43,7 +43,7 @@ pub fn take() -> ~Scheduler {
/// # Safety Note
/// Because this leaves the Scheduler in thread-local storage it is possible
/// For the Scheduler pointer to be aliased
pub unsafe fn borrow(f: &fn(&mut Scheduler)) {
pub unsafe fn borrow() -> &mut Scheduler {
unsafe {
let key = tls_key();
let mut void_sched: *mut c_void = tls::get(key);
@ -54,7 +54,7 @@ pub unsafe fn borrow(f: &fn(&mut Scheduler)) {
transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
};
let sched: &mut Scheduler = &mut **sched;
f(sched);
return sched;
}
}
}
@ -93,8 +93,7 @@ fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
unsafe {
do borrow |_sched| {
}
let _scheduler = borrow();
}
let _scheduler = take();
}

View file

@ -95,20 +95,18 @@ pub impl Scheduler {
// Give ownership of the scheduler (self) to the thread
local::put(self);
do Scheduler::unsafe_local |scheduler| {
fn run_scheduler_once() {
do Scheduler::unsafe_local |scheduler| {
if scheduler.resume_task_from_queue() {
// Ok, a task ran. Nice! We'll do it again later
scheduler.event_loop.callback(run_scheduler_once);
}
}
let scheduler = Scheduler::unsafe_local_borrow();
fn run_scheduler_once() {
let scheduler = Scheduler::unsafe_local_borrow();
if scheduler.resume_task_from_queue() {
// Ok, a task ran. Nice! We'll do it again later
scheduler.event_loop.callback(run_scheduler_once);
}
scheduler.event_loop.callback(run_scheduler_once);
scheduler.event_loop.run();
}
scheduler.event_loop.callback(run_scheduler_once);
scheduler.event_loop.run();
return local::take();
}
@ -116,8 +114,14 @@ pub impl Scheduler {
/// # Safety Note
/// This allows other mutable aliases to the scheduler, both in the current
/// execution context and other execution contexts.
fn unsafe_local(f: &fn(&mut Scheduler)) {
unsafe { local::borrow(f) }
fn unsafe_local_borrow() -> &mut Scheduler {
unsafe { local::borrow() }
}
fn local_borrow(f: &fn(&mut Scheduler)) {
let mut sched = local::take();
f(sched);
local::put(sched);
}
// * Scheduler-context operations
@ -208,9 +212,8 @@ pub impl Scheduler {
}
// We could be executing in a different thread now
do Scheduler::unsafe_local |sched| {
sched.run_cleanup_job();
}
let sched = Scheduler::unsafe_local_borrow();
sched.run_cleanup_job();
}
/// Switch directly to another task, without going through the scheduler.
@ -232,9 +235,8 @@ pub impl Scheduler {
}
// We could be executing in a different thread now
do Scheduler::unsafe_local |sched| {
sched.run_cleanup_job();
}
let sched = Scheduler::unsafe_local_borrow();
sched.run_cleanup_job();
}
// * Other stuff
@ -331,15 +333,13 @@ pub impl Task {
// This is the first code to execute after the initial
// context switch to the task. The previous context may
// have asked us to do some cleanup.
do Scheduler::unsafe_local |sched| {
sched.run_cleanup_job();
}
let sched = Scheduler::unsafe_local_borrow();
sched.run_cleanup_job();
start();
do Scheduler::unsafe_local |sched| {
sched.terminate_current_task();
}
let sched = Scheduler::unsafe_local_borrow();
sched.terminate_current_task();
};
return wrapper;
}
@ -398,13 +398,12 @@ fn test_swap_tasks() {
let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
do Scheduler::unsafe_local |sched| {
let task2 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
sched.resume_task_from_running_task_direct(task2);
}
let sched = Scheduler::unsafe_local_borrow();
let task2 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
sched.resume_task_from_running_task_direct(task2);
unsafe { *count_ptr = *count_ptr + 1; }
};
sched.task_queue.push_back(task1);
@ -431,7 +430,7 @@ fn test_run_a_lot_of_tasks_queued() {
assert!(count == MAX);
fn run_task(count_ptr: *mut int) {
do Scheduler::unsafe_local |sched| {
do Scheduler::local_borrow |sched| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
@ -464,18 +463,17 @@ fn test_run_a_lot_of_tasks_direct() {
assert!(count == MAX);
fn run_task(count_ptr: *mut int) {
do Scheduler::unsafe_local |sched| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
run_task(count_ptr);
}
let sched = Scheduler::unsafe_local_borrow();
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
run_task(count_ptr);
}
};
// Context switch directly to the new task
sched.resume_task_from_running_task_direct(task);
}
}
};
// Context switch directly to the new task
sched.resume_task_from_running_task_direct(task);
};
}
}
@ -485,12 +483,11 @@ fn test_block_task() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::unsafe_local |sched| {
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |sched, task| {
assert!(!sched.in_task_context());
sched.task_queue.push_back(task);
}
let sched = Scheduler::unsafe_local_borrow();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |sched, task| {
assert!(!sched.in_task_context());
sched.task_queue.push_back(task);
}
};
sched.task_queue.push_back(task);

View file

@ -104,37 +104,35 @@ impl IoFactory for UvIoFactory {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
do Scheduler::unsafe_local |scheduler| {
assert!(scheduler.in_task_context());
let scheduler = Scheduler::unsafe_local_borrow();
assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |scheduler, task| {
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |scheduler, task| {
rtdebug!("connect: entered scheduler context");
assert!(!scheduler.in_task_context());
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
let task_cell = Cell(task);
rtdebug!("connect: entered scheduler context");
assert!(!scheduler.in_task_context());
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
let task_cell = Cell(task);
// Wait for a connection
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
rtdebug!("status is none");
Some(~UvStream(stream_watcher))
} else {
rtdebug!("status is some");
stream_watcher.close(||());
None
};
// Wait for a connection
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
rtdebug!("status is none");
Some(~UvStream(stream_watcher))
} else {
rtdebug!("status is some");
stream_watcher.close(||());
None
};
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Context switch
do Scheduler::unsafe_local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
// Context switch
let scheduler = Scheduler::unsafe_local_borrow();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -178,33 +176,31 @@ impl TcpListener for UvTcpListener {
let server_tcp_watcher = self.watcher();
do Scheduler::unsafe_local |scheduler| {
assert!(scheduler.in_task_context());
let scheduler = Scheduler::unsafe_local_borrow();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell(task);
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_from_watcher(&server_stream_watcher);
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
} else {
None
};
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell(task);
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_from_watcher(&server_stream_watcher);
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
} else {
None
};
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
rtdebug!("resuming task from listen");
// Context switch
do Scheduler::unsafe_local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
rtdebug!("resuming task from listen");
// Context switch
let scheduler = Scheduler::unsafe_local_borrow();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -243,42 +239,40 @@ impl Stream for UvStream {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
do Scheduler::unsafe_local |scheduler| {
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |scheduler, task| {
rtdebug!("read: entered scheduler context");
assert!(!scheduler.in_task_context());
let scheduler = Scheduler::unsafe_local_borrow();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |scheduler, task| {
rtdebug!("read: entered scheduler context");
assert!(!scheduler.in_task_context());
let mut watcher = watcher;
let task_cell = Cell(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
do watcher.read_start(alloc) |watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
let mut watcher = watcher;
let task_cell = Cell(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
watcher.read_stop();
let result = if status.is_none() {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(())
};
do watcher.read_start(alloc) |watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
let mut watcher = watcher;
watcher.read_stop();
unsafe { (*result_cell_ptr).put_back(result); }
let result = if status.is_none() {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(())
};
unsafe { (*result_cell_ptr).put_back(result); }
do Scheduler::unsafe_local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
let scheduler = Scheduler::unsafe_local_borrow();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -289,29 +283,27 @@ impl Stream for UvStream {
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
do Scheduler::unsafe_local |scheduler| {
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let mut watcher = watcher;
let task_cell = Cell(task);
let buf = unsafe { &*buf_ptr };
// XXX: OMGCOPIES
let buf = buf.to_vec();
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(())
};
let scheduler = Scheduler::unsafe_local_borrow();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let mut watcher = watcher;
let task_cell = Cell(task);
let buf = unsafe { &*buf_ptr };
// XXX: OMGCOPIES
let buf = buf.to_vec();
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(())
};
unsafe { (*result_cell_ptr).put_back(result); }
unsafe { (*result_cell_ptr).put_back(result); }
do Scheduler::unsafe_local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
let scheduler = Scheduler::unsafe_local_borrow();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -326,12 +318,11 @@ fn test_simple_io_no_connect() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::unsafe_local |sched| {
let io = sched.event_loop.io().unwrap();
let addr = Ipv4(127, 0, 0, 1, 2926);
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
}
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let addr = Ipv4(127, 0, 0, 1, 2926);
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
};
sched.task_queue.push_back(task);
sched.run();
@ -346,29 +337,27 @@ fn test_simple_tcp_server_and_client() {
let addr = Ipv4(127, 0, 0, 1, 2929);
let client_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::unsafe_local |sched| {
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::unsafe_local |sched| {
let io = sched.event_loop.io().unwrap();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert!(nread == 8);
for uint::range(0, nread) |i| {
rtdebug!("%u", buf[i] as uint);
assert!(buf[i] == i as u8);
}
stream.close();
listener.close();
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert!(nread == 8);
for uint::range(0, nread) |i| {
rtdebug!("%u", buf[i] as uint);
assert!(buf[i] == i as u8);
}
stream.close();
listener.close();
};
// Start the server first so it listens before the client connects
@ -385,53 +374,50 @@ fn test_read_and_block() {
let addr = Ipv4(127, 0, 0, 1, 2930);
let client_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::unsafe_local |sched| {
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::unsafe_local |sched| {
let io = sched.event_loop.io().unwrap();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
let mut current = 0;
let mut reads = 0;
let expected = 32;
let mut current = 0;
let mut reads = 0;
while current < expected {
let nread = stream.read(buf).unwrap();
for uint::range(0, nread) |i| {
let val = buf[i] as uint;
assert!(val == current % 8);
current += 1;
}
reads += 1;
do Scheduler::unsafe_local |scheduler| {
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.deschedule_running_task_and_then |scheduler, task| {
scheduler.task_queue.push_back(task);
}
}
while current < expected {
let nread = stream.read(buf).unwrap();
for uint::range(0, nread) |i| {
let val = buf[i] as uint;
assert!(val == current % 8);
current += 1;
}
reads += 1;
// Make sure we had multiple reads
assert!(reads > 1);
stream.close();
listener.close();
let scheduler = Scheduler::unsafe_local_borrow();
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.deschedule_running_task_and_then |scheduler, task| {
scheduler.task_queue.push_back(task);
}
}
// Make sure we had multiple reads
assert!(reads > 1);
stream.close();
listener.close();
};
// Start the server first so it listens before the client connects
@ -448,19 +434,18 @@ fn test_read_read_read() {
let addr = Ipv4(127, 0, 0, 1, 2931);
let client_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::unsafe_local |sched| {
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < 500000000 {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
}
rtdebug_!("read %u bytes total", total_bytes_read as uint);
stream.close();
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < 500000000 {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
}
rtdebug_!("read %u bytes total", total_bytes_read as uint);
stream.close();
};
sched.task_queue.push_back(client_task);