Merge remote-tracking branch 'brson/futures'

This commit is contained in:
Brian Anderson 2012-10-24 20:29:01 -07:00
commit b2d5acd6bc
24 changed files with 66 additions and 122 deletions

View file

@ -181,7 +181,6 @@ pub mod task {
pub mod spawn;
pub mod rt;
}
pub mod future;
pub mod pipes;
// Runtime and language-primitive support

View file

@ -581,16 +581,20 @@ pub mod tests {
for uint::range(0, num_tasks) |_i| {
let total = total.clone();
futures.push(future::spawn(|move total| {
let (chan, port) = pipes::stream();
futures.push(move port);
do task::spawn |move total, move chan| {
for uint::range(0, count) |_i| {
do total.with |count| {
**count += 1;
}
}
}));
chan.send(());
}
};
for futures.each |f| { f.get() }
for futures.each |f| { f.recv() }
do total.with |total| {
assert **total == num_tasks * count
@ -642,7 +646,7 @@ pub mod tests {
// Have to get rid of our reference before blocking.
{ let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
let res = option::swap_unwrap(&mut res);
future::get(&res);
res.recv();
}
#[test] #[should_fail] #[ignore(cfg(windows))]
@ -657,7 +661,7 @@ pub mod tests {
}
assert unwrap_exclusive(move x) == ~~"hello";
let res = option::swap_unwrap(&mut res);
future::get(&res);
res.recv();
}
#[test] #[ignore(cfg(windows))]

View file

@ -32,6 +32,7 @@ use cmp::Eq;
use result::Result;
use pipes::{stream, Chan, Port};
use local_data_priv::{local_get, local_set};
use util::replace;
use rt::task_id;
use rt::rust_task;
@ -72,25 +73,6 @@ impl TaskResult : Eq {
pure fn ne(other: &TaskResult) -> bool { !self.eq(other) }
}
/// A message type for notifying of task lifecycle events
pub enum Notification {
/// Sent when a task exits with the task handle and result
Exit(Task, TaskResult)
}
impl Notification : cmp::Eq {
pure fn eq(other: &Notification) -> bool {
match self {
Exit(e0a, e1a) => {
match (*other) {
Exit(e0b, e1b) => e0a == e0b && e1a == e1b
}
}
}
}
pure fn ne(other: &Notification) -> bool { !self.eq(other) }
}
/// Scheduler modes
pub enum SchedMode {
/// All tasks run in the same OS thread
@ -200,7 +182,7 @@ pub type SchedOpts = {
pub type TaskOpts = {
linked: bool,
supervised: bool,
mut notify_chan: Option<Chan<Notification>>,
mut notify_chan: Option<Chan<TaskResult>>,
sched: Option<SchedOpts>,
};
@ -246,11 +228,7 @@ priv impl TaskBuilder {
fail ~"Cannot copy a task_builder"; // Fake move mode on self
}
self.consumed = true;
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
@ -271,11 +249,7 @@ impl TaskBuilder {
* the other will not be killed.
*/
fn unlinked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: false,
@ -293,11 +267,7 @@ impl TaskBuilder {
* the child.
*/
fn supervised() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: false,
@ -314,11 +284,7 @@ impl TaskBuilder {
* other will be killed.
*/
fn linked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: true,
@ -348,7 +314,7 @@ impl TaskBuilder {
* # Failure
* Fails if a future_result was already set for this task.
*/
fn future_result(blk: fn(v: future::Future<TaskResult>)) -> TaskBuilder {
fn future_result(blk: fn(v: Port<TaskResult>)) -> TaskBuilder {
// FIXME (#3725): Once linked failure and notification are
// handled in the library, I can imagine implementing this by just
// registering an arbitrary number of task::on_exit handlers and
@ -359,13 +325,9 @@ impl TaskBuilder {
}
// Construct the future and give it to the caller.
let (notify_pipe_ch, notify_pipe_po) = stream::<Notification>();
let (notify_pipe_ch, notify_pipe_po) = stream::<TaskResult>();
blk(do future::from_fn |move notify_pipe_po| {
match notify_pipe_po.recv() {
Exit(_, result) => result
}
});
blk(move notify_pipe_po);
// Reconfigure self to use a notify channel.
TaskBuilder({
@ -381,11 +343,7 @@ impl TaskBuilder {
}
/// Configure a custom scheduler mode for the task.
fn sched_mode(mode: SchedMode) -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
@ -412,11 +370,7 @@ impl TaskBuilder {
*/
fn add_wrapper(wrapper: fn@(v: fn~()) -> fn~()) -> TaskBuilder {
let prev_gen_body = self.gen_body;
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
@ -447,13 +401,7 @@ impl TaskBuilder {
* must be greater than zero.
*/
fn spawn(f: fn~()) {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
let swapped_notify_chan =
option::swap_unwrap(&mut self.opts.notify_chan);
Some(move swapped_notify_chan)
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
let x = self.consume();
let opts = {
linked: x.opts.linked,
@ -532,7 +480,7 @@ impl TaskBuilder {
do fr_task_builder.spawn |move f| {
comm::send(ch, f());
}
match future::get(&option::unwrap(move result)) {
match option::unwrap(move result).recv() {
Success => result::Ok(comm::recv(po)),
Failure => result::Err(())
}
@ -949,14 +897,14 @@ fn test_add_wrapper() {
fn test_future_result() {
let mut result = None;
do task().future_result(|+r| { result = Some(move r); }).spawn { }
assert future::get(&option::unwrap(move result)) == Success;
assert option::unwrap(move result).recv() == Success;
result = None;
do task().future_result(|+r|
{ result = Some(move r); }).unlinked().spawn {
fail;
}
assert future::get(&option::unwrap(move result)) == Failure;
assert option::unwrap(move result).recv() == Failure;
}
#[test] #[should_fail] #[ignore(cfg(windows))]

View file

@ -320,15 +320,15 @@ fn TCB(me: *rust_task, tasks: TaskGroupArc, ancestors: AncestorList,
}
struct AutoNotify {
notify_chan: Chan<Notification>,
notify_chan: Chan<TaskResult>,
mut failed: bool,
drop {
let result = if self.failed { Failure } else { Success };
self.notify_chan.send(Exit(get_task(), result));
self.notify_chan.send(result);
}
}
fn AutoNotify(chan: Chan<Notification>) -> AutoNotify {
fn AutoNotify(chan: Chan<TaskResult>) -> AutoNotify {
AutoNotify {
notify_chan: move chan,
failed: true // Un-set above when taskgroup successfully made.
@ -532,7 +532,7 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
// (4) ...and runs the provided body function.
fn make_child_wrapper(child: *rust_task, child_arc: TaskGroupArc,
ancestors: AncestorList, is_main: bool,
notify_chan: Option<Chan<Notification>>,
notify_chan: Option<Chan<TaskResult>>,
f: fn~()) -> fn~() {
let child_data = ~mut Some((move child_arc, move ancestors));
return fn~(move notify_chan, move child_data, move f) {
@ -660,25 +660,21 @@ fn test_spawn_raw_unsupervise() {
#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_success() {
let (task_ch, task_po) = pipes::stream();
let (notify_ch, notify_po) = pipes::stream();
let opts = {
notify_chan: Some(move notify_ch),
.. default_task_opts()
};
do spawn_raw(move opts) |move task_ch| {
task_ch.send(get_task());
do spawn_raw(move opts) {
}
let task_ = task_po.recv();
assert notify_po.recv() == Exit(task_, Success);
assert notify_po.recv() == Success;
}
#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_failure() {
// New bindings for these
let (task_ch, task_po) = pipes::stream();
let (notify_ch, notify_po) = pipes::stream();
let opts = {
@ -686,10 +682,8 @@ fn test_spawn_raw_notify_failure() {
notify_chan: Some(move notify_ch),
.. default_task_opts()
};
do spawn_raw(move opts) |move task_ch| {
task_ch.send(get_task());
do spawn_raw(move opts) {
fail;
}
let task_ = task_po.recv();
assert notify_po.recv() == Exit(task_, Failure);
assert notify_po.recv() == Failure;
}

View file

@ -651,7 +651,7 @@ mod tests {
}
// Wait for children to pass their asserts
for vec::each(children) |r| { future::get(r); }
for vec::each(children) |r| { r.recv(); }
// Wait for writer to finish
p.recv();

View file

@ -17,7 +17,7 @@
*/
use either::Either;
use pipes::recv;
use pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one};
use cast::copy_lifetime;
#[doc = "The future type"]
@ -67,7 +67,7 @@ pub fn from_value<A>(val: A) -> Future<A> {
Future {state: Forced(~(move val))}
}
pub fn from_port<A:Send>(port: future_pipe::client::waiting<A>) ->
pub fn from_port<A:Send>(port: PortOne<A>) ->
Future<A> {
/*!
* Create a future from a port
@ -82,7 +82,7 @@ pub fn from_port<A:Send>(port: future_pipe::client::waiting<A>) ->
port_ <-> *port;
let port = option::unwrap(move port_);
match recv(move port) {
future_pipe::completed(move data) => move data
oneshot::send(move data) => move data
}
}
}
@ -107,9 +107,15 @@ pub fn spawn<A:Send>(blk: fn~() -> A) -> Future<A> {
* value of the future.
*/
from_port(pipes::spawn_service_recv(future_pipe::init, |move blk, ch| {
future_pipe::server::completed(move ch, blk());
}))
let (chan, port) = oneshot::init();
let chan = ~mut Some(move chan);
do task::spawn |move blk, move chan| {
let chan = option::swap_unwrap(&mut *chan);
send_one(move chan, blk());
}
return from_port(move port);
}
pub fn get_ref<A>(future: &r/Future<A>) -> &r/A {
@ -162,12 +168,6 @@ pub fn with<A,B>(future: &Future<A>, blk: fn((&A)) -> B) -> B {
blk(get_ref(future))
}
proto! future_pipe (
waiting:recv<T:Send> {
completed(T) -> !
}
)
#[allow(non_implicitly_copyable_typarams)]
pub mod test {
#[test]
@ -178,8 +178,8 @@ pub mod test {
#[test]
pub fn test_from_port() {
let (po, ch) = future_pipe::init();
future_pipe::server::completed(move ch, ~"whale");
let (ch, po) = oneshot::init();
send_one(move ch, ~"whale");
let f = from_port(move po);
assert get(&f) == ~"whale";
}

View file

@ -53,6 +53,7 @@ pub mod cell;
pub mod sync;
pub mod arc;
pub mod comm;
pub mod future;
// Collections

View file

@ -391,7 +391,7 @@ fn run_test(test: TestDesc, monitor_ch: comm::Chan<MonitorMsg>) {
task::task().unlinked().future_result(|+r| {
result_future = Some(move r);
}).spawn(move testfn);
let task_result = future::get(&option::unwrap(move result_future));
let task_result = option::unwrap(move result_future).recv();
let test_result = calc_result(&test, task_result == task::Success);
comm::send(monitor_ch, (copy test, test_result));
};

View file

@ -1,5 +1,6 @@
use doc::ItemUtils;
use io::ReaderUtil;
use std::future;
export WriteInstr;
export Writer;

View file

@ -74,7 +74,7 @@ fn run(args: &[~str]) {
}
for vec::each(worker_results) |r| {
future::get(r);
r.recv();
}
//error!("sending stop message");

View file

@ -71,7 +71,7 @@ fn run(args: &[~str]) {
}
for vec::each(worker_results) |r| {
future::get(r);
r.recv();
}
//error!("sending stop message");

View file

@ -7,11 +7,10 @@
// xfail-pretty
use future::future;
extern mod std;
use std::time;
use std::arc;
use std::future;
// A poor man's pipe.
type pipe = arc::MutexARC<~[uint]>;

View file

@ -8,10 +8,9 @@
// xfail-pretty
use future::future;
extern mod std;
use std::time;
use std::future;
use pipes::recv;

View file

@ -7,11 +7,10 @@
// xfail-pretty
use future::future;
extern mod std;
use std::time;
use std::arc;
use std::future;
// A poor man's pipe.
type pipe = arc::RWARC<~[uint]>;

View file

@ -5,10 +5,10 @@
// message path.
use comm::*;
use future::future;
extern mod std;
use std::time;
use std::future;
fn thread_ring(i: uint,
count: uint,

View file

@ -45,7 +45,7 @@ fn run(args: ~[~str]) {
};
}
for vec::each(worker_results) |r| {
future::get(r);
r.recv();
}
comm::send(to_child, stop);
let result = comm::recv(from_child);

View file

@ -78,7 +78,7 @@ fn stress(num_tasks: int) {
stress_task(i);
}
}
for results.each |r| { future::get(r); }
for results.each |r| { r.recv(); }
}
fn main() {

View file

@ -33,7 +33,7 @@ fn spawn_supervised_blocking(myname: &str, +f: fn~()) {
let mut res = None;
task::task().future_result(|+r| res = Some(move r)).supervised().spawn(move f);
error!("%s group waiting", myname);
let x = future::get(&option::unwrap(move res));
let x = option::unwrap(move res).recv();
assert x == task::Success;
}

View file

@ -19,7 +19,7 @@ fn test00() {
}
// Try joining tasks that have already finished.
future::get(&option::unwrap(move result));
option::unwrap(move result).recv();
debug!("Joined task.");
}

View file

@ -53,7 +53,7 @@ fn test00() {
}
// Join spawned tasks...
for results.each |r| { future::get(r); }
for results.each |r| { r.recv(); }
debug!("Completed: Final number is: ");
log(error, sum);

View file

@ -30,7 +30,7 @@ fn test00() {
i += 1;
}
future::get(&option::unwrap(move result));
option::unwrap(move result).recv();
assert (sum == number_of_messages * (number_of_messages - 1) / 2);
}

View file

@ -51,7 +51,7 @@ fn test00() {
while i < number_of_messages { sum += recv(po); i = i + 1; }
}
for results.each |r| { future::get(r); }
for results.each |r| { r.recv(); }
debug!("Completed: Final number is: ");
assert (sum ==
@ -134,7 +134,7 @@ fn test06() {
}
for results.each |r| { future::get(r); }
for results.each |r| { r.recv(); }
}

View file

@ -10,7 +10,7 @@ fn main() {
error!("2");
yield();
error!("3");
future::get(&option::unwrap(move result));
option::unwrap(move result).recv();
}
fn child() {

View file

@ -7,7 +7,7 @@ fn main() {
task::task().future_result(|+r| { result = Some(move r); }).spawn(child);
error!("1");
yield();
future::get(&option::unwrap(move result));
option::unwrap(move result).recv();
}
fn child() { error!("2"); }