diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs index 8c1500353ae6..d9230e08dc76 100644 --- a/src/libcore/private/global.rs +++ b/src/libcore/private/global.rs @@ -114,6 +114,21 @@ unsafe fn global_data_modify_( } } +pub unsafe fn global_data_clone( + key: GlobalDataKey) -> Option { + let mut maybe_clone: Option = None; + do global_data_modify(key) |current| { + match ¤t { + &Some(~ref value) => { + maybe_clone = Some(value.clone()); + } + &None => () + } + current + } + return maybe_clone; +} + // GlobalState is a map from keys to unique pointers and a // destructor. Keys are pointers derived from the type of the // global value. There is a single GlobalState instance per runtime. diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs index 0607055db5c0..cc788dfee22f 100644 --- a/src/libstd/flatpipes.rs +++ b/src/libstd/flatpipes.rs @@ -782,7 +782,6 @@ mod test { let (finish_port, finish_chan) = pipes::stream(); let addr = ip::v4::parse_addr("127.0.0.1"); - let iotask = uv::global_loop::get(); let begin_connect_chan = Cell(move begin_connect_chan); let accept_chan = Cell(move accept_chan); @@ -790,6 +789,7 @@ mod test { // The server task do task::spawn |copy addr, move begin_connect_chan, move accept_chan| { + let iotask = &uv::global_loop::get(); let begin_connect_chan = begin_connect_chan.take(); let accept_chan = accept_chan.take(); let listen_res = do tcp::listen( @@ -821,6 +821,7 @@ mod test { begin_connect_port.recv(); debug!("connecting"); + let iotask = &uv::global_loop::get(); let connect_result = tcp::connect(copy addr, port, iotask); assert connect_result.is_ok(); let sock = result::unwrap(move connect_result); diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index fad583a668b3..080c5514ac8b 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -114,7 +114,7 @@ enum IpGetAddrErr { * a vector of `ip_addr` results, in the case of success, or an error * object in the case of failure */ -pub fn get_addr(node: &str, iotask: iotask) +pub fn get_addr(node: &str, iotask: &iotask) -> result::Result<~[IpAddr], IpGetAddrErr> { do oldcomm::listen |output_ch| { do str::as_buf(node) |node_ptr, len| unsafe { @@ -413,7 +413,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr() { let localhost_name = ~"localhost"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); if result::is_err(&ga_result) { fail ~"got err result from net::ip::get_addr();" @@ -439,7 +439,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr_bad_input() { let localhost_name = ~"sjkl234m,./sdf"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); assert result::is_err(&ga_result); } diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 847962c1773a..75c7a7cbfb9f 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -142,7 +142,7 @@ pub enum TcpConnectErrData { * `net::tcp::tcp_connect_err_data` instance will be returned */ pub fn connect(input_ip: ip::IpAddr, port: uint, - iotask: IoTask) + iotask: &IoTask) -> result::Result unsafe { let result_po = oldcomm::Port::(); let closed_signal_po = oldcomm::Port::<()>(); @@ -164,7 +164,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, ip::Ipv4(_) => { false } ip::Ipv6(_) => { true } }, - iotask: iotask + iotask: iotask.clone() }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch)); @@ -496,17 +496,17 @@ pub fn accept(new_conn: TcpNewConnection) let server_data_ptr = uv::ll::get_data_for_uv_handle( server_handle_ptr) as *TcpListenFcData; let reader_po = oldcomm::Port(); - let iotask = (*server_data_ptr).iotask; + let iotask = &(*server_data_ptr).iotask; let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); - let client_socket_data = @{ + let client_socket_data: @TcpSocketData = @{ reader_po: reader_po, reader_ch: oldcomm::Chan(&reader_po), stream_handle_ptr : stream_handle_ptr, connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), ipv6: (*server_data_ptr).ipv6, - iotask : iotask + iotask : iotask.clone() }; let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data)); let client_stream_handle_ptr = @@ -588,10 +588,10 @@ pub fn accept(new_conn: TcpNewConnection) * of listen exiting because of an error */ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, - on_establish_cb: fn~(oldcomm::Chan>), - new_connect_cb: fn~(TcpNewConnection, - oldcomm::Chan>)) + iotask: &IoTask, + on_establish_cb: fn~(oldcomm::Chan>), + new_connect_cb: fn~(TcpNewConnection, + oldcomm::Chan>)) -> result::Result<(), TcpListenErrData> unsafe { do listen_common(move host_ip, port, backlog, iotask, move on_establish_cb) @@ -606,7 +606,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, } fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, + iotask: &IoTask, on_establish_cb: fn~(oldcomm::Chan>), on_connect_cb: fn~(*uv::ll::uv_tcp_t)) -> result::Result<(), TcpListenErrData> unsafe { @@ -615,12 +615,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, let kill_ch = oldcomm::Chan(&kill_po); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(&server_stream); - let server_data = { + let server_data: TcpListenFcData = { server_stream_ptr: server_stream_ptr, stream_closed_ch: oldcomm::Chan(&stream_closed_po), kill_ch: kill_ch, on_connect_cb: move on_connect_cb, - iotask: iotask, + iotask: iotask.clone(), ipv6: match &host_ip { &ip::Ipv4(_) => { false } &ip::Ipv6(_) => { true } @@ -895,7 +895,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe { }; let close_data_ptr = ptr::addr_of(&close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?", stream_handle_ptr, loop_ptr)); uv::ll::set_data_for_uv_handle(stream_handle_ptr, @@ -916,7 +916,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) use timer; log(debug, ~"starting tcp::read"); - let iotask = (*socket_data).iotask; + let iotask = &(*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_err(&rs_result) { let err_data = result::get_err(&rs_result); @@ -956,7 +956,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stop_po = oldcomm::Port::>(); let stop_ch = oldcomm::Chan(&stop_po); - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, ~"in interact cb for tcp::read_stop"); match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { 0i32 => { @@ -984,7 +984,7 @@ fn read_start_common_impl(socket_data: *TcpSocketData) let start_po = oldcomm::Port::>(); let start_ch = oldcomm::Chan(&start_po); log(debug, ~"in tcp::read_start before interact loop"); - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr)); match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, on_alloc_cb, @@ -1024,7 +1024,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, result_ch: oldcomm::Chan(&result_po) }; let write_data_ptr = ptr::addr_of(&write_data); - do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe { log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr)); match uv::ll::write(write_req_ptr, stream_handle_ptr, @@ -1369,7 +1369,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_and_client() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8888u; let expected_req = ~"ping"; @@ -1381,6 +1381,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1389,7 +1390,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1415,7 +1416,7 @@ pub mod test { assert str::contains(actual_resp, expected_resp); } pub fn impl_gl_tcp_ipv4_get_peer_addr() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8887u; let expected_resp = ~"pong"; @@ -1426,6 +1427,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1434,7 +1436,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1445,10 +1447,11 @@ pub mod test { let server_ip_addr = ip::v4::parse_addr(server_ip); let iotask = uv::global_loop::get(); let connect_result = connect(move server_ip_addr, server_port, - iotask); + &iotask); let sock = result::unwrap(move connect_result); + debug!("testing peer address"); // This is what we are actually testing! assert net::ip::format_addr(&sock.get_peer_addr()) == ~"127.0.0.1"; @@ -1457,12 +1460,14 @@ pub mod test { // Fulfill the protocol the test server expects let resp_bytes = str::to_bytes(~"ping"); tcp_write_single(&sock, resp_bytes); + debug!("message sent"); let read_result = sock.read(0u); client_ch.send(str::from_bytes(read_result.get())); + debug!("result read"); }; } pub fn impl_gl_tcp_ipv4_client_error_connection_refused() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8889u; let expected_req = ~"ping"; @@ -1482,7 +1487,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_address_in_use() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8890u; let expected_req = ~"ping"; @@ -1494,6 +1499,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1502,7 +1508,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1533,7 +1539,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_access_denied() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 80u; // this one should fail.. @@ -1553,7 +1559,7 @@ pub mod test { } pub fn impl_gl_tcp_ipv4_server_client_reader_writer() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8891u; let expected_req = ~"ping"; @@ -1565,6 +1571,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1573,7 +1580,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - iotask) + &iotask_clone) }; server_result_ch.send(actual_req); }; @@ -1604,7 +1611,7 @@ pub mod test { pub fn impl_tcp_socket_impl_reader_handles_eof() { use core::io::{Reader,ReaderUtil}; - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 10041u; let expected_req = ~"GET /"; @@ -1616,6 +1623,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1624,7 +1632,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1664,7 +1672,7 @@ pub mod test { fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str, server_ch: oldcomm::Chan<~str>, cont_ch: oldcomm::Chan<()>, - iotask: IoTask) -> ~str { + iotask: &IoTask) -> ~str { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1751,7 +1759,7 @@ pub mod test { } fn run_tcp_test_server_fail(server_ip: &str, server_port: uint, - iotask: IoTask) -> TcpListenErrData { + iotask: &IoTask) -> TcpListenErrData { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1775,7 +1783,7 @@ pub mod test { fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str, client_ch: oldcomm::Chan<~str>, - iotask: IoTask) -> result::Result<~str, + iotask: &IoTask) -> result::Result<~str, TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index 18c623c2bd8c..0f0aa2a011ea 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -39,7 +39,7 @@ use core; * * ch - a channel of type T to send a `val` on * * val - a value of type T to send over the provided `ch` */ -pub fn delayed_send(iotask: IoTask, +pub fn delayed_send(iotask: &IoTask, msecs: uint, ch: oldcomm::Chan, val: T) { @@ -90,7 +90,7 @@ pub fn delayed_send(iotask: IoTask, * * `iotask` - a `uv::iotask` that the tcp request will run on * * msecs - an amount of time, in milliseconds, for the current task to block */ -pub fn sleep(iotask: IoTask, msecs: uint) { +pub fn sleep(iotask: &IoTask, msecs: uint) { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); delayed_send(iotask, msecs, exit_ch, ()); @@ -117,7 +117,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) { * on the provided port in the allotted timeout period, then the result will * be a `some(T)`. If not, then `none` will be returned. */ -pub fn recv_timeout(iotask: IoTask, +pub fn recv_timeout(iotask: &IoTask, msecs: uint, wait_po: oldcomm::Port) -> Option { @@ -177,13 +177,13 @@ mod test { #[test] fn test_gl_timer_simple_sleep_test() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); sleep(hl_loop, 1u); } #[test] fn test_gl_timer_sleep_stress1() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); for iter::repeat(50u) { sleep(hl_loop, 1u); } @@ -193,7 +193,7 @@ mod test { fn test_gl_timer_sleep_stress2() { let po = oldcomm::Port(); let ch = oldcomm::Chan(&po); - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let repeat = 20u; let spec = { @@ -208,11 +208,12 @@ mod test { for spec.each |spec| { let (times, maxms) = *spec; + let hl_loop_clone = hl_loop.clone(); do task::spawn { use rand::*; let rng = Rng(); for iter::repeat(times) { - sleep(hl_loop, rng.next() as uint % maxms); + sleep(&hl_loop_clone, rng.next() as uint % maxms); } oldcomm::send(ch, ()); } @@ -271,12 +272,12 @@ mod test { let expected = rand::Rng().gen_str(16u); let test_po = oldcomm::Port::<~str>(); let test_ch = oldcomm::Chan(&test_po); - + let hl_loop_clone = hl_loop.clone(); do task::spawn() { - delayed_send(hl_loop, 50u, test_ch, expected); + delayed_send(&hl_loop_clone, 50u, test_ch, expected); }; - match recv_timeout(hl_loop, 1u, test_po) { + match recv_timeout(&hl_loop, 1u, test_po) { None => successes += 1, _ => failures += 1 }; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 276cb9cab643..097e923225af 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask}; use core::either::{Left, Right}; use core::libc; -use core::oldcomm::{Port, Chan, select2, listen}; -use core::private::{chan_from_global_ptr, weaken_task}; +use core::pipes::{Port, Chan, SharedChan, select2i}; +use core::private::global::{global_data_clone_create, + global_data_clone}; +use core::private::weak_task::weaken_task; use core::str; -use core::task::TaskBuilder; +use core::task::{task, SingleThreaded, spawn}; use core::task; use core::vec; - -extern mod rustrt { - unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; -} +use core::clone::Clone; +use core::option::{Some, None}; /** * Race-free helper to get access to a global task where a libuv @@ -49,64 +49,58 @@ pub fn get() -> IoTask { #[doc(hidden)] fn get_monitor_task_gl() -> IoTask unsafe { - let monitor_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); - - debug!("ENTERING global_loop::get() loop chan: %?", - monitor_loop_chan_ptr); - - debug!("before priv::chan_from_global_ptr"); type MonChan = Chan; - let monitor_ch = - do chan_from_global_ptr::(monitor_loop_chan_ptr, - || { - task::task().sched_mode - (task::SingleThreaded) - .unlinked() - }) |msg_po| unsafe { - debug!("global monitor task starting"); + struct GlobalIoTask(IoTask); - // As a weak task the runtime will notify us when to exit - do weaken_task() |weak_exit_po| { - debug!("global monitor task is now weak"); - let hl_loop = spawn_loop(); - loop { - debug!("in outer_loop..."); - match select2(weak_exit_po, msg_po) { - Left(weak_exit) => { - // all normal tasks have ended, tell the - // libuv loop to tear_down, then exit - debug!("weak_exit_po recv'd msg: %?", weak_exit); - iotask::exit(hl_loop); - break; - } - Right(fetch_ch) => { - debug!("hl_loop req recv'd: %?", fetch_ch); - fetch_ch.send(hl_loop); - } + impl GlobalIoTask: Clone { + fn clone(&self) -> GlobalIoTask { + GlobalIoTask((**self).clone()) + } + } + + fn key(_: GlobalIoTask) { } + + match global_data_clone(key) { + Some(GlobalIoTask(iotask)) => iotask, + None => { + let iotask: IoTask = spawn_loop(); + let mut installed = false; + let final_iotask = do global_data_clone_create(key) { + installed = true; + ~GlobalIoTask(iotask.clone()) + }; + if installed { + do task().unlinked().spawn() unsafe { + debug!("global monitor task starting"); + // As a weak task the runtime will notify us when to exit + do weaken_task |weak_exit_po| { + debug!("global monitor task is now weak"); + weak_exit_po.recv(); + iotask::exit(&iotask); + debug!("global monitor task is leaving weakend state"); + }; + debug!("global monitor task exiting"); } + } else { + iotask::exit(&iotask); } - debug!("global monitor task is leaving weakend state"); - }; - debug!("global monitor task exiting"); - }; - // once we have a chan to the monitor loop, we ask it for - // the libuv loop's async handle - do listen |fetch_ch| { - monitor_ch.send(fetch_ch); - fetch_ch.recv() + match final_iotask { + GlobalIoTask(iotask) => iotask + } + } } } fn spawn_loop() -> IoTask { - let builder = do task::task().add_wrapper |task_body| { + let builder = do task().add_wrapper |task_body| { fn~(move task_body) { // The I/O loop task also needs to be weak so it doesn't keep // the runtime alive unsafe { - do weaken_task |weak_exit_po| { - debug!("global libuv task is now weak %?", weak_exit_po); + do weaken_task |_| { + debug!("global libuv task is now weak"); task_body(); // We don't wait for the exit message on weak_exit_po @@ -118,6 +112,7 @@ fn spawn_loop() -> IoTask { } } }; + let builder = builder.unlinked(); spawn_iotask(move builder) } @@ -147,7 +142,7 @@ mod test { _status: libc::c_int) unsafe { log(debug, ~"in simple timer cb"); ll::timer_stop(timer_ptr); - let hl_loop = get_gl(); + let hl_loop = &get_gl(); do iotask::interact(hl_loop) |_loop_ptr| unsafe { log(debug, ~"closing timer"); ll::close(timer_ptr, simple_timer_close_cb); @@ -157,7 +152,7 @@ mod test { log(debug, ~"exiting simple timer cb"); } - fn impl_uv_hl_simple_timer(iotask: IoTask) unsafe { + fn impl_uv_hl_simple_timer(iotask: &IoTask) unsafe { let exit_po = oldcomm::Port::(); let exit_ch = oldcomm::Chan(&exit_po); let exit_ch_ptr = ptr::addr_of(&exit_ch); @@ -190,10 +185,11 @@ mod test { #[test] fn test_gl_uv_global_loop_high_level_global_timer() unsafe { - let hl_loop = get_gl(); + let hl_loop = &get_gl(); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); oldcomm::send(exit_ch, ()); }); @@ -206,12 +202,12 @@ mod test { #[test] #[ignore] fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe { - let hl_loop = get_gl(); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); let cycles = 5000u; for iter::repeat(cycles) { task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); oldcomm::send(exit_ch, ()); }); diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index 409d73c2539b..c50a19cc5c17 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -20,7 +20,7 @@ use ll = uv_ll; use core::libc::c_void; use core::libc; -use core::oldcomm::{Port, Chan, listen}; +use core::pipes::{stream, Port, Chan, SharedChan}; use core::prelude::*; use core::ptr::addr_of; use core::task::TaskBuilder; @@ -30,22 +30,30 @@ use core::task; pub enum IoTask { IoTask_({ async_handle: *ll::uv_async_t, - op_chan: Chan + op_chan: SharedChan }) } +impl IoTask: Clone { + fn clone(&self) -> IoTask { + IoTask_({ + async_handle: self.async_handle, + op_chan: self.op_chan.clone() + }) + } +} + pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { - do listen |iotask_ch| { + let (iotask_port, iotask_chan) = stream(); - do task.sched_mode(task::SingleThreaded).spawn { - debug!("entering libuv task"); - run_loop(iotask_ch); - debug!("libuv task exiting"); - }; + do task.sched_mode(task::SingleThreaded).spawn { + debug!("entering libuv task"); + run_loop(&iotask_chan); + debug!("libuv task exiting"); + }; - iotask_ch.recv() - } + iotask_port.recv() } @@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { * module. It is not safe to send the `loop_ptr` param to this callback out * via ports/chans. */ -pub unsafe fn interact(iotask: IoTask, +pub unsafe fn interact(iotask: &IoTask, cb: fn~(*c_void)) { send_msg(iotask, Interaction(move cb)); } @@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask, * async handle and do a sanity check to make sure that all other handles are * closed, causing a failure otherwise. */ -pub fn exit(iotask: IoTask) unsafe { +pub fn exit(iotask: &IoTask) unsafe { send_msg(iotask, TeardownLoop); } @@ -96,8 +104,9 @@ enum IoTaskMsg { } /// Run the loop and begin handling messages -fn run_loop(iotask_ch: Chan) unsafe { +fn run_loop(iotask_ch: &Chan) unsafe { + debug!("creating loop"); let loop_ptr = ll::loop_new(); // set up the special async handle we'll use to allow multi-task @@ -108,10 +117,12 @@ fn run_loop(iotask_ch: Chan) unsafe { // associate the async handle with the loop ll::async_init(loop_ptr, async_handle, wake_up_cb); + let (msg_po, msg_ch) = stream::(); + // initialize our loop data and store it in the loop let data: IoTaskLoopData = { async_handle: async_handle, - msg_po: Port() + msg_po: msg_po }; ll::set_data_for_uv_handle(async_handle, addr_of(&data)); @@ -119,7 +130,7 @@ fn run_loop(iotask_ch: Chan) unsafe { // while we dwell in the I/O loop let iotask = IoTask_({ async_handle: async_handle, - op_chan: data.msg_po.chan() + op_chan: SharedChan(msg_ch) }); iotask_ch.send(iotask); @@ -136,7 +147,7 @@ type IoTaskLoopData = { msg_po: Port }; -fn send_msg(iotask: IoTask, +fn send_msg(iotask: &IoTask, msg: IoTaskMsg) unsafe { iotask.op_chan.send(move msg); ll::async_send(iotask.async_handle); @@ -151,7 +162,7 @@ extern fn wake_up_cb(async_handle: *ll::uv_async_t, let loop_ptr = ll::get_loop_for_uv_handle(async_handle); let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData; - let msg_po = (*data).msg_po; + let msg_po = &(*data).msg_po; while msg_po.peek() { match msg_po.recv() { @@ -203,34 +214,37 @@ mod test { iotask: IoTask, exit_ch: oldcomm::Chan<()> }; - fn impl_uv_iotask_async(iotask: IoTask) unsafe { + fn impl_uv_iotask_async(iotask: &IoTask) unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(&async_handle); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); let ah_data = { - iotask: iotask, + iotask: iotask.clone(), exit_ch: exit_ch }; - let ah_data_ptr = ptr::addr_of(&ah_data); + let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data); + debug!("about to interact"); do interact(iotask) |loop_ptr| unsafe { + debug!("interacting"); ll::async_init(loop_ptr, ah_ptr, async_handle_cb); ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); }; + debug!("waiting for async close"); oldcomm::recv(exit_po); } // this fn documents the bear minimum neccesary to roll your own // high_level_loop unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask { - let iotask_port = oldcomm::Port::(); - let iotask_ch = oldcomm::Chan(&iotask_port); + let (iotask_port, iotask_ch) = stream::(); do task::spawn_sched(task::ManualThreads(1u)) { - run_loop(iotask_ch); + debug!("about to run a test loop"); + run_loop(&iotask_ch); exit_ch.send(()); }; - return oldcomm::recv(iotask_port); + return iotask_port.recv(); } extern fn lifetime_handle_close(handle: *libc::c_void) unsafe { @@ -247,7 +261,9 @@ mod test { fn test_uv_iotask_async() unsafe { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); - let iotask = spawn_test_loop(exit_ch); + let iotask = &spawn_test_loop(exit_ch); + + debug!("spawned iotask"); // using this handle to manage the lifetime of the high_level_loop, // as it will exit the first time one of the impl_uv_hl_async() is @@ -258,12 +274,16 @@ mod test { let work_exit_po = oldcomm::Port::<()>(); let work_exit_ch = oldcomm::Chan(&work_exit_po); for iter::repeat(7u) { + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - impl_uv_iotask_async(iotask); + debug!("async"); + impl_uv_iotask_async(&iotask_clone); + debug!("done async"); oldcomm::send(work_exit_ch, ()); }; }; for iter::repeat(7u) { + debug!("waiting"); oldcomm::recv(work_exit_po); }; log(debug, ~"sending teardown_loop msg.."); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 7e3428788411..8ca49ea6a57c 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -34,7 +34,6 @@ rust_kernel::rust_kernel(rust_env *env) : sched_reaper(this), osmain_driver(NULL), non_weak_tasks(0), - global_loop_chan(0), at_exit_runner(NULL), at_exit_started(false), env(env), diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 477e59d1b3e6..8ba0405b86e0 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -129,9 +129,6 @@ class rust_kernel { void end_weak_tasks(); void begin_shutdown(); - // Used to communicate with the process-side, global libuv loop - uintptr_t global_loop_chan; - lock_and_signal at_exit_lock; spawn_fn at_exit_runner; bool at_exit_started; @@ -190,8 +187,6 @@ public: bool send_to_port(rust_port_id chan, void *sptr); - uintptr_t* get_global_loop() { return &global_loop_chan; } - void register_exit_function(spawn_fn runner, fn_env_pair *f); }; diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 53d8177bcf82..2dc70088628f 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -513,15 +513,6 @@ rust_uv_ip6_port(struct sockaddr_in6* src) { return ntohs(src->sin6_port); } -extern "C" uintptr_t* -rust_uv_get_kernel_global_chan_ptr() { - uintptr_t* result = rust_get_current_task()->kernel->get_global_loop(); - rust_task* task = rust_get_current_task(); - LOG(task, stdlib, "global loop: %lu", (unsigned long int)result); - LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result); - return result; -} - extern "C" void* rust_uv_current_kernel_malloc(size_t size) { return current_kernel_malloc(size, "rust_uv_current_kernel_malloc"); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index dd84e5ff6e7c..8e8ce9ee509e 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -158,7 +158,6 @@ rust_uv_get_data_for_req rust_uv_set_data_for_req rust_uv_get_base_from_buf rust_uv_get_len_from_buf -rust_uv_get_kernel_global_chan_ptr rust_uv_current_kernel_malloc rust_uv_current_kernel_free rust_uv_getaddrinfo diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs index c2d4be04191b..10b13d8757fa 100644 --- a/src/test/run-pass/pipe-detect-term.rs +++ b/src/test/run-pass/pipe-detect-term.rs @@ -27,7 +27,7 @@ proto! oneshot ( ) fn main() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); pipes::spawn_service(oneshot::init, |p| { match try_recv(move p) { diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs index e71d0c4931dc..e138f2562aae 100644 --- a/src/test/run-pass/pipe-select.rs +++ b/src/test/run-pass/pipe-select.rs @@ -35,7 +35,7 @@ fn main() { use oneshot::client::*; use stream::client::*; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let c = pipes::spawn_service(stream::init, |p| { error!("waiting for pipes"); diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs index 4a6e7b4ce36a..ae7e4e7fb0ca 100644 --- a/src/test/run-pass/pipe-sleep.rs +++ b/src/test/run-pass/pipe-sleep.rs @@ -27,7 +27,7 @@ fn main() { let c = pipes::spawn_service(oneshot::init, |p| { recv(move p); }); - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); sleep(iotask, 500); signal(move c);