Merge pull request #4112 from RalfJung/socket-cleanup
Socket read/write cleanup
This commit is contained in:
commit
5ccf10edf1
4 changed files with 169 additions and 116 deletions
|
|
@ -62,11 +62,10 @@ impl FileDescription for Event {
|
|||
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
|
||||
}
|
||||
|
||||
// eventfd read at the size of u64.
|
||||
// Turn the pointer into a place at the right type.
|
||||
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
|
||||
|
||||
let weak_eventfd = self_ref.downgrade();
|
||||
eventfd_read(buf_place, dest, weak_eventfd, ecx)
|
||||
eventfd_read(buf_place, dest, self_ref, ecx)
|
||||
}
|
||||
|
||||
/// A write call adds the 8-byte integer value supplied in
|
||||
|
|
@ -97,18 +96,10 @@ impl FileDescription for Event {
|
|||
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
|
||||
}
|
||||
|
||||
// Read the user-supplied value from the pointer.
|
||||
// Turn the pointer into a place at the right type.
|
||||
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
|
||||
let num = ecx.read_scalar(&buf_place)?.to_u64()?;
|
||||
|
||||
// u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
|
||||
if num == u64::MAX {
|
||||
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
|
||||
}
|
||||
// If the addition does not let the counter to exceed the maximum value, update the counter.
|
||||
// Else, block.
|
||||
let weak_eventfd = self_ref.downgrade();
|
||||
eventfd_write(num, buf_place, dest, weak_eventfd, ecx)
|
||||
eventfd_write(buf_place, dest, self_ref, ecx)
|
||||
}
|
||||
|
||||
fn as_unix(&self) -> &dyn UnixFileDescription {
|
||||
|
|
@ -193,20 +184,22 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
|
|||
/// Block thread if the value addition will exceed u64::MAX -1,
|
||||
/// else just add the user-supplied value to current counter.
|
||||
fn eventfd_write<'tcx>(
|
||||
num: u64,
|
||||
buf_place: MPlaceTy<'tcx>,
|
||||
dest: &MPlaceTy<'tcx>,
|
||||
weak_eventfd: WeakFileDescriptionRef,
|
||||
eventfd_ref: &FileDescriptionRef,
|
||||
ecx: &mut MiriInterpCx<'tcx>,
|
||||
) -> InterpResult<'tcx> {
|
||||
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
|
||||
throw_unsup_format!("eventfd FD got closed while blocking.")
|
||||
};
|
||||
|
||||
// Since we pass the weak file description ref, it is guaranteed to be
|
||||
// an eventfd file description.
|
||||
let eventfd = eventfd_ref.downcast::<Event>().unwrap();
|
||||
|
||||
// Figure out which value we should add.
|
||||
let num = ecx.read_scalar(&buf_place)?.to_u64()?;
|
||||
// u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
|
||||
if num == u64::MAX {
|
||||
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
|
||||
}
|
||||
|
||||
match eventfd.counter.get().checked_add(num) {
|
||||
Some(new_count @ 0..=MAX_COUNTER) => {
|
||||
// Future `read` calls will synchronize with this write, so update the FD clock.
|
||||
|
|
@ -219,7 +212,7 @@ fn eventfd_write<'tcx>(
|
|||
|
||||
// The state changed; we check and update the status of all supported event
|
||||
// types for current file description.
|
||||
ecx.check_and_update_readiness(&eventfd_ref)?;
|
||||
ecx.check_and_update_readiness(eventfd_ref)?;
|
||||
|
||||
// Unblock *all* threads previously blocked on `read`.
|
||||
// We need to take out the blocked thread ids and unblock them together,
|
||||
|
|
@ -244,6 +237,7 @@ fn eventfd_write<'tcx>(
|
|||
|
||||
eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());
|
||||
|
||||
let weak_eventfd = eventfd_ref.downgrade();
|
||||
ecx.block_thread(
|
||||
BlockReason::Eventfd,
|
||||
None,
|
||||
|
|
@ -255,8 +249,10 @@ fn eventfd_write<'tcx>(
|
|||
weak_eventfd: WeakFileDescriptionRef,
|
||||
}
|
||||
@unblock = |this| {
|
||||
// When we get unblocked, try again.
|
||||
eventfd_write(num, buf_place, &dest, weak_eventfd, this)
|
||||
// When we get unblocked, try again. We know the ref is still valid,
|
||||
// otherwise there couldn't be a `write` that unblocks us.
|
||||
let eventfd_ref = weak_eventfd.upgrade().unwrap();
|
||||
eventfd_write(buf_place, &dest, &eventfd_ref, this)
|
||||
}
|
||||
),
|
||||
);
|
||||
|
|
@ -270,13 +266,9 @@ fn eventfd_write<'tcx>(
|
|||
fn eventfd_read<'tcx>(
|
||||
buf_place: MPlaceTy<'tcx>,
|
||||
dest: &MPlaceTy<'tcx>,
|
||||
weak_eventfd: WeakFileDescriptionRef,
|
||||
eventfd_ref: &FileDescriptionRef,
|
||||
ecx: &mut MiriInterpCx<'tcx>,
|
||||
) -> InterpResult<'tcx> {
|
||||
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
|
||||
throw_unsup_format!("eventfd FD got closed while blocking.")
|
||||
};
|
||||
|
||||
// Since we pass the weak file description ref to the callback function, it is guaranteed to be
|
||||
// an eventfd file description.
|
||||
let eventfd = eventfd_ref.downcast::<Event>().unwrap();
|
||||
|
|
@ -293,6 +285,7 @@ fn eventfd_read<'tcx>(
|
|||
|
||||
eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());
|
||||
|
||||
let weak_eventfd = eventfd_ref.downgrade();
|
||||
ecx.block_thread(
|
||||
BlockReason::Eventfd,
|
||||
None,
|
||||
|
|
@ -303,8 +296,10 @@ fn eventfd_read<'tcx>(
|
|||
weak_eventfd: WeakFileDescriptionRef,
|
||||
}
|
||||
@unblock = |this| {
|
||||
// When we get unblocked, try again.
|
||||
eventfd_read(buf_place, &dest, weak_eventfd, this)
|
||||
// When we get unblocked, try again. We know the ref is still valid,
|
||||
// otherwise there couldn't be a `write` that unblocks us.
|
||||
let eventfd_ref = weak_eventfd.upgrade().unwrap();
|
||||
eventfd_read(buf_place, &dest, &eventfd_ref, this)
|
||||
}
|
||||
),
|
||||
);
|
||||
|
|
@ -317,7 +312,7 @@ fn eventfd_read<'tcx>(
|
|||
|
||||
// The state changed; we check and update the status of all supported event
|
||||
// types for current file description.
|
||||
ecx.check_and_update_readiness(&eventfd_ref)?;
|
||||
ecx.check_and_update_readiness(eventfd_ref)?;
|
||||
|
||||
// Unblock *all* threads previously blocked on `write`.
|
||||
// We need to take out the blocked thread ids and unblock them together,
|
||||
|
|
|
|||
|
|
@ -96,26 +96,7 @@ impl FileDescription for AnonSocket {
|
|||
dest: &MPlaceTy<'tcx>,
|
||||
ecx: &mut MiriInterpCx<'tcx>,
|
||||
) -> InterpResult<'tcx> {
|
||||
// Always succeed on read size 0.
|
||||
if len == 0 {
|
||||
return ecx.return_read_success(ptr, &[], 0, dest);
|
||||
}
|
||||
|
||||
let Some(readbuf) = &self.readbuf else {
|
||||
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
|
||||
// corresponding ErrorKind variant.
|
||||
throw_unsup_format!("reading from the write end of a pipe");
|
||||
};
|
||||
|
||||
if readbuf.borrow().buf.is_empty() && self.is_nonblock {
|
||||
// Non-blocking socketpair with writer and empty buffer.
|
||||
// https://linux.die.net/man/2/read
|
||||
// EAGAIN or EWOULDBLOCK can be returned for socket,
|
||||
// POSIX.1-2001 allows either error to be returned for this case.
|
||||
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
|
||||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
|
||||
}
|
||||
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
|
||||
anonsocket_read(self_ref, len, ptr, dest, ecx)
|
||||
}
|
||||
|
||||
fn write<'tcx>(
|
||||
|
|
@ -127,31 +108,7 @@ impl FileDescription for AnonSocket {
|
|||
dest: &MPlaceTy<'tcx>,
|
||||
ecx: &mut MiriInterpCx<'tcx>,
|
||||
) -> InterpResult<'tcx> {
|
||||
// Always succeed on write size 0.
|
||||
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
|
||||
if len == 0 {
|
||||
return ecx.return_write_success(0, dest);
|
||||
}
|
||||
|
||||
// We are writing to our peer's readbuf.
|
||||
let Some(peer_fd) = self.peer_fd().upgrade() else {
|
||||
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
|
||||
// closed.
|
||||
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
|
||||
};
|
||||
|
||||
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
|
||||
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
|
||||
// corresponding ErrorKind variant.
|
||||
throw_unsup_format!("writing to the reading end of a pipe");
|
||||
};
|
||||
let available_space =
|
||||
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
|
||||
if available_space == 0 && self.is_nonblock {
|
||||
// Non-blocking socketpair with a full buffer.
|
||||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
|
||||
}
|
||||
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
|
||||
anonsocket_write(self_ref, ptr, len, dest, ecx)
|
||||
}
|
||||
|
||||
fn as_unix(&self) -> &dyn UnixFileDescription {
|
||||
|
|
@ -161,50 +118,65 @@ impl FileDescription for AnonSocket {
|
|||
|
||||
/// Write to AnonSocket based on the space available and return the written byte size.
|
||||
fn anonsocket_write<'tcx>(
|
||||
weak_self_ref: WeakFileDescriptionRef,
|
||||
self_ref: &FileDescriptionRef,
|
||||
ptr: Pointer,
|
||||
len: usize,
|
||||
dest: MPlaceTy<'tcx>,
|
||||
dest: &MPlaceTy<'tcx>,
|
||||
ecx: &mut MiriInterpCx<'tcx>,
|
||||
) -> InterpResult<'tcx> {
|
||||
let Some(self_ref) = weak_self_ref.upgrade() else {
|
||||
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
|
||||
throw_unsup_format!("This will be a deadlock error in future")
|
||||
};
|
||||
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
|
||||
|
||||
// Always succeed on write size 0.
|
||||
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
|
||||
if len == 0 {
|
||||
return ecx.return_write_success(0, dest);
|
||||
}
|
||||
|
||||
// We are writing to our peer's readbuf.
|
||||
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
|
||||
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
|
||||
// closed.
|
||||
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
|
||||
// closed. It is an error to write even if there would be space.
|
||||
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
|
||||
};
|
||||
|
||||
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
|
||||
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
|
||||
// corresponding ErrorKind variant.
|
||||
throw_unsup_format!("writing to the reading end of a pipe")
|
||||
// Writing to the read end of a pipe.
|
||||
return ecx.set_last_error_and_return(IoError::LibcError("EBADF"), dest);
|
||||
};
|
||||
|
||||
// Let's see if we can write.
|
||||
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
|
||||
|
||||
if available_space == 0 {
|
||||
// Blocking socketpair with a full buffer.
|
||||
let dest = dest.clone();
|
||||
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
|
||||
ecx.block_thread(
|
||||
BlockReason::UnnamedSocket,
|
||||
None,
|
||||
callback!(
|
||||
@capture<'tcx> {
|
||||
weak_self_ref: WeakFileDescriptionRef,
|
||||
ptr: Pointer,
|
||||
len: usize,
|
||||
dest: MPlaceTy<'tcx>,
|
||||
}
|
||||
@unblock = |this| {
|
||||
anonsocket_write(weak_self_ref, ptr, len, dest, this)
|
||||
}
|
||||
),
|
||||
);
|
||||
if self_anonsocket.is_nonblock {
|
||||
// Non-blocking socketpair with a full buffer.
|
||||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
|
||||
} else {
|
||||
// Blocking socketpair with a full buffer.
|
||||
// Block the current thread; only keep a weak ref for this.
|
||||
let weak_self_ref = self_ref.downgrade();
|
||||
let dest = dest.clone();
|
||||
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
|
||||
ecx.block_thread(
|
||||
BlockReason::UnnamedSocket,
|
||||
None,
|
||||
callback!(
|
||||
@capture<'tcx> {
|
||||
weak_self_ref: WeakFileDescriptionRef,
|
||||
ptr: Pointer,
|
||||
len: usize,
|
||||
dest: MPlaceTy<'tcx>,
|
||||
}
|
||||
@unblock = |this| {
|
||||
// If we got unblocked, then our peer successfully upgraded its weak
|
||||
// ref to us. That means we can also upgrade our weak ref.
|
||||
let self_ref = weak_self_ref.upgrade().unwrap();
|
||||
anonsocket_write(&self_ref, ptr, len, &dest, this)
|
||||
}
|
||||
),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// There is space to write!
|
||||
let mut writebuf = writebuf.borrow_mut();
|
||||
// Remember this clock so `read` can synchronize with us.
|
||||
ecx.release_clock(|clock| {
|
||||
|
|
@ -229,25 +201,26 @@ fn anonsocket_write<'tcx>(
|
|||
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
|
||||
}
|
||||
|
||||
return ecx.return_write_success(actual_write_size, &dest);
|
||||
return ecx.return_write_success(actual_write_size, dest);
|
||||
}
|
||||
interp_ok(())
|
||||
}
|
||||
|
||||
/// Read from AnonSocket and return the number of bytes read.
|
||||
fn anonsocket_read<'tcx>(
|
||||
weak_self_ref: WeakFileDescriptionRef,
|
||||
self_ref: &FileDescriptionRef,
|
||||
len: usize,
|
||||
ptr: Pointer,
|
||||
dest: MPlaceTy<'tcx>,
|
||||
dest: &MPlaceTy<'tcx>,
|
||||
ecx: &mut MiriInterpCx<'tcx>,
|
||||
) -> InterpResult<'tcx> {
|
||||
let Some(self_ref) = weak_self_ref.upgrade() else {
|
||||
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
|
||||
throw_unsup_format!("This will be a deadlock error in future")
|
||||
};
|
||||
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
|
||||
|
||||
// Always succeed on read size 0.
|
||||
if len == 0 {
|
||||
return ecx.return_read_success(ptr, &[], 0, dest);
|
||||
}
|
||||
|
||||
let Some(readbuf) = &self_anonsocket.readbuf else {
|
||||
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
|
||||
// corresponding ErrorKind variant.
|
||||
|
|
@ -258,10 +231,19 @@ fn anonsocket_read<'tcx>(
|
|||
if self_anonsocket.peer_fd().upgrade().is_none() {
|
||||
// Socketpair with no peer and empty buffer.
|
||||
// 0 bytes successfully read indicates end-of-file.
|
||||
return ecx.return_read_success(ptr, &[], 0, &dest);
|
||||
return ecx.return_read_success(ptr, &[], 0, dest);
|
||||
} else if self_anonsocket.is_nonblock {
|
||||
// Non-blocking socketpair with writer and empty buffer.
|
||||
// https://linux.die.net/man/2/read
|
||||
// EAGAIN or EWOULDBLOCK can be returned for socket,
|
||||
// POSIX.1-2001 allows either error to be returned for this case.
|
||||
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
|
||||
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
|
||||
} else {
|
||||
// Blocking socketpair with writer and empty buffer.
|
||||
let weak_self_ref = weak_self_ref.clone();
|
||||
// Block the current thread; only keep a weak ref for this.
|
||||
let weak_self_ref = self_ref.downgrade();
|
||||
let dest = dest.clone();
|
||||
self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
|
||||
ecx.block_thread(
|
||||
BlockReason::UnnamedSocket,
|
||||
|
|
@ -274,12 +256,16 @@ fn anonsocket_read<'tcx>(
|
|||
dest: MPlaceTy<'tcx>,
|
||||
}
|
||||
@unblock = |this| {
|
||||
anonsocket_read(weak_self_ref, len, ptr, dest, this)
|
||||
// If we got unblocked, then our peer successfully upgraded its weak
|
||||
// ref to us. That means we can also upgrade our weak ref.
|
||||
let self_ref = weak_self_ref.upgrade().unwrap();
|
||||
anonsocket_read(&self_ref, len, ptr, &dest, this)
|
||||
}
|
||||
),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// There's data to be read!
|
||||
let mut bytes = vec![0; len];
|
||||
let mut readbuf = readbuf.borrow_mut();
|
||||
// Synchronize with all previous writes to this buffer.
|
||||
|
|
@ -313,7 +299,7 @@ fn anonsocket_read<'tcx>(
|
|||
}
|
||||
};
|
||||
|
||||
return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
|
||||
return ecx.return_read_success(ptr, &bytes, actual_read_size, dest);
|
||||
}
|
||||
interp_ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
//! This is a regression test for <https://github.com/rust-lang/miri/issues/3947>: we had some
|
||||
//! faulty logic around `release_clock` that led to this code not reporting a data race.
|
||||
//~^^ERROR: deadlock
|
||||
//@ignore-target: windows # no libc socketpair on Windows
|
||||
//@compile-flags: -Zmiri-preemption-rate=0 -Zmiri-address-reuse-rate=0
|
||||
//@error-in-other-file: deadlock
|
||||
use std::thread;
|
||||
|
||||
fn main() {
|
||||
let mut fds = [-1, -1];
|
||||
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
|
||||
assert_eq!(res, 0);
|
||||
|
||||
let thread1 = thread::spawn(move || {
|
||||
let mut buf: [u8; 1] = [0; 1];
|
||||
let _res: i32 = unsafe {
|
||||
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) //~ERROR: deadlock
|
||||
.try_into()
|
||||
.unwrap()
|
||||
};
|
||||
});
|
||||
let thread2 = thread::spawn(move || {
|
||||
// Close the FD that the other thread is blocked on.
|
||||
unsafe { libc::close(fds[1]) };
|
||||
});
|
||||
|
||||
// Run the other threads.
|
||||
thread::yield_now();
|
||||
|
||||
// When they are both done, continue here.
|
||||
let data = "a".as_bytes().as_ptr();
|
||||
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1) };
|
||||
assert_eq!(res, -1);
|
||||
|
||||
thread1.join().unwrap();
|
||||
thread2.join().unwrap();
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
error: deadlock: the evaluated program deadlocked
|
||||
--> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
|
||||
|
|
||||
LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) };
|
||||
| ^ the evaluated program deadlocked
|
||||
|
|
||||
= note: BACKTRACE:
|
||||
= note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
|
||||
= note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
|
||||
= note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
|
||||
note: inside `main`
|
||||
--> tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC
|
||||
|
|
||||
LL | thread1.join().unwrap();
|
||||
| ^^^^^^^^^^^^^^
|
||||
|
||||
error: deadlock: the evaluated program deadlocked
|
||||
--> tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC
|
||||
|
|
||||
LL | libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
|
||||
| ^ the evaluated program deadlocked
|
||||
|
|
||||
= note: BACKTRACE on thread `unnamed-ID`:
|
||||
= note: inside closure at tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC
|
||||
|
||||
error: deadlock: the evaluated program deadlocked
|
||||
|
|
||||
= note: the evaluated program deadlocked
|
||||
= note: (no span available)
|
||||
= note: BACKTRACE on thread `unnamed-ID`:
|
||||
|
||||
note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace
|
||||
|
||||
error: aborting due to 3 previous errors
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue