Added a UdpWatcher and UdpSendRequest with associated callbacks

This commit is contained in:
Eric Reed 2013-06-14 12:04:11 -07:00
parent 03fe59aefa
commit a7f92c92ed
2 changed files with 184 additions and 5 deletions

View file

@ -54,7 +54,7 @@ use rt::io::IoError;
#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::FsRequest;
pub use self::net::{StreamWatcher, TcpWatcher};
pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
@ -128,6 +128,8 @@ pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, Ipv4, uint, Option<UvError>);
pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
@ -139,7 +141,9 @@ struct WatcherData {
alloc_cb: Option<AllocCallback>,
idle_cb: Option<IdleCallback>,
timer_cb: Option<TimerCallback>,
async_cb: Option<AsyncCallback>
async_cb: Option<AsyncCallback>,
udp_recv_cb: Option<UdpReceiveCallback>,
udp_send_cb: Option<UdpSendCallback>
}
pub trait WatcherInterop {
@ -169,7 +173,9 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
alloc_cb: None,
idle_cb: None,
timer_cb: None,
async_cb: None
async_cb: None,
udp_recv_cb: None,
udp_send_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);
@ -309,6 +315,9 @@ pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError>
/// The uv buffer type
pub type Buf = uvll::uv_buf_t;
/// The uv IPv4 type
pub type Ipv4 = uvll::sockaddr_in;
/// Borrow a slice to a Buf
pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
let data = vec::raw::to_ptr(v);

View file

@ -9,10 +9,10 @@
// except according to those terms.
use prelude::*;
use libc::{size_t, ssize_t, c_int, c_void};
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use rt::uv::uvll;
use rt::uv::uvll::*;
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback};
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
status_to_maybe_uv_error};
use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
@ -254,6 +254,142 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
}
}
pub struct UdpWatcher(*uvll::uv_udp_t);
impl Watcher for UdpWatcher { }
pub impl UdpWatcher {
fn new(loop_: &mut Loop) -> UdpWatcher {
unsafe {
let handle = malloc_handle(UV_UDP);
assert!(handle.is_not_null());
assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::udp_bind(self.native_handle(), addr, 0u32)
};
if result == 0 {
Ok(())
} else {
Err(last_uv_error(self))
}
}
}
_ => fail!() // TODO ipv6
}
}
fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
{
let data = self.get_watcher_data();
data.alloc_cb = Some(alloc);
data.udp_recv_cb = Some(cb);
}
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, recv_cb); }
extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let data = udp_watcher.get_watcher_data();
let alloc_cb = data.alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
/* TODO the socket address should actually be a pointer to either a sockaddr_in or sockaddr_in6.
In libuv, the udp_recv callback takes a struct *sockaddr */
extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
address: *uvll::sockaddr_in, flags: c_uint) {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let data = udp_watcher.get_watcher_data();
let cb = data.udp_recv_cb.get_ref();
let status = status_to_maybe_uv_error(handle, nread as c_int);
unsafe { (*cb)(udp_watcher, nread as int, buf, *address, flags as uint, status) };
}
}
fn recv_stop(&mut self) {
let handle = self.native_handle();
unsafe { uvll::udp_recv_stop(handle); }
}
fn send(&mut self, buf: Buf, address: IpAddr, cb: UdpSendCallback) {
{
let data = self.get_watcher_data();
assert!(data.udp_send_cb.is_none());
data.udp_send_cb = Some(cb);
}
let req = UdpSendRequest::new();
let bufs = [buf];
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
unsafe {
assert!(0 == uvll::udp_send(req.native_handle(),
self.native_handle(),
bufs, addr, send_cb));
}
}
}
_ => fail!() // TODO ipv6
}
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
let mut udp_watcher = send_request.handle();
send_request.delete();
let cb = {
let data = udp_watcher.get_watcher_data();
let cb = data.udp_send_cb.swap_unwrap();
cb
};
let status = status_to_maybe_uv_error(udp_watcher.native_handle(), status);
cb(udp_watcher, status);
}
}
fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_udp_t) {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
{
let data = udp_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
udp_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
}
}
impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
UdpWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_udp_t {
match self { &UdpWatcher(ptr) => ptr }
}
}
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
@ -327,6 +463,40 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
}
}
pub struct UdpSendRequest(*uvll::uv_udp_send_t);
impl Request for UdpSendRequest { }
pub impl UdpSendRequest {
fn new() -> UdpSendRequest {
let send_handle = unsafe {
malloc_req(UV_UDP_SEND)
};
assert!(send_handle.is_not_null());
let send_handle = send_handle as *uvll::uv_udp_send_t;
UdpSendRequest(send_handle)
}
fn handle(&self) -> UdpWatcher {
unsafe {
let udp_handle = uvll::get_udp_handle_from_send_req(self.native_handle());
NativeHandle::from_native_handle(udp_handle)
}
}
fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
UdpSendRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_udp_send_t {
match self { &UdpSendRequest(ptr) => ptr }
}
}
#[cfg(test)]
mod test {