From 640886ceb662ae212cf3edc7f400142af3aa7bbf Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Thu, 25 Aug 2011 15:36:55 -0700 Subject: [PATCH] Polymorphic MapReduce! --- src/comp/util/ppaux.rs | 1 + .../bench/task-perf-word-count-generic.rs | 328 ++++++++++++++++++ 2 files changed, 329 insertions(+) create mode 100644 src/test/bench/task-perf-word-count-generic.rs diff --git a/src/comp/util/ppaux.rs b/src/comp/util/ppaux.rs index a37e64cfb0a3..d5ad05fa6b01 100644 --- a/src/comp/util/ppaux.rs +++ b/src/comp/util/ppaux.rs @@ -26,6 +26,7 @@ fn mode_str(m: &ty::mode) -> str { mo_val. { "" } mo_alias(false) { "&" } mo_alias(true) { "&mutable " } + mo_move. { "-" } } } diff --git a/src/test/bench/task-perf-word-count-generic.rs b/src/test/bench/task-perf-word-count-generic.rs new file mode 100644 index 000000000000..ce4d2b2e44d6 --- /dev/null +++ b/src/test/bench/task-perf-word-count-generic.rs @@ -0,0 +1,328 @@ +/** + A parallel word-frequency counting program. + + This is meant primarily to demonstrate Rust's MapReduce framework. + + It takes a list of files on the command line and outputs a list of + words along with how many times each word is used. + +*/ + +use std; + +import option = std::option::t; +import std::option::some; +import std::option::none; +import std::str; +import std::treemap; +import std::vec; +import std::io; + +import std::time; +import std::u64; + +import std::task; +import std::task::joinable_task; +import std::comm; +import std::comm::chan; +import std::comm::port; +import std::comm::recv; +import std::comm::send; + +fn map(filename: &[u8], emit: &map_reduce::putter<[u8], int>) { + let f = io::file_reader(str::unsafe_from_bytes(filename)); + + while true { + alt read_word(f) { + some(w) { emit(str::bytes(w), 1); } + none. { break; } + } + } +} + +fn reduce(word: &[u8], get: &map_reduce::getter) { + let count = 0; + + while true { alt get() { some(_) { count += 1; } none. { break } } } +} + +mod map_reduce { + export putter; + export getter; + export mapper; + export reducer; + export map_reduce; + + type putter<~K, ~V> = fn(&K, &V); + + // FIXME: the first K1 parameter should probably be a -, but that + // doesn't parse at the moment. + type mapper<~K1, ~K2, ~V> = fn(&K1, &putter); + + type getter<~V> = fn() -> option; + + type reducer<~K, ~V> = fn(&K, &getter); + + tag ctrl_proto<~K, ~V> { + find_reducer(K, chan>>); + mapper_done; + } + + tag reduce_proto<~V> { emit_val(V); done; ref; release; } + + fn start_mappers<~K1, ~K2, ~V>(map : mapper, + ctrl: chan>, inputs: &[K1]) + -> [joinable_task] { + let tasks = []; + for i in inputs { + let m = map, c = ctrl, ii = i; + tasks += [task::spawn_joinable(bind map_task(m, c, ii))]; + } + ret tasks; + } + + fn map_task<~K1, ~K2, ~V>(map : -mapper, + ctrl: -chan>, input: -K1) { + // log_err "map_task " + input; + let intermediates = treemap::init(); + + fn emit<~K2, ~V>(im: &treemap::treemap>>, + ctrl: &chan>, key: &K2, val: &V) { + let c; + alt treemap::find(im, key) { + some(_c) { + c = _c + } + none. { + let p = port(); + send(ctrl, find_reducer(key, chan(p))); + c = recv(p); + treemap::insert(im, key, c); + send(c, ref); + } + } + send(c, emit_val(val)); + } + + map(input, bind emit(intermediates, ctrl, _, _)); + + fn finish<~K, ~V>(k : &K, v : &chan>) { + send(v, release); + } + treemap::traverse(intermediates, finish); + send(ctrl, mapper_done); + } + + fn reduce_task<~K, ~V>(reduce : -reducer, + key: -K, out: -chan>>) { + let p = port(); + + send(out, chan(p)); + + let ref_count = 0; + let is_done = false; + + fn get<~V>(p: &port>, ref_count: &mutable int, + is_done: &mutable bool) -> option { + while !is_done || ref_count > 0 { + alt recv(p) { + emit_val(v) { + // log_err #fmt("received %d", v); + ret some(v); + } + done. { + // log_err "all done"; + is_done = true; + } + ref. { ref_count += 1; } + release. { ref_count -= 1; } + } + } + ret none; + } + + reduce(key, bind get(p, ref_count, is_done)); + } + + fn map_reduce<~K1, ~K2, ~V>(map : mapper, + reduce : reducer, + inputs: &[K1]) { + let ctrl = port(); + + // This task becomes the master control task. It task::_spawns + // to do the rest. + + let reducers = treemap::init(); + + let tasks = start_mappers(map, chan(ctrl), inputs); + + let num_mappers = vec::len(inputs) as int; + + while num_mappers > 0 { + alt recv(ctrl) { + mapper_done. { + // log_err "received mapper terminated."; + num_mappers -= 1; + } + find_reducer(k, cc) { + let c; + // log_err "finding reducer for " + k; + alt treemap::find(reducers, k) { + some(_c) { + // log_err "reusing existing reducer for " + k; + c = _c; + } + none. { + // log_err "creating new reducer for " + k; + let p = port(); + let r = reduce, kk = k; + tasks += + [task::spawn_joinable(bind reduce_task(r, + kk, chan(p)))]; + c = recv(p); + treemap::insert(reducers, k, c); + } + } + send(cc, c); + } + } + } + + fn finish<~K, ~V>(k : &K, v : &chan>) { + send(v, done); + } + treemap::traverse(reducers, finish); + + for t in tasks { task::join(t); } + } +} + +fn main(argv: [str]) { + if vec::len(argv) < 2u { + let out = io::stdout(); + + out.write_line(#fmt["Usage: %s ...", argv[0]]); + + // TODO: run something just to make sure the code hasn't + // broken yet. This is the unit test mode of this program. + + ret; + } + + let iargs = []; + for a in vec::slice(argv, 1u, vec::len(argv)) { + iargs += [str::bytes(a)]; + } + + // We can get by with 8k stacks, and we'll probably exhaust our + // address space otherwise. + task::set_min_stack(8192u); + + let start = time::precise_time_ns(); + + map_reduce::map_reduce(map, reduce, iargs); + let stop = time::precise_time_ns(); + + let elapsed = stop - start; + elapsed /= 1000000u64; + + log_err "MapReduce completed in " + u64::str(elapsed) + "ms"; +} + +fn read_word(r: io::reader) -> option { + let w = ""; + + while !r.eof() { + let c = r.read_char(); + + + if is_word_char(c) { + w += str::from_char(c); + } else { if w != "" { ret some(w); } } + } + ret none; +} + +fn is_digit(c: char) -> bool { + alt c { + '0' { true } + '1' { true } + '2' { true } + '3' { true } + '4' { true } + '5' { true } + '6' { true } + '7' { true } + '8' { true } + '9' { true } + _ { false } + } +} + +fn is_alpha_lower(c: char) -> bool { + alt c { + 'a' { true } + 'b' { true } + 'c' { true } + 'd' { true } + 'e' { true } + 'f' { true } + 'g' { true } + 'h' { true } + 'i' { true } + 'j' { true } + 'k' { true } + 'l' { true } + 'm' { true } + 'n' { true } + 'o' { true } + 'p' { true } + 'q' { true } + 'r' { true } + 's' { true } + 't' { true } + 'u' { true } + 'v' { true } + 'w' { true } + 'x' { true } + 'y' { true } + 'z' { true } + _ { false } + } +} + +fn is_alpha_upper(c: char) -> bool { + alt c { + 'A' { true } + 'B' { true } + 'C' { true } + 'D' { true } + 'E' { true } + 'F' { true } + 'G' { true } + 'H' { true } + 'I' { true } + 'J' { true } + 'K' { true } + 'L' { true } + 'M' { true } + 'N' { true } + 'O' { true } + 'P' { true } + 'Q' { true } + 'R' { true } + 'S' { true } + 'T' { true } + 'U' { true } + 'V' { true } + 'W' { true } + 'X' { true } + 'Y' { true } + 'Z' { true } + _ { false } + } +} + +fn is_alpha(c: char) -> bool { is_alpha_upper(c) || is_alpha_lower(c) } + +fn is_word_char(c: char) -> bool { is_alpha(c) || is_digit(c) || c == '_' }