diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index 9bd8af6419e0..290293cf086c 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -22,38 +22,40 @@ use std::cell::UnsafeCell; use homing::HomingMissile; -pub struct Access { - inner: Arc>, +pub struct Access { + inner: Arc>>, } -pub struct Guard<'a> { - access: &'a mut Access, +pub struct Guard<'a, T> { + access: &'a mut Access, missile: Option, } -struct Inner { +struct Inner { queue: Vec<(BlockedTask, uint)>, held: bool, closed: bool, + data: T, } -impl Access { - pub fn new() -> Access { +impl Access { + pub fn new(data: T) -> Access { Access { inner: Arc::new(UnsafeCell::new(Inner { queue: vec![], held: false, closed: false, + data: data, })) } } pub fn grant<'a>(&'a mut self, token: uint, - missile: HomingMissile) -> Guard<'a> { + missile: HomingMissile) -> Guard<'a, T> { // This unsafety is actually OK because the homing missile argument // guarantees that we're on the same event loop as all the other objects // attempting to get access granted. - let inner: &mut Inner = unsafe { &mut *self.inner.get() }; + let inner = unsafe { &mut *self.inner.get() }; if inner.held { let t: Box = Local::take(); @@ -69,6 +71,15 @@ impl Access { Guard { access: self, missile: Some(missile) } } + pub fn unsafe_get(&self) -> *mut T { + unsafe { &mut (*self.inner.get()).data as *mut _ } + } + + // Safe version which requires proof that you are on the home scheduler. + pub fn get_mut<'a>(&'a mut self, _missile: &HomingMissile) -> &'a mut T { + unsafe { &mut *self.unsafe_get() } + } + pub fn close(&self, _missile: &HomingMissile) { // This unsafety is OK because with a homing missile we're guaranteed to // be the only task looking at the `closed` flag (and are therefore @@ -82,21 +93,27 @@ impl Access { // is only safe to invoke while on the home event loop, and there is no // guarantee that this i being invoked on the home event loop. pub unsafe fn dequeue(&mut self, token: uint) -> Option { - let inner: &mut Inner = &mut *self.inner.get(); + let inner = &mut *self.inner.get(); match inner.queue.iter().position(|&(_, t)| t == token) { Some(i) => Some(inner.queue.remove(i).unwrap().val0()), None => None, } } + + /// Test whether this access is closed, using a homing missile to prove + /// that it's safe + pub fn is_closed(&self, _missile: &HomingMissile) -> bool { + unsafe { (*self.inner.get()).closed } + } } -impl Clone for Access { - fn clone(&self) -> Access { +impl Clone for Access { + fn clone(&self) -> Access { Access { inner: self.inner.clone() } } } -impl<'a> Guard<'a> { +impl<'a, T: Send> Guard<'a, T> { pub fn is_closed(&self) -> bool { // See above for why this unsafety is ok, it just applies to the read // instead of the write. @@ -104,13 +121,27 @@ impl<'a> Guard<'a> { } } +impl<'a, T: Send> Deref for Guard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { + // A guard represents exclusive access to a piece of data, so it's safe + // to hand out shared and mutable references + unsafe { &(*self.access.inner.get()).data } + } +} + +impl<'a, T: Send> DerefMut for Guard<'a, T> { + fn deref_mut<'a>(&'a mut self) -> &'a mut T { + unsafe { &mut (*self.access.inner.get()).data } + } +} + #[unsafe_destructor] -impl<'a> Drop for Guard<'a> { +impl<'a, T> Drop for Guard<'a, T> { fn drop(&mut self) { // This guard's homing missile is still armed, so we're guaranteed to be // on the same I/O event loop, so this unsafety should be ok. assert!(self.missile.is_some()); - let inner: &mut Inner = unsafe { + let inner: &mut Inner = unsafe { mem::transmute(self.access.inner.get()) }; @@ -133,7 +164,8 @@ impl<'a> Drop for Guard<'a> { } } -impl Drop for Inner { +#[unsafe_destructor] +impl Drop for Inner { fn drop(&mut self) { assert!(!self.held); assert_eq!(self.queue.len(), 0); diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 0da8d0d2108e..b13598402470 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -22,7 +22,7 @@ use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, wait_until_woken_after, wakeup}; -use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx}; +use timeout::{AccessTimeout, ConnectCtx, AcceptTimeout}; use uvio::UvIoFactory; use uvll; @@ -158,20 +158,20 @@ pub struct TcpWatcher { // stream object, so we use these access guards in order to arbitrate among // multiple concurrent reads and writes. Note that libuv *can* read and // write simultaneously, it just can't read and read simultaneously. - read_access: AccessTimeout, - write_access: AccessTimeout, + read_access: AccessTimeout<()>, + write_access: AccessTimeout<()>, } pub struct TcpListener { home: HomeHandle, - handle: *mut uvll::uv_pipe_t, - outgoing: Sender, IoError>>, - incoming: Receiver, IoError>>, + handle: *mut uvll::uv_tcp_t, } pub struct TcpAcceptor { - listener: Box, - timeout: AcceptTimeout, + home: HomeHandle, + handle: *mut uvll::uv_tcp_t, + access: AcceptTimeout>, + refcount: Refcount, } // TCP watchers (clients/streams) @@ -192,8 +192,8 @@ impl TcpWatcher { handle: handle, stream: StreamWatcher::new(handle, true), refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), } } @@ -291,7 +291,7 @@ impl rtio::RtioTcpStream for TcpWatcher { let task = { let m = self.fire_homing_missile(); self.read_access.access.close(&m); - self.stream.cancel_read(uvll::EOF as libc::ssize_t) + self.stream.cancel_read(uvll::EOF as libc::ssize_t) }; let _ = task.map(|t| t.reawaken()); Ok(()) @@ -354,12 +354,9 @@ impl TcpListener { assert_eq!(unsafe { uvll::uv_tcp_init(io.uv_loop(), handle) }, 0); - let (tx, rx) = channel(); let l = box TcpListener { home: io.make_handle(), handle: handle, - outgoing: tx, - incoming: rx, }; let mut storage = unsafe { mem::zeroed() }; let _len = addr_to_sockaddr(address, &mut storage); @@ -392,15 +389,19 @@ impl rtio::RtioSocket for TcpListener { impl rtio::RtioTcpListener for TcpListener { fn listen(self: Box) -> Result, IoError> { - // create the acceptor object from ourselves - let mut acceptor = box TcpAcceptor { - listener: self, - timeout: AcceptTimeout::new(), - }; + let _m = self.fire_homing_missile(); + + // create the acceptor object from ourselves + let acceptor = (box TcpAcceptor { + handle: self.handle, + home: self.home.clone(), + access: AcceptTimeout::new(), + refcount: Refcount::new(), + }).install(); + self.handle = 0 as *mut _; - let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable - match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } { + match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } { 0 => Ok(acceptor as Box), n => Err(uv_error_to_io_error(UvError(n))), } @@ -409,7 +410,7 @@ impl rtio::RtioTcpListener for TcpListener { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) { assert!(status != uvll::ECANCELED); - let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) }; + let tcp: &mut TcpAcceptor = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -421,11 +422,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - tcp.outgoing.send(msg); + + // If we're running then we have exclusive access, so the unsafe_get() is ok + unsafe { tcp.access.push(msg); } } impl Drop for TcpListener { fn drop(&mut self) { + if self.handle.is_null() { return } + let _m = self.fire_homing_missile(); self.close(); } @@ -434,40 +439,68 @@ impl Drop for TcpListener { // TCP acceptors (bound servers) impl HomingIO for TcpAcceptor { - fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl rtio::RtioSocket for TcpAcceptor { fn socket_name(&mut self) -> Result { let _m = self.fire_homing_missile(); - socket_name(Tcp, self.listener.handle) + socket_name(Tcp, self.handle) } } +impl UvHandle for TcpAcceptor { + fn uv_handle(&self) -> *mut uvll::uv_tcp_t { self.handle } +} + impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> Result, IoError> { - self.timeout.accept(&self.listener.incoming) + let m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.accept(m, &loop_) } fn accept_simultaneously(&mut self) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { - uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1) + uvll::uv_tcp_simultaneous_accepts(self.handle, 1) }) } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { - uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0) + uvll::uv_tcp_simultaneous_accepts(self.handle, 0) }) } fn set_timeout(&mut self, ms: Option) { let _m = self.fire_homing_missile(); - match ms { - None => self.timeout.clear(), - Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), + let loop_ = self.uv_loop(); + self.access.set_timeout(ms, &loop_, &self.home); + } + + fn clone(&self) -> Box { + box TcpAcceptor { + refcount: self.refcount.clone(), + home: self.home.clone(), + handle: self.handle, + access: self.access.clone(), + } as Box + } + + fn close_accept(&mut self) -> Result<(), IoError> { + let m = self.fire_homing_missile(); + self.access.close(m); + Ok(()) + } +} + +impl Drop for TcpAcceptor { + fn drop(&mut self) { + let _m = self.fire_homing_missile(); + if self.refcount.decrement() { + self.close(); } } } @@ -482,8 +515,8 @@ pub struct UdpWatcher { // See above for what these fields are refcount: Refcount, - read_access: AccessTimeout, - write_access: AccessTimeout, + read_access: AccessTimeout<()>, + write_access: AccessTimeout<()>, blocked_sender: Option, } @@ -507,8 +540,8 @@ impl UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, home: io.make_handle(), refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), blocked_sender: None, }; assert_eq!(unsafe { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index f0a57546ed43..aa89e5e5f034 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -31,20 +31,20 @@ pub struct PipeWatcher { refcount: Refcount, // see comments in TcpWatcher for why these exist - write_access: AccessTimeout, - read_access: AccessTimeout, + write_access: AccessTimeout<()>, + read_access: AccessTimeout<()>, } pub struct PipeListener { home: HomeHandle, pipe: *mut uvll::uv_pipe_t, - outgoing: Sender>>, - incoming: Receiver>>, } pub struct PipeAcceptor { - listener: Box, - timeout: AcceptTimeout, + home: HomeHandle, + handle: *mut uvll::uv_pipe_t, + access: AcceptTimeout>, + refcount: Refcount, } // PipeWatcher implementation and traits @@ -71,8 +71,8 @@ impl PipeWatcher { home: home, defused: false, refcount: Refcount::new(), - read_access: AccessTimeout::new(), - write_access: AccessTimeout::new(), + read_access: AccessTimeout::new(()), + write_access: AccessTimeout::new(()), } } @@ -233,12 +233,9 @@ impl PipeListener { // If successful, unwrap the PipeWatcher because we control how // we close the pipe differently. We can't rely on // StreamWatcher's default close method. - let (tx, rx) = channel(); let p = box PipeListener { home: io.make_handle(), pipe: pipe.unwrap(), - incoming: rx, - outgoing: tx, }; Ok(p.install()) } @@ -250,15 +247,19 @@ impl PipeListener { impl rtio::RtioUnixListener for PipeListener { fn listen(self: Box) -> IoResult> { - // create the acceptor object from ourselves - let mut acceptor = box PipeAcceptor { - listener: self, - timeout: AcceptTimeout::new(), - }; + let _m = self.fire_homing_missile(); + + // create the acceptor object from ourselves + let acceptor = (box PipeAcceptor { + handle: self.pipe, + home: self.home.clone(), + access: AcceptTimeout::new(), + refcount: Refcount::new(), + }).install(); + self.pipe = 0 as *mut _; - let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable - match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } { + match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } { 0 => Ok(acceptor as Box), n => Err(uv_error_to_io_error(UvError(n))), } @@ -276,7 +277,7 @@ impl UvHandle for PipeListener { extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) { assert!(status != uvll::ECANCELED); - let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) }; + let pipe: &mut PipeAcceptor = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -288,11 +289,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) { } n => Err(uv_error_to_io_error(UvError(n))) }; - pipe.outgoing.send(msg); + + // If we're running then we have exclusive access, so the unsafe_get() is ok + unsafe { pipe.access.push(msg); } } impl Drop for PipeListener { fn drop(&mut self) { + if self.pipe.is_null() { return } + let _m = self.fire_homing_missile(); self.close(); } @@ -302,19 +307,48 @@ impl Drop for PipeListener { impl rtio::RtioUnixAcceptor for PipeAcceptor { fn accept(&mut self) -> IoResult> { - self.timeout.accept(&self.listener.incoming) + let m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.accept(m, &loop_) } - fn set_timeout(&mut self, timeout_ms: Option) { - match timeout_ms { - None => self.timeout.clear(), - Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), - } + fn set_timeout(&mut self, ms: Option) { + let _m = self.fire_homing_missile(); + let loop_ = self.uv_loop(); + self.access.set_timeout(ms, &loop_, &self.home); + } + + fn clone(&self) -> Box { + box PipeAcceptor { + refcount: self.refcount.clone(), + home: self.home.clone(), + handle: self.handle, + access: self.access.clone(), + } as Box + } + + fn close_accept(&mut self) -> IoResult<()> { + let m = self.fire_homing_missile(); + self.access.close(m); + Ok(()) } } impl HomingIO for PipeAcceptor { - fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } +} + +impl UvHandle for PipeAcceptor { + fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.handle } +} + +impl Drop for PipeAcceptor { + fn drop(&mut self) { + let _m = self.fire_homing_missile(); + if self.refcount.decrement() { + self.close(); + } + } } #[cfg(test)] diff --git a/src/librustuv/timeout.rs b/src/librustuv/timeout.rs index 1caaf5e0fc75..32d739524167 100644 --- a/src/librustuv/timeout.rs +++ b/src/librustuv/timeout.rs @@ -14,7 +14,7 @@ use std::rt::task::BlockedTask; use std::rt::rtio::IoResult; use access; -use homing::{HomeHandle, HomingMissile, HomingIO}; +use homing::{HomeHandle, HomingMissile}; use timer::TimerWatcher; use uvll; use uvio::UvIoFactory; @@ -22,15 +22,15 @@ use {Loop, UvError, uv_error_to_io_error, Request, wakeup}; use {UvHandle, wait_until_woken_after}; /// Management of a timeout when gaining access to a portion of a duplex stream. -pub struct AccessTimeout { +pub struct AccessTimeout { state: TimeoutState, timer: Option>, - pub access: access::Access, + pub access: access::Access, } -pub struct Guard<'a> { +pub struct Guard<'a, T> { state: &'a mut TimeoutState, - pub access: access::Guard<'a>, + pub access: access::Guard<'a, T>, pub can_timeout: bool, } @@ -49,17 +49,18 @@ enum ClientState { } struct TimerContext { - timeout: *mut AccessTimeout, - callback: fn(uint) -> Option, - payload: uint, + timeout: *mut AccessTimeout<()>, + callback: fn(*mut AccessTimeout<()>, &TimerContext), + user_unblock: fn(uint) -> Option, + user_payload: uint, } -impl AccessTimeout { - pub fn new() -> AccessTimeout { +impl AccessTimeout { + pub fn new(data: T) -> AccessTimeout { AccessTimeout { state: NoTimeout, timer: None, - access: access::Access::new(), + access: access::Access::new(data), } } @@ -68,7 +69,7 @@ impl AccessTimeout { /// On success, Ok(Guard) is returned and access has been granted to the /// stream. If a timeout occurs, then Err is returned with an appropriate /// error. - pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { + pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult> { // First, flag that we're attempting to acquire access. This will allow // us to cancel the pending grant if we timeout out while waiting for a // grant. @@ -94,6 +95,13 @@ impl AccessTimeout { }) } + pub fn timed_out(&self) -> bool { + match self.state { + TimedOut => true, + _ => false, + } + } + /// Sets the pending timeout to the value specified. /// /// The home/loop variables are used to construct a timer if one has not @@ -120,9 +128,10 @@ impl AccessTimeout { if self.timer.is_none() { let mut timer = box TimerWatcher::new_home(loop_, home.clone()); let mut cx = box TimerContext { - timeout: self as *mut _, - callback: cb, - payload: data, + timeout: self as *mut _ as *mut AccessTimeout<()>, + callback: real_cb::, + user_unblock: cb, + user_payload: data, }; unsafe { timer.set_data(&mut *cx); @@ -135,8 +144,8 @@ impl AccessTimeout { unsafe { let cx = uvll::get_data_for_uv_handle(timer.handle); let cx = cx as *mut TimerContext; - (*cx).callback = cb; - (*cx).payload = data; + (*cx).user_unblock = cb; + (*cx).user_payload = data; } timer.stop(); timer.start(timer_cb, ms, 0); @@ -146,7 +155,12 @@ impl AccessTimeout { let cx: &TimerContext = unsafe { &*(uvll::get_data_for_uv_handle(timer) as *const TimerContext) }; - let me = unsafe { &mut *cx.timeout }; + (cx.callback)(cx.timeout, cx); + } + + fn real_cb(timeout: *mut AccessTimeout<()>, cx: &TimerContext) { + let timeout = timeout as *mut AccessTimeout; + let me = unsafe { &mut *timeout }; match mem::replace(&mut me.state, TimedOut) { TimedOut | NoTimeout => unreachable!(), @@ -158,7 +172,7 @@ impl AccessTimeout { } } TimeoutPending(RequestPending) => { - match (cx.callback)(cx.payload) { + match (cx.user_unblock)(cx.user_payload) { Some(task) => task.reawaken(), None => unreachable!(), } @@ -168,8 +182,8 @@ impl AccessTimeout { } } -impl Clone for AccessTimeout { - fn clone(&self) -> AccessTimeout { +impl Clone for AccessTimeout { + fn clone(&self) -> AccessTimeout { AccessTimeout { access: self.access.clone(), state: NoTimeout, @@ -179,7 +193,7 @@ impl Clone for AccessTimeout { } #[unsafe_destructor] -impl<'a> Drop for Guard<'a> { +impl<'a, T> Drop for Guard<'a, T> { fn drop(&mut self) { match *self.state { TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) => @@ -193,7 +207,8 @@ impl<'a> Drop for Guard<'a> { } } -impl Drop for AccessTimeout { +#[unsafe_destructor] +impl Drop for AccessTimeout { fn drop(&mut self) { match self.timer { Some(ref timer) => unsafe { @@ -215,12 +230,6 @@ pub struct ConnectCtx { pub timer: Option>, } -pub struct AcceptTimeout { - timer: Option, - timeout_tx: Option>, - timeout_rx: Option>, -} - impl ConnectCtx { pub fn connect( mut self, obj: T, timeout: Option, io: &mut UvIoFactory, @@ -306,88 +315,97 @@ impl ConnectCtx { } } -impl AcceptTimeout { - pub fn new() -> AcceptTimeout { - AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } +pub struct AcceptTimeout { + access: AccessTimeout>, +} + +struct AcceptorState { + blocked_acceptor: Option, + pending: Vec>, +} + +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { + access: AccessTimeout::new(AcceptorState { + blocked_acceptor: None, + pending: Vec::new(), + }) + } } - pub fn accept(&mut self, c: &Receiver>) -> IoResult { - match self.timeout_rx { - None => c.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match c.try_recv() { - Ok(data) => return data, - Err(..) => {} - } - - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(c); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - c.recv() - } + pub fn accept(&mut self, + missile: HomingMissile, + loop_: &Loop) -> IoResult { + // If we've timed out but we're not closed yet, poll the state of the + // queue to see if we can peel off a connection. + if self.access.timed_out() && !self.access.access.is_closed(&missile) { + let tmp = self.access.access.get_mut(&missile); + return match tmp.pending.remove(0) { + Some(msg) => msg, + None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) } } - } - pub fn clear(&mut self) { - match self.timeout_rx { - Some(ref t) => { let _ = t.try_recv(); } + // Now that we're not polling, attempt to gain access and then peel off + // a connection. If we have no pending connections, then we need to go + // to sleep and wait for one. + // + // Note that if we're woken up for a pending connection then we're + // guaranteed that the check above will not steal our connection due to + // the single-threaded nature of the event loop. + let mut guard = try!(self.access.grant(missile)); + if guard.access.is_closed() { + return Err(uv_error_to_io_error(UvError(uvll::EOF))) + } + + match guard.access.pending.remove(0) { + Some(msg) => return msg, None => {} } - match self.timer { - Some(ref mut t) => t.stop(), - None => {} + + wait_until_woken_after(&mut guard.access.blocked_acceptor, loop_, || {}); + + match guard.access.pending.remove(0) { + _ if guard.access.is_closed() => { + Err(uv_error_to_io_error(UvError(uvll::EOF))) + } + Some(msg) => msg, + None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) } } - pub fn set_timeout + HomingIO>( - &mut self, ms: u64, t: &mut T - ) { - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(t.uv_handle()) - }); - let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + pub unsafe fn push(&mut self, t: IoResult) { + let state = self.access.access.unsafe_get(); + (*state).pending.push(t); + let _ = (*state).blocked_acceptor.take().map(|t| t.reawaken()); + } + + pub fn set_timeout(&mut self, + ms: Option, + loop_: &Loop, + home: &HomeHandle) { + self.access.set_timeout(ms, home, loop_, cancel_accept::, + self as *mut _ as uint); + + fn cancel_accept(me: uint) -> Option { unsafe { - timer.set_data(self as *mut _); + let me: &mut AcceptTimeout = mem::transmute(me); + (*me.access.access.unsafe_get()).blocked_acceptor.take() } - self.timer = Some(timer); } + } - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); - - extern fn timer_cb(timer: *mut uvll::uv_timer_t) { - let acceptor: &mut AcceptTimeout = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); - } + pub fn close(&mut self, m: HomingMissile) { + self.access.access.close(&m); + let task = self.access.access.get_mut(&m).blocked_acceptor.take(); + drop(m); + let _ = task.map(|t| t.reawaken()); + } +} + +impl Clone for AcceptTimeout { + fn clone(&self) -> AcceptTimeout { + AcceptTimeout { access: self.access.clone() } } }