diff --git a/src/libstd/par.rs b/src/libstd/par.rs new file mode 100644 index 000000000000..5160e50fd9af --- /dev/null +++ b/src/libstd/par.rs @@ -0,0 +1,103 @@ +import comm::port; +import comm::chan; +import comm::send; +import comm::recv; +import task::spawn; + +export future; +export map; +export alli; + +iface future { + fn get() -> T; +} + +type future_ = { + mut slot : option, + port : port, +}; + +impl of future for future_ { + fn get() -> T { + alt(self.slot) { + some(x) { x } + none { + let x = recv(self.port); + self.slot = some(x); + x + } + } + } +} + + +#[doc="Executes a bit of code asynchronously. + +Returns a handle that can be used to retrieve the result at your +leisure."] +fn future(thunk : fn~() -> T) -> future { + let p = port(); + let c = chan(p); + + spawn() {|| + send(c, thunk()); + } + + {mut slot: none::, port : p} as future:: +} + +#[doc="The maximum number of tasks this module will spawn for a single + operationg."] +const max_tasks : uint = 32u; + +#[doc="The minimum number of elements each task will process."] +const min_granularity : uint = 1024u; + +#[doc="An internal helper to map a function over a large vector and + return the intermediate results. + +This is used to build most of the other parallel vector functions, +like map or alli."] +fn map_slices(xs: [A], f: fn~(uint, [A]) -> B) -> [B] { + let len = xs.len(); + if len < min_granularity { + // This is a small vector, fall back on the normal map. + [f(0u, xs)] + } + else { + let num_tasks = uint::min(max_tasks, len / min_granularity); + + let items_per_task = len / num_tasks; + + let mut futures = []; + let mut base = 0u; + while base < len { + let slice = vec::slice(xs, base, + uint::min(len, base + items_per_task)); + futures += [future() {|copy base| + f(base, slice) + }]; + base += items_per_task; + } + + futures.map() {|ys| + ys.get() + } + } +} + +#[doc="A parallel version of map."] +fn map(xs: [A], f: fn~(A) -> B) -> [B] { + vec::concat(map_slices(xs) {|_base, slice| + map(slice, f) + }) +} + +#[doc="Returns true if the function holds for all elements in the vector."] +fn alli(xs: [A], f: fn~(uint, A) -> bool) -> bool { + vec::all(map_slices(xs) {|base, slice| + slice.alli() {|i, x| + f(i + base, x) + } + }) {|x| x } +} diff --git a/src/libstd/std.rc b/src/libstd/std.rc index 40a03ae25c2b..f25ceb87c3ca 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -19,6 +19,7 @@ export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap; export rope, arena; export ebml, dbg, getopts, json, rand, sha1, term, time, prettyprint; export test, tempfile, serialization; +export par; // General io and system-services modules @@ -58,6 +59,7 @@ mod getopts; mod json; mod sha1; mod md4; +mod par; mod tempfile; mod term; mod time; diff --git a/src/test/bench/graph500-bfs.rs b/src/test/bench/graph500-bfs.rs index 3295ab2f07c9..d145da651b16 100644 --- a/src/test/bench/graph500-bfs.rs +++ b/src/test/bench/graph500-bfs.rs @@ -10,6 +10,7 @@ import std::map; import std::map::hashmap; import std::deque; import std::deque::t; +//import std::par; import io::writer_util; import comm::*; import int::abs; @@ -221,13 +222,11 @@ fn validate(edges: [(node_id, node_id)], log(info, "Verifying tree and graph edges..."); - let status = tree.alli() {|u, v| + let status = par::alli(tree) {|u, v| if v == -1 || u as int == root { true } else { - log(info, #fmt("Checking for %? or %?", - (u, v), (v, u))); edges.contains((u as int, v)) || edges.contains((v, u as int)) } }; @@ -269,9 +268,118 @@ fn main() { stop - start)); let start = time::precise_time_s(); - assert(validate(graph, edges, root, bfs_tree)); + assert(validate(edges, root, bfs_tree)); let stop = time::precise_time_s(); io::stdout().write_line(#fmt("Validation completed in %? seconds.", stop - start)); } + + +// par stuff ///////////////////////////////////////////////////////// + +mod par { +import comm::port; +import comm::chan; +import comm::send; +import comm::recv; +import task::spawn; + +iface future { + fn get() -> T; +} + +type future_ = { + mut slot : option, + port : port, +}; + +impl of future for future_ { + fn get() -> T { + get(self) + } +} + +fn get(f: future_) -> T { + alt(f.slot) { + some(x) { x } + none { + let x = recv(f.port); + f.slot = some(x); + x + } + } +} + + +#[doc="Executes a bit of code asynchronously. + +Returns a handle that can be used to retrieve the result at your +leisure."] +fn future(thunk : fn~() -> T) -> future { + let p = port(); + let c = chan(p); + + spawn() {|| + send(c, thunk()); + } + + {mut slot: none::, port : p} as future:: +} + +#[doc="The maximum number of tasks this module will spawn for a single + operationg."] +const max_tasks : uint = 32u; + +#[doc="The minimum number of elements each task will process."] +const min_granularity : uint = 1024u; + +#[doc="An internal helper to map a function over a large vector and + return the intermediate results. + +This is used to build most of the other parallel vector functions, +like map or alli."] +fn map_slices(xs: [A], f: fn~(uint, [A]) -> B) -> [B] { + let len = xs.len(); + if len < min_granularity { + // This is a small vector, fall back on the normal map. + [f(0u, xs)] + } + else { + let num_tasks = uint::min(max_tasks, len / min_granularity); + + let items_per_task = len / num_tasks; + + let mut futures = []; + let mut base = 0u; + while base < len { + let slice = vec::slice(xs, base, + uint::min(len, base + items_per_task)); + futures += [future() {|copy base| + f(base, slice) + }]; + base += items_per_task; + } + + futures.map() {|ys| + ys.get() + } + } +} + +#[doc="A parallel version of map."] +fn map(xs: [A], f: fn~(A) -> B) -> [B] { + vec::concat(map_slices(xs) {|_base, slice| + map(slice, f) + }) +} + +#[doc="Returns true if the function holds for all elements in the vector."] +fn alli(xs: [A], f: fn~(uint, A) -> bool) -> bool { + vec::all(map_slices(xs) {|base, slice| + slice.alli() {|i, x| + f(i + base, x) + } + }) {|x| x } +} +} \ No newline at end of file