Add task::task_builder interface for improved spawning (related #2585)

This commit is contained in:
Ben Blum 2012-07-23 19:27:44 -04:00
parent 7680f504c2
commit e6efb24f3f

View file

@ -35,6 +35,7 @@ export sched_mode;
export sched_opts;
export task_opts;
export builder;
export task_builder;
export default_task_opts;
export get_opts;
@ -46,7 +47,6 @@ export run;
export future_result;
export future_task;
export unsupervise;
export parent;
export run_listener;
export run_with;
@ -77,7 +77,7 @@ export osmain;
/* Data types */
/// A handle to a task
enum task = task_id;
enum task { task_handle(task_id) }
/**
* Indicates the manner in which a task exited.
@ -192,6 +192,141 @@ enum builder {
})
}
class dummy { let x: (); new() { self.x = (); } drop { } }
// FIXME (#2585): Replace the 'consumed' bit with move mode on self
enum task_builder = {
opts: task_opts,
gen_body: fn@(+fn~()) -> fn~(),
can_not_copy: option<dummy>,
mut consumed: bool,
};
/**
* Generate the base configuration for spawning a task, off of which more
* configuration methods can be chained.
* For example, task().unlinked().spawn is equivalent to spawn_unlinked.
*/
fn task() -> task_builder {
task_builder({
opts: default_task_opts(),
gen_body: |body| body, // Identity function
can_not_copy: none,
mut consumed: false,
})
}
impl private_methods for task_builder {
fn consume() -> task_builder {
if self.consumed {
fail ~"Cannot copy a task_builder"; // Fake move mode on self
}
self.consumed = true;
task_builder({ can_not_copy: none, mut consumed: false, with *self })
}
}
impl task_builder for task_builder {
/**
* Decouple the child task's failure from the parent's. If either fails,
* the other will not be killed.
*/
fn unlinked() -> task_builder {
task_builder({
opts: { linked: false with self.opts },
can_not_copy: none,
with *self.consume()
})
}
/**
* Unidirectionally link the child task's failure with the parent's. The
* child's failure will not kill the parent, but the parent's will kill
* the child.
*/
fn supervised() -> task_builder {
task_builder({
opts: { linked: false, parented: true with self.opts },
can_not_copy: none,
with *self.consume()
})
}
/**
* Link the child task's and parent task's failures. If either fails, the
* other will be killed.
*/
fn linked() -> task_builder {
task_builder({
opts: { linked: true, parented: false with self.opts },
can_not_copy: none,
with *self.consume()
})
}
/// Configure a future result notification for this task.
fn future_result(blk: fn(-future::future<task_result>)) -> task_builder {
// Construct the future and give it to the caller.
let po = comm::port::<notification>();
let ch = comm::chan(po);
blk(do future::from_fn {
alt comm::recv(po) {
exit(_, result) { result }
}
});
// Reconfigure self to use a notify channel.
task_builder({
opts: { notify_chan: some(ch) with self.opts },
can_not_copy: none,
with *self.consume()
})
}
/// Configure a custom scheduler mode for the task.
fn sched_mode(mode: sched_mode) -> task_builder {
task_builder({
opts: { sched: some({ mode: mode, foreign_stack_size: none})
with self.opts },
can_not_copy: none,
with *self.consume()
})
}
fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> task_builder {
let prev_gen_body = self.gen_body;
task_builder({
gen_body: |body| { wrapper(prev_gen_body(body)) },
can_not_copy: none,
with *self.consume()
})
}
/// Run the task.
fn spawn(+f: fn~()) {
let x = self.consume();
spawn_raw(x.opts, x.gen_body(f));
}
/// Runs a task, while transfering ownership of one argument to the child.
fn spawn_with<A: send>(+arg: A, +f: fn~(+A)) {
let arg = ~mut some(arg);
do self.spawn {
let mut my_arg = none;
my_arg <-> *arg;
f(option::unwrap(my_arg))
}
}
/// Runs a task with a listening port, returning the associated channel.
fn spawn_listener<A: send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
let setup_po = comm::port();
let setup_ch = comm::chan(setup_po);
do self.spawn {
let po = comm::port();
let ch = comm::chan(po);
comm::send(setup_ch, ch);
f(po);
}
comm::recv(setup_po)
}
}
/* Task construction */
@ -362,11 +497,6 @@ fn unsupervise(builder: builder) {
});
}
fn parent(builder: builder) {
//! Configures the new task to be killed if the parent group is killed.
set_opts(builder, { parented: true with get_opts(builder) });
}
fn run_with<A:send>(-builder: builder,
+arg: A,
+f: fn~(+A)) {
@ -428,7 +558,7 @@ fn spawn(+f: fn~()) {
* This function is equivalent to `run(new_builder(), f)`.
*/
run(builder(), f);
task().spawn(f)
}
fn spawn_unlinked(+f: fn~()) {
@ -437,9 +567,16 @@ fn spawn_unlinked(+f: fn~()) {
* task or the child task fails, the other will not be killed.
*/
let b = builder();
unsupervise(b);
run(b, f);
task().unlinked().spawn(f)
}
fn spawn_supervised(+f: fn~()) {
/*!
* Creates a child task unlinked from the current one. If either this
* task or the child task fails, the other will not be killed.
*/
task().supervised().spawn(f)
}
fn spawn_with<A:send>(+arg: A, +f: fn~(+A)) {
@ -453,7 +590,7 @@ fn spawn_with<A:send>(+arg: A, +f: fn~(+A)) {
* This function is equivalent to `run_with(builder(), arg, f)`.
*/
run_with(builder(), arg, f)
task().spawn_with(arg, f)
}
fn spawn_listener<A:send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
@ -482,7 +619,7 @@ fn spawn_listener<A:send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
* This function is equivalent to `run_listener(builder(), f)`.
*/
run_listener(builder(), f)
task().spawn_listener(f)
}
fn spawn_sched(mode: sched_mode, +f: fn~()) {
@ -499,9 +636,7 @@ fn spawn_sched(mode: sched_mode, +f: fn~()) {
* greater than zero.
*/
let mut builder = builder();
set_sched_mode(builder, mode);
run(builder, f);
task().sched_mode(mode).spawn(f)
}
fn try<T:send>(+f: fn~() -> T) -> result<T,()> {
@ -518,13 +653,13 @@ fn try<T:send>(+f: fn~() -> T) -> result<T,()> {
let po = comm::port();
let ch = comm::chan(po);
let mut builder = builder();
unsupervise(builder);
let result = future_result(builder);
do run(builder) {
let mut result = none;
do task().unlinked().future_result(|-r| { result = some(r); }).spawn {
comm::send(ch, f());
}
alt future::get(result) {
alt future::get(option::unwrap(result)) {
success { result::ok(comm::recv(po)) }
failure { result::err(()) }
}
@ -553,7 +688,7 @@ fn failing() -> bool {
fn get_task() -> task {
//! Get a handle to the running task
task(rustrt::get_task_id())
task_handle(rustrt::get_task_id())
}
/**
@ -1161,6 +1296,16 @@ fn test_spawn_raw_unsupervise() {
}
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_cant_dup_task_builder() {
let b = task().unlinked();
do b.spawn { }
// FIXME(#2585): For now, this is a -runtime- failure, because we haven't
// got modes on self. When 2585 is fixed, this test should fail to compile
// instead, and should go in tests/compile-fail.
do b.spawn { } // b should have been consumed by the previous call
}
// The following 8 tests test the following 2^3 combinations:
// {un,}linked {un,}supervised failure propagation {up,down}wards.
@ -1171,8 +1316,8 @@ fn test_spawn_raw_unsupervise() {
fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
let po = comm::port();
let ch = comm::chan(po);
do task::spawn_unlinked {
do task::spawn_unlinked {
do spawn_unlinked {
do spawn_unlinked {
// Give middle task a chance to fail-but-not-kill-us.
for iter::repeat(8192) { task::yield(); }
comm::send(ch, ()); // If killed first, grandparent hangs.
@ -1183,23 +1328,17 @@ fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
do task::spawn_unlinked { fail; }
do spawn_unlinked { fail; }
}
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails
let builder = task::builder();
task::unsupervise(builder);
task::parent(builder);
do task::run(builder) { fail; }
do spawn_supervised { fail; }
// Give child a chance to fail-but-not-kill-us.
for iter::repeat(8192) { task::yield(); }
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_unlinked_sup_fail_down() {
let builder = task::builder();
task::unsupervise(builder);
task::parent(builder);
do task::run(builder) { loop { task::yield(); } }
do spawn_supervised { loop { task::yield(); } }
fail; // Shouldn't leave a child hanging around.
}
@ -1207,17 +1346,29 @@ fn test_spawn_unlinked_sup_fail_down() {
fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
let po = comm::port::<()>();
let _ch = comm::chan(po);
let builder = task::builder();
task::parent(builder);
// Unidirectional "parenting" shouldn't override bidirectional linked.
do task::run(builder) { fail; }
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let b0 = task();
let b1 = task_builder({
opts: { linked: true, parented: true with b0.opts },
can_not_copy: none,
with *b0
});
do b1.spawn { fail; }
comm::recv(po); // We should get punted awake
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
let builder = task::builder();
task::parent(builder);
do task::run(builder) { loop { task::yield(); } }
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let b0 = task();
let b1 = task_builder({
opts: { linked: true, parented: true with b0.opts },
can_not_copy: none,
with *b0
});
do b1.spawn { loop { task::yield(); } }
fail; // *both* mechanisms would be wrong if this didn't kill the child...
}
#[test] #[should_fail] #[ignore(cfg(windows))]
@ -1225,13 +1376,19 @@ fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
let po = comm::port::<()>();
let _ch = comm::chan(po);
// Default options are to spawn linked & unsupervised.
do task::spawn { fail; }
do spawn { fail; }
comm::recv(po); // We should get punted awake
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
// Default options are to spawn linked & unsupervised.
do task::spawn { loop { task::yield(); } }
do spawn { loop { task::yield(); } }
fail;
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails
// Make sure the above test is the same as this one.
do task().linked().spawn { loop { task::yield(); } }
fail;
}
@ -1240,14 +1397,8 @@ fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
#[test] #[should_fail] // #[ignore(cfg(windows))]
#[ignore] // FIXME (#1868) (bblum) make this work
fn test_spawn_unlinked_sup_propagate_grandchild() {
let builder = task::builder();
task::unsupervise(builder);
task::parent(builder);
do task::run(builder) {
let builder = task::builder();
task::unsupervise(builder);
task::parent(builder);
do task::run(builder) {
do spawn_supervised {
do spawn_supervised {
loop { task::yield(); }
}
}
@ -1290,8 +1441,7 @@ fn test_spawn_raw_notify() {
fn test_run_basic() {
let po = comm::port();
let ch = comm::chan(po);
let buildr = builder();
do run(buildr) {
do task().spawn {
comm::send(ch, ());
}
comm::recv(po);
@ -1301,30 +1451,29 @@ fn test_run_basic() {
fn test_add_wrapper() {
let po = comm::port();
let ch = comm::chan(po);
let buildr = builder();
do add_wrapper(buildr) |body| {
let b0 = task();
let b1 = do b0.add_wrapper |body| {
fn~() {
body();
comm::send(ch, ());
}
}
do run(buildr) { }
};
do b1.spawn { }
comm::recv(po);
}
#[test]
#[ignore(cfg(windows))]
fn test_future_result() {
let buildr = builder();
let result = future_result(buildr);
do run(buildr) { }
assert future::get(result) == success;
let mut result = none;
do task().future_result(|-r| { result = some(r); }).spawn { }
assert future::get(option::unwrap(result)) == success;
let buildr = builder();
let result = future_result(buildr);
unsupervise(buildr);
do run(buildr) { fail }
assert future::get(result) == failure;
result = none;
do task().future_result(|-r| { result = some(r); }).unlinked().spawn {
fail;
}
assert future::get(option::unwrap(result)) == failure;
}
#[test]
@ -1525,20 +1674,18 @@ fn test_avoid_copying_the_body_spawn_listener() {
}
#[test]
fn test_avoid_copying_the_body_run() {
fn test_avoid_copying_the_body_task_spawn() {
do avoid_copying_the_body |f| {
let buildr = builder();
do run(buildr) {
do task().spawn {
f();
}
}
}
#[test]
fn test_avoid_copying_the_body_run_listener() {
fn test_avoid_copying_the_body_spawn_listener() {
do avoid_copying_the_body |f| {
let buildr = builder();
run_listener(buildr, fn~(move f, _po: comm::port<int>) {
task().spawn_listener(fn~(move f, _po: comm::port<int>) {
f();
});
}
@ -1565,11 +1712,9 @@ fn test_avoid_copying_the_body_future_task() {
}
#[test]
fn test_avoid_copying_the_body_unsupervise() {
fn test_avoid_copying_the_body_unlinked() {
do avoid_copying_the_body |f| {
let buildr = builder();
unsupervise(buildr);
do run(buildr) {
do spawn_unlinked {
f();
}
}
@ -1577,12 +1722,9 @@ fn test_avoid_copying_the_body_unsupervise() {
#[test]
fn test_osmain() {
let buildr = builder();
set_sched_mode(buildr, osmain);
let po = comm::port();
let ch = comm::chan(po);
do run(buildr) {
do task().sched_mode(osmain).spawn {
comm::send(ch, ());
}
comm::recv(po);