core::rt: Add MegaPipe, an unbounded, multiple producer/consumer, lock-free queue

This commit is contained in:
Brian Anderson 2013-06-01 14:03:38 -07:00
parent 51d257fd9a
commit ece38b3c7e

View file

@ -471,6 +471,44 @@ impl<T> Clone for SharedPort<T> {
}
}
// XXX: Need better name
type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
pub fn megapipe<T: Owned>() -> MegaPipe<T> {
let (port, chan) = stream();
(SharedPort::new(port), SharedChan::new(chan))
}
impl<T: Owned> GenericChan<T> for MegaPipe<T> {
fn send(&self, val: T) {
match *self {
(_, ref c) => c.send(val)
}
}
}
impl<T: Owned> GenericSmartChan<T> for MegaPipe<T> {
fn try_send(&self, val: T) -> bool {
match *self {
(_, ref c) => c.try_send(val)
}
}
}
impl<T: Owned> GenericPort<T> for MegaPipe<T> {
fn recv(&self) -> T {
match *self {
(ref p, _) => p.recv()
}
}
fn try_recv(&self) -> Option<T> {
match *self {
(ref p, _) => p.try_recv()
}
}
}
#[cfg(test)]
mod test {
use super::*;
@ -834,5 +872,38 @@ mod test {
assert!(recvd == send_total);
}
}
#[test]
fn megapipe_stress() {
use rand;
use rand::RngUtil;
do run_in_mt_newsched_task {
let (end_port, end_chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let pipe = megapipe();
let total = stress_factor() + 10;
let mut rng = rand::rng();
for total.times {
let msgs = rng.gen_uint_range(0, 10);
let pipe_clone = pipe.clone();
let end_chan_clone = end_chan.clone();
do spawntask_random {
for msgs.times {
pipe_clone.send(());
}
for msgs.times {
pipe_clone.recv();
}
}
end_chan_clone.send(());
}
for total.times {
end_port.recv();
}
}
}
}