From b2c6d6fd3ff303c2e32a3ac0175810581c65b751 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sun, 27 Apr 2014 15:45:16 -0700 Subject: [PATCH] rustuv: Implement timeouts for unix networking This commit implements the set{,_read,_write}_timeout() methods for the libuv-based networking I/O objects. The implementation details are commented thoroughly throughout the implementation. --- src/librustuv/access.rs | 24 ++- src/librustuv/lib.rs | 1 + src/librustuv/net.rs | 383 +++++++++++++++---------------------- src/librustuv/pipe.rs | 64 +++++-- src/librustuv/stream.rs | 118 +++++++++--- src/librustuv/timeout.rs | 394 +++++++++++++++++++++++++++++++++++++++ src/librustuv/timer.rs | 2 +- src/librustuv/tty.rs | 2 +- 8 files changed, 711 insertions(+), 277 deletions(-) create mode 100644 src/librustuv/timeout.rs diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index f96fa1e5be6e..433073b43c44 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -31,7 +31,7 @@ pub struct Guard<'a> { } struct Inner { - queue: Vec, + queue: Vec<(BlockedTask, uint)>, held: bool, closed: bool, } @@ -47,16 +47,17 @@ impl Access { } } - pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> { + pub fn grant<'a>(&'a mut self, token: uint, + missile: HomingMissile) -> Guard<'a> { // This unsafety is actually OK because the homing missile argument // guarantees that we're on the same event loop as all the other objects // attempting to get access granted. - let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) }; + let inner: &mut Inner = unsafe { &mut *self.inner.get() }; if inner.held { let t: Box = Local::take(); t.deschedule(1, |task| { - inner.queue.push(task); + inner.queue.push((task, token)); Ok(()) }); assert!(inner.held); @@ -75,6 +76,17 @@ impl Access { // necessary synchronization to be running on this thread. unsafe { (*self.inner.get()).closed = true; } } + + // Dequeue a blocked task with a specified token. This is unsafe because it + // is only safe to invoke while on the home event loop, and there is no + // guarantee that this i being invoked on the home event loop. + pub unsafe fn dequeue(&mut self, token: uint) -> Option { + let inner: &mut Inner = &mut *self.inner.get(); + match inner.queue.iter().position(|&(_, t)| t == token) { + Some(i) => Some(inner.queue.remove(i).unwrap().val0()), + None => None, + } + } } impl Clone for Access { @@ -111,9 +123,9 @@ impl<'a> Drop for Guard<'a> { // scheduled on this scheduler. Because we might be woken up on some // other scheduler, we drop our homing missile before we reawaken // the task. - Some(task) => { + Some((task, _)) => { drop(self.missile.take()); - let _ = task.wake().map(|t| t.reawaken()); + task.reawaken(); } None => { inner.held = false; } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 84d4b6b4702c..968029a6edc8 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -84,6 +84,7 @@ fn start(argc: int, argv: **u8) -> int { mod macros; mod access; +mod timeout; mod homing; mod queue; mod rc; diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 0ddf50921fdd..84220cd7a308 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -12,21 +12,20 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use libc; use std::cast; use std::io; -use std::io::{IoError, IoResult}; +use std::io::IoError; use std::io::net::ip; use std::mem; use std::ptr; use std::rt::rtio; use std::rt::task::BlockedTask; -use access::Access; use homing::{HomingIO, HomeHandle}; use rc::Refcount; use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, wait_until_woken_after, wakeup}; -use timer::TimerWatcher; +use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx}; use uvio::UvIoFactory; use uvll; @@ -146,190 +145,6 @@ fn socket_name(sk: SocketNameKind, n => Err(uv_error_to_io_error(UvError(n))) } } -//////////////////////////////////////////////////////////////////////////////// -// Helpers for handling timeouts, shared for pipes/tcp -//////////////////////////////////////////////////////////////////////////////// - -pub struct ConnectCtx { - pub status: c_int, - pub task: Option, - pub timer: Option>, -} - -pub struct AcceptTimeout { - timer: Option, - timeout_tx: Option>, - timeout_rx: Option>, -} - -impl ConnectCtx { - pub fn connect( - mut self, obj: T, timeout: Option, io: &mut UvIoFactory, - f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int - ) -> Result { - let mut req = Request::new(uvll::UV_CONNECT); - let r = f(&req, &obj, connect_cb); - return match r { - 0 => { - req.defuse(); // uv callback now owns this request - match timeout { - Some(t) => { - let mut timer = TimerWatcher::new(io); - timer.start(timer_cb, t, 0); - self.timer = Some(timer); - } - None => {} - } - wait_until_woken_after(&mut self.task, &io.loop_, || { - let data = &self as *_; - match self.timer { - Some(ref mut timer) => unsafe { timer.set_data(data) }, - None => {} - } - req.set_data(data); - }); - // Make sure an erroneously fired callback doesn't have access - // to the context any more. - req.set_data(0 as *int); - - // If we failed because of a timeout, drop the TcpWatcher as - // soon as possible because it's data is now set to null and we - // want to cancel the callback ASAP. - match self.status { - 0 => Ok(obj), - n => { drop(obj); Err(UvError(n)) } - } - } - n => Err(UvError(n)) - }; - - extern fn timer_cb(handle: *uvll::uv_timer_t) { - // Don't close the corresponding tcp request, just wake up the task - // and let RAII take care of the pending watcher. - let cx: &mut ConnectCtx = unsafe { - &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx) - }; - cx.status = uvll::ECANCELED; - wakeup(&mut cx.task); - } - - extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { - // This callback can be invoked with ECANCELED if the watcher is - // closed by the timeout callback. In that case we just want to free - // the request and be along our merry way. - let req = Request::wrap(req); - if status == uvll::ECANCELED { return } - - // Apparently on windows when the handle is closed this callback may - // not be invoked with ECANCELED but rather another error code. - // Either ways, if the data is null, then our timeout has expired - // and there's nothing we can do. - let data = unsafe { uvll::get_data_for_req(req.handle) }; - if data.is_null() { return } - - let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) }; - cx.status = status; - match cx.timer { - Some(ref mut t) => t.stop(), - None => {} - } - // Note that the timer callback doesn't cancel the connect request - // (that's the job of uv_close()), so it's possible for this - // callback to get triggered after the timeout callback fires, but - // before the task wakes up. In that case, we did indeed - // successfully connect, but we don't need to wake someone up. We - // updated the status above (correctly so), and the task will pick - // up on this when it wakes up. - if cx.task.is_some() { - wakeup(&mut cx.task); - } - } - } -} - -impl AcceptTimeout { - pub fn new() -> AcceptTimeout { - AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } - } - - pub fn accept(&mut self, c: &Receiver>) -> IoResult { - match self.timeout_rx { - None => c.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match c.try_recv() { - Ok(data) => return data, - Err(..) => {} - } - - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(c); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - c.recv() - } - } - } - } - - pub fn clear(&mut self) { - // Clear any previous timeout by dropping the timer and transmission - // channels - drop((self.timer.take(), - self.timeout_tx.take(), - self.timeout_rx.take())) - } - - pub fn set_timeout + HomingIO>( - &mut self, ms: u64, t: &mut T - ) { - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let _m = t.fire_homing_missile(); - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(t.uv_handle()) - }); - let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); - unsafe { - timer.set_data(self as *mut _ as *AcceptTimeout); - } - self.timer = Some(timer); - } - - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); - - extern fn timer_cb(timer: *uvll::uv_timer_t) { - let acceptor: &mut AcceptTimeout = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); - } - } -} //////////////////////////////////////////////////////////////////////////////// /// TCP implementation @@ -345,8 +160,8 @@ pub struct TcpWatcher { // stream object, so we use these access guards in order to arbitrate among // multiple concurrent reads and writes. Note that libuv *can* read and // write simultaneously, it just can't read and read simultaneously. - read_access: Access, - write_access: Access, + read_access: AccessTimeout, + write_access: AccessTimeout, } pub struct TcpListener { @@ -380,8 +195,8 @@ impl TcpWatcher { handle: handle, stream: StreamWatcher::new(handle), refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), + read_access: AccessTimeout::new(), + write_access: AccessTimeout::new(), } } @@ -412,10 +227,10 @@ impl rtio::RtioSocket for TcpWatcher { impl rtio::RtioTcpStream for TcpWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { let m = self.fire_homing_missile(); - let access = self.read_access.grant(m); + let guard = try!(self.read_access.grant(m)); // see comments in close_read about this check - if access.is_closed() { + if guard.access.is_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -424,8 +239,8 @@ impl rtio::RtioTcpStream for TcpWatcher { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let m = self.fire_homing_missile(); - let _g = self.write_access.grant(m); - self.stream.write(buf).map_err(uv_error_to_io_error) + let guard = try!(self.write_access.grant(m)); + self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error) } fn peer_name(&mut self) -> Result { @@ -468,16 +283,19 @@ impl rtio::RtioTcpStream for TcpWatcher { stream: StreamWatcher::new(self.handle), home: self.home.clone(), refcount: self.refcount.clone(), - write_access: self.write_access.clone(), read_access: self.read_access.clone(), + write_access: self.write_access.clone(), } as Box } fn close_read(&mut self) -> Result<(), IoError> { // see comments in PipeWatcher::close_read - let m = self.fire_homing_missile(); - self.read_access.close(&m); - self.stream.cancel_read(m); + let task = { + let m = self.fire_homing_missile(); + self.read_access.access.close(&m); + self.stream.cancel_read(uvll::EOF as libc::ssize_t) + }; + let _ = task.map(|t| t.reawaken()); Ok(()) } @@ -485,6 +303,35 @@ impl rtio::RtioTcpStream for TcpWatcher { let _m = self.fire_homing_missile(); shutdown(self.handle, &self.uv_loop()) } + + fn set_timeout(&mut self, timeout: Option) { + self.set_read_timeout(timeout); + self.set_write_timeout(timeout); + } + + fn set_read_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read, + &self.stream as *_ as uint); + + fn cancel_read(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_read(uvll::ECANCELED as ssize_t) + } + } + + fn set_write_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write, + &self.stream as *_ as uint); + + fn cancel_write(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_write() + } + } } impl UvHandle for TcpWatcher { @@ -618,6 +465,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { } fn set_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); match ms { None => self.timeout.clear(), Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), @@ -635,8 +483,22 @@ pub struct UdpWatcher { // See above for what these fields are refcount: Refcount, - read_access: Access, - write_access: Access, + read_access: AccessTimeout, + write_access: AccessTimeout, + + blocked_sender: Option, +} + +struct UdpRecvCtx { + task: Option, + buf: Option, + result: Option<(ssize_t, Option)>, +} + +struct UdpSendCtx { + result: c_int, + data: Option>, + udp: *mut UdpWatcher, } impl UdpWatcher { @@ -646,8 +508,9 @@ impl UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, home: io.make_handle(), refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), + read_access: AccessTimeout::new(), + write_access: AccessTimeout::new(), + blocked_sender: None, }; assert_eq!(unsafe { uvll::uv_udp_init(io.uv_loop(), udp.handle) @@ -683,20 +546,15 @@ impl rtio::RtioUdpSocket for UdpWatcher { fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, ip::SocketAddr), IoError> { - struct Ctx { - task: Option, - buf: Option, - result: Option<(ssize_t, Option)>, - } let loop_ = self.uv_loop(); let m = self.fire_homing_missile(); - let _g = self.read_access.grant(m); + let _guard = try!(self.read_access.grant(m)); return match unsafe { uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) } { 0 => { - let mut cx = Ctx { + let mut cx = UdpRecvCtx { task: None, buf: Some(slice_to_uv_buf(buf)), result: None, @@ -718,7 +576,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { _suggested_size: size_t, buf: *mut Buf) { unsafe { - let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx); + let cx = uvll::get_data_for_uv_handle(handle); + let cx = &mut *(cx as *mut UdpRecvCtx); *buf = cx.buf.take().expect("recv alloc_cb called more than once") } } @@ -727,7 +586,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { addr: *libc::sockaddr, _flags: c_uint) { assert!(nread != uvll::ECANCELED as ssize_t); let cx = unsafe { - &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx) + &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx) }; // When there's no data to read the recv callback can be a no-op. @@ -751,42 +610,68 @@ impl rtio::RtioUdpSocket for UdpWatcher { } fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> { - struct Ctx { task: Option, result: c_int } - let m = self.fire_homing_missile(); let loop_ = self.uv_loop(); - let _g = self.write_access.grant(m); + let guard = try!(self.write_access.grant(m)); let mut req = Request::new(uvll::UV_UDP_SEND); - let buf = slice_to_uv_buf(buf); let (addr, _len) = addr_to_sockaddr(dst); - let result = unsafe { - let addr_p = &addr as *libc::sockaddr_storage; - uvll::uv_udp_send(req.handle, self.handle, [buf], - addr_p as *libc::sockaddr, send_cb) + let addr_p = &addr as *_ as *libc::sockaddr; + + // see comments in StreamWatcher::write for why we may allocate a buffer + // here. + let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None}; + let uv_buf = if guard.can_timeout { + slice_to_uv_buf(data.get_ref().as_slice()) + } else { + slice_to_uv_buf(buf) }; - return match result { + return match unsafe { + uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb) + } { 0 => { req.defuse(); // uv callback now owns this request - let mut cx = Ctx { task: None, result: 0 }; - wait_until_woken_after(&mut cx.task, &loop_, || { + let mut cx = UdpSendCtx { + result: uvll::ECANCELED, data: data, udp: self as *mut _ + }; + wait_until_woken_after(&mut self.blocked_sender, &loop_, || { req.set_data(&cx); }); - match cx.result { - 0 => Ok(()), - n => Err(uv_error_to_io_error(UvError(n))) + + if cx.result != uvll::ECANCELED { + return match cx.result { + 0 => Ok(()), + n => Err(uv_error_to_io_error(UvError(n))) + } } + let new_cx = ~UdpSendCtx { + result: 0, + udp: 0 as *mut UdpWatcher, + data: cx.data.take(), + }; + unsafe { + req.set_data(&*new_cx); + cast::forget(new_cx); + } + Err(uv_error_to_io_error(UvError(cx.result))) } n => Err(uv_error_to_io_error(UvError(n))) }; + // This function is the same as stream::write_cb, but adapted for udp + // instead of streams. extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { let req = Request::wrap(req); - assert!(status != uvll::ECANCELED); - let cx: &mut Ctx = unsafe { req.get_data() }; + let cx: &mut UdpSendCtx = unsafe { req.get_data() }; cx.result = status; - wakeup(&mut cx.task); + + if cx.udp as uint != 0 { + let udp: &mut UdpWatcher = unsafe { &mut *cx.udp }; + wakeup(&mut udp.blocked_sender); + } else { + let _cx: ~UdpSendCtx = unsafe { cast::transmute(cx) }; + } } } @@ -866,8 +751,48 @@ impl rtio::RtioUdpSocket for UdpWatcher { refcount: self.refcount.clone(), write_access: self.write_access.clone(), read_access: self.read_access.clone(), + blocked_sender: None, } as Box } + + fn set_timeout(&mut self, timeout: Option) { + self.set_read_timeout(timeout); + self.set_write_timeout(timeout); + } + + fn set_read_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read, + self.handle as uint); + + fn cancel_read(stream: uint) -> Option { + // This method is quite similar to StreamWatcher::cancel_read, see + // there for more information + let handle = stream as *uvll::uv_udp_t; + assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0); + let data = unsafe { + let data = uvll::get_data_for_uv_handle(handle); + if data.is_null() { return None } + uvll::set_data_for_uv_handle(handle, 0 as *int); + &mut *(data as *mut UdpRecvCtx) + }; + data.result = Some((uvll::ECANCELED as ssize_t, None)); + data.task.take() + } + } + + fn set_write_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write, + self as *mut _ as uint); + + fn cancel_write(stream: uint) -> Option { + let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) }; + stream.blocked_sender.take() + } + } } impl Drop for UdpWatcher { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 7fec4051761d..76bf92bd5557 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -10,16 +10,18 @@ use libc; use std::c_str::CString; +use std::cast; use std::io::IoError; use std::io; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; +use std::rt::task::BlockedTask; -use access::Access; use homing::{HomingIO, HomeHandle}; use net; use rc::Refcount; use stream::StreamWatcher; use super::{Loop, UvError, UvHandle, uv_error_to_io_error}; +use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout}; use uvio::UvIoFactory; use uvll; @@ -30,8 +32,8 @@ pub struct PipeWatcher { refcount: Refcount, // see comments in TcpWatcher for why these exist - write_access: Access, - read_access: Access, + write_access: AccessTimeout, + read_access: AccessTimeout, } pub struct PipeListener { @@ -43,7 +45,7 @@ pub struct PipeListener { pub struct PipeAcceptor { listener: Box, - timeout: net::AcceptTimeout, + timeout: AcceptTimeout, } // PipeWatcher implementation and traits @@ -70,8 +72,8 @@ impl PipeWatcher { home: home, defused: false, refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), + read_access: AccessTimeout::new(), + write_access: AccessTimeout::new(), } } @@ -89,7 +91,7 @@ impl PipeWatcher { -> Result { let pipe = PipeWatcher::new(io, false); - let cx = net::ConnectCtx { status: -1, task: None, timer: None }; + let cx = ConnectCtx { status: -1, task: None, timer: None }; cx.connect(pipe, timeout, io, |req, pipe, cb| { unsafe { uvll::uv_pipe_connect(req.handle, pipe.handle(), @@ -112,10 +114,10 @@ impl PipeWatcher { impl RtioPipe for PipeWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { let m = self.fire_homing_missile(); - let access = self.read_access.grant(m); + let guard = try!(self.read_access.grant(m)); // see comments in close_read about this check - if access.is_closed() { + if guard.access.is_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -124,8 +126,8 @@ impl RtioPipe for PipeWatcher { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let m = self.fire_homing_missile(); - let _g = self.write_access.grant(m); - self.stream.write(buf).map_err(uv_error_to_io_error) + let guard = try!(self.write_access.grant(m)); + self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error) } fn clone(&self) -> Box { @@ -157,9 +159,12 @@ impl RtioPipe for PipeWatcher { // ordering is crucial because we could in theory be rescheduled during // the uv_read_stop which means that another read invocation could leak // in before we set the flag. - let m = self.fire_homing_missile(); - self.read_access.close(&m); - self.stream.cancel_read(m); + let task = { + let m = self.fire_homing_missile(); + self.read_access.access.close(&m); + self.stream.cancel_read(uvll::EOF as libc::ssize_t) + }; + let _ = task.map(|t| t.reawaken()); Ok(()) } @@ -167,6 +172,35 @@ impl RtioPipe for PipeWatcher { let _m = self.fire_homing_missile(); net::shutdown(self.stream.handle, &self.uv_loop()) } + + fn set_timeout(&mut self, timeout: Option) { + self.set_read_timeout(timeout); + self.set_write_timeout(timeout); + } + + fn set_read_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read, + &self.stream as *_ as uint); + + fn cancel_read(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_read(uvll::ECANCELED as libc::ssize_t) + } + } + + fn set_write_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write, + &self.stream as *_ as uint); + + fn cancel_write(stream: uint) -> Option { + let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) }; + stream.cancel_write() + } + } } impl HomingIO for PipeWatcher { @@ -219,7 +253,7 @@ impl RtioUnixListener for PipeListener { // create the acceptor object from ourselves let mut acceptor = box PipeAcceptor { listener: self, - timeout: net::AcceptTimeout::new(), + timeout: AcceptTimeout::new(), }; let _m = acceptor.fire_homing_missile(); diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index a1b606709d87..0b0673725838 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -14,7 +14,6 @@ use std::ptr; use std::rt::task::BlockedTask; use Loop; -use homing::HomingMissile; use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, ForbidUnwind, wakeup}; use uvll; @@ -31,6 +30,8 @@ pub struct StreamWatcher { // structure, but currently we don't have mappings for all the structures // defined in libuv, so we're foced to malloc this. last_write_req: Option, + + blocked_writer: Option, } struct ReadContext { @@ -41,7 +42,8 @@ struct ReadContext { struct WriteContext { result: c_int, - task: Option, + stream: *mut StreamWatcher, + data: Option>, } impl StreamWatcher { @@ -62,6 +64,7 @@ impl StreamWatcher { StreamWatcher { handle: stream, last_write_req: None, + blocked_writer: None, } } @@ -74,7 +77,7 @@ impl StreamWatcher { buf: Some(slice_to_uv_buf(buf)), // if the read is canceled, we'll see eof, otherwise this will get // overwritten - result: uvll::EOF as ssize_t, + result: 0, task: None, }; // When reading a TTY stream on windows, libuv will invoke alloc_cb @@ -104,27 +107,22 @@ impl StreamWatcher { return ret; } - pub fn cancel_read(&mut self, m: HomingMissile) { + pub fn cancel_read(&mut self, reason: ssize_t) -> Option { // When we invoke uv_read_stop, it cancels the read and alloc // callbacks. We need to manually wake up a pending task (if one was - // present). Note that we wake up the task *outside* the homing missile - // to ensure that we don't switch schedulers when we're not supposed to. + // present). assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0); let data = unsafe { let data = uvll::get_data_for_uv_handle(self.handle); - if data.is_null() { return } + if data.is_null() { return None } uvll::set_data_for_uv_handle(self.handle, 0 as *int); &mut *(data as *mut ReadContext) }; - let task = data.task.take(); - drop(m); - match task { - Some(task) => { let _ = task.wake().map(|t| t.reawaken()); } - None => {} - } + data.result = reason; + data.task.take() } - pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> { + pub fn write(&mut self, buf: &[u8], may_timeout: bool) -> Result<(), UvError> { // The ownership of the write request is dubious if this function // unwinds. I believe that if the write_cb fails to re-schedule the task // then the write request will be leaked. @@ -137,30 +135,94 @@ impl StreamWatcher { }; req.set_data(ptr::null::<()>()); + // And here's where timeouts get a little interesting. Currently, libuv + // does not support canceling an in-flight write request. Consequently, + // when a write timeout expires, there's not much we can do other than + // detach the sleeping task from the write request itself. Semantically, + // this means that the write request will complete asynchronously, but + // the calling task will return error (because the write timed out). + // + // There is special wording in the documentation of set_write_timeout() + // indicating that this is a plausible failure scenario, and this + // function is why that wording exists. + // + // Implementation-wise, we must be careful when passing a buffer down to + // libuv. Most of this implementation avoids allocations becuase of the + // blocking guarantee (all stack local variables are valid for the + // entire read/write request). If our write request can be timed out, + // however, we must heap allocate the data and pass that to the libuv + // functions instead. The reason for this is that if we time out and + // return, there's no guarantee that `buf` is a valid buffer any more. + // + // To do this, the write context has an optionally owned vector of + // bytes. + let data = if may_timeout {Some(Vec::from_slice(buf))} else {None}; + let uv_buf = if may_timeout { + slice_to_uv_buf(data.get_ref().as_slice()) + } else { + slice_to_uv_buf(buf) + }; + // Send off the request, but be careful to not block until we're sure // that the write reqeust is queued. If the reqeust couldn't be queued, // then we should return immediately with an error. match unsafe { - uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)], - write_cb) + uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb) } { 0 => { - let mut wcx = WriteContext { result: 0, task: None, }; + let mut wcx = WriteContext { + result: uvll::ECANCELED, + stream: self as *mut _, + data: data, + }; req.defuse(); // uv callback now owns this request let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) }; - wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || { + wait_until_woken_after(&mut self.blocked_writer, + &Loop::wrap(loop_), || { req.set_data(&wcx); }); - self.last_write_req = Some(Request::wrap(req.handle)); - match wcx.result { - 0 => Ok(()), - n => Err(UvError(n)), + + if wcx.result != uvll::ECANCELED { + self.last_write_req = Some(Request::wrap(req.handle)); + return match wcx.result { + 0 => Ok(()), + n => Err(UvError(n)), + } } + + // This is the second case where canceling an in-flight write + // gets interesting. If we've been canceled (no one reset our + // result), then someone still needs to free the request, and + // someone still needs to free the allocate buffer. + // + // To take care of this, we swap out the stack-allocated write + // context for a heap-allocated context, transferring ownership + // of everything to the write_cb. Libuv guarantees that this + // callback will be invoked at some point, and the callback will + // be responsible for deallocating these resources. + // + // Note that we don't cache this write request back in the + // stream watcher because we no longer have ownership of it, and + // we never will. + let new_wcx = ~WriteContext { + result: 0, + stream: 0 as *mut StreamWatcher, + data: wcx.data.take(), + }; + unsafe { + req.set_data(&*new_wcx); + cast::forget(new_wcx); + } + Err(UvError(wcx.result)) } n => Err(UvError(n)), } } + + pub fn cancel_write(&mut self) -> Option { + self.blocked_writer.take() + } } // This allocation callback expects to be invoked once and only once. It will @@ -198,12 +260,18 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) { // away the error code as a result. extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { let mut req = Request::wrap(req); - assert!(status != uvll::ECANCELED); // Remember to not free the request because it is re-used between writes on // the same stream. let wcx: &mut WriteContext = unsafe { req.get_data() }; wcx.result = status; - req.defuse(); - wakeup(&mut wcx.task); + // If the stream is present, we haven't timed out, otherwise we acquire + // ownership of everything and then deallocate it all at once. + if wcx.stream as uint != 0 { + req.defuse(); + let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream }; + wakeup(&mut stream.blocked_writer); + } else { + let _wcx: ~WriteContext = unsafe { cast::transmute(wcx) }; + } } diff --git a/src/librustuv/timeout.rs b/src/librustuv/timeout.rs new file mode 100644 index 000000000000..47c9d9335fe5 --- /dev/null +++ b/src/librustuv/timeout.rs @@ -0,0 +1,394 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::c_int; +use std::cast; +use std::io::IoResult; +use std::mem; +use std::rt::task::BlockedTask; + +use access; +use homing::{HomeHandle, HomingMissile, HomingIO}; +use timer::TimerWatcher; +use uvll; +use uvio::UvIoFactory; +use {Loop, UvError, uv_error_to_io_error, Request, wakeup}; +use {UvHandle, wait_until_woken_after}; + +/// Managment of a timeout when gaining access to a portion of a duplex stream. +pub struct AccessTimeout { + state: TimeoutState, + timer: Option<~TimerWatcher>, + pub access: access::Access, +} + +pub struct Guard<'a> { + state: &'a mut TimeoutState, + pub access: access::Guard<'a>, + pub can_timeout: bool, +} + +#[deriving(Eq)] +enum TimeoutState { + NoTimeout, + TimeoutPending(ClientState), + TimedOut, +} + +#[deriving(Eq)] +enum ClientState { + NoWaiter, + AccessPending, + RequestPending, +} + +struct TimerContext { + timeout: *mut AccessTimeout, + callback: fn(uint) -> Option, + payload: uint, +} + +impl AccessTimeout { + pub fn new() -> AccessTimeout { + AccessTimeout { + state: NoTimeout, + timer: None, + access: access::Access::new(), + } + } + + /// Grants access to half of a duplex stream, timing out if necessary. + /// + /// On success, Ok(Guard) is returned and access has been granted to the + /// stream. If a timeout occurs, then Err is returned with an appropriate + /// error. + pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { + // First, flag that we're attempting to acquire access. This will allow + // us to cancel the pending grant if we timeout out while waiting for a + // grant. + match self.state { + NoTimeout => {}, + TimeoutPending(ref mut client) => *client = AccessPending, + TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } + let access = self.access.grant(self as *mut _ as uint, m); + + // After acquiring the grant, we need to flag ourselves as having a + // pending request so the timeout knows to cancel the request. + let can_timeout = match self.state { + NoTimeout => false, + TimeoutPending(ref mut client) => { *client = RequestPending; true } + TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + }; + + Ok(Guard { + access: access, + state: &mut self.state, + can_timeout: can_timeout + }) + } + + /// Sets the pending timeout to the value specified. + /// + /// The home/loop variables are used to construct a timer if one has not + /// been previously constructed. + /// + /// The callback will be invoked if the timeout elapses, and the data of + /// the time will be set to `data`. + pub fn set_timeout(&mut self, ms: Option, + home: &HomeHandle, + loop_: &Loop, + cb: fn(uint) -> Option, + data: uint) { + self.state = NoTimeout; + let ms = match ms { + Some(ms) => ms, + None => return match self.timer { + Some(ref mut t) => t.stop(), + None => {} + } + }; + + // If we have a timeout, lazily initialize the timer which will be used + // to fire when the timeout runs out. + if self.timer.is_none() { + let mut timer = ~TimerWatcher::new_home(loop_, home.clone()); + let cx = ~TimerContext { + timeout: self as *mut _, + callback: cb, + payload: data, + }; + unsafe { + timer.set_data(&*cx); + cast::forget(cx); + } + self.timer = Some(timer); + } + + let timer = self.timer.get_mut_ref(); + unsafe { + let cx = uvll::get_data_for_uv_handle(timer.handle); + let cx = cx as *mut TimerContext; + (*cx).callback = cb; + (*cx).payload = data; + } + timer.stop(); + timer.start(timer_cb, ms, 0); + self.state = TimeoutPending(NoWaiter); + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let cx: &TimerContext = unsafe { + &*(uvll::get_data_for_uv_handle(timer) as *TimerContext) + }; + let me = unsafe { &mut *cx.timeout }; + + match mem::replace(&mut me.state, TimedOut) { + TimedOut | NoTimeout => unreachable!(), + TimeoutPending(NoWaiter) => {} + TimeoutPending(AccessPending) => { + match unsafe { me.access.dequeue(me as *mut _ as uint) } { + Some(task) => task.reawaken(), + None => unreachable!(), + } + } + TimeoutPending(RequestPending) => { + match (cx.callback)(cx.payload) { + Some(task) => task.reawaken(), + None => unreachable!(), + } + } + } + } + } +} + +impl Clone for AccessTimeout { + fn clone(&self) -> AccessTimeout { + AccessTimeout { + access: self.access.clone(), + state: NoTimeout, + timer: None, + } + } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + match *self.state { + TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) => + unreachable!(), + + NoTimeout | TimedOut => {} + TimeoutPending(RequestPending) => { + *self.state = TimeoutPending(NoWaiter); + } + } + } +} + +impl Drop for AccessTimeout { + fn drop(&mut self) { + match self.timer { + Some(ref timer) => unsafe { + let data = uvll::get_data_for_uv_handle(timer.handle); + let _data: ~TimerContext = cast::transmute(data); + }, + None => {} + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Connect timeouts +//////////////////////////////////////////////////////////////////////////////// + +pub struct ConnectCtx { + pub status: c_int, + pub task: Option, + pub timer: Option<~TimerWatcher>, +} + +pub struct AcceptTimeout { + timer: Option, + timeout_tx: Option>, + timeout_rx: Option>, +} + +impl ConnectCtx { + pub fn connect( + mut self, obj: T, timeout: Option, io: &mut UvIoFactory, + f: |&Request, &T, uvll::uv_connect_cb| -> c_int + ) -> Result { + let mut req = Request::new(uvll::UV_CONNECT); + let r = f(&req, &obj, connect_cb); + return match r { + 0 => { + req.defuse(); // uv callback now owns this request + match timeout { + Some(t) => { + let mut timer = TimerWatcher::new(io); + timer.start(timer_cb, t, 0); + self.timer = Some(timer); + } + None => {} + } + wait_until_woken_after(&mut self.task, &io.loop_, || { + let data = &self as *_; + match self.timer { + Some(ref mut timer) => unsafe { timer.set_data(data) }, + None => {} + } + req.set_data(data); + }); + // Make sure an erroneously fired callback doesn't have access + // to the context any more. + req.set_data(0 as *int); + + // If we failed because of a timeout, drop the TcpWatcher as + // soon as possible because it's data is now set to null and we + // want to cancel the callback ASAP. + match self.status { + 0 => Ok(obj), + n => { drop(obj); Err(UvError(n)) } + } + } + n => Err(UvError(n)) + }; + + extern fn timer_cb(handle: *uvll::uv_timer_t) { + // Don't close the corresponding tcp request, just wake up the task + // and let RAII take care of the pending watcher. + let cx: &mut ConnectCtx = unsafe { + &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx) + }; + cx.status = uvll::ECANCELED; + wakeup(&mut cx.task); + } + + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + // This callback can be invoked with ECANCELED if the watcher is + // closed by the timeout callback. In that case we just want to free + // the request and be along our merry way. + let req = Request::wrap(req); + if status == uvll::ECANCELED { return } + + // Apparently on windows when the handle is closed this callback may + // not be invoked with ECANCELED but rather another error code. + // Either ways, if the data is null, then our timeout has expired + // and there's nothing we can do. + let data = unsafe { uvll::get_data_for_req(req.handle) }; + if data.is_null() { return } + + let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) }; + cx.status = status; + match cx.timer { + Some(ref mut t) => t.stop(), + None => {} + } + // Note that the timer callback doesn't cancel the connect request + // (that's the job of uv_close()), so it's possible for this + // callback to get triggered after the timeout callback fires, but + // before the task wakes up. In that case, we did indeed + // successfully connect, but we don't need to wake someone up. We + // updated the status above (correctly so), and the task will pick + // up on this when it wakes up. + if cx.task.is_some() { + wakeup(&mut cx.task); + } + } + } +} + +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } + } + + pub fn accept(&mut self, c: &Receiver>) -> IoResult { + match self.timeout_rx { + None => c.recv(), + Some(ref rx) => { + use std::comm::Select; + + // Poll the incoming channel first (don't rely on the order of + // select just yet). If someone's pending then we should return + // them immediately. + match c.try_recv() { + Ok(data) => return data, + Err(..) => {} + } + + // Use select to figure out which channel gets ready first. We + // do some custom handling of select to ensure that we never + // actually drain the timeout channel (we'll keep seeing the + // timeout message in the future). + let s = Select::new(); + let mut timeout = s.handle(rx); + let mut data = s.handle(c); + unsafe { + timeout.add(); + data.add(); + } + if s.wait() == timeout.id() { + Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } else { + c.recv() + } + } + } + } + + pub fn clear(&mut self) { + match self.timeout_rx { + Some(ref t) => { let _ = t.try_recv(); } + None => {} + } + match self.timer { + Some(ref mut t) => t.stop(), + None => {} + } + } + + pub fn set_timeout + HomingIO>( + &mut self, ms: u64, t: &mut T + ) { + // If we have a timeout, lazily initialize the timer which will be used + // to fire when the timeout runs out. + if self.timer.is_none() { + let loop_ = Loop::wrap(unsafe { + uvll::get_loop_for_uv_handle(t.uv_handle()) + }); + let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + unsafe { + timer.set_data(self as *mut _ as *AcceptTimeout); + } + self.timer = Some(timer); + } + + // Once we've got a timer, stop any previous timeout, reset it for the + // current one, and install some new channels to send/receive data on + let timer = self.timer.get_mut_ref(); + timer.stop(); + timer.start(timer_cb, ms, 0); + let (tx, rx) = channel(); + self.timeout_tx = Some(tx); + self.timeout_rx = Some(rx); + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let acceptor: &mut AcceptTimeout = unsafe { + &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) + }; + // This send can never fail because if this timer is active then the + // receiving channel is guaranteed to be alive + acceptor.timeout_tx.get_ref().send(()); + } + } +} diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 216eb6001305..525539f8b36f 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -18,7 +18,7 @@ use uvio::UvIoFactory; use uvll; pub struct TimerWatcher { - handle: *uvll::uv_timer_t, + pub handle: *uvll::uv_timer_t, home: HomeHandle, action: Option, blocker: Option, diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs index 4f3e12b6974d..f70c3b4c1bd7 100644 --- a/src/librustuv/tty.rs +++ b/src/librustuv/tty.rs @@ -87,7 +87,7 @@ impl RtioTTY for TtyWatcher { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let _m = self.fire_homing_missile(); - self.stream.write(buf).map_err(uv_error_to_io_error) + self.stream.write(buf, false).map_err(uv_error_to_io_error) } fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {