Auto merge of #3609 - tiif:feat/socketpair, r=RalfJung

Add socketpair shim

Fixes #3442
Design proposal: https://hackmd.io/`@tiif/Skhc1t0-C`
This commit is contained in:
bors 2024-06-10 15:56:20 +00:00
commit 0d66db93b7
6 changed files with 366 additions and 15 deletions

View file

@ -1,15 +1,38 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io;
use std::io::{Error, ErrorKind, Read};
use std::rc::{Rc, Weak};
use crate::shims::unix::*;
use crate::*;
use crate::{concurrency::VClock, *};
use self::fd::FileDescriptor;
/// The maximum capacity of the socketpair buffer in bytes.
/// This number is arbitrary as the value can always
/// be configured in the real system.
const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
/// Pair of connected sockets.
///
/// We currently don't allow sending any data through this pair, so this can be just a dummy.
#[derive(Debug)]
struct SocketPair;
struct SocketPair {
// By making the write link weak, a `write` can detect when all readers are
// gone, and trigger EPIPE as appropriate.
writebuf: Weak<RefCell<Buffer>>,
readbuf: Rc<RefCell<Buffer>>,
is_nonblock: bool,
}
#[derive(Debug)]
struct Buffer {
buf: VecDeque<u8>,
clock: VClock,
/// Indicates if there is at least one active writer to this buffer.
/// If all writers of this buffer are dropped, buf_has_writer becomes false and we
/// indicate EOF instead of blocking.
buf_has_writer: bool,
}
impl FileDescription for SocketPair {
fn name(&self) -> &'static str {
@ -20,17 +43,102 @@ impl FileDescription for SocketPair {
self: Box<Self>,
_communicate_allowed: bool,
) -> InterpResult<'tcx, io::Result<()>> {
// This is used to signal socketfd of other side that there is no writer to its readbuf.
// If the upgrade fails, there is no need to update as all read ends have been dropped.
if let Some(writebuf) = self.writebuf.upgrade() {
writebuf.borrow_mut().buf_has_writer = false;
};
Ok(Ok(()))
}
fn read<'tcx>(
&mut self,
_communicate_allowed: bool,
bytes: &mut [u8],
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx, io::Result<usize>> {
let request_byte_size = bytes.len();
let mut readbuf = self.readbuf.borrow_mut();
// Always succeed on read size 0.
if request_byte_size == 0 {
return Ok(Ok(0));
}
if readbuf.buf.is_empty() {
if !readbuf.buf_has_writer {
// Socketpair with no writer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return Ok(Ok(0));
} else {
if self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return Ok(Err(Error::from(ErrorKind::WouldBlock)));
} else {
// Blocking socketpair with writer and empty buffer.
// FIXME: blocking is currently not supported
throw_unsup_format!("socketpair read: blocking isn't supported yet");
}
}
}
// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);
// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(bytes).unwrap();
return Ok(Ok(actual_read_size));
}
fn write<'tcx>(
&mut self,
_communicate_allowed: bool,
bytes: &[u8],
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx, io::Result<usize>> {
let write_size = bytes.len();
// Always succeed on write size 0.
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
if write_size == 0 {
return Ok(Ok(0));
}
let Some(writebuf) = self.writebuf.upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return Ok(Err(Error::from(ErrorKind::BrokenPipe)));
};
let mut writebuf = writebuf.borrow_mut();
let data_size = writebuf.buf.len();
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.checked_sub(data_size).unwrap();
if available_space == 0 {
if self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return Ok(Err(Error::from(ErrorKind::WouldBlock)));
} else {
// Blocking socketpair with a full buffer.
throw_unsup_format!("socketpair write: blocking isn't supported yet");
}
}
// Remember this clock so `read` can synchronize with us.
if let Some(clock) = &ecx.release_clock() {
writebuf.clock.join(clock);
}
// Do full write / partial write based on the space available.
let actual_write_size = write_size.min(available_space);
writebuf.buf.extend(&bytes[..actual_write_size]);
return Ok(Ok(actual_write_size));
}
}
impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// Currently this function this function is a stub. Eventually we need to
/// properly implement an FD type for sockets and have this function create
/// two sockets and associated FDs such that writing to one will produce
/// data that can be read from the other.
///
/// For more information on the arguments see the socketpair manpage:
/// <https://linux.die.net/man/2/socketpair>
fn socketpair(
@ -42,17 +150,80 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
) -> InterpResult<'tcx, Scalar> {
let this = self.eval_context_mut();
let _domain = this.read_scalar(domain)?.to_i32()?;
let _type_ = this.read_scalar(type_)?.to_i32()?;
let _protocol = this.read_scalar(protocol)?.to_i32()?;
let domain = this.read_scalar(domain)?.to_i32()?;
let mut type_ = this.read_scalar(type_)?.to_i32()?;
let protocol = this.read_scalar(protocol)?.to_i32()?;
let sv = this.deref_pointer(sv)?;
// FIXME: fail on unsupported inputs
let mut is_sock_nonblock = false;
// Parse and remove the type flags that we support. If type != 0 after removing,
// unsupported flags are used.
if type_ & this.eval_libc_i32("SOCK_STREAM") == this.eval_libc_i32("SOCK_STREAM") {
type_ &= !(this.eval_libc_i32("SOCK_STREAM"));
}
// SOCK_NONBLOCK only exists on Linux.
if this.tcx.sess.target.os == "linux" {
if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") {
is_sock_nonblock = true;
type_ &= !(this.eval_libc_i32("SOCK_NONBLOCK"));
}
if type_ & this.eval_libc_i32("SOCK_CLOEXEC") == this.eval_libc_i32("SOCK_CLOEXEC") {
type_ &= !(this.eval_libc_i32("SOCK_CLOEXEC"));
}
}
// Fail on unsupported input.
// AF_UNIX and AF_LOCAL are synonyms, so we accept both in case
// their values differ.
if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
throw_unsup_format!(
"socketpair: Unsupported domain {:#x} is used, only AF_UNIX \
and AF_LOCAL are allowed",
domain
);
} else if type_ != 0 {
throw_unsup_format!(
"socketpair: Unsupported type {:#x} is used, only SOCK_STREAM, \
SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
type_
);
} else if protocol != 0 {
throw_unsup_format!(
"socketpair: Unsupported socket protocol {protocol} is used, \
only 0 is allowed",
);
}
let buffer1 = Rc::new(RefCell::new(Buffer {
buf: VecDeque::new(),
clock: VClock::default(),
buf_has_writer: true,
}));
let buffer2 = Rc::new(RefCell::new(Buffer {
buf: VecDeque::new(),
clock: VClock::default(),
buf_has_writer: true,
}));
let socketpair_0 = SocketPair {
writebuf: Rc::downgrade(&buffer1),
readbuf: Rc::clone(&buffer2),
is_nonblock: is_sock_nonblock,
};
let socketpair_1 = SocketPair {
writebuf: Rc::downgrade(&buffer2),
readbuf: Rc::clone(&buffer1),
is_nonblock: is_sock_nonblock,
};
let fds = &mut this.machine.fds;
let sv0 = fds.insert_fd(FileDescriptor::new(SocketPair));
let sv0 = fds.insert_fd(FileDescriptor::new(socketpair_0));
let sv0 = Scalar::try_from_int(sv0, sv.layout.size).unwrap();
let sv1 = fds.insert_fd(FileDescriptor::new(SocketPair));
let sv1 = fds.insert_fd(FileDescriptor::new(socketpair_1));
let sv1 = Scalar::try_from_int(sv1, sv.layout.size).unwrap();
this.write_scalar(sv0, &sv)?;

