From a0cd55a1d7436dc9532ddf5cdad7d1f7e8f108f3 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 20 May 2013 18:23:56 -0700 Subject: [PATCH] core::rt: Add RemoteCallback trait and uv implementation This is used for signalling the event loop from other threads. --- src/libcore/rt/rtio.rs | 11 +++++ src/libcore/rt/uv/async.rs | 6 +-- src/libcore/rt/uv/uvio.rs | 86 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs index 4b5eda22ff5d..fa657555f3aa 100644 --- a/src/libcore/rt/rtio.rs +++ b/src/libcore/rt/rtio.rs @@ -18,6 +18,7 @@ use rt::uv::uvio; // XXX: ~object doesn't work currently so these are some placeholder // types to use instead pub type EventLoopObject = uvio::UvEventLoop; +pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; @@ -26,10 +27,20 @@ pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn callback_ms(&mut self, ms: u64, ~fn()); + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; /// The asynchronous I/O services. Not all event loops may provide one fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; } +pub trait RemoteCallback { + /// Trigger the remote callback. Note that the number of times the callback + /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire', + /// the callback will be called at least once, but multiple callbacks may be coalesced + /// and callbacks may be called more often requested. Destruction also triggers the + /// callback. + fn fire(&mut self); +} + pub trait IoFactory { fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; diff --git a/src/libcore/rt/uv/async.rs b/src/libcore/rt/uv/async.rs index 0d032f512d38..6ed06cc10b78 100644 --- a/src/libcore/rt/uv/async.rs +++ b/src/libcore/rt/uv/async.rs @@ -20,7 +20,7 @@ pub struct AsyncWatcher(*uvll::uv_async_t); impl Watcher for AsyncWatcher { } impl AsyncWatcher { - fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { + pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { unsafe { let handle = uvll::malloc_handle(UV_ASYNC); assert!(handle.is_not_null()); @@ -41,14 +41,14 @@ impl AsyncWatcher { } } - fn send(&mut self) { + pub fn send(&mut self) { unsafe { let handle = self.native_handle(); uvll::async_send(handle); } } - fn close(self, cb: NullCallback) { + pub fn close(self, cb: NullCallback) { let mut this = self; let data = this.get_watcher_data(); assert!(data.close_cb.is_none()); diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index cacd67314eba..cf1bd568d028 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -12,6 +12,7 @@ use option::*; use result::*; use ops::Drop; use cell::{Cell, empty_cell}; +use cast; use cast::transmute; use clone::Clone; use rt::io::IoError; @@ -23,6 +24,8 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; +use unstable::sync::{UnsafeAtomicRcBox, AtomicInt}; +use unstable::intrinsics; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -82,6 +85,10 @@ impl EventLoop for UvEventLoop { } } + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + } + fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { Some(&mut self.uvio) } @@ -101,6 +108,85 @@ fn test_callback_run_once() { } } +pub struct UvRemoteCallback { + // The uv async handle for triggering the callback + async: AsyncWatcher, + // An atomic flag to tell the callback to exit, + // set from the dtor. + exit_flag: UnsafeAtomicRcBox +} + +impl UvRemoteCallback { + pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback { + let exit_flag = UnsafeAtomicRcBox::new(AtomicInt::new(0)); + let exit_flag_clone = exit_flag.clone(); + let async = do AsyncWatcher::new(loop_) |watcher, status| { + assert!(status.is_none()); + f(); + let exit_flag_ptr = exit_flag_clone.get(); + unsafe { + if (*exit_flag_ptr).load() == 1 { + watcher.close(||()); + } + } + }; + UvRemoteCallback { + async: async, + exit_flag: exit_flag + } + } +} + +impl RemoteCallback for UvRemoteCallback { + fn fire(&mut self) { self.async.send() } +} + +impl Drop for UvRemoteCallback { + fn finalize(&self) { + unsafe { + let mut this: &mut UvRemoteCallback = cast::transmute_mut(self); + let exit_flag_ptr = this.exit_flag.get(); + (*exit_flag_ptr).store(1); + this.async.send(); + } + } +} + +#[cfg(test)] +mod test_remote { + use super::*; + use cell; + use cell::Cell; + use rt::test::*; + use rt::thread::Thread; + use rt::tube::Tube; + use rt::rtio::EventLoop; + use rt::local::Local; + use rt::sched::Scheduler; + + #[test] + fn test_uv_remote() { + do run_in_newsched_task { + let mut tube = Tube::new(); + let tube_clone = tube.clone(); + let remote_cell = cell::empty_cell(); + do Local::borrow::() |sched| { + let tube_clone = tube_clone.clone(); + let tube_clone_cell = Cell(tube_clone); + let remote = do sched.event_loop.remote_callback { + tube_clone_cell.take().send(1); + }; + remote_cell.put_back(remote); + } + let _thread = do Thread::start { + remote_cell.take().fire(); + }; + + assert!(tube.recv() == 1); + } + } +} + pub struct UvIoFactory(Loop); pub impl UvIoFactory {