diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 9c795bb926a2..fbb4ee0a1557 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -21,8 +21,9 @@ native mod rustrt { Race-free helper to get access to a global task where a libuv loop is running. -Use `uv::hl::interact`, `uv::hl::ref_handle` and `uv::hl::unref_handle` to -do operations against the global loop that this function returns. +Use `uv::hl::interact`, `uv::hl::ref`, `uv::hl::unref` and +uv `uv::hl::unref_and_close` to do operations against the global +loop that this function returns. # Return @@ -33,6 +34,7 @@ fn get() -> hl::high_level_loop { ret get_monitor_task_gl(); } +// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime. #[doc(hidden)] fn get_monitor_task_gl() -> hl::high_level_loop { let monitor_loop_chan = @@ -51,6 +53,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop { }); } +// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime. #[doc(hidden)] fn get_single_task_gl() -> hl::high_level_loop { let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); @@ -323,7 +326,7 @@ mod test { hl::interact(hl_loop) {|loop_ptr| log(debug, "closing timer"); //ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb); - hl::unref_handle(hl_loop, timer_ptr, simple_timer_close_cb); + hl::unref_and_close(hl_loop, timer_ptr, simple_timer_close_cb); log(debug, "about to deref exit_ch_ptr"); log(debug, "after msg sent on deref'd exit_ch"); }; @@ -342,7 +345,7 @@ mod test { log(debug, "user code inside interact loop!!!"); let init_status = ll::timer_init(loop_ptr, timer_ptr); if(init_status == 0i32) { - hl::ref_handle(hl_loop, timer_ptr); + hl::ref(hl_loop, timer_ptr); ll::set_data_for_uv_handle( timer_ptr as *libc::c_void, exit_ch_ptr as *libc::c_void); diff --git a/src/libstd/uv_hl.rs b/src/libstd/uv_hl.rs index 48d377532089..8787b650b251 100644 --- a/src/libstd/uv_hl.rs +++ b/src/libstd/uv_hl.rs @@ -7,7 +7,7 @@ libuv functionality. "]; export high_level_loop, hl_loop_ext, high_level_msg; -export run_high_level_loop, interact, ref_handle, unref_handle; +export run_high_level_loop, interact, ref, unref, unref_and_close; import ll = uv_ll; @@ -23,6 +23,10 @@ the C uv loop to process any pending callbacks by the C uv loop "] enum high_level_loop { + simple_task_loop({ + async_handle: *ll::uv_async_t, + op_chan: comm::chan + }), single_task_loop({ async_handle: **ll::uv_async_t, op_chan: comm::chan @@ -52,6 +56,9 @@ impl hl_loop_ext for high_level_loop { monitor_task_loop({op_chan}) { ret op_chan; } + simple_task_loop({async_handle, op_chan}) { + ret op_chan; + } } } } @@ -61,8 +68,8 @@ Represents the range of interactions with a `high_level_loop` "] enum high_level_msg { interaction (fn~(*libc::c_void)), - auto_ref_handle (*libc::c_void), - auto_unref_handle (*libc::c_void, *u8), + ref_handle (*libc::c_void), + manual_unref_handle (*libc::c_void, option<*u8>), tear_down } @@ -96,7 +103,7 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void, ll::async_init(loop_ptr, async_handle, high_level_wake_up_cb); // initialize our loop data and store it in the loop - let data: global_loop_data = default_gl_data({ + let data: hl_loop_data = default_gl_data({ async_handle: async_handle, mut active: true, before_msg_drain: before_msg_drain, @@ -159,22 +166,25 @@ resource safe_handle_container(handle_fields: safe_handle_fields) { #[doc=" Needs to be encapsulated within `safe_handle` "] -fn ref_handle(hl_loop: high_level_loop, handle: *T) unsafe { - send_high_level_msg(hl_loop, auto_ref_handle(handle as *libc::c_void)); +fn ref(hl_loop: high_level_loop, handle: *T) unsafe { + send_high_level_msg(hl_loop, ref_handle(handle as *libc::c_void)); } #[doc=" Needs to be encapsulated within `safe_handle` "] -fn unref_handle(hl_loop: high_level_loop, handle: *T, - user_close_cb: *u8) unsafe { - send_high_level_msg(hl_loop, auto_unref_handle(handle as *libc::c_void, - user_close_cb)); +fn unref(hl_loop: high_level_loop, handle: *T) unsafe { + send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void, + none)); +} +fn unref_and_close(hl_loop: high_level_loop, handle: *T, cb: *u8) unsafe { + send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void, + some(cb))); } // INTERNAL API // data that lives for the lifetime of the high-evel oo -enum global_loop_data { +enum hl_loop_data { default_gl_data({ async_handle: *ll::uv_async_t, mut active: bool, @@ -203,6 +213,10 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop, log(debug,"GLOBAL ASYNC handle == 0"); } } + simple_task_loop({async_handle, op_chan}) { + log(debug,"simple async handle != 0, waking up loop.."); + ll::async_send((async_handle)); + } _ {} } } @@ -218,7 +232,7 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?", async_handle, status)); let loop_ptr = ll::get_loop_for_uv_handle(async_handle); - let data = ll::get_data_for_uv_handle(async_handle) as *global_loop_data; + let data = ll::get_data_for_uv_handle(async_handle) as *hl_loop_data; // we check to see if the loop is "active" (the loop is set to // active = false the first time we realize we need to 'tear down', // set subsequent calls to the global async handle may be triggered @@ -245,11 +259,11 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, cb(loop_ptr); log(debug,"after calling cb"); } - auto_ref_handle(handle) { + ref_handle(handle) { high_level_ref(data, handle); } - auto_unref_handle(handle, user_close_cb) { - high_level_unref(data, handle, false, user_close_cb); + manual_unref_handle(handle, user_close_cb) { + high_level_unref(data, handle, true, user_close_cb); } tear_down { log(debug,"incoming hl_msg: got tear_down"); @@ -258,6 +272,9 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, continue = comm::peek(msg_po); } } + else { + log(debug, "in hl wake_cb, no pending messages"); + } } log(debug, #fmt("after on_wake, continue? %?", continue)); if !do_msg_drain { @@ -269,13 +286,13 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe { log(debug, #fmt("tear_down_close_cb called, closing handle at %?", handle)); - let data = ll::get_data_for_uv_handle(handle) as *global_loop_data; - if vec::len((*data).refd_handles) > 0 { + let data = ll::get_data_for_uv_handle(handle) as *hl_loop_data; + if vec::len((*data).refd_handles) > 0u { fail "Didn't unref all high-level handles"; } } -fn high_level_tear_down(data: *global_loop_data) unsafe { +fn high_level_tear_down(data: *hl_loop_data) unsafe { log(debug, "high_level_tear_down() called, close async_handle"); // call user-suppled before_tear_down cb let async_handle = (*data).async_handle; @@ -283,36 +300,66 @@ fn high_level_tear_down(data: *global_loop_data) unsafe { ll::close(async_handle as *libc::c_void, tear_down_close_cb); } -unsafe fn high_level_ref(data: *global_loop_data, handle: *libc::c_void) { - log(debug,"incoming hl_msg: got auto_ref_handle"); +unsafe fn high_level_ref(data: *hl_loop_data, handle: *libc::c_void) { + log(debug,"incoming hl_msg: got ..ref_handle"); let mut refd_handles = (*data).refd_handles; + let mut unrefd_handles = (*data).unrefd_handles; let handle_already_refd = refd_handles.contains(handle); if handle_already_refd { fail "attempt to do a high-level ref an already ref'd handle"; } + let handle_already_unrefd = unrefd_handles.contains(handle); + // if we are ref'ing a handle (by ptr) that was already unref'd, + // probably + if handle_already_unrefd { + let last_idx = vec::len(unrefd_handles) - 1u; + let handle_idx = vec::position_elem(unrefd_handles, handle); + alt handle_idx { + none { + fail "trying to remove handle that isn't in unrefd_handles"; + } + some(idx) { + unrefd_handles[idx] <-> unrefd_handles[last_idx]; + vec::pop(unrefd_handles); + } + } + (*data).unrefd_handles = unrefd_handles; + } refd_handles += [handle]; (*data).refd_handles = refd_handles; } -unsafe fn high_level_unref(data: *global_loop_data, handle: *libc::c_void, - manual_unref: bool, user_close_cb: *u8) { +unsafe fn high_level_unref(data: *hl_loop_data, handle: *libc::c_void, + manual_unref: bool, user_close_cb: option<*u8>) { log(debug,"incoming hl_msg: got auto_unref_handle"); let mut refd_handles = (*data).refd_handles; let mut unrefd_handles = (*data).unrefd_handles; + log(debug, #fmt("refs: %?, unrefs %? handle %?", vec::len(refd_handles), + vec::len(unrefd_handles), handle)); let handle_already_refd = refd_handles.contains(handle); if !handle_already_refd { fail "attempting to high-level unref an untracked handle"; } let double_unref = unrefd_handles.contains(handle); if double_unref { + log(debug, "double unref encountered"); if manual_unref { // will allow a user to manual unref, but only signal // a fail when a double-unref is caused by a user fail "attempting to high-level unref an unrefd handle"; } + else { + log(debug, "not failing..."); + } } else { - ll::close(handle, user_close_cb); + log(debug, "attempting to unref handle"); + alt user_close_cb { + some(cb) { + ll::close(handle, cb); + } + none { } + } let last_idx = vec::len(refd_handles) - 1u; let handle_idx = vec::position_elem(refd_handles, handle); alt handle_idx { @@ -337,3 +384,128 @@ unsafe fn high_level_unref(data: *global_loop_data, handle: *libc::c_void, } } +#[cfg(test)] +mod test { + crust fn async_close_cb(handle: *ll::uv_async_t) unsafe { + log(debug, #fmt("async_close_cb handle %?", handle)); + let exit_ch = (*(ll::get_data_for_uv_handle(handle) + as *ah_data)).exit_ch; + comm::send(exit_ch, ()); + } + crust fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) + unsafe { + log(debug, #fmt("async_handle_cb handle %? status %?",handle,status)); + let hl_loop = (*(ll::get_data_for_uv_handle(handle) + as *ah_data)).hl_loop; + unref_and_close(hl_loop, handle, async_close_cb); + } + type ah_data = { + hl_loop: high_level_loop, + exit_ch: comm::chan<()> + }; + fn impl_uv_hl_async(hl_loop: high_level_loop) unsafe { + let async_handle = ll::async_t(); + let ah_ptr = ptr::addr_of(async_handle); + let exit_po = comm::port::<()>(); + let exit_ch = comm::chan(exit_po); + let ah_data = { + hl_loop: hl_loop, + exit_ch: exit_ch + }; + let ah_data_ptr = ptr::addr_of(ah_data); + interact(hl_loop) {|loop_ptr| + ref(hl_loop, ah_ptr); + ll::async_init(loop_ptr, ah_ptr, async_handle_cb); + ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); + ll::async_send(ah_ptr); + }; + comm::recv(exit_po); + } + + // this fn documents the bear minimum neccesary to roll your own + // high_level_loop + unsafe fn spawn_test_loop(exit_ch: comm::chan<()>) -> high_level_loop { + let hl_loop_port = comm::port::(); + let hl_loop_ch = comm::chan(hl_loop_port); + task::spawn_sched(task::manual_threads(1u)) {|| + let loop_ptr = ll::loop_new(); + let msg_po = comm::port::(); + let msg_ch = comm::chan(msg_po); + run_high_level_loop( + loop_ptr, + msg_po, + // before_run + {|async_handle| + log(debug,#fmt("hltest before_run: async_handle %?", + async_handle)); + // do an async_send with it + ll::async_send(async_handle); + comm::send(hl_loop_ch, simple_task_loop({ + async_handle: async_handle, + op_chan: msg_ch + })); + }, + // before_msg_drain + {|async_handle| + log(debug,#fmt("hltest before_msg_drain: async_handle %?", + async_handle)); + true + }, + // before_tear_down + {|async_handle| + log(debug,#fmt("hl test_loop b4_tear_down: async %?", + async_handle)); + }); + ll::loop_delete(loop_ptr); + comm::send(exit_ch, ()); + }; + ret comm::recv(hl_loop_port); + } + + crust fn lifetime_handle_close(handle: *libc::c_void) unsafe { + log(debug, #fmt("lifetime_handle_close ptr %?", handle)); + } + + crust fn lifetime_async_callback(handle: *libc::c_void, + status: libc::c_int) { + log(debug, #fmt("lifetime_handle_close ptr %? status %?", + handle, status)); + } + + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_hl_async() unsafe { + let exit_po = comm::port::<()>(); + let exit_ch = comm::chan(exit_po); + let hl_loop = spawn_test_loop(exit_ch); + + // using this handle to manage the lifetime of the high_level_loop, + // as it will exit the first time one of the impl_uv_hl_async() is + // cleaned up with no one ref'd handles on the loop (Which can happen + // under race-condition type situations.. this ensures that the loop + // lives until, at least, all of the impl_uv_hl_async() runs have been + // called, at least. + let lifetime_handle = ll::async_t(); + let lifetime_handle_ptr = ptr::addr_of(lifetime_handle); + interact(hl_loop) {|loop_ptr| + ref(hl_loop, lifetime_handle_ptr); + ll::async_init(loop_ptr, lifetime_handle_ptr, + lifetime_async_callback); + }; + + iter::repeat(7u) {|| + task::spawn_sched(task::manual_threads(1u), {|| + impl_uv_hl_async(hl_loop); + }); + }; + impl_uv_hl_async(hl_loop); + impl_uv_hl_async(hl_loop); + impl_uv_hl_async(hl_loop); + interact(hl_loop) {|loop_ptr| + ll::close(lifetime_handle_ptr, lifetime_handle_close); + unref(hl_loop, lifetime_handle_ptr); + log(debug, "close and unref lifetime handle"); + }; + comm::recv(exit_po); + } +} diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs index 11e709ec6821..0f52664a8155 100644 --- a/src/libstd/uv_ll.rs +++ b/src/libstd/uv_ll.rs @@ -595,8 +595,8 @@ unsafe fn run(loop_handle: *libc::c_void) { rustrt::rust_uv_run(loop_handle); } -unsafe fn close(handle: *libc::c_void, cb: *u8) { - rustrt::rust_uv_close(handle, cb); +unsafe fn close(handle: *T, cb: *u8) { + rustrt::rust_uv_close(handle as *libc::c_void, cb); } unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t)