diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 3d115463844c..429651616a76 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -1,4 +1,47 @@ -// Runtime support for pipes. +/*! Runtime support for message passing with protocol enforcement. + + +Pipes consist of two endpoints. One endpoint can send messages and +the other can receive messages. The set of legal messages and which +directions they can flow at any given point are determined by a +protocol. Below is an example protocol. + +~~~ +proto! pingpong { + ping: send { + ping -> pong + } + pong: recv { + pong -> ping + } +} +~~~ + +The `proto!` syntax extension will convert this into a module called +`pingpong`, which includes a set of types and functions that can be +used to write programs that follow the pingpong protocol. + +*/ + +/* IMPLEMENTATION NOTES + +The initial design for this feature is available at: + +https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts + +Much of the design in that document is still accurate. There are +several components for the pipe implementation. First of all is the +syntax extension. To see how that works, it is best see comments in +libsyntax/ext/pipes.rs. + +This module includes two related pieces of the runtime +implementation. There is support for unbounded and bounded +protocols. The main difference between the two is the type of the +buffer that is carried along in the endpoint data structures. + +FIXME (#3072) - This is still incomplete + +*/ import unsafe::{forget, reinterpret_cast, transmute}; import either::{either, left, right}; @@ -11,7 +54,7 @@ export send_packet_buffered, recv_packet_buffered; export packet, mk_packet, entangle_buffer, has_buffer, buffer_header; // export these so we can find them in the buffer_resource -// destructor. This is probably another metadata bug. +// destructor. This is probably a symptom of #3005. export atomic_add_acq, atomic_sub_rel; // User-level things @@ -20,16 +63,14 @@ export select, select2, selecti, select2i, selectable; export spawn_service, spawn_service_recv; export stream, port, chan, shared_chan, port_set, channel; +#[doc(hidden)] const SPIN_COUNT: uint = 0; macro_rules! move_it { { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } } -// This is to help make sure we only move out of enums in safe -// places. Once there is unary move, it can be removed. -fn move_it(-x: T) -> T { x } - +#[doc(hidden)] enum state { empty, full, @@ -37,7 +78,7 @@ enum state { terminated } -class buffer_header { +struct buffer_header { // Tracks whether this buffer needs to be freed. We can probably // get away with restricting it to 0 or 1, if we're careful. let mut ref_count: int; @@ -49,12 +90,13 @@ class buffer_header { } // This is for protocols to associate extra data to thread around. +#[doc(hidden)] type buffer = { header: buffer_header, data: T, }; -class packet_header { +struct packet_header { let mut state: state; let mut blocked_task: option<*rust_task>; @@ -95,6 +137,7 @@ class packet_header { } } +#[doc(hidden)] type packet = { header: packet_header, mut payload: option, @@ -133,6 +176,7 @@ fn unibuffer() -> ~buffer> { b } +#[doc(hidden)] fn packet() -> *packet { let b = unibuffer(); let p = ptr::addr_of(b.data); @@ -141,6 +185,7 @@ fn packet() -> *packet { p } +#[doc(hidden)] fn entangle_buffer( -buffer: ~buffer, init: fn(*libc::c_void, x: &T) -> *packet) @@ -163,18 +208,22 @@ extern mod rusti { // If I call the rusti versions directly from a polymorphic function, // I get link errors. This is a bug that needs investigated more. +#[doc(hidden)] fn atomic_xchng_rel(&dst: int, src: int) -> int { rusti::atomic_xchng_rel(dst, src) } +#[doc(hidden)] fn atomic_add_acq(&dst: int, src: int) -> int { rusti::atomic_add_acq(dst, src) } +#[doc(hidden)] fn atomic_sub_rel(&dst: int, src: int) -> int { rusti::atomic_sub_rel(dst, src) } +#[doc(hidden)] type rust_task = libc::c_void; extern mod rustrt { @@ -188,6 +237,7 @@ extern mod rustrt { pure fn task_signal_event(target: *rust_task, event: *libc::c_void); } +#[doc(hidden)] fn wait_event(this: *rust_task) -> *libc::c_void { let mut event = ptr::null(); @@ -198,6 +248,7 @@ fn wait_event(this: *rust_task) -> *libc::c_void { event } +#[doc(hidden)] fn swap_state_acq(&dst: state, src: state) -> state { unsafe { reinterpret_cast(rusti::atomic_xchng_acq( @@ -206,6 +257,7 @@ fn swap_state_acq(&dst: state, src: state) -> state { } } +#[doc(hidden)] fn swap_state_rel(&dst: state, src: state) -> state { unsafe { reinterpret_cast(rusti::atomic_xchng_rel( @@ -214,11 +266,12 @@ fn swap_state_rel(&dst: state, src: state) -> state { } } +#[doc(hidden)] unsafe fn get_buffer(p: *packet_header) -> ~buffer { transmute((*p).buf_header()) } -class buffer_resource { +struct buffer_resource { let buffer: ~buffer; new(+b: ~buffer) { //let p = ptr::addr_of(*b); @@ -244,6 +297,7 @@ class buffer_resource { } } +#[doc(hidden)] fn send(-p: send_packet_buffered, -payload: T) { let header = p.header(); @@ -281,10 +335,21 @@ fn send(-p: send_packet_buffered, } } +/** Receives a message from a pipe. + +Fails if the sender closes the connection. + +*/ fn recv(-p: recv_packet_buffered) -> T { option::unwrap(try_recv(p)) } +/** Attempts to receive a message from a pipe. + +Returns `none` if the sender has closed the connection without sending +a message, or `some(T)` if a message was received. + +*/ fn try_recv(-p: recv_packet_buffered) -> option { @@ -351,6 +416,7 @@ impl peek for recv_packet_buffered { } } +#[doc(hidden)] fn sender_terminate(p: *packet) { let p = unsafe { &*p }; alt swap_state_rel(p.header.state, terminated) { @@ -377,6 +443,7 @@ fn sender_terminate(p: *packet) { } } +#[doc(hidden)] fn receiver_terminate(p: *packet) { let p = unsafe { &*p }; alt swap_state_rel(p.header.state, terminated) { @@ -394,8 +461,16 @@ fn receiver_terminate(p: *packet) { } } -#[doc = "Returns when one of the packet headers reports data is -available."] +/** Returns when one of the packet headers reports data is available. + +This function is primarily intended for building higher level waiting +functions, such as `select`, `select2`, etc. + +It takes a vector slice of packet_headers and returns an index into +that vector. The index points to an endpoint that has either been +closed by the sender or has a message waiting to be received. + +*/ fn wait_many(pkts: &[*packet_header]) -> uint { let this = rustrt::rust_get_task(); @@ -447,6 +522,34 @@ fn wait_many(pkts: &[*packet_header]) -> uint { ready_packet } +/** Receives a message from one of two endpoints. + +The return value is `left` if the first endpoint received something, +or `right` if the second endpoint receives something. In each case, +the result includes the other endpoint as well so it can be used +again. Below is an example of using `select2`. + +~~~ +match select2(a, b) { + left((none, b)) { + // endpoint a was closed. + } + right((a, none)) { + // endpoint b was closed. + } + left((some(_), b)) { + // endpoint a received a message + } + right(a, some(_)) { + // endpoint b received a message. + } +} +~~~ + +Sometimes messages will be available on both endpoints at once. In +this case, `select2` may return either `left` or `right`. + +*/ fn select2( +a: recv_packet_buffered, +b: recv_packet_buffered) @@ -500,13 +603,16 @@ fn select(+endpoints: ~[recv_packet_buffered]) (ready, result, remaining) } +/// The sending end of a pipe. It can be used to send exactly one +/// message. type send_packet = send_packet_buffered>; +#[doc(hidden)] fn send_packet(p: *packet) -> send_packet { send_packet_buffered(p) } -class send_packet_buffered { +struct send_packet_buffered { let mut p: option<*packet>; let mut buffer: option>; new(p: *packet) { @@ -560,13 +666,16 @@ class send_packet_buffered { } } +/// Represents the receive end of a pipe. It can receive exactly one +/// message. type recv_packet = recv_packet_buffered>; +#[doc(hidden)] fn recv_packet(p: *packet) -> recv_packet { recv_packet_buffered(p) } -class recv_packet_buffered : selectable { +struct recv_packet_buffered : selectable { let mut p: option<*packet>; let mut buffer: option>; new(p: *packet) { @@ -620,6 +729,7 @@ class recv_packet_buffered : selectable { } } +#[doc(hidden)] fn entangle() -> (send_packet, recv_packet) { let p = packet(); (send_packet(p), recv_packet(p)) @@ -686,12 +796,14 @@ trait recv { pure fn peek() -> bool; } +#[doc(hidden)] type chan_ = { mut endp: option> }; enum chan { chan_(chan_) } +#[doc(hidden)] type port_ = { mut endp: option> }; enum port { @@ -725,7 +837,7 @@ impl port of recv for port { fn try_recv() -> option { let mut endp = none; endp <-> self.endp; - alt move_it(pipes::try_recv(unwrap(endp))) { + alt move pipes::try_recv(unwrap(endp)) { some(streamp::data(x, endp)) { self.endp = some(move_it!{endp}); some(move_it!{x}) @@ -749,7 +861,7 @@ impl port of recv for port { } // Treat a whole bunch of ports as one. -class port_set : recv { +struct port_set : recv { let mut ports: ~[pipes::port]; new() { self.ports = ~[]; } @@ -770,7 +882,7 @@ class port_set : recv { let i = wait_many(self.ports.map(|p| p.header())); // dereferencing an unsafe pointer nonsense to appease the // borrowchecker. - alt move_it(unsafe {(*ptr::addr_of(self.ports[i])).try_recv()}) { + alt move unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} { some(m) { result = some(move_it!{m}); } diff --git a/src/libsyntax/ext/pipes.rs b/src/libsyntax/ext/pipes.rs index 5a1462cea309..08562c4490ff 100644 --- a/src/libsyntax/ext/pipes.rs +++ b/src/libsyntax/ext/pipes.rs @@ -1,3 +1,37 @@ +/*! Implementation of proto! extension. + +This is frequently called the pipe compiler. It handles code such as... + +~~~ +proto! pingpong { + ping: send { + ping -> pong + } + pong: recv { + pong -> ping + } +} +~~~ + +There are several components: + + * The parser (libsyntax/ext/pipes/parse_proto.rs) + * Responsible for building an AST from a protocol specification. + + * The checker (libsyntax/ext/pipes/check.rs) + * Basic correctness checking for protocols (i.e. no undefined states, etc.) + + * The analyzer (libsyntax/ext/pipes/liveness.rs) + * Determines whether the protocol is bounded or unbounded. + + * The compiler (libsynatx/ext/pipes/pipec.rs) + * Generates a Rust AST from the protocol AST and the results of analysis. + +There is more documentation in each of the files referenced above. + +FIXME (#3072) - This is still incomplete. + +*/ import codemap::span; import ext::base::ext_ctxt;