std::rt: Tasks contain a JoinLatch

This commit is contained in:
Brian Anderson 2013-06-13 23:31:19 -07:00
parent 90fbe38f00
commit 505ef7e710
2 changed files with 56 additions and 32 deletions

View file

@ -16,9 +16,11 @@
use prelude::*;
use libc::{c_void, uintptr_t};
use cast::transmute;
use option::{Option, Some, None};
use rt::local::Local;
use super::local_heap::LocalHeap;
use rt::logging::StdErrLogger;
use rt::join_latch::JoinLatch;
pub struct Task {
heap: LocalHeap,
@ -26,6 +28,8 @@ pub struct Task {
storage: LocalStorage,
logger: StdErrLogger,
unwinder: Unwinder,
join_latch: Option<~JoinLatch>,
on_exit: Option<~fn(bool)>,
destroyed: bool
}
@ -44,6 +48,8 @@ impl Task {
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: Unwinder { unwinding: false },
join_latch: Some(JoinLatch::new_root()),
on_exit: None,
destroyed: false
}
}
@ -55,6 +61,8 @@ impl Task {
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: Unwinder { unwinding: false },
join_latch: Some(self.join_latch.get_mut_ref().new_child()),
on_exit: None,
destroyed: false
}
}
@ -68,9 +76,22 @@ impl Task {
self.unwinder.try(f);
self.destroy();
// Wait for children. Possibly report the exit status.
let local_success = !self.unwinder.unwinding;
let join_latch = self.join_latch.swap_unwrap();
match self.on_exit {
Some(ref on_exit) => {
let success = join_latch.wait(local_success);
(*on_exit)(success);
}
None => {
join_latch.release(local_success);
}
}
}
/// Must be called manually before finalization to clean up
/// must be called manually before finalization to clean up
/// thread-local resources. Some of the routines here expect
/// Task to be available recursively so this must be
/// called unsafely, without removing Task from
@ -216,5 +237,15 @@ mod test {
assert!(port.recv() == 10);
}
}
#[test]
fn linked_failure() {
do run_in_newsched_task() {
let res = do spawntask_try {
spawntask_random(|| fail!());
};
assert!(res.is_err());
}
}
}

View file

@ -18,6 +18,7 @@ use vec::OwnedVector;
use result::{Result, Ok, Err};
use unstable::run_in_bare_thread;
use super::io::net::ip::{IpAddr, Ipv4};
use rt::comm::oneshot;
use rt::task::Task;
use rt::thread::Thread;
use rt::local::Local;
@ -47,8 +48,11 @@ pub fn run_in_newsched_task(f: ~fn()) {
do run_in_bare_thread {
let mut sched = ~new_test_uv_sched();
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
new_task.on_exit = Some(on_exit);
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::new_root(),
new_task,
f.take());
sched.enqueue_task(task);
sched.run();
@ -94,16 +98,20 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
let f_cell = Cell(f_cell.take());
let handles = Cell(handles);
let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) {
f_cell.take()();
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| {
let mut handles = handles.take();
// Tell schedulers to exit
for handles.each_mut |handle| {
handle.send(Shutdown);
}
};
rtassert!(exit_status);
};
new_task.on_exit = Some(on_exit);
let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
new_task, f_cell.take());
scheds[0].enqueue_task(main_task);
let mut threads = ~[];
@ -213,36 +221,21 @@ pub fn spawntask_random(f: ~fn()) {
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
use cell::Cell;
use super::sched::*;
use task;
use unstable::finally::Finally;
// Our status variables will be filled in from the scheduler context
let mut failed = false;
let failed_ptr: *mut bool = &mut failed;
// Switch to the scheduler
let f = Cell(Cell(f));
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |sched, old_task| {
let old_task = Cell(old_task);
let f = f.take();
let new_task = ~do Coroutine::new_root(&mut sched.stack_pool) {
do (|| {
(f.take())()
}).finally {
// Check for failure then resume the parent task
unsafe { *failed_ptr = task::failing(); }
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| {
sched.enqueue_task(new_task);
}
}
};
sched.enqueue_task(new_task);
let (port, chan) = oneshot();
let chan = Cell(chan);
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
new_task.on_exit = Some(on_exit);
let mut sched = Local::take::<Scheduler>();
let new_task = ~Coroutine::with_task(&mut sched.stack_pool,
new_task, f);
do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
sched.enqueue_task(old_task);
}
if !failed { Ok(()) } else { Err(()) }
let exit_status = port.recv();
if exit_status { Ok(()) } else { Err(()) }
}
// Spawn a new task in a new scheduler and return a thread handle.