std: Convert uv_global_loop to use pipes

This commit is contained in:
Brian Anderson 2013-01-19 23:38:17 -08:00
parent fb9299346a
commit b9608fe423
14 changed files with 172 additions and 147 deletions

View file

@ -782,7 +782,6 @@ mod test {
let (finish_port, finish_chan) = pipes::stream();
let addr = ip::v4::parse_addr("127.0.0.1");
let iotask = uv::global_loop::get();
let begin_connect_chan = Cell(move begin_connect_chan);
let accept_chan = Cell(move accept_chan);
@ -790,6 +789,7 @@ mod test {
// The server task
do task::spawn |copy addr, move begin_connect_chan,
move accept_chan| {
let iotask = &uv::global_loop::get();
let begin_connect_chan = begin_connect_chan.take();
let accept_chan = accept_chan.take();
let listen_res = do tcp::listen(
@ -821,6 +821,7 @@ mod test {
begin_connect_port.recv();
debug!("connecting");
let iotask = &uv::global_loop::get();
let connect_result = tcp::connect(copy addr, port, iotask);
assert connect_result.is_ok();
let sock = result::unwrap(move connect_result);

View file

@ -114,7 +114,7 @@ enum IpGetAddrErr {
* a vector of `ip_addr` results, in the case of success, or an error
* object in the case of failure
*/
pub fn get_addr(node: &str, iotask: iotask)
pub fn get_addr(node: &str, iotask: &iotask)
-> result::Result<~[IpAddr], IpGetAddrErr> {
do oldcomm::listen |output_ch| {
do str::as_buf(node) |node_ptr, len| unsafe {
@ -413,7 +413,7 @@ mod test {
#[ignore(reason = "valgrind says it's leaky")]
fn test_ip_get_addr() {
let localhost_name = ~"localhost";
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let ga_result = get_addr(localhost_name, iotask);
if result::is_err(&ga_result) {
fail ~"got err result from net::ip::get_addr();"
@ -439,7 +439,7 @@ mod test {
#[ignore(reason = "valgrind says it's leaky")]
fn test_ip_get_addr_bad_input() {
let localhost_name = ~"sjkl234m,./sdf";
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let ga_result = get_addr(localhost_name, iotask);
assert result::is_err(&ga_result);
}

View file

@ -142,7 +142,7 @@ pub enum TcpConnectErrData {
* `net::tcp::tcp_connect_err_data` instance will be returned
*/
pub fn connect(input_ip: ip::IpAddr, port: uint,
iotask: IoTask)
iotask: &IoTask)
-> result::Result<TcpSocket, TcpConnectErrData> unsafe {
let result_po = oldcomm::Port::<ConnAttempt>();
let closed_signal_po = oldcomm::Port::<()>();
@ -164,7 +164,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
ip::Ipv4(_) => { false }
ip::Ipv6(_) => { true }
},
iotask: iotask
iotask: iotask.clone()
};
let socket_data_ptr = ptr::addr_of(&(*socket_data));
log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
@ -496,17 +496,17 @@ pub fn accept(new_conn: TcpNewConnection)
let server_data_ptr = uv::ll::get_data_for_uv_handle(
server_handle_ptr) as *TcpListenFcData;
let reader_po = oldcomm::Port();
let iotask = (*server_data_ptr).iotask;
let iotask = &(*server_data_ptr).iotask;
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let client_socket_data = @{
let client_socket_data: @TcpSocketData = @{
reader_po: reader_po,
reader_ch: oldcomm::Chan(&reader_po),
stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(),
ipv6: (*server_data_ptr).ipv6,
iotask : iotask
iotask : iotask.clone()
};
let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
let client_stream_handle_ptr =
@ -588,10 +588,10 @@ pub fn accept(new_conn: TcpNewConnection)
* of listen exiting because of an error
*/
pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
new_connect_cb: fn~(TcpNewConnection,
oldcomm::Chan<Option<TcpErrData>>))
iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
new_connect_cb: fn~(TcpNewConnection,
oldcomm::Chan<Option<TcpErrData>>))
-> result::Result<(), TcpListenErrData> unsafe {
do listen_common(move host_ip, port, backlog, iotask,
move on_establish_cb)
@ -606,7 +606,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
}
fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: IoTask,
iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
on_connect_cb: fn~(*uv::ll::uv_tcp_t))
-> result::Result<(), TcpListenErrData> unsafe {
@ -615,12 +615,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
let kill_ch = oldcomm::Chan(&kill_po);
let server_stream = uv::ll::tcp_t();
let server_stream_ptr = ptr::addr_of(&server_stream);
let server_data = {
let server_data: TcpListenFcData = {
server_stream_ptr: server_stream_ptr,
stream_closed_ch: oldcomm::Chan(&stream_closed_po),
kill_ch: kill_ch,
on_connect_cb: move on_connect_cb,
iotask: iotask,
iotask: iotask.clone(),
ipv6: match &host_ip {
&ip::Ipv4(_) => { false }
&ip::Ipv6(_) => { true }
@ -895,7 +895,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe {
};
let close_data_ptr = ptr::addr_of(&close_data);
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
stream_handle_ptr, loop_ptr));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
@ -916,7 +916,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
use timer;
log(debug, ~"starting tcp::read");
let iotask = (*socket_data).iotask;
let iotask = &(*socket_data).iotask;
let rs_result = read_start_common_impl(socket_data);
if result::is_err(&rs_result) {
let err_data = result::get_err(&rs_result);
@ -956,7 +956,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = oldcomm::Port::<Option<TcpErrData>>();
let stop_ch = oldcomm::Chan(&stop_po);
do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
log(debug, ~"in interact cb for tcp::read_stop");
match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
0i32 => {
@ -984,7 +984,7 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
let start_ch = oldcomm::Chan(&start_po);
log(debug, ~"in tcp::read_start before interact loop");
do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
on_alloc_cb,
@ -1024,7 +1024,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
result_ch: oldcomm::Chan(&result_po)
};
let write_data_ptr = ptr::addr_of(&write_data);
do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe {
log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
match uv::ll::write(write_req_ptr,
stream_handle_ptr,
@ -1369,7 +1369,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_and_client() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8888u;
let expected_req = ~"ping";
@ -1381,6 +1381,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1389,7 +1390,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1415,7 +1416,7 @@ pub mod test {
assert str::contains(actual_resp, expected_resp);
}
pub fn impl_gl_tcp_ipv4_get_peer_addr() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8887u;
let expected_resp = ~"pong";
@ -1426,6 +1427,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1434,7 +1436,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1445,10 +1447,11 @@ pub mod test {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let iotask = uv::global_loop::get();
let connect_result = connect(move server_ip_addr, server_port,
iotask);
&iotask);
let sock = result::unwrap(move connect_result);
debug!("testing peer address");
// This is what we are actually testing!
assert net::ip::format_addr(&sock.get_peer_addr()) ==
~"127.0.0.1";
@ -1457,12 +1460,14 @@ pub mod test {
// Fulfill the protocol the test server expects
let resp_bytes = str::to_bytes(~"ping");
tcp_write_single(&sock, resp_bytes);
debug!("message sent");
let read_result = sock.read(0u);
client_ch.send(str::from_bytes(read_result.get()));
debug!("result read");
};
}
pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8889u;
let expected_req = ~"ping";
@ -1482,7 +1487,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_address_in_use() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8890u;
let expected_req = ~"ping";
@ -1494,6 +1499,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1502,7 +1508,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1533,7 +1539,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_access_denied() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 80u;
// this one should fail..
@ -1553,7 +1559,7 @@ pub mod test {
}
pub fn impl_gl_tcp_ipv4_server_client_reader_writer() {
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8891u;
let expected_req = ~"ping";
@ -1565,6 +1571,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let iotask_clone = iotask.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1573,7 +1580,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
iotask)
&iotask_clone)
};
server_result_ch.send(actual_req);
};
@ -1604,7 +1611,7 @@ pub mod test {
pub fn impl_tcp_socket_impl_reader_handles_eof() {
use core::io::{Reader,ReaderUtil};
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 10041u;
let expected_req = ~"GET /";
@ -1616,6 +1623,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
@ -1624,7 +1632,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
@ -1664,7 +1672,7 @@ pub mod test {
fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
server_ch: oldcomm::Chan<~str>,
cont_ch: oldcomm::Chan<()>,
iotask: IoTask) -> ~str {
iotask: &IoTask) -> ~str {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(move server_ip_addr, server_port, 128,
iotask,
@ -1751,7 +1759,7 @@ pub mod test {
}
fn run_tcp_test_server_fail(server_ip: &str, server_port: uint,
iotask: IoTask) -> TcpListenErrData {
iotask: &IoTask) -> TcpListenErrData {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(move server_ip_addr, server_port, 128,
iotask,
@ -1775,7 +1783,7 @@ pub mod test {
fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
client_ch: oldcomm::Chan<~str>,
iotask: IoTask) -> result::Result<~str,
iotask: &IoTask) -> result::Result<~str,
TcpConnectErrData> {
let server_ip_addr = ip::v4::parse_addr(server_ip);

View file

@ -39,7 +39,7 @@ use core;
* * ch - a channel of type T to send a `val` on
* * val - a value of type T to send over the provided `ch`
*/
pub fn delayed_send<T: Owned>(iotask: IoTask,
pub fn delayed_send<T: Owned>(iotask: &IoTask,
msecs: uint,
ch: oldcomm::Chan<T>,
val: T) {
@ -90,7 +90,7 @@ pub fn delayed_send<T: Owned>(iotask: IoTask,
* * `iotask` - a `uv::iotask` that the tcp request will run on
* * msecs - an amount of time, in milliseconds, for the current task to block
*/
pub fn sleep(iotask: IoTask, msecs: uint) {
pub fn sleep(iotask: &IoTask, msecs: uint) {
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
delayed_send(iotask, msecs, exit_ch, ());
@ -117,7 +117,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) {
* on the provided port in the allotted timeout period, then the result will
* be a `some(T)`. If not, then `none` will be returned.
*/
pub fn recv_timeout<T: Copy Owned>(iotask: IoTask,
pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask,
msecs: uint,
wait_po: oldcomm::Port<T>)
-> Option<T> {
@ -177,13 +177,13 @@ mod test {
#[test]
fn test_gl_timer_simple_sleep_test() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
sleep(hl_loop, 1u);
}
#[test]
fn test_gl_timer_sleep_stress1() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
for iter::repeat(50u) {
sleep(hl_loop, 1u);
}
@ -193,7 +193,7 @@ mod test {
fn test_gl_timer_sleep_stress2() {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let repeat = 20u;
let spec = {
@ -208,11 +208,12 @@ mod test {
for spec.each |spec| {
let (times, maxms) = *spec;
let hl_loop_clone = hl_loop.clone();
do task::spawn {
use rand::*;
let rng = Rng();
for iter::repeat(times) {
sleep(hl_loop, rng.next() as uint % maxms);
sleep(&hl_loop_clone, rng.next() as uint % maxms);
}
oldcomm::send(ch, ());
}
@ -271,12 +272,12 @@ mod test {
let expected = rand::Rng().gen_str(16u);
let test_po = oldcomm::Port::<~str>();
let test_ch = oldcomm::Chan(&test_po);
let hl_loop_clone = hl_loop.clone();
do task::spawn() {
delayed_send(hl_loop, 50u, test_ch, expected);
delayed_send(&hl_loop_clone, 50u, test_ch, expected);
};
match recv_timeout(hl_loop, 1u, test_po) {
match recv_timeout(&hl_loop, 1u, test_po) {
None => successes += 1,
_ => failures += 1
};

View file

@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask};
use core::either::{Left, Right};
use core::libc;
use core::oldcomm::{Port, Chan, select2, listen};
use core::private::{chan_from_global_ptr, weaken_task};
use core::pipes::{Port, Chan, SharedChan, select2i};
use core::private::global::{global_data_clone_create,
global_data_clone};
use core::private::weak_task::weaken_task;
use core::str;
use core::task::TaskBuilder;
use core::task::{task, SingleThreaded, spawn};
use core::task;
use core::vec;
extern mod rustrt {
unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
}
use core::clone::Clone;
use core::option::{Some, None};
/**
* Race-free helper to get access to a global task where a libuv
@ -49,64 +49,58 @@ pub fn get() -> IoTask {
#[doc(hidden)]
fn get_monitor_task_gl() -> IoTask unsafe {
let monitor_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr();
debug!("ENTERING global_loop::get() loop chan: %?",
monitor_loop_chan_ptr);
debug!("before priv::chan_from_global_ptr");
type MonChan = Chan<IoTask>;
let monitor_ch =
do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr,
|| {
task::task().sched_mode
(task::SingleThreaded)
.unlinked()
}) |msg_po| unsafe {
debug!("global monitor task starting");
struct GlobalIoTask(IoTask);
// As a weak task the runtime will notify us when to exit
do weaken_task() |weak_exit_po| {
debug!("global monitor task is now weak");
let hl_loop = spawn_loop();
loop {
debug!("in outer_loop...");
match select2(weak_exit_po, msg_po) {
Left(weak_exit) => {
// all normal tasks have ended, tell the
// libuv loop to tear_down, then exit
debug!("weak_exit_po recv'd msg: %?", weak_exit);
iotask::exit(hl_loop);
break;
}
Right(fetch_ch) => {
debug!("hl_loop req recv'd: %?", fetch_ch);
fetch_ch.send(hl_loop);
}
impl GlobalIoTask: Clone {
fn clone(&self) -> GlobalIoTask {
GlobalIoTask((**self).clone())
}
}
fn key(_: GlobalIoTask) { }
match global_data_clone(key) {
Some(GlobalIoTask(iotask)) => iotask,
None => {
let iotask: IoTask = spawn_loop();
let mut installed = false;
let final_iotask = do global_data_clone_create(key) {
installed = true;
~GlobalIoTask(iotask.clone())
};
if installed {
do task().unlinked().spawn() unsafe {
debug!("global monitor task starting");
// As a weak task the runtime will notify us when to exit
do weaken_task |weak_exit_po| {
debug!("global monitor task is now weak");
weak_exit_po.recv();
iotask::exit(&iotask);
debug!("global monitor task is leaving weakend state");
};
debug!("global monitor task exiting");
}
} else {
iotask::exit(&iotask);
}
debug!("global monitor task is leaving weakend state");
};
debug!("global monitor task exiting");
};
// once we have a chan to the monitor loop, we ask it for
// the libuv loop's async handle
do listen |fetch_ch| {
monitor_ch.send(fetch_ch);
fetch_ch.recv()
match final_iotask {
GlobalIoTask(iotask) => iotask
}
}
}
}
fn spawn_loop() -> IoTask {
let builder = do task::task().add_wrapper |task_body| {
let builder = do task().add_wrapper |task_body| {
fn~(move task_body) {
// The I/O loop task also needs to be weak so it doesn't keep
// the runtime alive
unsafe {
do weaken_task |weak_exit_po| {
debug!("global libuv task is now weak %?", weak_exit_po);
do weaken_task |_| {
debug!("global libuv task is now weak");
task_body();
// We don't wait for the exit message on weak_exit_po
@ -118,6 +112,7 @@ fn spawn_loop() -> IoTask {
}
}
};
let builder = builder.unlinked();
spawn_iotask(move builder)
}
@ -147,7 +142,7 @@ mod test {
_status: libc::c_int) unsafe {
log(debug, ~"in simple timer cb");
ll::timer_stop(timer_ptr);
let hl_loop = get_gl();
let hl_loop = &get_gl();
do iotask::interact(hl_loop) |_loop_ptr| unsafe {
log(debug, ~"closing timer");
ll::close(timer_ptr, simple_timer_close_cb);
@ -157,7 +152,7 @@ mod test {
log(debug, ~"exiting simple timer cb");
}
fn impl_uv_hl_simple_timer(iotask: IoTask) unsafe {
fn impl_uv_hl_simple_timer(iotask: &IoTask) unsafe {
let exit_po = oldcomm::Port::<bool>();
let exit_ch = oldcomm::Chan(&exit_po);
let exit_ch_ptr = ptr::addr_of(&exit_ch);
@ -190,10 +185,11 @@ mod test {
#[test]
fn test_gl_uv_global_loop_high_level_global_timer() unsafe {
let hl_loop = get_gl();
let hl_loop = &get_gl();
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
task::spawn_sched(task::ManualThreads(1u), || {
let hl_loop = &get_gl();
impl_uv_hl_simple_timer(hl_loop);
oldcomm::send(exit_ch, ());
});
@ -206,12 +202,12 @@ mod test {
#[test]
#[ignore]
fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe {
let hl_loop = get_gl();
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let cycles = 5000u;
for iter::repeat(cycles) {
task::spawn_sched(task::ManualThreads(1u), || {
let hl_loop = &get_gl();
impl_uv_hl_simple_timer(hl_loop);
oldcomm::send(exit_ch, ());
});

View file

@ -20,7 +20,7 @@ use ll = uv_ll;
use core::libc::c_void;
use core::libc;
use core::oldcomm::{Port, Chan, listen};
use core::pipes::{stream, Port, Chan, SharedChan};
use core::prelude::*;
use core::ptr::addr_of;
use core::task::TaskBuilder;
@ -30,22 +30,30 @@ use core::task;
pub enum IoTask {
IoTask_({
async_handle: *ll::uv_async_t,
op_chan: Chan<IoTaskMsg>
op_chan: SharedChan<IoTaskMsg>
})
}
impl IoTask: Clone {
fn clone(&self) -> IoTask {
IoTask_({
async_handle: self.async_handle,
op_chan: self.op_chan.clone()
})
}
}
pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
do listen |iotask_ch| {
let (iotask_port, iotask_chan) = stream();
do task.sched_mode(task::SingleThreaded).spawn {
debug!("entering libuv task");
run_loop(iotask_ch);
debug!("libuv task exiting");
};
do task.sched_mode(task::SingleThreaded).spawn {
debug!("entering libuv task");
run_loop(&iotask_chan);
debug!("libuv task exiting");
};
iotask_ch.recv()
}
iotask_port.recv()
}
@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
* module. It is not safe to send the `loop_ptr` param to this callback out
* via ports/chans.
*/
pub unsafe fn interact(iotask: IoTask,
pub unsafe fn interact(iotask: &IoTask,
cb: fn~(*c_void)) {
send_msg(iotask, Interaction(move cb));
}
@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask,
* async handle and do a sanity check to make sure that all other handles are
* closed, causing a failure otherwise.
*/
pub fn exit(iotask: IoTask) unsafe {
pub fn exit(iotask: &IoTask) unsafe {
send_msg(iotask, TeardownLoop);
}
@ -96,8 +104,9 @@ enum IoTaskMsg {
}
/// Run the loop and begin handling messages
fn run_loop(iotask_ch: Chan<IoTask>) unsafe {
fn run_loop(iotask_ch: &Chan<IoTask>) unsafe {
debug!("creating loop");
let loop_ptr = ll::loop_new();
// set up the special async handle we'll use to allow multi-task
@ -108,10 +117,12 @@ fn run_loop(iotask_ch: Chan<IoTask>) unsafe {
// associate the async handle with the loop
ll::async_init(loop_ptr, async_handle, wake_up_cb);
let (msg_po, msg_ch) = stream::<IoTaskMsg>();
// initialize our loop data and store it in the loop
let data: IoTaskLoopData = {
async_handle: async_handle,
msg_po: Port()
msg_po: msg_po
};
ll::set_data_for_uv_handle(async_handle, addr_of(&data));
@ -119,7 +130,7 @@ fn run_loop(iotask_ch: Chan<IoTask>) unsafe {
// while we dwell in the I/O loop
let iotask = IoTask_({
async_handle: async_handle,
op_chan: data.msg_po.chan()
op_chan: SharedChan(msg_ch)
});
iotask_ch.send(iotask);
@ -136,7 +147,7 @@ type IoTaskLoopData = {
msg_po: Port<IoTaskMsg>
};
fn send_msg(iotask: IoTask,
fn send_msg(iotask: &IoTask,
msg: IoTaskMsg) unsafe {
iotask.op_chan.send(move msg);
ll::async_send(iotask.async_handle);
@ -151,7 +162,7 @@ extern fn wake_up_cb(async_handle: *ll::uv_async_t,
let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
let msg_po = (*data).msg_po;
let msg_po = &(*data).msg_po;
while msg_po.peek() {
match msg_po.recv() {
@ -203,34 +214,37 @@ mod test {
iotask: IoTask,
exit_ch: oldcomm::Chan<()>
};
fn impl_uv_iotask_async(iotask: IoTask) unsafe {
fn impl_uv_iotask_async(iotask: &IoTask) unsafe {
let async_handle = ll::async_t();
let ah_ptr = ptr::addr_of(&async_handle);
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let ah_data = {
iotask: iotask,
iotask: iotask.clone(),
exit_ch: exit_ch
};
let ah_data_ptr = ptr::addr_of(&ah_data);
let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data);
debug!("about to interact");
do interact(iotask) |loop_ptr| unsafe {
debug!("interacting");
ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
ll::async_send(ah_ptr);
};
debug!("waiting for async close");
oldcomm::recv(exit_po);
}
// this fn documents the bear minimum neccesary to roll your own
// high_level_loop
unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask {
let iotask_port = oldcomm::Port::<IoTask>();
let iotask_ch = oldcomm::Chan(&iotask_port);
let (iotask_port, iotask_ch) = stream::<IoTask>();
do task::spawn_sched(task::ManualThreads(1u)) {
run_loop(iotask_ch);
debug!("about to run a test loop");
run_loop(&iotask_ch);
exit_ch.send(());
};
return oldcomm::recv(iotask_port);
return iotask_port.recv();
}
extern fn lifetime_handle_close(handle: *libc::c_void) unsafe {
@ -247,7 +261,9 @@ mod test {
fn test_uv_iotask_async() unsafe {
let exit_po = oldcomm::Port::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let iotask = spawn_test_loop(exit_ch);
let iotask = &spawn_test_loop(exit_ch);
debug!("spawned iotask");
// using this handle to manage the lifetime of the high_level_loop,
// as it will exit the first time one of the impl_uv_hl_async() is
@ -258,12 +274,16 @@ mod test {
let work_exit_po = oldcomm::Port::<()>();
let work_exit_ch = oldcomm::Chan(&work_exit_po);
for iter::repeat(7u) {
let iotask_clone = iotask.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
impl_uv_iotask_async(iotask);
debug!("async");
impl_uv_iotask_async(&iotask_clone);
debug!("done async");
oldcomm::send(work_exit_ch, ());
};
};
for iter::repeat(7u) {
debug!("waiting");
oldcomm::recv(work_exit_po);
};
log(debug, ~"sending teardown_loop msg..");