View file

@ -0,0 +1,12 @@
//@ignore-target-windows: no libc socketpair on Windows
// This is temporarily here because blocking on fd is not supported yet.
// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair
fn main() {
let mut fds = [-1, -1];
let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
// The read below will be blocked because the buffer is empty.
let mut buf: [u8; 3] = [0; 3];
let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; //~ERROR: blocking isn't supported
}

View file

@ -0,0 +1,14 @@
error: unsupported operation: socketpair read: blocking isn't supported yet
--> $DIR/socketpair_read_blocking.rs:LL:CC
|
LL | let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair read: blocking isn't supported yet
|
= help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support
= note: BACKTRACE:
= note: inside `main` at $DIR/socketpair_read_blocking.rs:LL:CC
note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace
error: aborting due to 1 previous error

View file

@ -0,0 +1,16 @@
//@ignore-target-windows: no libc socketpair on Windows
// This is temporarily here because blocking on fd is not supported yet.
// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair
fn main() {
let mut fds = [-1, -1];
let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
// Write size > buffer capacity
// Used up all the space in the buffer.
let arr1: [u8; 212992] = [1; 212992];
let _ = unsafe { libc::write(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) };
let data = "abc".as_bytes().as_ptr();
// The write below will be blocked as the buffer is full.
let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; //~ERROR: blocking isn't supported
let mut buf: [u8; 3] = [0; 3];
let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
}

