diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 94c99361ef84..794f2d3890b4 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -2,6 +2,7 @@ import unsafe::{forget, reinterpret_cast, transmute}; import either::{either, left, right}; +import option::unwrap; enum state { empty, @@ -428,3 +429,69 @@ fn spawn_service_recv( client } + +// Streams - Make pipes a little easier in general. + +proto! streamp { + open:send { + data(T) -> open + } +} + +type chan = { mut endp: option> }; +type port = { mut endp: option> }; + +fn stream() -> (chan, port) { + let (c, s) = streamp::init(); + + #macro[ + [#move[x], + unsafe { let y <- *ptr::addr_of(x); y }] + ]; + + ({ mut endp: some(c) }, { mut endp: some(s) }) +} + +impl chan for chan { + fn send(+x: T) { + let mut endp = none; + endp <-> self.endp; + self.endp = some( + streamp::client::data(unwrap(endp), x)) + } +} + +impl port for port { + fn recv() -> T { + let mut endp = none; + endp <-> self.endp; + let streamp::data(x, endp) = pipes::recv(unwrap(endp)); + self.endp = some(endp); + x + } + + fn try_recv() -> option { + let mut endp = none; + endp <-> self.endp; + alt pipes::try_recv(unwrap(endp)) { + some(streamp::data(x, endp)) { + self.endp = some(#move(endp)); + some(#move(x)) + } + none { none } + } + } + + pure fn peek() -> bool unchecked { + let mut endp = none; + endp <-> self.endp; + let peek = alt endp { + some(endp) { + pipes::peek(endp) + } + none { fail "peeking empty stream" } + }; + self.endp <-> endp; + peek + } +} diff --git a/src/test/bench/shootout-k-nucleotide-pipes.rs b/src/test/bench/shootout-k-nucleotide-pipes.rs index 488af7a9b23a..3704bb3d7806 100644 --- a/src/test/bench/shootout-k-nucleotide-pipes.rs +++ b/src/test/bench/shootout-k-nucleotide-pipes.rs @@ -9,46 +9,7 @@ import std::map; import std::map::hashmap; import std::sort; -import stream::{stream, chan, port}; - -// After a snapshot, this should move into core, or std. -mod stream { - import option::unwrap; - - proto! streamp { - open:send { - data(T) -> open - } - } - - type chan = { mut endp: option> }; - type port = { mut endp: option> }; - - fn stream() -> (chan, port) { - let (c, s) = streamp::init(); - ({ mut endp: some(c) }, { mut endp: some(s) }) - } - - impl chan for chan { - fn send(+x: T) { - let mut endp = none; - endp <-> self.endp; - self.endp = some( - streamp::client::data(unwrap(endp), x)) - } - } - - impl port for port { - fn recv() -> T { - let mut endp = none; - endp <-> self.endp; - let streamp::data(x, endp) = unwrap( - pipes::try_recv(unwrap(endp))); - self.endp = some(endp); - x - } - } -} +import pipes::{stream, port, chan}; // given a map, print a sorted version of it fn sort_and_fmt(mm: hashmap<~[u8], uint>, total: uint) -> str { @@ -127,8 +88,8 @@ fn windows_with_carry(bb: ~[const u8], nn: uint, ret vec::slice(bb, len - (nn - 1u), len); } -fn make_sequence_processor(sz: uint, from_parent: stream::port<~[u8]>, - to_parent: stream::chan) { +fn make_sequence_processor(sz: uint, from_parent: pipes::port<~[u8]>, + to_parent: pipes::chan) { let freqs: hashmap<~[u8], uint> = map::bytes_hash(); let mut carry: ~[u8] = ~[]; @@ -190,7 +151,7 @@ fn main(args: ~[str]) { vec::push(from_child, from_child_); - let (to_child, from_parent) = stream::stream(); + let (to_child, from_parent) = pipes::stream(); do task::spawn_with(from_parent) |from_parent| { make_sequence_processor(sz, from_parent, to_parent_);