rust/src/librustuv/pipe.rs
Alex Crichton 56080c4767 Implement clone() for TCP/UDP/Unix sockets
This is part of the overall strategy I would like to take when approaching
issue #11165. The only two I/O objects that reasonably want to be "split" are
the network stream objects. Everything else can be "split" by just creating
another version.

The initial idea I had was the literally split the object into a reader and a
writer half, but that would just introduce lots of clutter with extra interfaces
that were a little unnnecssary, or it would return a ~Reader and a ~Writer which
means you couldn't access things like the remote peer name or local socket name.

The solution I found to be nicer was to just clone the stream itself. The clone
is just a clone of the handle, nothing fancy going on at the kernel level.
Conceptually I found this very easy to wrap my head around (everything else
supports clone()), and it solved the "split" problem at the same time.

The cloning support is pretty specific per platform/lib combination:

* native/win32 - uses some specific WSA apis to clone the SOCKET handle
* native/unix - uses dup() to get another file descriptor
* green/all - This is where things get interesting. When we support full clones
              of a handle, this implies that we're allowing simultaneous writes
              and reads to happen. It turns out that libuv doesn't support two
              simultaneous reads or writes of the same object. It does support
              *one* read and *one* write at the same time, however. Some extra
              infrastructure was added to just block concurrent writers/readers
              until the previous read/write operation was completed.

I've added tests to the tcp/unix modules to make sure that this functionality is
supported everywhere.
2014-02-05 11:43:49 -08:00

339 lines
10 KiB
Rust

// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::c_str::CString;
use std::io::IoError;
use std::libc;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
use rc::Refcount;
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
wait_until_woken_after, wakeup};
use uvio::UvIoFactory;
use uvll;
pub struct PipeWatcher {
stream: StreamWatcher,
home: HomeHandle,
priv defused: bool,
priv refcount: Refcount,
// see comments in TcpWatcher for why these exist
priv write_access: Access,
priv read_access: Access,
}
pub struct PipeListener {
home: HomeHandle,
pipe: *uvll::uv_pipe_t,
priv outgoing: Chan<Result<~RtioPipe, IoError>>,
priv incoming: Port<Result<~RtioPipe, IoError>>,
}
pub struct PipeAcceptor {
listener: ~PipeListener,
}
// PipeWatcher implementation and traits
impl PipeWatcher {
// Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
// get bound to some other source (this is normally a helper method paired
// with another call).
pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
let home = io.make_handle();
PipeWatcher::new_home(&io.loop_, home, ipc)
}
pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
let handle = unsafe {
let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
assert!(!handle.is_null());
let ipc = ipc as libc::c_int;
assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
handle
};
PipeWatcher {
stream: StreamWatcher::new(handle),
home: home,
defused: false,
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
}
}
pub fn open(io: &mut UvIoFactory, file: libc::c_int)
-> Result<PipeWatcher, UvError>
{
let pipe = PipeWatcher::new(io, false);
match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
0 => Ok(pipe),
n => Err(UvError(n))
}
}
pub fn connect(io: &mut UvIoFactory, name: &CString)
-> Result<PipeWatcher, UvError>
{
struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
let mut cx = Ctx { task: None, result: 0 };
let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(io, false);
wait_until_woken_after(&mut cx.task, || {
unsafe {
uvll::uv_pipe_connect(req.handle,
pipe.handle(),
name.with_ref(|p| p),
connect_cb)
}
req.set_data(&cx);
req.defuse(); // uv callback now owns this request
});
return match cx.result {
0 => Ok(pipe),
n => Err(UvError(n))
};
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.result = status;
wakeup(&mut cx.task);
}
}
pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
// Unwraps the underlying uv pipe. This cancels destruction of the pipe and
// allows the pipe to get moved elsewhere
fn unwrap(mut self) -> *uvll::uv_pipe_t {
self.defused = true;
return self.stream.handle;
}
}
impl RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
self.stream.read(buf).map_err(uv_error_to_io_error)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
}
fn clone(&self) -> ~RtioPipe {
~PipeWatcher {
stream: StreamWatcher::new(self.stream.handle),
defused: false,
home: self.home.clone(),
refcount: self.refcount.clone(),
read_access: self.read_access.clone(),
write_access: self.write_access.clone(),
} as ~RtioPipe
}
}
impl HomingIO for PipeWatcher {
fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
}
impl Drop for PipeWatcher {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
if !self.defused && self.refcount.decrement() {
self.close();
}
}
}
// PipeListener implementation and traits
impl PipeListener {
pub fn bind(io: &mut UvIoFactory, name: &CString)
-> Result<~PipeListener, UvError>
{
let pipe = PipeWatcher::new(io, false);
match unsafe {
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
} {
0 => {
// If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on
// StreamWatcher's default close method.
let (port, chan) = Chan::new();
let p = ~PipeListener {
home: io.make_handle(),
pipe: pipe.unwrap(),
incoming: port,
outgoing: chan,
};
Ok(p.install())
}
n => Err(UvError(n))
}
}
}
impl RtioUnixListener for PipeListener {
fn listen(~self) -> Result<~RtioUnixAcceptor, IoError> {
// create the acceptor object from ourselves
let mut acceptor = ~PipeAcceptor { listener: self };
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
0 => Ok(acceptor as ~RtioUnixAcceptor),
n => Err(uv_error_to_io_error(UvError(n))),
}
}
}
impl HomingIO for PipeListener {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_pipe_t> for PipeListener {
fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
}
extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
assert!(status != uvll::ECANCELED);
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status {
0 => {
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(server)
});
let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
Ok(~client as ~RtioPipe)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
pipe.outgoing.send(msg);
}
impl Drop for PipeListener {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
self.close();
}
}
// PipeAcceptor implementation and traits
impl RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> Result<~RtioPipe, IoError> {
self.listener.incoming.recv()
}
}
impl HomingIO for PipeAcceptor {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
}
#[cfg(test)]
mod tests {
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
use std::io::test::next_test_unix;
use super::{PipeWatcher, PipeListener};
use super::super::local_loop;
#[test]
fn connect_err() {
match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) {
Ok(..) => fail!(),
Err(..) => {}
}
}
#[test]
fn bind_err() {
match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.name(), ~"EACCES"),
}
}
#[test]
fn bind() {
let p = next_test_unix().to_c_str();
match PipeListener::bind(local_loop(), &p) {
Ok(..) => {}
Err(..) => fail!(),
}
}
#[test] #[should_fail]
fn bind_fail() {
let p = next_test_unix().to_c_str();
let _w = PipeListener::bind(local_loop(), &p).unwrap();
fail!();
}
#[test]
fn connect() {
let path = next_test_unix();
let path2 = path.clone();
let (port, chan) = Chan::new();
spawn(proc() {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
let mut p = p.listen().unwrap();
chan.send(());
let mut client = p.accept().unwrap();
let mut buf = [0];
assert!(client.read(buf).unwrap() == 1);
assert_eq!(buf[0], 1);
assert!(client.write([2]).is_ok());
});
port.recv();
let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
assert!(c.write([1]).is_ok());
let mut buf = [0];
assert!(c.read(buf).unwrap() == 1);
assert_eq!(buf[0], 2);
}
#[test] #[should_fail]
fn connect_fail() {
let path = next_test_unix();
let path2 = path.clone();
let (port, chan) = Chan::new();
spawn(proc() {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
let mut p = p.listen().unwrap();
chan.send(());
drop(p.accept().unwrap());
});
port.recv();
let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
fail!()
}
}