View file

@ -0,0 +1,14 @@
error: unsupported operation: socketpair write: blocking isn't supported yet
--> $DIR/socketpair_write_blocking.rs:LL:CC
|
LL | let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) };
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair write: blocking isn't supported yet
|
= help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support
= note: BACKTRACE:
= note: inside `main` at $DIR/socketpair_write_blocking.rs:LL:CC
note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace
error: aborting due to 1 previous error

View file

@ -0,0 +1,124 @@
//@ignore-target-windows: No libc socketpair on Windows
// test_race depends on a deterministic schedule.
//@compile-flags: -Zmiri-preemption-rate=0
use std::thread;
fn main() {
test_socketpair();
test_socketpair_threaded();
test_race();
}
fn test_socketpair() {
let mut fds = [-1, -1];
let mut res =
unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
// Read size == data available in buffer.
let data = "abcde".as_bytes().as_ptr();
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
assert_eq!(res, 5);
let mut buf: [u8; 5] = [0; 5];
res = unsafe {
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
};
assert_eq!(res, 5);
assert_eq!(buf, "abcde".as_bytes());
// Read size > data available in buffer.
let data = "abc".as_bytes().as_ptr();
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3).try_into().unwrap() };
assert_eq!(res, 3);
let mut buf2: [u8; 5] = [0; 5];
res = unsafe {
libc::read(fds[1], buf2.as_mut_ptr().cast(), buf2.len() as libc::size_t).try_into().unwrap()
};
assert_eq!(res, 3);
assert_eq!(&buf2[0..3], "abc".as_bytes());
// Test read and write from another direction.
// Read size == data available in buffer.
let data = "12345".as_bytes().as_ptr();
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
assert_eq!(res, 5);
let mut buf3: [u8; 5] = [0; 5];
res = unsafe {
libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t).try_into().unwrap()
};
assert_eq!(res, 5);
assert_eq!(buf3, "12345".as_bytes());
// Read size > data available in buffer.
let data = "123".as_bytes().as_ptr();
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3).try_into().unwrap() };
assert_eq!(res, 3);
let mut buf4: [u8; 5] = [0; 5];
res = unsafe {
libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t).try_into().unwrap()
};
assert_eq!(res, 3);
assert_eq!(&buf4[0..3], "123".as_bytes());
}
fn test_socketpair_threaded() {
let mut fds = [-1, -1];
let mut res =
unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
let data = "abcde".as_bytes().as_ptr();
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
assert_eq!(res, 5);
let thread1 = thread::spawn(move || {
let mut buf: [u8; 5] = [0; 5];
let res: i64 = unsafe {
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
.try_into()
.unwrap()
};
assert_eq!(res, 5);
assert_eq!(buf, "abcde".as_bytes());
});
thread1.join().unwrap();
// Read and write from different direction
let thread2 = thread::spawn(move || {
let data = "12345".as_bytes().as_ptr();
let res: i64 =
unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
assert_eq!(res, 5);
});
thread2.join().unwrap();
let mut buf: [u8; 5] = [0; 5];
res = unsafe {
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
};
assert_eq!(res, 5);
assert_eq!(buf, "12345".as_bytes());
}
fn test_race() {
static mut VAL: u8 = 0;
let mut fds = [-1, -1];
let mut res =
unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
let thread1 = thread::spawn(move || {
let mut buf: [u8; 1] = [0; 1];
// write() from the main thread will occur before the read() here
// because preemption is disabled and the main thread yields after write().
let res: i32 = unsafe {
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
.try_into()
.unwrap()
};
assert_eq!(res, 1);
assert_eq!(buf, "a".as_bytes());
unsafe { assert_eq!(VAL, 1) };
});
unsafe { VAL = 1 };
let data = "a".as_bytes().as_ptr();
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1).try_into().unwrap() };
assert_eq!(res, 1);
thread::yield_now();
thread1.join().unwrap();
}