From 6328f7c199a1697aaee7e5fe2b397c457e6c311a Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 22 Apr 2014 18:38:59 -0700 Subject: [PATCH] std: Add timeouts to unix connect/accept This adds support for connecting to a unix socket with a timeout (a named pipe on windows), and accepting a connection with a timeout. The goal is to bring unix pipes/named sockets back in line with TCP support for timeouts. Similarly to the TCP sockets, all methods are marked #[experimental] due to uncertainty about the type of the timeout argument. This internally involved a good bit of refactoring to share as much code as possible between TCP servers and pipe servers, but the core implementation did not change drastically as part of this commit. cc #13523 --- src/liblibc/lib.rs | 2 +- src/libnative/io/c_win32.rs | 2 + src/libnative/io/mod.rs | 6 +- src/libnative/io/net.rs | 128 +----------- src/libnative/io/pipe_unix.rs | 59 +++--- src/libnative/io/pipe_win32.rs | 56 ++++- src/libnative/io/util.rs | 136 ++++++++++++ src/librustuv/net.rs | 364 ++++++++++++++++++--------------- src/librustuv/pipe.rs | 59 +++--- src/librustuv/uvio.rs | 5 +- src/libstd/io/net/unix.rs | 91 ++++++++- src/libstd/rt/rtio.rs | 4 +- 12 files changed, 547 insertions(+), 365 deletions(-) create mode 100644 src/libnative/io/util.rs diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index 98613f885cd4..bebf95a4a3ba 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown}; #[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED}; #[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES}; #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING}; -#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED}; +#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0}; #[cfg(windows)] pub use types::os::common::bsd44::{SOCKET}; #[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf}; #[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES}; diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index dbbb39b3b7b5..6c84424e97a0 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -59,4 +59,6 @@ extern "system" { optname: libc::c_int, optval: *mut libc::c_char, optlen: *mut libc::c_int) -> libc::c_int; + + pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; } diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 19cb5c5f1d4f..944766e8fd07 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -44,6 +44,7 @@ pub use self::process::Process; pub mod addrinfo; pub mod net; pub mod process; +mod util; #[cfg(unix)] #[path = "file_unix.rs"] @@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory { fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> { pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send) } - fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> { - pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send) + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> IoResult<~RtioPipe:Send> { + pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 93ec23e32ad4..cc41da846b2b 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -13,13 +13,12 @@ use std::cast; use std::io::net::ip; use std::io; use std::mem; -use std::os; -use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; use super::{IoResult, retry, keep_going}; use super::c; +use super::util; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings @@ -118,8 +117,8 @@ fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, } } -fn getsockopt(fd: sock_t, opt: libc::c_int, - val: libc::c_int) -> IoResult { +pub fn getsockopt(fd: sock_t, opt: libc::c_int, + val: libc::c_int) -> IoResult { unsafe { let mut slot: T = mem::init(); let mut len = mem::size_of::() as libc::socklen_t; @@ -145,21 +144,6 @@ fn last_error() -> io::IoError { super::last_error() } -fn ms_to_timeval(ms: u64) -> libc::timeval { - libc::timeval { - tv_sec: (ms / 1000) as libc::time_t, - tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, - } -} - -fn timeout(desc: &'static str) -> io::IoError { - io::IoError { - kind: io::TimedOut, - desc: desc, - detail: None, - } -} - #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } @@ -270,7 +254,7 @@ impl TcpStream { let addrp = &addr as *_ as *libc::sockaddr; match timeout { Some(timeout) => { - try!(TcpStream::connect_timeout(fd, addrp, len, timeout)); + try!(util::connect_timeout(fd, addrp, len, timeout)); Ok(ret) }, None => { @@ -282,84 +266,6 @@ impl TcpStream { } } - // See http://developerweb.net/viewtopic.php?id=3196 for where this is - // derived from. - fn connect_timeout(fd: sock_t, - addrp: *libc::sockaddr, - len: libc::socklen_t, - timeout_ms: u64) -> IoResult<()> { - #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; - #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; - #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; - #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; - - // Make sure the call to connect() doesn't block - try!(set_nonblocking(fd, true)); - - let ret = match unsafe { libc::connect(fd, addrp, len) } { - // If the connection is in progress, then we need to wait for it to - // finish (with a timeout). The current strategy for doing this is - // to use select() with a timeout. - -1 if os::errno() as int == INPROGRESS as int || - os::errno() as int == WOULDBLOCK as int => { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, fd); - match await(fd, &mut set, timeout_ms) { - 0 => Err(timeout("connection timed out")), - -1 => Err(last_error()), - _ => { - let err: libc::c_int = try!( - getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); - if err == 0 { - Ok(()) - } else { - Err(io::IoError::from_errno(err as uint, true)) - } - } - } - } - - -1 => Err(last_error()), - _ => Ok(()), - }; - - // be sure to turn blocking I/O back on - try!(set_nonblocking(fd, false)); - return ret; - - #[cfg(unix)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let set = nb as libc::c_int; - super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) - } - #[cfg(windows)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let mut set = nb as libc::c_ulong; - if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { - Err(last_error()) - } else { - Ok(()) - } - } - - #[cfg(unix)] - fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let start = ::io::timer::now(); - retry(|| unsafe { - // Recalculate the timeout each iteration (it is generally - // undefined what the value of the 'tv' is after select - // returns EINTR). - let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); - c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv) - }) - } - #[cfg(windows)] - fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let tv = ms_to_timeval(timeout); - unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } - } - } - pub fn fd(&self) -> sock_t { // This unsafety is fine because it's just a read-only arc unsafe { (*self.inner.get()).fd } @@ -533,7 +439,7 @@ impl TcpAcceptor { pub fn native_accept(&mut self) -> IoResult { if self.deadline != 0 { - try!(self.accept_deadline()); + try!(util::accept_deadline(self.fd(), self.deadline)); } unsafe { let mut storage: libc::sockaddr_storage = mem::init(); @@ -550,25 +456,6 @@ impl TcpAcceptor { } } } - - fn accept_deadline(&mut self) -> IoResult<()> { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, self.fd()); - - match retry(|| { - // If we're past the deadline, then pass a 0 timeout to select() so - // we can poll the status of the socket. - let now = ::io::timer::now(); - let ms = if self.deadline > now {0} else {self.deadline - now}; - let tv = ms_to_timeval(ms); - let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1}; - unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } - }) { - -1 => Err(last_error()), - 0 => Err(timeout("accept timed out")), - _ => return Ok(()), - } - } } impl rtio::RtioSocket for TcpAcceptor { @@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn set_timeout(&mut self, timeout: Option) { - self.deadline = match timeout { - None => 0, - Some(t) => ::io::timer::now() + t, - }; + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 5d13a6b5fc5c..190cae05d434 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,16 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc; use std::c_str::CString; use std::cast; +use std::intrinsics; use std::io; -use libc; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::intrinsics; use super::{IoResult, retry, keep_going}; +use super::util; use super::file::fd_t; fn unix_socket(ty: libc::c_int) -> IoResult { @@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint return Ok((storage, len)); } -fn sockaddr_to_unix(storage: &libc::sockaddr_storage, - len: uint) -> IoResult { - match storage.ss_family as libc::c_int { - libc::AF_UNIX => { - assert!(len as uint <= mem::size_of::()); - let storage: &libc::sockaddr_un = unsafe { - cast::transmute(storage) - }; - unsafe { - Ok(CString::new(storage.sun_path.as_ptr(), false).clone()) - } - } - _ => Err(io::standard_error(io::InvalidInput)) - } -} - struct Inner { fd: fd_t, } @@ -76,16 +61,24 @@ impl Drop for Inner { fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } } -fn connect(addr: &CString, ty: libc::c_int) -> IoResult { +fn connect(addr: &CString, ty: libc::c_int, + timeout: Option) -> IoResult { let (addr, len) = try!(addr_to_sockaddr_un(addr)); let inner = Inner { fd: try!(unix_socket(ty)) }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| unsafe { - libc::connect(inner.fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => Err(super::last_error()), - _ => Ok(inner) + let addrp = &addr as *_ as *libc::sockaddr; + let len = len as libc::socklen_t; + + match timeout { + None => { + match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } + } + Some(timeout_ms) => { + try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms)); + Ok(inner) + } } } @@ -110,8 +103,9 @@ pub struct UnixStream { } impl UnixStream { - pub fn connect(addr: &CString) -> IoResult { - connect(addr, libc::SOCK_STREAM).map(|inner| { + pub fn connect(addr: &CString, + timeout: Option) -> IoResult { + connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { UnixStream { inner: UnsafeArc::new(inner) } }) } @@ -176,7 +170,7 @@ impl UnixListener { pub fn native_listen(self, backlog: int) -> IoResult { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self }) + _ => Ok(UnixAcceptor { listener: self, deadline: 0 }) } } } @@ -189,12 +183,16 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, + deadline: u64, } impl UnixAcceptor { fn fd(&self) -> fd_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult { + if self.deadline != 0 { + try!(util::accept_deadline(self.fd(), self.deadline)); + } let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; let storagep = &mut storage as *mut libc::sockaddr_storage; let size = mem::size_of::(); @@ -214,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option) { + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } } impl Drop for UnixListener { diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index 84b3d887c049..a4f09ded0ac1 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -93,6 +93,8 @@ use std::sync::arc::UnsafeArc; use std::intrinsics; use super::IoResult; +use super::c; +use super::util; struct Event(libc::HANDLE); @@ -210,8 +212,9 @@ impl UnixStream { None } - pub fn connect(addr: &CString) -> IoResult { + pub fn connect(addr: &CString, timeout: Option) -> IoResult { as_utf16_p(addr.as_str().unwrap(), |p| { + let start = ::io::timer::now(); loop { match UnixStream::try_connect(p) { Some(handle) => { @@ -246,11 +249,26 @@ impl UnixStream { return Err(super::last_error()) } - // An example I found on microsoft's website used 20 seconds, - // libuv uses 30 seconds, hence we make the obvious choice of - // waiting for 25 seconds. - if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { - return Err(super::last_error()) + match timeout { + Some(timeout) => { + let now = ::io::timer::now(); + let timed_out = (now - start) >= timeout || unsafe { + let ms = (timeout - (now - start)) as libc::DWORD; + libc::WaitNamedPipeW(p, ms) == 0 + }; + if timed_out { + return Err(util::timeout("connect timed out")) + } + } + + // An example I found on microsoft's website used 20 + // seconds, libuv uses 30 seconds, hence we make the + // obvious choice of waiting for 25 seconds. + None => { + if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { + return Err(super::last_error()) + } + } } } }) @@ -372,6 +390,7 @@ impl UnixListener { Ok(UnixAcceptor { listener: self, event: try!(Event::new(true, false)), + deadline: 0, }) } } @@ -391,6 +410,7 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, event: Event, + deadline: u64, } impl UnixAcceptor { @@ -438,7 +458,28 @@ impl UnixAcceptor { overlapped.hEvent = self.event.handle(); if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { let mut err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + // If we've got a timeout, use WaitForSingleObject in tandem + // with CancelIo to figure out if we should indeed get the + // result. + if self.deadline != 0 { + let now = ::io::timer::now(); + let timeout = self.deadline < now || unsafe { + let ms = (self.deadline - now) as libc::DWORD; + let r = libc::WaitForSingleObject(overlapped.hEvent, + ms); + r != libc::WAIT_OBJECT_0 + }; + if timeout { + unsafe { let _ = c::CancelIo(handle); } + return Err(util::timeout("accept timed out")) + } + } + + // This will block until the overlapped I/O is completed. The + // timeout was previously handled, so this will either block in + // the normal case or succeed very quickly in the timeout case. let ret = unsafe { let mut transfer = 0; libc::GetOverlappedResult(handle, @@ -488,5 +529,8 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option) { + self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); + } } diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs new file mode 100644 index 000000000000..0aaac8f8ad81 --- /dev/null +++ b/src/libnative/io/util.rs @@ -0,0 +1,136 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc; +use std::io::IoResult; +use std::io; +use std::mem; +use std::ptr; + +use super::c; +use super::net; +use super::{retry, last_error}; + +pub fn timeout(desc: &'static str) -> io::IoError { + io::IoError { + kind: io::TimedOut, + desc: desc, + detail: None, + } +} + +pub fn ms_to_timeval(ms: u64) -> libc::timeval { + libc::timeval { + tv_sec: (ms / 1000) as libc::time_t, + tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, + } +} + +// See http://developerweb.net/viewtopic.php?id=3196 for where this is +// derived from. +pub fn connect_timeout(fd: net::sock_t, + addrp: *libc::sockaddr, + len: libc::socklen_t, + timeout_ms: u64) -> IoResult<()> { + use std::os; + #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; + #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; + #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; + #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; + + // Make sure the call to connect() doesn't block + try!(set_nonblocking(fd, true)); + + let ret = match unsafe { libc::connect(fd, addrp, len) } { + // If the connection is in progress, then we need to wait for it to + // finish (with a timeout). The current strategy for doing this is + // to use select() with a timeout. + -1 if os::errno() as int == INPROGRESS as int || + os::errno() as int == WOULDBLOCK as int => { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + match await(fd, &mut set, timeout_ms) { + 0 => Err(timeout("connection timed out")), + -1 => Err(last_error()), + _ => { + let err: libc::c_int = try!( + net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); + if err == 0 { + Ok(()) + } else { + Err(io::IoError::from_errno(err as uint, true)) + } + } + } + } + + -1 => Err(last_error()), + _ => Ok(()), + }; + + // be sure to turn blocking I/O back on + try!(set_nonblocking(fd, false)); + return ret; + + #[cfg(unix)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let set = nb as libc::c_int; + super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) + } + + #[cfg(windows)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let mut set = nb as libc::c_ulong; + if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { + Err(last_error()) + } else { + Ok(()) + } + } + + #[cfg(unix)] + fn await(fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let start = ::io::timer::now(); + retry(|| unsafe { + // Recalculate the timeout each iteration (it is generally + // undefined what the value of the 'tv' is after select + // returns EINTR). + let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); + c::select(fd + 1, ptr::null(), set as *mut _ as *_, + ptr::null(), &tv) + }) + } + #[cfg(windows)] + fn await(_fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let tv = ms_to_timeval(timeout); + unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } + } +} + +pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + + match retry(|| { + // If we're past the deadline, then pass a 0 timeout to select() so + // we can poll the status of the socket. + let now = ::io::timer::now(); + let ms = if deadline < now {0} else {deadline - now}; + let tv = ms_to_timeval(ms); + let n = if cfg!(windows) {1} else {fd as libc::c_int + 1}; + unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } + }) { + -1 => Err(last_error()), + 0 => Err(timeout("accept timed out")), + _ => return Ok(()), + } +} diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 27a069119398..470a343b84ed 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -9,7 +9,7 @@ // except according to those terms. use std::cast; -use std::io::IoError; +use std::io::{IoError, IoResult}; use std::io::net::ip; use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use libc; @@ -145,6 +145,190 @@ fn socket_name(sk: SocketNameKind, n => Err(uv_error_to_io_error(UvError(n))) } } +//////////////////////////////////////////////////////////////////////////////// +// Helpers for handling timeouts, shared for pipes/tcp +//////////////////////////////////////////////////////////////////////////////// + +pub struct ConnectCtx { + pub status: c_int, + pub task: Option, + pub timer: Option<~TimerWatcher>, +} + +pub struct AcceptTimeout { + timer: Option, + timeout_tx: Option>, + timeout_rx: Option>, +} + +impl ConnectCtx { + pub fn connect( + mut self, obj: T, timeout: Option, io: &mut UvIoFactory, + f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int + ) -> Result { + let mut req = Request::new(uvll::UV_CONNECT); + let r = f(&req, &obj, connect_cb); + return match r { + 0 => { + req.defuse(); // uv callback now owns this request + match timeout { + Some(t) => { + let mut timer = TimerWatcher::new(io); + timer.start(timer_cb, t, 0); + self.timer = Some(timer); + } + None => {} + } + wait_until_woken_after(&mut self.task, &io.loop_, || { + let data = &self as *_; + match self.timer { + Some(ref mut timer) => unsafe { timer.set_data(data) }, + None => {} + } + req.set_data(data); + }); + // Make sure an erroneously fired callback doesn't have access + // to the context any more. + req.set_data(0 as *int); + + // If we failed because of a timeout, drop the TcpWatcher as + // soon as possible because it's data is now set to null and we + // want to cancel the callback ASAP. + match self.status { + 0 => Ok(obj), + n => { drop(obj); Err(UvError(n)) } + } + } + n => Err(UvError(n)) + }; + + extern fn timer_cb(handle: *uvll::uv_timer_t) { + // Don't close the corresponding tcp request, just wake up the task + // and let RAII take care of the pending watcher. + let cx: &mut ConnectCtx = unsafe { + &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx) + }; + cx.status = uvll::ECANCELED; + wakeup(&mut cx.task); + } + + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + // This callback can be invoked with ECANCELED if the watcher is + // closed by the timeout callback. In that case we just want to free + // the request and be along our merry way. + let req = Request::wrap(req); + if status == uvll::ECANCELED { return } + + // Apparently on windows when the handle is closed this callback may + // not be invoked with ECANCELED but rather another error code. + // Either ways, if the data is null, then our timeout has expired + // and there's nothing we can do. + let data = unsafe { uvll::get_data_for_req(req.handle) }; + if data.is_null() { return } + + let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) }; + cx.status = status; + match cx.timer { + Some(ref mut t) => t.stop(), + None => {} + } + // Note that the timer callback doesn't cancel the connect request + // (that's the job of uv_close()), so it's possible for this + // callback to get triggered after the timeout callback fires, but + // before the task wakes up. In that case, we did indeed + // successfully connect, but we don't need to wake someone up. We + // updated the status above (correctly so), and the task will pick + // up on this when it wakes up. + if cx.task.is_some() { + wakeup(&mut cx.task); + } + } + } +} + +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } + } + + pub fn accept(&mut self, c: &Receiver>) -> IoResult { + match self.timeout_rx { + None => c.recv(), + Some(ref rx) => { + use std::comm::Select; + + // Poll the incoming channel first (don't rely on the order of + // select just yet). If someone's pending then we should return + // them immediately. + match c.try_recv() { + Ok(data) => return data, + Err(..) => {} + } + + // Use select to figure out which channel gets ready first. We + // do some custom handling of select to ensure that we never + // actually drain the timeout channel (we'll keep seeing the + // timeout message in the future). + let s = Select::new(); + let mut timeout = s.handle(rx); + let mut data = s.handle(c); + unsafe { + timeout.add(); + data.add(); + } + if s.wait() == timeout.id() { + Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } else { + c.recv() + } + } + } + } + + pub fn clear(&mut self) { + // Clear any previous timeout by dropping the timer and transmission + // channels + drop((self.timer.take(), + self.timeout_tx.take(), + self.timeout_rx.take())) + } + + pub fn set_timeout + HomingIO>( + &mut self, ms: u64, t: &mut T + ) { + // If we have a timeout, lazily initialize the timer which will be used + // to fire when the timeout runs out. + if self.timer.is_none() { + let _m = t.fire_homing_missile(); + let loop_ = Loop::wrap(unsafe { + uvll::get_loop_for_uv_handle(t.uv_handle()) + }); + let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + unsafe { + timer.set_data(self as *mut _ as *AcceptTimeout); + } + self.timer = Some(timer); + } + + // Once we've got a timer, stop any previous timeout, reset it for the + // current one, and install some new channels to send/receive data on + let timer = self.timer.get_mut_ref(); + timer.stop(); + timer.start(timer_cb, ms, 0); + let (tx, rx) = channel(); + self.timeout_tx = Some(tx); + self.timeout_rx = Some(rx); + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let acceptor: &mut AcceptTimeout = unsafe { + &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) + }; + // This send can never fail because if this timer is active then the + // receiving channel is guaranteed to be alive + acceptor.timeout_tx.get_ref().send(()); + } + } +} //////////////////////////////////////////////////////////////////////////////// /// TCP implementation @@ -174,9 +358,7 @@ pub struct TcpListener { pub struct TcpAcceptor { listener: ~TcpListener, - timer: Option, - timeout_tx: Option>, - timeout_rx: Option>, + timeout: AcceptTimeout, } // TCP watchers (clients/streams) @@ -205,97 +387,13 @@ impl TcpWatcher { pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr, timeout: Option) -> Result { - struct Ctx { - status: c_int, - task: Option, - timer: Option<~TimerWatcher>, - } - let tcp = TcpWatcher::new(io); + let cx = ConnectCtx { status: -1, task: None, timer: None }; let (addr, _len) = addr_to_sockaddr(address); - let mut req = Request::new(uvll::UV_CONNECT); - let result = unsafe { - let addr_p = &addr as *libc::sockaddr_storage; - uvll::uv_tcp_connect(req.handle, tcp.handle, - addr_p as *libc::sockaddr, - connect_cb) - }; - return match result { - 0 => { - req.defuse(); // uv callback now owns this request - let mut cx = Ctx { status: -1, task: None, timer: None }; - match timeout { - Some(t) => { - let mut timer = TimerWatcher::new(io); - timer.start(timer_cb, t, 0); - cx.timer = Some(timer); - } - None => {} - } - wait_until_woken_after(&mut cx.task, &io.loop_, || { - let data = &cx as *_; - match cx.timer { - Some(ref mut timer) => unsafe { timer.set_data(data) }, - None => {} - } - req.set_data(data); - }); - // Make sure an erroneously fired callback doesn't have access - // to the context any more. - req.set_data(0 as *int); - - // If we failed because of a timeout, drop the TcpWatcher as - // soon as possible because it's data is now set to null and we - // want to cancel the callback ASAP. - match cx.status { - 0 => Ok(tcp), - n => { drop(tcp); Err(UvError(n)) } - } - } - n => Err(UvError(n)) - }; - - extern fn timer_cb(handle: *uvll::uv_timer_t) { - // Don't close the corresponding tcp request, just wake up the task - // and let RAII take care of the pending watcher. - let cx: &mut Ctx = unsafe { - &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx) - }; - cx.status = uvll::ECANCELED; - wakeup(&mut cx.task); - } - - extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { - // This callback can be invoked with ECANCELED if the watcher is - // closed by the timeout callback. In that case we just want to free - // the request and be along our merry way. - let req = Request::wrap(req); - if status == uvll::ECANCELED { return } - - // Apparently on windows when the handle is closed this callback may - // not be invoked with ECANCELED but rather another error code. - // Either ways, if the data is null, then our timeout has expired - // and there's nothing we can do. - let data = unsafe { uvll::get_data_for_req(req.handle) }; - if data.is_null() { return } - - let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) }; - cx.status = status; - match cx.timer { - Some(ref mut t) => t.stop(), - None => {} - } - // Note that the timer callback doesn't cancel the connect request - // (that's the job of uv_close()), so it's possible for this - // callback to get triggered after the timeout callback fires, but - // before the task wakes up. In that case, we did indeed - // successfully connect, but we don't need to wake someone up. We - // updated the status above (correctly so), and the task will pick - // up on this when it wakes up. - if cx.task.is_some() { - wakeup(&mut cx.task); - } - } + let addr_p = &addr as *_ as *libc::sockaddr; + cx.connect(tcp, timeout, io, |req, tcp, cb| { + unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) } + }) } } @@ -463,9 +561,7 @@ impl rtio::RtioTcpListener for TcpListener { // create the acceptor object from ourselves let mut acceptor = ~TcpAcceptor { listener: self, - timer: None, - timeout_tx: None, - timeout_rx: None, + timeout: AcceptTimeout::new(), }; let _m = acceptor.fire_homing_missile(); @@ -516,37 +612,7 @@ impl rtio::RtioSocket for TcpAcceptor { impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> { - match self.timeout_rx { - None => self.listener.incoming.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match self.listener.incoming.try_recv() { - Ok(data) => return data, - Err(..) => {} - } - - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(&self.listener.incoming); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - self.listener.incoming.recv() - } - } - } + self.timeout.accept(&self.listener.incoming) } fn accept_simultaneously(&mut self) -> Result<(), IoError> { @@ -564,47 +630,9 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { } fn set_timeout(&mut self, ms: Option) { - // First, if the timeout is none, clear any previous timeout by dropping - // the timer and transmission channels - let ms = match ms { - None => { - return drop((self.timer.take(), - self.timeout_tx.take(), - self.timeout_rx.take())) - } - Some(ms) => ms, - }; - - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let _m = self.fire_homing_missile(); - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(self.listener.handle) - }); - let mut timer = TimerWatcher::new_home(&loop_, self.home().clone()); - unsafe { - timer.set_data(self as *mut _ as *TcpAcceptor); - } - self.timer = Some(timer); - } - - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); - - extern fn timer_cb(timer: *uvll::uv_timer_t) { - let acceptor: &mut TcpAcceptor = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); + match ms { + None => self.timeout.clear(), + Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), } } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 6ee684ff9bdc..7277be1616b7 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -12,14 +12,13 @@ use std::c_str::CString; use std::io::IoError; use libc; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; -use std::rt::task::BlockedTask; use access::Access; use homing::{HomingIO, HomeHandle}; +use net; use rc::Refcount; use stream::StreamWatcher; -use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, - wait_until_woken_after, wakeup}; +use super::{Loop, UvError, UvHandle, uv_error_to_io_error}; use uvio::UvIoFactory; use uvll; @@ -43,6 +42,7 @@ pub struct PipeListener { pub struct PipeAcceptor { listener: ~PipeListener, + timeout: net::AcceptTimeout, } // PipeWatcher implementation and traits @@ -84,36 +84,18 @@ impl PipeWatcher { } } - pub fn connect(io: &mut UvIoFactory, name: &CString) + pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option) -> Result { - struct Ctx { task: Option, result: libc::c_int, } - let mut cx = Ctx { task: None, result: 0 }; - let mut req = Request::new(uvll::UV_CONNECT); let pipe = PipeWatcher::new(io, false); - - wait_until_woken_after(&mut cx.task, &io.loop_, || { + let cx = net::ConnectCtx { status: -1, task: None, timer: None }; + cx.connect(pipe, timeout, io, |req, pipe, cb| { unsafe { - uvll::uv_pipe_connect(req.handle, - pipe.handle(), - name.with_ref(|p| p), - connect_cb) + uvll::uv_pipe_connect(req.handle, pipe.handle(), + name.with_ref(|p| p), cb) } - req.set_data(&cx); - req.defuse(); // uv callback now owns this request - }); - return match cx.result { - 0 => Ok(pipe), - n => Err(UvError(n)) - }; - - extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {; - let req = Request::wrap(req); - assert!(status != uvll::ECANCELED); - let cx: &mut Ctx = unsafe { req.get_data() }; - cx.result = status; - wakeup(&mut cx.task); - } + 0 + }) } pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle } @@ -199,7 +181,10 @@ impl PipeListener { impl RtioUnixListener for PipeListener { fn listen(~self) -> Result<~RtioUnixAcceptor:Send, IoError> { // create the acceptor object from ourselves - let mut acceptor = ~PipeAcceptor { listener: self }; + let mut acceptor = ~PipeAcceptor { + listener: self, + timeout: net::AcceptTimeout::new(), + }; let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable @@ -247,7 +232,14 @@ impl Drop for PipeListener { impl RtioUnixAcceptor for PipeAcceptor { fn accept(&mut self) -> Result<~RtioPipe:Send, IoError> { - self.listener.incoming.recv() + self.timeout.accept(&self.listener.incoming) + } + + fn set_timeout(&mut self, timeout_ms: Option) { + match timeout_ms { + None => self.timeout.clear(), + Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), + } } } @@ -265,7 +257,8 @@ mod tests { #[test] fn connect_err() { - match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) { + match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(), + None) { Ok(..) => fail!(), Err(..) => {} } @@ -312,7 +305,7 @@ mod tests { assert!(client.write([2]).is_ok()); }); rx.recv(); - let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap(); + let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); assert!(c.write([1]).is_ok()); let mut buf = [0]; assert!(c.read(buf).unwrap() == 1); @@ -332,7 +325,7 @@ mod tests { drop(p.accept().unwrap()); }); rx.recv(); - let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap(); + let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); fail!() } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 3127a01d70e4..81d7ac6601e2 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -291,8 +291,9 @@ impl IoFactory for UvIoFactory { } } - fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe:Send, IoError> { - match PipeWatcher::connect(self, path) { + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> Result<~rtio::RtioPipe:Send, IoError> { + match PipeWatcher::connect(self, path, timeout) { Ok(p) => Ok(~p as ~rtio::RtioPipe:Send), Err(e) => Err(uv_error_to_io_error(e)), } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index bf5681770202..b75b797e9744 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -61,7 +61,31 @@ impl UnixStream { /// ``` pub fn connect(path: &P) -> IoResult { LocalIo::maybe_raise(|io| { - io.unix_connect(&path.to_c_str()).map(UnixStream::new) + io.unix_connect(&path.to_c_str(), None).map(UnixStream::new) + }) + } + + /// Connect to a pipe named by `path`. This will attempt to open a + /// connection to the underlying socket. + /// + /// The returned stream will be closed when the object falls out of scope. + /// + /// # Example + /// + /// ```rust + /// # #![allow(unused_must_use)] + /// use std::io::net::unix::UnixStream; + /// + /// let server = Path::new("path/to/my/socket"); + /// let mut stream = UnixStream::connect(&server); + /// stream.write([1, 2, 3]); + /// ``` + #[experimental = "the timeout argument is likely to change types"] + pub fn connect_timeout(path: &P, + timeout_ms: u64) -> IoResult { + LocalIo::maybe_raise(|io| { + let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms)); + s.map(UnixStream::new) }) } } @@ -128,6 +152,25 @@ pub struct UnixAcceptor { obj: ~RtioUnixAcceptor:Send, } +impl UnixAcceptor { + /// Sets a timeout for this acceptor, after which accept() will no longer + /// block indefinitely. + /// + /// The argument specified is the amount of time, in milliseconds, into the + /// future after which all invocations of accept() will not block (and any + /// pending invocation will return). A value of `None` will clear any + /// existing timeout. + /// + /// When using this method, it is likely necessary to reset the timeout as + /// appropriate, the timeout specified is specific to this object, not + /// specific to the next request. + #[experimental = "the name and arguments to this function are likely \ + to change"] + pub fn set_timeout(&mut self, timeout_ms: Option) { + self.obj.set_timeout(timeout_ms) + } +} + impl Acceptor for UnixAcceptor { fn accept(&mut self) -> IoResult { self.obj.accept().map(UnixStream::new) @@ -135,6 +178,7 @@ impl Acceptor for UnixAcceptor { } #[cfg(test)] +#[allow(experimental)] mod tests { use prelude::*; use super::*; @@ -371,4 +415,49 @@ mod tests { drop(l.listen().unwrap()); assert!(!path.exists()); } #[cfg(not(windows))]) + + iotest!(fn accept_timeout() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + + a.set_timeout(Some(10)); + + // Make sure we time out once and future invocations also time out + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + + // Also make sure that even though the timeout is expired that we will + // continue to receive any pending connections. + let l = UnixStream::connect(&addr).unwrap(); + for i in range(0, 1001) { + match a.accept() { + Ok(..) => break, + Err(ref e) if e.kind == TimedOut => {} + Err(e) => fail!("error: {}", e), + } + if i == 1000 { fail!("should have a pending connection") } + } + drop(l); + + // Unset the timeout and make sure that this always blocks. + a.set_timeout(None); + let addr2 = addr.clone(); + spawn(proc() { + drop(UnixStream::connect(&addr2)); + }); + a.accept().unwrap(); + }) + + iotest!(fn connect_timeout_error() { + let addr = next_test_unix(); + assert!(UnixStream::connect_timeout(&addr, 100).is_err()); + }) + + iotest!(fn connect_timeout_success() { + let addr = next_test_unix(); + let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + assert!(UnixStream::connect_timeout(&addr, 100).is_ok()); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 5dd148346695..f3c7fdaf7105 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -152,7 +152,8 @@ pub trait IoFactory { fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>; fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send>; - fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send>; + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> IoResult<~RtioPipe:Send>; fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]>; @@ -274,6 +275,7 @@ pub trait RtioUnixListener { pub trait RtioUnixAcceptor { fn accept(&mut self) -> IoResult<~RtioPipe:Send>; + fn set_timeout(&mut self, timeout: Option); } pub trait RtioTTY {