diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 116d240308a3..c56b20453e5b 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -474,17 +474,43 @@ pub trait Seek { fn seek(&mut self, pos: i64, style: SeekStyle); } -/// A listener is a value that listens for connections -pub trait Listener { - /// Wait for and accept an incoming connection - /// - /// Returns `None` on timeout. +/// A listener is a value that can consume itself to start listening for connections. +/// Doing so produces some sort of Acceptor. +pub trait Listener> { + /// Spin up the listener and start queueing incoming connections /// /// # Failure /// /// Raises `io_error` condition. If the condition is handled, + /// then `listen` returns `None`. + fn listen(self) -> Option; +} + +/// An acceptor is a value that presents incoming connections +pub trait Acceptor { + /// Wait for and accept an incoming connection + /// + /// # Failure + /// Raise `io_error` condition. If the condition is handled, /// then `accept` returns `None`. - fn accept(&mut self) -> Option; + fn accept(&mut self) -> Option; + + /// Create an iterator over incoming connections + fn incoming<'r>(&'r mut self) -> IncomingIterator<'r, Self> { + IncomingIterator { inc: self } + } +} + +/// An infinite iterator over incoming connection attempts. +/// Calling `next` will block the task until a connection is attempted. +struct IncomingIterator<'self, A> { + priv inc: &'self mut A, +} + +impl<'self, T, A: Acceptor> Iterator for IncomingIterator<'self, A> { + fn next(&mut self) -> Option { + self.inc.accept() + } } /// Common trait for decorator types. diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index ce66cd0de59c..b7cb703eb254 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -11,12 +11,13 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; -use rt::io::{Reader, Writer, Listener}; +use rt::io::{Reader, Writer, Listener, Acceptor}; use rt::io::{io_error, read_error, EndOfFile}; use rt::rtio::{IoFactory, IoFactoryObject, - RtioSocket, RtioTcpListener, - RtioTcpListenerObject, RtioTcpStream, - RtioTcpStreamObject}; + RtioSocket, + RtioTcpListener, RtioTcpListenerObject, + RtioTcpAcceptor, RtioTcpAcceptorObject, + RtioTcpStream, RtioTcpStreamObject}; use rt::local::Local; pub struct TcpStream(~RtioTcpStreamObject); @@ -124,13 +125,27 @@ impl TcpListener { } } -impl Listener for TcpListener { +impl Listener for TcpListener { + fn listen(self) -> Option { + match (**self).listen() { + Ok(acceptor) => Some(TcpAcceptor(acceptor)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +pub struct TcpAcceptor(~RtioTcpAcceptorObject); + +impl Acceptor for TcpAcceptor { fn accept(&mut self) -> Option { match (**self).accept() { Ok(s) => Some(TcpStream::new(s)), Err(ioerr) => { io_error::cond.raise(ioerr); - return None; + None } } } @@ -184,8 +199,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); @@ -204,8 +219,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); @@ -224,8 +239,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -244,8 +259,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -265,8 +280,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -288,8 +303,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); @@ -311,8 +326,8 @@ mod test { let addr = next_test_ip4(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let buf = [0]; loop { let mut stop = false; @@ -341,8 +356,8 @@ mod test { let addr = next_test_ip6(); do spawntask { - let mut listener = TcpListener::bind(addr); - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + let mut stream = acceptor.accept(); let buf = [0]; loop { let mut stop = false; @@ -371,9 +386,8 @@ mod test { let max = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - do max.times { - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); @@ -396,9 +410,8 @@ mod test { let max = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - do max.times { - let mut stream = listener.accept(); + let mut acceptor = TcpListener::bind(addr).listen(); + for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); @@ -421,10 +434,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for i in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); @@ -460,10 +472,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for i in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); @@ -499,10 +510,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for _ in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for stream in acceptor.incoming().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); @@ -537,10 +547,9 @@ mod test { static MAX: int = 10; do spawntask { - let mut listener = TcpListener::bind(addr); - for _ in range(0, MAX) { - let stream = Cell::new(listener.accept()); - rtdebug!("accepted"); + let mut acceptor = TcpListener::bind(addr).listen(); + for stream in acceptor.incoming().take(MAX as uint) { + let stream = Cell::new(stream); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); @@ -573,10 +582,7 @@ mod test { fn socket_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { - let listener = TcpListener::bind(addr); - - assert!(listener.is_some()); - let mut listener = listener.unwrap(); + let mut listener = TcpListener::bind(addr).unwrap(); // Make sure socket_name gives // us the socket we binded to. @@ -592,9 +598,9 @@ mod test { fn peer_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { - let mut listener = TcpListener::bind(addr); + let mut acceptor = TcpListener::bind(addr).listen(); - listener.accept(); + acceptor.accept(); } do spawntask { diff --git a/src/libstd/rt/io/net/unix.rs b/src/libstd/rt/io/net/unix.rs index b85b7dd059d8..1771a963ba78 100644 --- a/src/libstd/rt/io/net/unix.rs +++ b/src/libstd/rt/io/net/unix.rs @@ -40,6 +40,12 @@ impl UnixListener { } } -impl Listener for UnixListener { +impl Listener for UnixListener { + fn listen(self) -> Option { fail!() } +} + +pub struct UnixAcceptor; + +impl Acceptor for UnixAcceptor { fn accept(&mut self) -> Option { fail!() } } diff --git a/src/libstd/rt/io/option.rs b/src/libstd/rt/io/option.rs index 7dadc653e6cc..098433f299c1 100644 --- a/src/libstd/rt/io/option.rs +++ b/src/libstd/rt/io/option.rs @@ -17,7 +17,7 @@ //! # XXX Seek and Close use option::*; -use super::{Reader, Writer, Listener}; +use super::{Reader, Writer, Listener, Acceptor}; use super::{standard_error, PreviousIoError, io_error, read_error, IoError}; fn prev_io_error() -> IoError { @@ -62,10 +62,22 @@ impl Reader for Option { } } -impl, S> Listener for Option { - fn accept(&mut self) -> Option { +impl, L: Listener> Listener for Option { + fn listen(self) -> Option { + match self { + Some(listener) => listener.listen(), + None => { + io_error::cond.raise(prev_io_error()); + None + } + } + } +} + +impl> Acceptor for Option { + fn accept(&mut self) -> Option { match *self { - Some(ref mut listener) => listener.accept(), + Some(ref mut acceptor) => acceptor.accept(), None => { io_error::cond.raise(prev_io_error()); None diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 1788b7a04e33..6f1b33d1e219 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -26,6 +26,7 @@ pub type EventLoopObject = uvio::UvEventLoop; pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; +pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor; pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; @@ -75,6 +76,10 @@ pub trait IoFactory { } pub trait RtioTcpListener : RtioSocket { + fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>; +} + +pub trait RtioTcpAcceptor : RtioSocket { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; fn accept_simultaneously(&mut self) -> Result<(), IoError>; fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index ad9d2a597947..e37dfba0cc19 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -599,9 +599,7 @@ impl IoFactory for UvIoFactory { } pub struct UvTcpListener { - watcher: TcpWatcher, - listening: bool, - incoming_streams: Tube>, + watcher : TcpWatcher, home: SchedHandle, } @@ -611,15 +609,8 @@ impl HomingIO for UvTcpListener { impl UvTcpListener { fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener { - UvTcpListener { - watcher: watcher, - listening: false, - incoming_streams: Tube::new(), - home: home, - } + UvTcpListener { watcher: watcher, home: home } } - - fn watcher(&self) -> TcpWatcher { self.watcher } } impl Drop for UvTcpListener { @@ -628,10 +619,10 @@ impl Drop for UvTcpListener { let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) }; do self_.home_for_io_with_sched |self_, scheduler| { do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.watcher().as_stream().close { + let task = Cell::new(task); + do self_.watcher.as_stream().close { let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task.take()); } } } @@ -641,50 +632,71 @@ impl Drop for UvTcpListener { impl RtioSocket for UvTcpListener { fn socket_name(&mut self) -> Result { do self.home_for_io |self_| { - socket_name(Tcp, self_.watcher) + socket_name(Tcp, self_.watcher) } } } impl RtioTcpListener for UvTcpListener { - - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { - do self.home_for_io |self_| { - - if !self_.listening { - self_.listening = true; - - let incoming_streams_cell = Cell::new(self_.incoming_streams.clone()); - - do self_.watcher().listen |mut server, status| { - let stream = match status { + fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> { + do self.home_for_io_consume |self_| { + let mut acceptor = ~UvTcpAcceptor::new(self_); + let incoming = Cell::new(acceptor.incoming.clone()); + do acceptor.listener.watcher.listen |mut server, status| { + do incoming.with_mut_ref |incoming| { + let inc = match status { Some(_) => Err(standard_error(OtherIoError)), None => { - let client = TcpWatcher::new(&server.event_loop()); - // XXX: needs to be surfaced in interface - server.accept(client.as_stream()); + let inc = TcpWatcher::new(&server.event_loop()); + // first accept call in the callback guarenteed to succeed + server.accept(inc.as_stream()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpStream { watcher: client, home: home }) + Ok(~UvTcpStream { watcher: inc, home: home }) } }; - - let mut incoming_streams = incoming_streams_cell.take(); - incoming_streams.send(stream); - incoming_streams_cell.put_back(incoming_streams); + incoming.send(inc); } - - } - self_.incoming_streams.recv() + }; + Ok(acceptor) } } +} + +pub struct UvTcpAcceptor { + listener: UvTcpListener, + incoming: Tube>, +} + +impl HomingIO for UvTcpAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} + +impl UvTcpAcceptor { + fn new(listener: UvTcpListener) -> UvTcpAcceptor { + UvTcpAcceptor { listener: listener, incoming: Tube::new() } + } +} + +impl RtioSocket for UvTcpAcceptor { + fn socket_name(&mut self) -> Result { + do self.home_for_io |self_| { + socket_name(Tcp, self_.listener.watcher) + } + } +} + +impl RtioTcpAcceptor for UvTcpAcceptor { + fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { + self.incoming.recv() + } fn accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) + uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(self_.listener.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -694,10 +706,10 @@ impl RtioTcpListener for UvTcpListener { fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) + uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(self_.listener.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1440,8 +1452,9 @@ fn test_simple_tcp_server_and_client() { do spawntask { unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let listener = (*io).tcp_bind(addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert_eq!(nread, 8); @@ -1498,11 +1511,10 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { }; let server_fn: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; - let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; - let mut stream = listener.accept().unwrap(); + let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; + let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert_eq!(nread, 8); @@ -1583,8 +1595,9 @@ fn test_read_and_block() { do spawntask { let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; - let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; - let mut stream = listener.accept().unwrap(); + let listener = unsafe { (*io).tcp_bind(addr).unwrap() }; + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; let expected = 32; @@ -1639,8 +1652,9 @@ fn test_read_read_read() { do spawntask { unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut listener = (*io).tcp_bind(addr).unwrap(); - let mut stream = listener.accept().unwrap(); + let listener = (*io).tcp_bind(addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); let buf = [1, .. 2048]; let mut total_bytes_written = 0; while total_bytes_written < MAX {