From ece38b3c7e16be1bedb45e552a127fe75bdb726a Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 1 Jun 2013 14:03:38 -0700 Subject: [PATCH] core::rt: Add `MegaPipe`, an unbounded, multiple producer/consumer, lock-free queue --- src/libstd/rt/comm.rs | 71 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 4772a8596bfb..ef2091f789c0 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -471,6 +471,44 @@ impl Clone for SharedPort { } } +// XXX: Need better name +type MegaPipe = (SharedPort, SharedChan); + +pub fn megapipe() -> MegaPipe { + let (port, chan) = stream(); + (SharedPort::new(port), SharedChan::new(chan)) +} + +impl GenericChan for MegaPipe { + fn send(&self, val: T) { + match *self { + (_, ref c) => c.send(val) + } + } +} + +impl GenericSmartChan for MegaPipe { + fn try_send(&self, val: T) -> bool { + match *self { + (_, ref c) => c.try_send(val) + } + } +} + +impl GenericPort for MegaPipe { + fn recv(&self) -> T { + match *self { + (ref p, _) => p.recv() + } + } + + fn try_recv(&self) -> Option { + match *self { + (ref p, _) => p.try_recv() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -834,5 +872,38 @@ mod test { assert!(recvd == send_total); } } + + #[test] + fn megapipe_stress() { + use rand; + use rand::RngUtil; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let pipe = megapipe(); + let total = stress_factor() + 10; + let mut rng = rand::rng(); + for total.times { + let msgs = rng.gen_uint_range(0, 10); + let pipe_clone = pipe.clone(); + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for msgs.times { + pipe_clone.send(()); + } + for msgs.times { + pipe_clone.recv(); + } + } + + end_chan_clone.send(()); + } + + for total.times { + end_port.recv(); + } + } + } }