diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 4d187a01e7fe..288f52919ac6 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -35,6 +35,7 @@ export sched_mode; export sched_opts; export task_opts; export builder; +export task_builder; export default_task_opts; export get_opts; @@ -46,7 +47,6 @@ export run; export future_result; export future_task; export unsupervise; -export parent; export run_listener; export run_with; @@ -77,7 +77,7 @@ export osmain; /* Data types */ /// A handle to a task -enum task = task_id; +enum task { task_handle(task_id) } /** * Indicates the manner in which a task exited. @@ -192,6 +192,141 @@ enum builder { }) } +class dummy { let x: (); new() { self.x = (); } drop { } } + +// FIXME (#2585): Replace the 'consumed' bit with move mode on self +enum task_builder = { + opts: task_opts, + gen_body: fn@(+fn~()) -> fn~(), + can_not_copy: option, + mut consumed: bool, +}; + +/** + * Generate the base configuration for spawning a task, off of which more + * configuration methods can be chained. + * For example, task().unlinked().spawn is equivalent to spawn_unlinked. + */ +fn task() -> task_builder { + task_builder({ + opts: default_task_opts(), + gen_body: |body| body, // Identity function + can_not_copy: none, + mut consumed: false, + }) +} + +impl private_methods for task_builder { + fn consume() -> task_builder { + if self.consumed { + fail ~"Cannot copy a task_builder"; // Fake move mode on self + } + self.consumed = true; + task_builder({ can_not_copy: none, mut consumed: false, with *self }) + } +} + +impl task_builder for task_builder { + /** + * Decouple the child task's failure from the parent's. If either fails, + * the other will not be killed. + */ + fn unlinked() -> task_builder { + task_builder({ + opts: { linked: false with self.opts }, + can_not_copy: none, + with *self.consume() + }) + } + /** + * Unidirectionally link the child task's failure with the parent's. The + * child's failure will not kill the parent, but the parent's will kill + * the child. + */ + fn supervised() -> task_builder { + task_builder({ + opts: { linked: false, parented: true with self.opts }, + can_not_copy: none, + with *self.consume() + }) + } + /** + * Link the child task's and parent task's failures. If either fails, the + * other will be killed. + */ + fn linked() -> task_builder { + task_builder({ + opts: { linked: true, parented: false with self.opts }, + can_not_copy: none, + with *self.consume() + }) + } + + /// Configure a future result notification for this task. + fn future_result(blk: fn(-future::future)) -> task_builder { + // Construct the future and give it to the caller. + let po = comm::port::(); + let ch = comm::chan(po); + + blk(do future::from_fn { + alt comm::recv(po) { + exit(_, result) { result } + } + }); + + // Reconfigure self to use a notify channel. + task_builder({ + opts: { notify_chan: some(ch) with self.opts }, + can_not_copy: none, + with *self.consume() + }) + } + /// Configure a custom scheduler mode for the task. + fn sched_mode(mode: sched_mode) -> task_builder { + task_builder({ + opts: { sched: some({ mode: mode, foreign_stack_size: none}) + with self.opts }, + can_not_copy: none, + with *self.consume() + }) + } + fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> task_builder { + let prev_gen_body = self.gen_body; + task_builder({ + gen_body: |body| { wrapper(prev_gen_body(body)) }, + can_not_copy: none, + with *self.consume() + }) + } + + /// Run the task. + fn spawn(+f: fn~()) { + let x = self.consume(); + spawn_raw(x.opts, x.gen_body(f)); + } + /// Runs a task, while transfering ownership of one argument to the child. + fn spawn_with(+arg: A, +f: fn~(+A)) { + let arg = ~mut some(arg); + do self.spawn { + let mut my_arg = none; + my_arg <-> *arg; + f(option::unwrap(my_arg)) + } + } + /// Runs a task with a listening port, returning the associated channel. + fn spawn_listener(+f: fn~(comm::port)) -> comm::chan { + let setup_po = comm::port(); + let setup_ch = comm::chan(setup_po); + do self.spawn { + let po = comm::port(); + let ch = comm::chan(po); + comm::send(setup_ch, ch); + f(po); + } + comm::recv(setup_po) + } +} + /* Task construction */ @@ -362,11 +497,6 @@ fn unsupervise(builder: builder) { }); } -fn parent(builder: builder) { - //! Configures the new task to be killed if the parent group is killed. - set_opts(builder, { parented: true with get_opts(builder) }); -} - fn run_with(-builder: builder, +arg: A, +f: fn~(+A)) { @@ -428,7 +558,7 @@ fn spawn(+f: fn~()) { * This function is equivalent to `run(new_builder(), f)`. */ - run(builder(), f); + task().spawn(f) } fn spawn_unlinked(+f: fn~()) { @@ -437,9 +567,16 @@ fn spawn_unlinked(+f: fn~()) { * task or the child task fails, the other will not be killed. */ - let b = builder(); - unsupervise(b); - run(b, f); + task().unlinked().spawn(f) +} + +fn spawn_supervised(+f: fn~()) { + /*! + * Creates a child task unlinked from the current one. If either this + * task or the child task fails, the other will not be killed. + */ + + task().supervised().spawn(f) } fn spawn_with(+arg: A, +f: fn~(+A)) { @@ -453,7 +590,7 @@ fn spawn_with(+arg: A, +f: fn~(+A)) { * This function is equivalent to `run_with(builder(), arg, f)`. */ - run_with(builder(), arg, f) + task().spawn_with(arg, f) } fn spawn_listener(+f: fn~(comm::port)) -> comm::chan { @@ -482,7 +619,7 @@ fn spawn_listener(+f: fn~(comm::port)) -> comm::chan { * This function is equivalent to `run_listener(builder(), f)`. */ - run_listener(builder(), f) + task().spawn_listener(f) } fn spawn_sched(mode: sched_mode, +f: fn~()) { @@ -499,9 +636,7 @@ fn spawn_sched(mode: sched_mode, +f: fn~()) { * greater than zero. */ - let mut builder = builder(); - set_sched_mode(builder, mode); - run(builder, f); + task().sched_mode(mode).spawn(f) } fn try(+f: fn~() -> T) -> result { @@ -518,13 +653,13 @@ fn try(+f: fn~() -> T) -> result { let po = comm::port(); let ch = comm::chan(po); - let mut builder = builder(); - unsupervise(builder); - let result = future_result(builder); - do run(builder) { + + let mut result = none; + + do task().unlinked().future_result(|-r| { result = some(r); }).spawn { comm::send(ch, f()); } - alt future::get(result) { + alt future::get(option::unwrap(result)) { success { result::ok(comm::recv(po)) } failure { result::err(()) } } @@ -553,7 +688,7 @@ fn failing() -> bool { fn get_task() -> task { //! Get a handle to the running task - task(rustrt::get_task_id()) + task_handle(rustrt::get_task_id()) } /** @@ -1161,6 +1296,16 @@ fn test_spawn_raw_unsupervise() { } } +#[test] #[should_fail] #[ignore(cfg(windows))] +fn test_cant_dup_task_builder() { + let b = task().unlinked(); + do b.spawn { } + // FIXME(#2585): For now, this is a -runtime- failure, because we haven't + // got modes on self. When 2585 is fixed, this test should fail to compile + // instead, and should go in tests/compile-fail. + do b.spawn { } // b should have been consumed by the previous call +} + // The following 8 tests test the following 2^3 combinations: // {un,}linked {un,}supervised failure propagation {up,down}wards. @@ -1171,8 +1316,8 @@ fn test_spawn_raw_unsupervise() { fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port let po = comm::port(); let ch = comm::chan(po); - do task::spawn_unlinked { - do task::spawn_unlinked { + do spawn_unlinked { + do spawn_unlinked { // Give middle task a chance to fail-but-not-kill-us. for iter::repeat(8192) { task::yield(); } comm::send(ch, ()); // If killed first, grandparent hangs. @@ -1183,23 +1328,17 @@ fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails - do task::spawn_unlinked { fail; } + do spawn_unlinked { fail; } } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails - let builder = task::builder(); - task::unsupervise(builder); - task::parent(builder); - do task::run(builder) { fail; } + do spawn_supervised { fail; } // Give child a chance to fail-but-not-kill-us. for iter::repeat(8192) { task::yield(); } } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_unlinked_sup_fail_down() { - let builder = task::builder(); - task::unsupervise(builder); - task::parent(builder); - do task::run(builder) { loop { task::yield(); } } + do spawn_supervised { loop { task::yield(); } } fail; // Shouldn't leave a child hanging around. } @@ -1207,17 +1346,29 @@ fn test_spawn_unlinked_sup_fail_down() { fn test_spawn_linked_sup_fail_up() { // child fails; parent fails let po = comm::port::<()>(); let _ch = comm::chan(po); - let builder = task::builder(); - task::parent(builder); // Unidirectional "parenting" shouldn't override bidirectional linked. - do task::run(builder) { fail; } + // We have to cheat with opts - the interface doesn't support them because + // they don't make sense (redundant with task().supervised()). + let b0 = task(); + let b1 = task_builder({ + opts: { linked: true, parented: true with b0.opts }, + can_not_copy: none, + with *b0 + }); + do b1.spawn { fail; } comm::recv(po); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails - let builder = task::builder(); - task::parent(builder); - do task::run(builder) { loop { task::yield(); } } + // We have to cheat with opts - the interface doesn't support them because + // they don't make sense (redundant with task().supervised()). + let b0 = task(); + let b1 = task_builder({ + opts: { linked: true, parented: true with b0.opts }, + can_not_copy: none, + with *b0 + }); + do b1.spawn { loop { task::yield(); } } fail; // *both* mechanisms would be wrong if this didn't kill the child... } #[test] #[should_fail] #[ignore(cfg(windows))] @@ -1225,13 +1376,19 @@ fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails let po = comm::port::<()>(); let _ch = comm::chan(po); // Default options are to spawn linked & unsupervised. - do task::spawn { fail; } + do spawn { fail; } comm::recv(po); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails // Default options are to spawn linked & unsupervised. - do task::spawn { loop { task::yield(); } } + do spawn { loop { task::yield(); } } + fail; +} +#[test] #[should_fail] #[ignore(cfg(windows))] +fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails + // Make sure the above test is the same as this one. + do task().linked().spawn { loop { task::yield(); } } fail; } @@ -1240,14 +1397,8 @@ fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails #[test] #[should_fail] // #[ignore(cfg(windows))] #[ignore] // FIXME (#1868) (bblum) make this work fn test_spawn_unlinked_sup_propagate_grandchild() { - let builder = task::builder(); - task::unsupervise(builder); - task::parent(builder); - do task::run(builder) { - let builder = task::builder(); - task::unsupervise(builder); - task::parent(builder); - do task::run(builder) { + do spawn_supervised { + do spawn_supervised { loop { task::yield(); } } } @@ -1290,8 +1441,7 @@ fn test_spawn_raw_notify() { fn test_run_basic() { let po = comm::port(); let ch = comm::chan(po); - let buildr = builder(); - do run(buildr) { + do task().spawn { comm::send(ch, ()); } comm::recv(po); @@ -1301,30 +1451,29 @@ fn test_run_basic() { fn test_add_wrapper() { let po = comm::port(); let ch = comm::chan(po); - let buildr = builder(); - do add_wrapper(buildr) |body| { + let b0 = task(); + let b1 = do b0.add_wrapper |body| { fn~() { body(); comm::send(ch, ()); } - } - do run(buildr) { } + }; + do b1.spawn { } comm::recv(po); } #[test] #[ignore(cfg(windows))] fn test_future_result() { - let buildr = builder(); - let result = future_result(buildr); - do run(buildr) { } - assert future::get(result) == success; + let mut result = none; + do task().future_result(|-r| { result = some(r); }).spawn { } + assert future::get(option::unwrap(result)) == success; - let buildr = builder(); - let result = future_result(buildr); - unsupervise(buildr); - do run(buildr) { fail } - assert future::get(result) == failure; + result = none; + do task().future_result(|-r| { result = some(r); }).unlinked().spawn { + fail; + } + assert future::get(option::unwrap(result)) == failure; } #[test] @@ -1525,20 +1674,18 @@ fn test_avoid_copying_the_body_spawn_listener() { } #[test] -fn test_avoid_copying_the_body_run() { +fn test_avoid_copying_the_body_task_spawn() { do avoid_copying_the_body |f| { - let buildr = builder(); - do run(buildr) { + do task().spawn { f(); } } } #[test] -fn test_avoid_copying_the_body_run_listener() { +fn test_avoid_copying_the_body_spawn_listener() { do avoid_copying_the_body |f| { - let buildr = builder(); - run_listener(buildr, fn~(move f, _po: comm::port) { + task().spawn_listener(fn~(move f, _po: comm::port) { f(); }); } @@ -1565,11 +1712,9 @@ fn test_avoid_copying_the_body_future_task() { } #[test] -fn test_avoid_copying_the_body_unsupervise() { +fn test_avoid_copying_the_body_unlinked() { do avoid_copying_the_body |f| { - let buildr = builder(); - unsupervise(buildr); - do run(buildr) { + do spawn_unlinked { f(); } } @@ -1577,12 +1722,9 @@ fn test_avoid_copying_the_body_unsupervise() { #[test] fn test_osmain() { - let buildr = builder(); - set_sched_mode(buildr, osmain); - let po = comm::port(); let ch = comm::chan(po); - do run(buildr) { + do task().sched_mode(osmain).spawn { comm::send(ch, ()); } comm::recv(po);