Implement named pipes for windows, touch up unix

* Implementation of pipe_win32 filled out for libnative
* Reorganize pipes to be clone-able
* Fix a few file descriptor leaks on error
* Factor out some common code into shared functions
* Make use of the if_ok!() macro for less indentation

Closes #11201
This commit is contained in:
Alex Crichton 2014-02-07 10:37:58 -08:00
parent 94b2d9dc4d
commit a526aa139e
8 changed files with 780 additions and 288 deletions

View file

@ -14,5 +14,5 @@ pub mod addrinfo;
pub mod tcp;
pub mod udp;
pub mod ip;
#[cfg(unix)]
// FIXME(#12093) - this should not be called unix
pub mod unix;

View file

@ -134,7 +134,7 @@ mod tests {
use io::*;
use io::test::*;
fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) {
pub fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) {
let path1 = next_test_unix();
let path2 = path1.clone();
let (port, chan) = Chan::new();
@ -149,25 +149,32 @@ mod tests {
server(acceptor.accept().unwrap());
}
#[test]
fn bind_error() {
match UnixListener::bind(&("path/to/nowhere")) {
iotest!(fn bind_error() {
let path = "path/to/nowhere";
match UnixListener::bind(&path) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.kind, PermissionDenied),
Err(e) => {
assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
e.kind == InvalidInput);
}
}
}
})
#[test]
fn connect_error() {
match UnixStream::connect(&("path/to/nowhere")) {
iotest!(fn connect_error() {
let path = if cfg!(windows) {
r"\\.\pipe\this_should_not_exist_ever"
} else {
"path/to/nowhere"
};
match UnixStream::connect(&path) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.kind,
if cfg!(windows) {OtherIoError} else {FileNotFound})
Err(e) => {
assert!(e.kind == FileNotFound || e.kind == OtherIoError);
}
}
}
})
#[test]
fn smoke() {
iotest!(fn smoke() {
smalltest(proc(mut server) {
let mut buf = [0];
server.read(buf).unwrap();
@ -175,10 +182,9 @@ mod tests {
}, proc(mut client) {
client.write([99]).unwrap();
})
}
})
#[test]
fn read_eof() {
iotest!(fn read_eof() {
smalltest(proc(mut server) {
let mut buf = [0];
assert!(server.read(buf).is_err());
@ -186,17 +192,18 @@ mod tests {
}, proc(_client) {
// drop the client
})
}
})
#[test]
fn write_begone() {
iotest!(fn write_begone() {
smalltest(proc(mut server) {
let buf = [0];
loop {
match server.write(buf) {
Ok(..) => {}
Err(e) => {
assert!(e.kind == BrokenPipe || e.kind == NotConnected,
assert!(e.kind == BrokenPipe ||
e.kind == NotConnected ||
e.kind == ConnectionReset,
"unknown error {:?}", e);
break;
}
@ -205,10 +212,9 @@ mod tests {
}, proc(_client) {
// drop the client
})
}
})
#[test]
fn accept_lots() {
iotest!(fn accept_lots() {
let times = 10;
let path1 = next_test_unix();
let path2 = path1.clone();
@ -218,38 +224,49 @@ mod tests {
port.recv();
for _ in range(0, times) {
let mut stream = UnixStream::connect(&path2);
stream.write([100]).unwrap();
match stream.write([100]) {
Ok(..) => {}
Err(e) => fail!("failed write: {}", e)
}
}
});
let mut acceptor = UnixListener::bind(&path1).listen();
let mut acceptor = match UnixListener::bind(&path1).listen() {
Ok(a) => a,
Err(e) => fail!("failed listen: {}", e),
};
chan.send(());
for _ in range(0, times) {
let mut client = acceptor.accept();
let mut buf = [0];
client.read(buf).unwrap();
match client.read(buf) {
Ok(..) => {}
Err(e) => fail!("failed read/accept: {}", e),
}
assert_eq!(buf[0], 100);
}
}
})
#[test]
fn path_exists() {
#[cfg(unix)]
iotest!(fn path_exists() {
let path = next_test_unix();
let _acceptor = UnixListener::bind(&path).listen();
assert!(path.exists());
}
})
#[test]
fn unix_clone_smoke() {
iotest!(fn unix_clone_smoke() {
let addr = next_test_unix();
let mut acceptor = UnixListener::bind(&addr).listen();
spawn(proc() {
let mut s = UnixStream::connect(&addr);
let mut buf = [0, 0];
debug!("client reading");
assert_eq!(s.read(buf), Ok(1));
assert_eq!(buf[0], 1);
debug!("client writing");
s.write([2]).unwrap();
debug!("client dropping");
});
let mut s1 = acceptor.accept().unwrap();
@ -260,17 +277,20 @@ mod tests {
spawn(proc() {
let mut s2 = s2;
p1.recv();
debug!("writer writing");
s2.write([1]).unwrap();
debug!("writer done");
c2.send(());
});
c1.send(());
let mut buf = [0, 0];
debug!("reader reading");
assert_eq!(s1.read(buf), Ok(1));
debug!("reader done");
p2.recv();
}
})
#[test]
fn unix_clone_two_read() {
iotest!(fn unix_clone_two_read() {
let addr = next_test_unix();
let mut acceptor = UnixListener::bind(&addr).listen();
let (p, c) = Chan::new();
@ -300,10 +320,9 @@ mod tests {
c.send(());
p.recv();
}
})
#[test]
fn unix_clone_two_write() {
iotest!(fn unix_clone_two_write() {
let addr = next_test_unix();
let mut acceptor = UnixListener::bind(&addr).listen();
@ -326,5 +345,5 @@ mod tests {
s1.write([2]).unwrap();
p.recv();
}
})
}

View file

@ -269,7 +269,6 @@ pub mod types {
pub mod bsd44 {
use libc::types::os::arch::c95::{c_char, c_int, c_uint};
pub static sun_len:uint = 108;
pub type socklen_t = u32;
pub type sa_family_t = u16;
pub type in_port_t = u16;
@ -641,7 +640,6 @@ pub mod types {
pub mod bsd44 {
use libc::types::os::arch::c95::{c_char, c_int, c_uint};
pub static sun_len:uint = 104;
pub type socklen_t = u32;
pub type sa_family_t = u8;
pub type in_port_t = u16;
@ -844,7 +842,6 @@ pub mod types {
pub mod bsd44 {
use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t};
pub static sun_len:uint = 108;
pub type SOCKET = c_uint;
pub type socklen_t = c_int;
pub type sa_family_t = u16;
@ -1213,7 +1210,6 @@ pub mod types {
pub mod bsd44 {
use libc::types::os::arch::c95::{c_char, c_int, c_uint};
pub static sun_len:uint = 104;
pub type socklen_t = c_int;
pub type sa_family_t = u8;
pub type in_port_t = u16;
@ -1627,11 +1623,19 @@ pub mod consts {
pub static O_NOINHERIT: c_int = 128;
pub static ERROR_SUCCESS : c_int = 0;
pub static ERROR_FILE_NOT_FOUND: c_int = 2;
pub static ERROR_ACCESS_DENIED: c_int = 5;
pub static ERROR_INVALID_HANDLE : c_int = 6;
pub static ERROR_BROKEN_PIPE: c_int = 109;
pub static ERROR_DISK_FULL : c_int = 112;
pub static ERROR_INSUFFICIENT_BUFFER : c_int = 122;
pub static ERROR_INVALID_NAME : c_int = 123;
pub static ERROR_ALREADY_EXISTS : c_int = 183;
pub static ERROR_PIPE_BUSY: c_int = 231;
pub static ERROR_NO_DATA: c_int = 232;
pub static ERROR_INVALID_ADDRESS : c_int = 487;
pub static ERROR_PIPE_CONNECTED: c_int = 535;
pub static ERROR_IO_PENDING: c_int = 997;
pub static ERROR_FILE_INVALID : c_int = 1006;
pub static INVALID_HANDLE_VALUE : c_int = -1;
@ -1770,6 +1774,7 @@ pub mod consts {
pub static FILE_FLAG_SESSION_AWARE: DWORD = 0x00800000;
pub static FILE_FLAG_SEQUENTIAL_SCAN: DWORD = 0x08000000;
pub static FILE_FLAG_WRITE_THROUGH: DWORD = 0x80000000;
pub static FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000;
pub static FILE_NAME_NORMALIZED: DWORD = 0x0;
pub static FILE_NAME_OPENED: DWORD = 0x8;
@ -1783,6 +1788,8 @@ pub mod consts {
pub static GENERIC_WRITE: DWORD = 0x40000000;
pub static GENERIC_EXECUTE: DWORD = 0x20000000;
pub static GENERIC_ALL: DWORD = 0x10000000;
pub static FILE_WRITE_ATTRIBUTES: DWORD = 0x00000100;
pub static FILE_READ_ATTRIBUTES: DWORD = 0x00000080;
pub static FILE_BEGIN: DWORD = 0;
pub static FILE_CURRENT: DWORD = 1;
@ -1794,6 +1801,19 @@ pub mod consts {
pub static DETACHED_PROCESS: DWORD = 0x00000008;
pub static CREATE_NEW_PROCESS_GROUP: DWORD = 0x00000200;
pub static PIPE_ACCESS_DUPLEX: DWORD = 0x00000003;
pub static PIPE_ACCESS_INBOUND: DWORD = 0x00000001;
pub static PIPE_ACCESS_OUTBOUND: DWORD = 0x00000002;
pub static PIPE_TYPE_BYTE: DWORD = 0x00000000;
pub static PIPE_TYPE_MESSAGE: DWORD = 0x00000004;
pub static PIPE_READMODE_BYTE: DWORD = 0x00000000;
pub static PIPE_READMODE_MESSAGE: DWORD = 0x00000002;
pub static PIPE_WAIT: DWORD = 0x00000000;
pub static PIPE_NOWAIT: DWORD = 0x00000001;
pub static PIPE_ACCEPT_REMOTE_CLIENTS: DWORD = 0x00000000;
pub static PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008;
pub static PIPE_UNLIMITED_INSTANCES: DWORD = 255;
}
pub mod sysconf {
}
@ -2784,6 +2804,7 @@ pub mod consts {
pub static AF_INET: c_int = 2;
pub static AF_INET6: c_int = 28;
pub static AF_UNIX: c_int = 1;
pub static SOCK_STREAM: c_int = 1;
pub static SOCK_DGRAM: c_int = 2;
pub static IPPROTO_TCP: c_int = 6;
@ -4177,6 +4198,34 @@ pub mod funcs {
lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL;
pub fn GetCurrentProcessId() -> DWORD;
pub fn CreateNamedPipeW(
lpName: LPCWSTR,
dwOpenMode: DWORD,
dwPipeMode: DWORD,
nMaxInstances: DWORD,
nOutBufferSize: DWORD,
nInBufferSize: DWORD,
nDefaultTimeOut: DWORD,
lpSecurityAttributes: LPSECURITY_ATTRIBUTES
) -> HANDLE;
pub fn ConnectNamedPipe(hNamedPipe: HANDLE,
lpOverlapped: LPOVERLAPPED) -> BOOL;
pub fn WaitNamedPipeW(lpNamedPipeName: LPCWSTR,
nTimeOut: DWORD) -> BOOL;
pub fn SetNamedPipeHandleState(hNamedPipe: HANDLE,
lpMode: LPDWORD,
lpMaxCollectionCount: LPDWORD,
lpCollectDataTimeout: LPDWORD)
-> BOOL;
pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCWSTR) -> HANDLE;
pub fn GetOverlappedResult(hFile: HANDLE,
lpOverlapped: LPOVERLAPPED,
lpNumberOfBytesTransferred: LPDWORD,
bWait: BOOL) -> BOOL;
pub fn DisconnectNamedPipe(hNamedPipe: HANDLE) -> BOOL;
}
}

View file

@ -260,11 +260,6 @@ pub trait RtioPipe {
fn clone(&self) -> ~RtioPipe;
}
pub trait RtioDatagramPipe : RtioPipe {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, CString), IoError>;
fn sendto(&mut self, buf: &[u8], dst: &CString) -> Result<(), IoError>;
}
pub trait RtioUnixListener {
fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>;
}