bring socket logic back together and fix logic bug

This commit is contained in:
Ralf Jung 2024-12-27 11:37:11 +01:00
parent 3623dfd42b
commit 16cffc7275

View file

@ -96,26 +96,7 @@ impl FileDescription for AnonSocket {
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
// Always succeed on read size 0.
if len == 0 {
return ecx.return_read_success(ptr, &[], 0, dest);
}
let Some(readbuf) = &self.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe");
};
if readbuf.borrow().buf.is_empty() && self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
anonsocket_read(self_ref, len, ptr, dest, ecx)
}
fn write<'tcx>(
@ -127,31 +108,7 @@ impl FileDescription for AnonSocket {
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
// Always succeed on write size 0.
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
if len == 0 {
return ecx.return_write_success(0, dest);
}
// We are writing to our peer's readbuf.
let Some(peer_fd) = self.peer_fd().upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
};
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe");
};
let available_space =
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
if available_space == 0 && self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
anonsocket_write(self_ref, ptr, len, dest, ecx)
}
fn as_unix(&self) -> &dyn UnixFileDescription {
@ -161,50 +118,66 @@ impl FileDescription for AnonSocket {
/// Write to AnonSocket based on the space available and return the written byte size.
fn anonsocket_write<'tcx>(
weak_self_ref: WeakFileDescriptionRef,
self_ref: &FileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
// Always succeed on write size 0.
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
if len == 0 {
return ecx.return_write_success(0, dest);
}
// We are writing to our peer's readbuf.
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
// closed. It is an error to write even if there would be space.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
};
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe")
// Writing to the read end of a pipe.
return ecx.set_last_error_and_return(IoError::LibcError("EBADF"), dest);
};
// Let's see if we can write.
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
if available_space == 0 {
// Blocking socketpair with a full buffer.
let dest = dest.clone();
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_write(weak_self_ref, ptr, len, dest, this)
}
),
);
if self_anonsocket.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with a full buffer.
// Block the current thread; only keep a weak ref for this.
let weak_self_ref = self_ref.downgrade();
let dest = dest.clone();
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
anonsocket_write(&self_ref, ptr, len, &dest, this)
}
),
);
}
} else {
// There is space to write!
let mut writebuf = writebuf.borrow_mut();
// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
@ -229,25 +202,26 @@ fn anonsocket_write<'tcx>(
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
}
return ecx.return_write_success(actual_write_size, &dest);
return ecx.return_write_success(actual_write_size, dest);
}
interp_ok(())
}
/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
weak_self_ref: WeakFileDescriptionRef,
self_ref: &FileDescriptionRef,
len: usize,
ptr: Pointer,
dest: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
// Always succeed on read size 0.
if len == 0 {
return ecx.return_read_success(ptr, &[], 0, dest);
}
let Some(readbuf) = &self_anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
@ -258,10 +232,19 @@ fn anonsocket_read<'tcx>(
if self_anonsocket.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return ecx.return_read_success(ptr, &[], 0, &dest);
return ecx.return_read_success(ptr, &[], 0, dest);
} else if self_anonsocket.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with writer and empty buffer.
let weak_self_ref = weak_self_ref.clone();
// Block the current thread; only keep a weak ref for this.
let weak_self_ref = self_ref.downgrade();
let dest = dest.clone();
self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
@ -274,12 +257,17 @@ fn anonsocket_read<'tcx>(
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_read(weak_self_ref, len, ptr, dest, this)
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
anonsocket_read(&self_ref, len, ptr, &dest, this)
}
),
);
}
} else {
// There's data to be read!
let mut bytes = vec![0; len];
let mut readbuf = readbuf.borrow_mut();
// Synchronize with all previous writes to this buffer.
@ -313,7 +301,7 @@ fn anonsocket_read<'tcx>(
}
};
return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
return ecx.return_read_success(ptr, &bytes, actual_read_size, dest);
}
interp_ok(())
}