epoll: separately track ready_set

This commit is contained in:
Ralf Jung 2025-11-16 11:18:38 +01:00
parent 573515cc4a
commit 415526623c

View file

@ -1,5 +1,5 @@
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::io;
use std::time::Duration;
@ -18,12 +18,13 @@ type EpollEventKey = (FdId, FdNum);
#[derive(Debug, Default)]
struct Epoll {
/// A map of EpollEventInterests registered under this epoll instance. Each entry is
/// differentiated using FdId and file descriptor value. Note that we do not have a separate
/// "ready" list; instead, a boolean flag in this list tracks which subset is ready. This makes
/// `epoll_wait` less efficient, but also requires less bookkeeping.
/// differentiated using FdId and file descriptor value.
interest_list: RefCell<BTreeMap<EpollEventKey, EpollEventInterest>>,
/// The queue of threads blocked on this epoll instance, and how many events they'd like to get.
queue: RefCell<VecDeque<(ThreadId, u32)>>,
/// The subset of interests that is currently considered "ready". Stored separately so we
/// can access it more efficiently.
ready_set: RefCell<BTreeSet<EpollEventKey>>,
/// The queue of threads blocked on this epoll instance.
queue: RefCell<VecDeque<ThreadId>>,
}
impl VisitProvenance for Epoll {
@ -44,8 +45,6 @@ pub struct EpollEventInterest {
relevant_events: u32,
/// The currently active events for this file descriptor.
active_events: u32,
/// Whether this interest is in the "ready" set.
ready: bool,
/// The vector clock for wakeups.
clock: VClock,
/// User-defined data associated with this interest.
@ -176,13 +175,19 @@ impl EpollInterestTable {
if let Some(epolls) = self.0.remove(&id) {
for epoll in epolls.iter().filter_map(|(_id, epoll)| epoll.upgrade()) {
// This is a still-live epoll with interest in this FD. Remove all
// relevent interests.
// relevent interests (including from the ready set).
epoll
.interest_list
.borrow_mut()
.extract_if(range_for_id(id), |_, _| true)
// Consume the iterator.
.for_each(|_| ());
epoll
.ready_set
.borrow_mut()
.extract_if(range_for_id(id), |_| true)
// Consume the iterator.
.for_each(|_| ());
}
}
}
@ -326,7 +331,6 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
relevant_events: events,
data,
active_events: 0,
ready: false,
clock: VClock::default(),
};
if interest_list.try_insert(epoll_key, new_interest).is_err() {
@ -351,7 +355,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
move |callback| {
// Need to release the RefCell when this closure returns, so we have to move
// it into the closure, so we have to do a re-lookup here.
callback(interest_list.get_mut(&epoll_key).unwrap())
callback(epoll_key, interest_list.get_mut(&epoll_key).unwrap())
},
)?;
@ -359,11 +363,12 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
} else if op == epoll_ctl_del {
let epoll_key = (id, fd);
// Remove epoll_event_interest from interest_list.
// Remove epoll_event_interest from interest_list and ready_set.
if interest_list.remove(&epoll_key).is_none() {
// We did not have interest in this.
return this.set_last_error_and_return_i32(LibcError("ENOENT"));
};
epfd.ready_set.borrow_mut().remove(&epoll_key);
// If this was the last interest in this FD, remove us from the global list
// of who is interested in this FD.
if interest_list.range(range_for_id(id)).next().is_none() {
@ -441,7 +446,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return this.set_last_error_and_return(LibcError("EBADF"), dest);
};
if timeout == 0 || epfd.interest_list.borrow().values().any(|i| i.ready) {
if timeout == 0 || !epfd.ready_set.borrow().is_empty() {
// If the timeout is 0 or there is a ready event, we can return immediately.
return_ready_list(&epfd, dest, &event, this)?;
} else {
@ -459,9 +464,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
}
};
// Record this thread as blocked.
epfd.queue
.borrow_mut()
.push_back((this.active_thread(), maxevents.try_into().unwrap()));
epfd.queue.borrow_mut().push_back(this.active_thread());
// And block it.
let dest = dest.clone();
// We keep a strong ref to the underlying `Epoll` to make sure it sticks around.
@ -487,7 +490,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
// Remove the current active thread_id from the blocked thread_id list.
epfd
.queue.borrow_mut()
.retain(|&(id, _events)| id != this.active_thread());
.retain(|&id| id != this.active_thread());
this.write_int(0, &dest)?;
interp_ok(())
},
@ -526,9 +529,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let active_events = fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this);
for epoll in epolls {
update_readiness(this, &epoll, active_events, force_edge, |callback| {
for (_key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id))
for (&key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id))
{
callback(interest)?;
callback(key, interest)?;
}
interp_ok(())
})?;
@ -550,46 +553,34 @@ fn update_readiness<'tcx>(
active_events: u32,
force_edge: bool,
for_each_interest: impl FnOnce(
&mut dyn FnMut(&mut EpollEventInterest) -> InterpResult<'tcx>,
&mut dyn FnMut(EpollEventKey, &mut EpollEventInterest) -> InterpResult<'tcx>,
) -> InterpResult<'tcx>,
) -> InterpResult<'tcx> {
let mut num_ready = 0u32; // how many events we have ready to deliver
for_each_interest(&mut |interest| {
let mut ready_set = epoll.ready_set.borrow_mut();
for_each_interest(&mut |key, interest| {
// Update the ready events tracked in this interest.
let new_readiness = interest.relevant_events & active_events;
let prev_readiness = std::mem::replace(&mut interest.active_events, new_readiness);
if new_readiness == 0 {
// Un-trigger this, there's nothing left to report here.
interest.ready = false;
ready_set.remove(&key);
} else if force_edge || new_readiness != prev_readiness & new_readiness {
// Either we force an "edge" to be detected, or there's a bit set in `new`
// that was not set in `prev`.
interest.ready = true;
// that was not set in `prev`. In both cases, this is ready now.
ready_set.insert(key);
ecx.release_clock(|clock| {
interest.clock.join(clock);
})?;
num_ready = num_ready.saturating_add(1);
}
interp_ok(())
})?;
// Edge-triggered notifications only wake up as many threads as are needed to deliver
// all the events.
while num_ready > 0
&& let Some((thread_id, events)) = epoll.queue.borrow_mut().pop_front()
// While there are events ready to be delivered, wake up a thread to receive them.
while !ready_set.is_empty()
&& let Some(thread_id) = epoll.queue.borrow_mut().pop_front()
{
drop(ready_set); // release the "lock" so the unblocked thread can have it
ecx.unblock_thread(thread_id, BlockReason::Epoll)?;
// Keep track of how many events we have left to deliver (except if we saturated;
// in that case we just wake up everybody).
if num_ready != u32::MAX {
num_ready = num_ready.saturating_sub(events);
}
}
// Sanity-check: if there are threads left to wake up, then there are no more ready events.
if !epoll.queue.borrow().is_empty() {
assert!(
epoll.interest_list.borrow().values().all(|i| !i.ready),
"there are unconsumed ready events and threads ready to take them"
);
ready_set = epoll.ready_set.borrow_mut();
}
interp_ok(())
@ -604,43 +595,38 @@ fn return_ready_list<'tcx>(
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx, i32> {
let mut interest_list = epfd.interest_list.borrow_mut();
let mut ready_set = epfd.ready_set.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = ecx.project_array_fields(events)?;
let mut interests = interest_list.iter_mut();
while let Some(slot) = array_iter.next(ecx)? {
// Search for next ready event that we are intersted in. This is an inefficient linear scan.
// We could make it efficient by tracking the set of triggered events in a BTreeSet or so,
// but since this is very unlikely to be a bottleneck we prefer cleaner code.
for (key, interest) in interests.by_ref() {
// Sanity-check to ensure that the info about this event is up-to-date.
if cfg!(debug_assertions) {
// Ensure this matches the latest readiness of this FD.
// We have to do an FD lookup by ID for this. The FdNum might be already closed.
let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
let current_readiness =
fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx);
assert_eq!(interest.active_events, current_readiness & interest.relevant_events);
}
// Skip event if it has not been triggered.
if !interest.ready {
continue;
}
// Deliver event to caller.
ecx.write_int_fields_named(
&[("events", interest.active_events.into()), ("u64", interest.data.into())],
&slot.1,
)?;
num_of_events = num_of_events.strict_add(1);
// Synchronize waking thread with the event of interest.
ecx.acquire_clock(&interest.clock)?;
// Mark this interest as no-longer-ready, since it has been delivered (and we only
// support ET).
interest.ready = false;
// Skip out of this loop so that we go to the next slot in the array.
break;
// Sanity-check to ensure that all event info is up-to-date.
if cfg!(debug_assertions) {
for (key, interest) in interest_list.iter() {
// Ensure this matches the latest readiness of this FD.
// We have to do an FD lookup by ID for this. The FdNum might be already closed.
let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
let current_active = fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx);
assert_eq!(interest.active_events, current_active & interest.relevant_events);
}
}
// While there is a slot to store another event, and an event to store, deliver that event.
while let Some(slot) = array_iter.next(ecx)?
&& let Some(&key) = ready_set.first()
{
let interest = interest_list.get_mut(&key).expect("non-existent event in ready set");
// Deliver event to caller.
ecx.write_int_fields_named(
&[("events", interest.active_events.into()), ("u64", interest.data.into())],
&slot.1,
)?;
num_of_events = num_of_events.strict_add(1);
// Synchronize receiving thread with the event of interest.
ecx.acquire_clock(&interest.clock)?;
// Since currently, all events are edge-triggered, we remove them from the ready set when
// they get delivered.
ready_set.remove(&key);
}
ecx.write_int(num_of_events, dest)?;
interp_ok(num_of_events)
}