auto merge of #15944 : alexcrichton/rust/task-dont-die, r=brson

Previously both spawning mechanisms were not resilient to task failures which were initiated from the task spawning infrastructure.

Closes #15895
This commit is contained in:
bors 2014-07-30 17:06:18 +00:00
commit 7a25cf3f30
7 changed files with 111 additions and 16 deletions

View file

@ -219,7 +219,7 @@ impl Scheduler {
let message = stask.sched.get_mut_ref().message_queue.pop();
rtassert!(match message { msgq::Empty => true, _ => false });
stask.task.get_mut_ref().destroyed = true;
stask.task.take().unwrap().drop();
}
// This does not return a scheduler, as the scheduler is placed

View file

@ -442,15 +442,30 @@ impl Runtime for GreenTask {
f: proc():Send) {
self.put_task(cur_task);
// First, set up a bomb which when it goes off will restore the local
// task unless its disarmed. This will allow us to gracefully fail from
// inside of `configure` which allocates a new task.
struct Bomb { inner: Option<Box<GreenTask>> }
impl Drop for Bomb {
fn drop(&mut self) {
let _ = self.inner.take().map(|task| task.put());
}
}
let mut bomb = Bomb { inner: Some(self) };
// Spawns a task into the current scheduler. We allocate the new task's
// stack from the scheduler's stack pool, and then configure it
// accordingly to `opts`. Afterwards we bootstrap it immediately by
// switching to it.
//
// Upon returning, our task is back in TLS and we're good to return.
let mut sched = self.sched.take_unwrap();
let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f);
sched.run_task(self, sibling)
let sibling = {
let sched = bomb.inner.get_mut_ref().sched.get_mut_ref();
GreenTask::configure(&mut sched.stack_pool, opts, f)
};
let mut me = bomb.inner.take().unwrap();
let sched = me.sched.take().unwrap();
sched.run_task(me, sibling)
}
// Local I/O is provided by the scheduler's event loop

View file

@ -71,7 +71,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
// Note that this increment must happen *before* the spawn in order to
// guarantee that if this task exits it will always end up waiting for the
// spawned task to exit.
bookkeeping::increment();
let token = bookkeeping::increment();
// Spawning a new OS thread guarantees that __morestack will never get
// triggered, but we must manually set up the actual stack bounds once this
@ -93,7 +93,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
let mut task = task;
task.put_runtime(ops);
drop(task.run(|| { f.take_unwrap()() }).destroy());
bookkeeping::decrement();
drop(token);
})
}

View file

@ -19,14 +19,24 @@
//! decrement() manually.
use core::atomics;
use core::ops::Drop;
use mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
pub fn increment() {
pub struct Token { _private: () }
impl Drop for Token {
fn drop(&mut self) { decrement() }
}
/// Increment the number of live tasks, returning a token which will decrement
/// the count when dropped.
pub fn increment() -> Token {
let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) };
Token { _private: () }
}
pub fn decrement() {

View file

@ -125,8 +125,8 @@ mod test {
}).join();
}
fn cleanup_task(mut t: Box<Task>) {
t.destroyed = true;
fn cleanup_task(t: Box<Task>) {
t.drop();
}
}

View file

@ -100,12 +100,21 @@ pub struct Task {
pub storage: LocalStorage,
pub unwinder: Unwinder,
pub death: Death,
pub destroyed: bool,
pub name: Option<SendStr>,
state: TaskState,
imp: Option<Box<Runtime + Send>>,
}
// Once a task has entered the `Armed` state it must be destroyed via `drop`,
// and no other method. This state is used to track this transition.
#[deriving(PartialEq)]
enum TaskState {
New,
Armed,
Destroyed,
}
pub struct TaskOpts {
/// Invoke this procedure with the result of the task when it finishes.
pub on_exit: Option<proc(Result): Send>,
@ -159,7 +168,7 @@ impl Task {
storage: LocalStorage(None),
unwinder: Unwinder::new(),
death: Death::new(),
destroyed: false,
state: New,
name: None,
imp: None,
}
@ -203,7 +212,7 @@ impl Task {
/// }).destroy();
/// # }
/// ```
pub fn run(self: Box<Task>, f: ||) -> Box<Task> {
pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> {
assert!(!self.is_destroyed(), "cannot re-use a destroyed task");
// First, make sure that no one else is in TLS. This does not allow
@ -212,6 +221,7 @@ impl Task {
if Local::exists(None::<Task>) {
fail!("cannot run a task recursively inside another");
}
self.state = Armed;
Local::put(self);
// There are two primary reasons that general try/catch is unsafe. The
@ -333,12 +343,12 @@ impl Task {
// Now that we're done, we remove the task from TLS and flag it for
// destruction.
let mut task: Box<Task> = Local::take();
task.destroyed = true;
task.state = Destroyed;
return task;
}
/// Queries whether this can be destroyed or not.
pub fn is_destroyed(&self) -> bool { self.destroyed }
pub fn is_destroyed(&self) -> bool { self.state == Destroyed }
/// Inserts a runtime object into this task, transferring ownership to the
/// task. It is illegal to replace a previous runtime object in this task
@ -453,12 +463,20 @@ impl Task {
pub fn can_block(&self) -> bool {
self.imp.get_ref().can_block()
}
/// Consume this task, flagging it as a candidate for destruction.
///
/// This function is required to be invoked to destroy a task. A task
/// destroyed through a normal drop will abort.
pub fn drop(mut self) {
self.state = Destroyed;
}
}
impl Drop for Task {
fn drop(&mut self) {
rtdebug!("called drop for a task: {}", self as *mut Task as uint);
rtassert!(self.destroyed);
rtassert!(self.state != Armed);
}
}
@ -634,12 +652,17 @@ mod test {
begin_unwind("cause", file!(), line!())
}
#[test]
fn drop_new_task_ok() {
drop(Task::new());
}
// Task blocking tests
#[test]
fn block_and_wake() {
let task = box Task::new();
let mut task = BlockedTask::block(task).wake().unwrap();
task.destroyed = true;
task.drop();
}
}

View file

@ -0,0 +1,47 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// ignore-macos apparently gargantuan mmap requests are ok?
#![feature(phase)]
#[phase(plugin)]
extern crate green;
extern crate native;
use std::task::TaskBuilder;
use native::NativeTaskBuilder;
green_start!(main)
fn main() {
test();
let (tx, rx) = channel();
TaskBuilder::new().native().spawn(proc() {
tx.send(test());
});
rx.recv();
}
#[cfg(not(target_word_size = "64"))]
fn test() {}
#[cfg(target_word_size = "64")]
fn test() {
let (tx, rx) = channel();
spawn(proc() {
TaskBuilder::new().stack_size(1024 * 1024 * 1024 * 64).spawn(proc() {
});
tx.send(());
});
assert!(rx.recv_opt().is_err());
}