core: Remove pipes::spawn_service, spawn_service_recv

These are only used in test cases; pipes isn't the right place for them;
they are unnecessary.

Conflicts:
	src/libcore/rt/uv/mod.rs
This commit is contained in:
Brian Anderson 2013-04-01 18:17:16 -07:00
parent b329f2fa82
commit ab08b4fbfd
5 changed files with 102 additions and 55 deletions

View file

@ -83,7 +83,6 @@ bounded and unbounded protocols allows for less code duplication.
*/
use cast::{forget, reinterpret_cast, transmute};
use cell::Cell;
use either::{Either, Left, Right};
use kinds::Owned;
use libc;
@ -902,51 +901,6 @@ pub fn entangle<T>() -> (SendPacket<T>, RecvPacket<T>) {
(SendPacket(p), RecvPacket(p))
}
/** Spawn a task to provide a service.
It takes an initialization function that produces a send and receive
endpoint. The send endpoint is returned to the caller and the receive
endpoint is passed to the new task.
*/
pub fn spawn_service<T:Owned,Tb:Owned>(
init: extern fn() -> (SendPacketBuffered<T, Tb>,
RecvPacketBuffered<T, Tb>),
service: ~fn(v: RecvPacketBuffered<T, Tb>))
-> SendPacketBuffered<T, Tb> {
let (client, server) = init();
// This is some nasty gymnastics required to safely move the pipe
// into a new task.
let server = Cell(server);
do task::spawn {
service(server.take());
}
client
}
/** Like `spawn_service_recv`, but for protocols that start in the
receive state.
*/
pub fn spawn_service_recv<T:Owned,Tb:Owned>(
init: extern fn() -> (RecvPacketBuffered<T, Tb>,
SendPacketBuffered<T, Tb>),
service: ~fn(v: SendPacketBuffered<T, Tb>))
-> RecvPacketBuffered<T, Tb> {
let (client, server) = init();
// This is some nasty gymnastics required to safely move the pipe
// into a new task.
let server = Cell(server);
do task::spawn {
service(server.take())
}
client
}
pub mod rt {
use option::{None, Option, Some};

View file

@ -14,7 +14,8 @@
extern mod std;
use core::pipes::{spawn_service, recv};
use core::cell::Cell;
use core::pipes::*;
use std::time::precise_time_s;
proto! pingpong (
@ -70,6 +71,52 @@ macro_rules! follow (
)
)
/** Spawn a task to provide a service.
It takes an initialization function that produces a send and receive
endpoint. The send endpoint is returned to the caller and the receive
endpoint is passed to the new task.
*/
pub fn spawn_service<T:Owned,Tb:Owned>(
init: extern fn() -> (SendPacketBuffered<T, Tb>,
RecvPacketBuffered<T, Tb>),
service: ~fn(v: RecvPacketBuffered<T, Tb>))
-> SendPacketBuffered<T, Tb> {
let (client, server) = init();
// This is some nasty gymnastics required to safely move the pipe
// into a new task.
let server = Cell(server);
do task::spawn {
service(server.take());
}
client
}
/** Like `spawn_service_recv`, but for protocols that start in the
receive state.
*/
pub fn spawn_service_recv<T:Owned,Tb:Owned>(
init: extern fn() -> (RecvPacketBuffered<T, Tb>,
SendPacketBuffered<T, Tb>),
service: ~fn(v: SendPacketBuffered<T, Tb>))
-> RecvPacketBuffered<T, Tb> {
let (client, server) = init();
// This is some nasty gymnastics required to safely move the pipe
// into a new task.
let server = Cell(server);
do task::spawn {
service(server.take())
}
client
}
fn switch<T:Owned,Tb:Owned,U>(+endp: core::pipes::RecvPacketBuffered<T, Tb>,
f: &fn(+v: Option<T>) -> U) -> U {
f(core::pipes::try_recv(endp))

View file

@ -18,7 +18,7 @@ extern mod std;
use std::timer::sleep;
use std::uv;
use core::pipes;
use core::cell::Cell;
use core::pipes::{try_recv, recv};
proto! oneshot (
@ -30,12 +30,14 @@ proto! oneshot (
pub fn main() {
let iotask = &uv::global_loop::get();
pipes::spawn_service(oneshot::init, |p| {
match try_recv(p) {
let (chan, port) = oneshot::init();
let port = Cell(port);
do spawn {
match try_recv(port.take()) {
Some(*) => { fail!() }
None => { }
}
});
}
sleep(iotask, 100);

View file

@ -17,8 +17,9 @@ extern mod std;
use std::timer::sleep;
use std::uv;
use core::cell::Cell;
use core::pipes;
use core::pipes::{recv, select};
use core::pipes::*;
proto! oneshot (
waiting:send {
@ -32,13 +33,30 @@ proto! stream (
}
)
pub fn spawn_service<T:Owned,Tb:Owned>(
init: extern fn() -> (SendPacketBuffered<T, Tb>,
RecvPacketBuffered<T, Tb>),
service: ~fn(v: RecvPacketBuffered<T, Tb>))
-> SendPacketBuffered<T, Tb> {
let (client, server) = init();
// This is some nasty gymnastics required to safely move the pipe
// into a new task.
let server = Cell(server);
do task::spawn {
service(server.take());
}
client
}
pub fn main() {
use oneshot::client::*;
use stream::client::*;
let iotask = &uv::global_loop::get();
let c = pipes::spawn_service(stream::init, |p| {
let c = spawn_service(stream::init, |p| {
error!("waiting for pipes");
let stream::send(x, p) = recv(p);
error!("got pipes");

View file

@ -13,8 +13,9 @@
extern mod std;
use std::timer::sleep;
use std::uv;
use core::cell::Cell;
use core::pipes;
use core::pipes::recv;
use core::pipes::*;
proto! oneshot (
waiting:send {
@ -22,10 +23,35 @@ proto! oneshot (
}
)
/** Spawn a task to provide a service.
It takes an initialization function that produces a send and receive
endpoint. The send endpoint is returned to the caller and the receive
endpoint is passed to the new task.
*/
pub fn spawn_service<T:Owned,Tb:Owned>(
init: extern fn() -> (SendPacketBuffered<T, Tb>,
RecvPacketBuffered<T, Tb>),
service: ~fn(v: RecvPacketBuffered<T, Tb>))
-> SendPacketBuffered<T, Tb> {
let (client, server) = init();
// This is some nasty gymnastics required to safely move the pipe
// into a new task.
let server = Cell(server);
do task::spawn {
service(server.take());
}
client
}
pub fn main() {
use oneshot::client::*;
let c = pipes::spawn_service(oneshot::init, |p| { recv(p); });
let c = spawn_service(oneshot::init, |p| { recv(p); });
let iotask = &uv::global_loop::get();
sleep(iotask, 500);