From ba10819800aa70d702633d18ecc58ebfc74be8d0 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Thu, 12 Jul 2012 15:57:12 -0700 Subject: [PATCH] Added a message send that uses shared chans. They are slower than port selectors, but scale better. --- src/test/bench/msgsend-pipes-shared.rs | 208 +++++++++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 src/test/bench/msgsend-pipes-shared.rs diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs new file mode 100644 index 000000000000..ea4834609537 --- /dev/null +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -0,0 +1,208 @@ +// A port of the simplistic benchmark from +// +// http://github.com/PaulKeeble/ScalaVErlangAgents +// +// I *think* it's the same, more or less. + +// This version uses pipes with a shared send endpoint. It should have +// different scalability characteristics compared to the select +// version. + +use std; +import io::writer; +import io::writer_util; + +import arc::methods; +import pipes::{port, chan}; + +macro_rules! move { + { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } +} + +enum request { + get_count, + bytes(uint), + stop +} + +fn server(requests: port, responses: pipes::chan) { + let mut count = 0u; + let mut done = false; + while !done { + alt requests.try_recv() { + some(get_count) { responses.send(copy count); } + some(bytes(b)) { + //#error("server: received %? bytes", b); + count += b; + } + none { done = true; } + _ { } + } + } + responses.send(count); + //#error("server exiting"); +} + +fn run(args: &[str]) { + let (to_parent, from_child) = pipes::stream(); + let (to_child, from_parent) = pipes::stream(); + + let to_child = shared_chan(to_child); + + let size = option::get(uint::from_str(args[1])); + let workers = option::get(uint::from_str(args[2])); + let num_bytes = 100; + let start = std::time::precise_time_s(); + let mut worker_results = ~[]; + for uint::range(0u, workers) |i| { + let builder = task::builder(); + vec::push(worker_results, task::future_result(builder)); + let to_child = to_child.clone(); + do task::run(builder) { + for uint::range(0u, size / workers) |_i| { + //#error("worker %?: sending %? bytes", i, num_bytes); + to_child.send(bytes(num_bytes)); + } + //#error("worker %? exiting", i); + }; + } + do task::spawn { + server(from_parent, to_parent); + } + + vec::iter(worker_results, |r| { future::get(r); } ); + //#error("sending stop message"); + to_child.send(stop); + move!{to_child}; + let result = from_child.recv(); + let end = std::time::precise_time_s(); + let elapsed = end - start; + io::stdout().write_str(#fmt("Count is %?\n", result)); + io::stdout().write_str(#fmt("Test took %? seconds\n", elapsed)); + let thruput = ((size / workers * workers) as float) / (elapsed as float); + io::stdout().write_str(#fmt("Throughput=%f per sec\n", thruput)); + assert result == num_bytes * size; +} + +fn main(args: ~[str]) { + let args = if os::getenv("RUST_BENCH").is_some() { + ~["", "1000000", "10000"] + } else if args.len() <= 1u { + ~["", "10000", "4"] + } else { + copy args + }; + + #debug("%?", args); + run(args); +} + +// Treat a whole bunch of ports as one. +class box { + let mut contents: option; + new(+x: T) { self.contents = some(x); } + + fn swap(f: fn(+T) -> T) { + let mut tmp = none; + self.contents <-> tmp; + self.contents = some(f(option::unwrap(tmp))); + } + + fn unwrap() -> T { + let mut tmp = none; + self.contents <-> tmp; + option::unwrap(tmp) + } +} + +class port_set { + let mut ports: ~[pipes::port]; + + new() { self.ports = ~[]; } + + fn add(+port: pipes::port) { + vec::push(self.ports, port) + } + + fn try_recv() -> option { + let mut result = none; + while result == none && self.ports.len() > 0 { + let i = pipes::wait_many(self.ports.map(|p| p.header())); + // dereferencing an unsafe pointer nonsense to appease the + // borrowchecker. + alt unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} { + some(m) { + result = some(move!{m}); + } + none { + // Remove this port. + let mut ports = ~[]; + self.ports <-> ports; + vec::consume(ports, + |j, x| if i != j { vec::push(self.ports, x) }); + } + } + } +/* + while !done { + do self.ports.swap |ports| { + if ports.len() > 0 { + let old_len = ports.len(); + let (_, m, ports) = pipes::select(ports); + alt m { + some(pipes::streamp::data(x, next)) { + result = some(move!{x}); + done = true; + assert ports.len() == old_len - 1; + vec::append_one(ports, move!{next}) + } + none { + //#error("pipe closed"); + assert ports.len() == old_len - 1; + ports + } + } + } + else { + //#error("no more pipes"); + done = true; + ~[] + } + } + } +*/ + result + } + + fn recv() -> T { + option::unwrap(self.try_recv()) + } +} + +impl private_methods/& for pipes::port { + pure fn header() -> *pipes::packet_header unchecked { + alt self.endp { + some(endp) { + endp.header() + } + none { fail "peeking empty stream" } + } + } +} + +type shared_chan = arc::exclusive>; + +impl chan for shared_chan { + fn send(+x: T) { + let mut xx = some(x); + do self.with |_c, chan| { + let mut x = none; + x <-> xx; + chan.send(option::unwrap(x)) + } + } +} + +fn shared_chan(+c: pipes::chan) -> shared_chan { + arc::exclusive(c) +} \ No newline at end of file