Auto merge of #14888 - lunacookies:multi-qos, r=Veykril

Prioritize threads affected by user typing

To this end I’ve introduced a new custom thread pool type which can spawn threads using each QoS class. This way we can run latency-sensitive requests under one QoS class and everything else under another QoS class. The implementation is very similar to that of the `threadpool` crate (which is currently used by rust-analyzer) but with unused functionality stripped out.

I’ll have to rebase on master once #14859 is merged but I think everything else is alright :D
This commit is contained in:
bors 2023-05-31 10:23:19 +00:00
commit 526507fe22
16 changed files with 507 additions and 350 deletions

11
Cargo.lock generated
View file

@ -1518,7 +1518,6 @@ dependencies = [
"syntax",
"test-utils",
"thiserror",
"threadpool",
"tikv-jemallocator",
"toolchain",
"tracing",
@ -1712,6 +1711,7 @@ version = "0.0.0"
dependencies = [
"always-assert",
"backtrace",
"crossbeam-channel",
"jod-thread",
"libc",
"miow",
@ -1823,15 +1823,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "tikv-jemalloc-ctl"
version = "0.5.0"

View file

@ -90,7 +90,7 @@ impl FlycheckHandle {
) -> FlycheckHandle {
let actor = FlycheckActor::new(id, sender, config, workspace_root);
let (sender, receiver) = unbounded::<StateChange>();
let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
.name("Flycheck".to_owned())
.spawn(move || actor.run(receiver))
.expect("failed to spawn thread");
@ -409,7 +409,7 @@ impl CargoHandle {
let (sender, receiver) = unbounded();
let actor = CargoActor::new(sender, stdout, stderr);
let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
.name("CargoHandle".to_owned())
.spawn(move || actor.run())
.expect("failed to spawn thread");

View file

@ -81,7 +81,7 @@ pub(crate) fn parallel_prime_caches(
let worker = prime_caches_worker.clone();
let db = db.snapshot();
stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
.allow_leak(true)
.spawn(move || Cancelled::catch(|| worker(db)))
.expect("failed to spawn thread");

View file

@ -31,7 +31,6 @@ oorandom = "11.1.3"
rustc-hash = "1.1.0"
serde_json = { workspace = true, features = ["preserve_order"] }
serde.workspace = true
threadpool = "1.8.1"
rayon = "1.6.1"
num_cpus = "1.15.0"
mimalloc = { version = "0.1.30", default-features = false, optional = true }

View file

@ -79,13 +79,15 @@ fn try_main(flags: flags::RustAnalyzer) -> Result<()> {
return Ok(());
}
// rust-analyzers “main thread” is actually a secondary thread
// with an increased stack size at the User Initiated QoS class.
// We use this QoS class because any delay in the main loop
// rust-analyzers “main thread” is actually
// a secondary latency-sensitive thread with an increased stack size.
// We use this thread intent because any delay in the main loop
// will make actions like hitting enter in the editor slow.
// rust-analyzer does not block the editors render loop,
// so we dont use User Interactive.
with_extra_thread("LspServer", stdx::thread::QoSClass::UserInitiated, run_server)?;
with_extra_thread(
"LspServer",
stdx::thread::ThreadIntent::LatencySensitive,
run_server,
)?;
}
flags::RustAnalyzerCmd::Parse(cmd) => cmd.run()?,
flags::RustAnalyzerCmd::Symbols(cmd) => cmd.run()?,
@ -143,10 +145,10 @@ const STACK_SIZE: usize = 1024 * 1024 * 8;
/// space.
fn with_extra_thread(
thread_name: impl Into<String>,
qos_class: stdx::thread::QoSClass,
thread_intent: stdx::thread::ThreadIntent,
f: impl FnOnce() -> Result<()> + Send + 'static,
) -> Result<()> {
let handle = stdx::thread::Builder::new(qos_class)
let handle = stdx::thread::Builder::new(thread_intent)
.name(thread_name.into())
.stack_size(STACK_SIZE)
.spawn(f)?;

View file

@ -4,6 +4,7 @@ use std::{fmt, panic, thread};
use ide::Cancelled;
use lsp_server::ExtractError;
use serde::{de::DeserializeOwned, Serialize};
use stdx::thread::ThreadIntent;
use crate::{
global_state::{GlobalState, GlobalStateSnapshot},
@ -87,7 +88,8 @@ impl<'a> RequestDispatcher<'a> {
self
}
/// Dispatches the request onto thread pool
/// Dispatches a non-latency-sensitive request onto the thread pool
/// without retrying it if it panics.
pub(crate) fn on_no_retry<R>(
&mut self,
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
@ -102,7 +104,7 @@ impl<'a> RequestDispatcher<'a> {
None => return self,
};
self.global_state.task_pool.handle.spawn({
self.global_state.task_pool.handle.spawn(ThreadIntent::Worker, {
let world = self.global_state.snapshot();
move || {
let result = panic::catch_unwind(move || {
@ -123,11 +125,49 @@ impl<'a> RequestDispatcher<'a> {
self
}
/// Dispatches the request onto thread pool
/// Dispatches a non-latency-sensitive request onto the thread pool.
pub(crate) fn on<R>(
&mut self,
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
) -> &mut Self
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug,
R::Result: Serialize,
{
self.on_with_thread_intent::<R>(ThreadIntent::Worker, f)
}
/// Dispatches a latency-sensitive request onto the thread pool.
pub(crate) fn on_latency_sensitive<R>(
&mut self,
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
) -> &mut Self
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug,
R::Result: Serialize,
{
self.on_with_thread_intent::<R>(ThreadIntent::LatencySensitive, f)
}
pub(crate) fn finish(&mut self) {
if let Some(req) = self.req.take() {
tracing::error!("unknown request: {:?}", req);
let response = lsp_server::Response::new_err(
req.id,
lsp_server::ErrorCode::MethodNotFound as i32,
"unknown request".to_string(),
);
self.global_state.respond(response);
}
}
fn on_with_thread_intent<R>(
&mut self,
intent: ThreadIntent,
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
) -> &mut Self
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug,
@ -138,7 +178,7 @@ impl<'a> RequestDispatcher<'a> {
None => return self,
};
self.global_state.task_pool.handle.spawn({
self.global_state.task_pool.handle.spawn(intent, {
let world = self.global_state.snapshot();
move || {
let result = panic::catch_unwind(move || {
@ -155,18 +195,6 @@ impl<'a> RequestDispatcher<'a> {
self
}
pub(crate) fn finish(&mut self) {
if let Some(req) = self.req.take() {
tracing::error!("unknown request: {:?}", req);
let response = lsp_server::Response::new_err(
req.id,
lsp_server::ErrorCode::MethodNotFound as i32,
"unknown request".to_string(),
);
self.global_state.respond(response);
}
}
fn parse<R>(&mut self) -> Option<(lsp_server::Request, R::Params, String)>
where
R: lsp_types::request::Request,

View file

@ -291,7 +291,7 @@ fn run_flycheck(state: &mut GlobalState, vfs_path: VfsPath) -> bool {
}
Ok(())
};
state.task_pool.handle.spawn_with_sender(move |_| {
state.task_pool.handle.spawn_with_sender(stdx::thread::ThreadIntent::Worker, move |_| {
if let Err(e) = std::panic::catch_unwind(task) {
tracing::error!("flycheck task panicked: {e:?}")
}

View file

@ -397,7 +397,7 @@ impl GlobalState {
tracing::debug!(%cause, "will prime caches");
let num_worker_threads = self.config.prime_caches_num_threads();
self.task_pool.handle.spawn_with_sender({
self.task_pool.handle.spawn_with_sender(stdx::thread::ThreadIntent::Worker, {
let analysis = self.snapshot().analysis;
move |sender| {
sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap();
@ -678,7 +678,32 @@ impl GlobalState {
.on_sync::<lsp_types::request::SelectionRangeRequest>(handlers::handle_selection_range)
.on_sync::<lsp_ext::MatchingBrace>(handlers::handle_matching_brace)
.on_sync::<lsp_ext::OnTypeFormatting>(handlers::handle_on_type_formatting)
// All other request handlers:
// We cant run latency-sensitive request handlers which do semantic
// analysis on the main thread because that would block other
// requests. Instead, we run these request handlers on higher priority
// threads in the threadpool.
.on_latency_sensitive::<lsp_types::request::Completion>(handlers::handle_completion)
.on_latency_sensitive::<lsp_types::request::ResolveCompletionItem>(
handlers::handle_completion_resolve,
)
.on_latency_sensitive::<lsp_types::request::SemanticTokensFullRequest>(
handlers::handle_semantic_tokens_full,
)
.on_latency_sensitive::<lsp_types::request::SemanticTokensFullDeltaRequest>(
handlers::handle_semantic_tokens_full_delta,
)
.on_latency_sensitive::<lsp_types::request::SemanticTokensRangeRequest>(
handlers::handle_semantic_tokens_range,
)
// Formatting is not caused by the user typing,
// but it does qualify as latency-sensitive
// because a delay before formatting is applied
// can be confusing for the user.
.on_latency_sensitive::<lsp_types::request::Formatting>(handlers::handle_formatting)
.on_latency_sensitive::<lsp_types::request::RangeFormatting>(
handlers::handle_range_formatting,
)
// All other request handlers
.on::<lsp_ext::FetchDependencyList>(handlers::fetch_dependency_list)
.on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)
.on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)
@ -706,8 +731,6 @@ impl GlobalState {
.on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)
.on_no_retry::<lsp_types::request::InlayHintRequest>(handlers::handle_inlay_hints)
.on::<lsp_types::request::InlayHintResolveRequest>(handlers::handle_inlay_hints_resolve)
.on::<lsp_types::request::Completion>(handlers::handle_completion)
.on::<lsp_types::request::ResolveCompletionItem>(handlers::handle_completion_resolve)
.on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)
.on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)
.on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)
@ -715,8 +738,6 @@ impl GlobalState {
.on::<lsp_types::request::PrepareRenameRequest>(handlers::handle_prepare_rename)
.on::<lsp_types::request::Rename>(handlers::handle_rename)
.on::<lsp_types::request::References>(handlers::handle_references)
.on::<lsp_types::request::Formatting>(handlers::handle_formatting)
.on::<lsp_types::request::RangeFormatting>(handlers::handle_range_formatting)
.on::<lsp_types::request::DocumentHighlightRequest>(handlers::handle_document_highlight)
.on::<lsp_types::request::CallHierarchyPrepare>(handlers::handle_call_hierarchy_prepare)
.on::<lsp_types::request::CallHierarchyIncomingCalls>(
@ -725,15 +746,6 @@ impl GlobalState {
.on::<lsp_types::request::CallHierarchyOutgoingCalls>(
handlers::handle_call_hierarchy_outgoing,
)
.on::<lsp_types::request::SemanticTokensFullRequest>(
handlers::handle_semantic_tokens_full,
)
.on::<lsp_types::request::SemanticTokensFullDeltaRequest>(
handlers::handle_semantic_tokens_full_delta,
)
.on::<lsp_types::request::SemanticTokensRangeRequest>(
handlers::handle_semantic_tokens_range,
)
.on::<lsp_types::request::WillRenameFiles>(handlers::handle_will_rename_files)
.on::<lsp_ext::Ssr>(handlers::handle_ssr)
.finish();
@ -781,7 +793,10 @@ impl GlobalState {
tracing::trace!("updating notifications for {:?}", subscriptions);
let snapshot = self.snapshot();
self.task_pool.handle.spawn(move || {
// Diagnostics are triggered by the user typing
// so we run them on a latency sensitive thread.
self.task_pool.handle.spawn(stdx::thread::ThreadIntent::LatencySensitive, move || {
let _p = profile::span("publish_diagnostics");
let diagnostics = subscriptions
.into_iter()

View file

@ -27,7 +27,7 @@ use ide_db::{
use itertools::Itertools;
use proc_macro_api::{MacroDylib, ProcMacroServer};
use project_model::{PackageRoot, ProjectWorkspace, WorkspaceBuildScripts};
use stdx::format_to;
use stdx::{format_to, thread::ThreadIntent};
use syntax::SmolStr;
use triomphe::Arc;
use vfs::{file_set::FileSetConfig, AbsPath, AbsPathBuf, ChangeKind};
@ -185,7 +185,7 @@ impl GlobalState {
pub(crate) fn fetch_workspaces(&mut self, cause: Cause) {
tracing::info!(%cause, "will fetch workspaces");
self.task_pool.handle.spawn_with_sender({
self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, {
let linked_projects = self.config.linked_projects();
let detached_files = self.config.detached_files().to_vec();
let cargo_config = self.config.cargo();
@ -260,7 +260,7 @@ impl GlobalState {
tracing::info!(%cause, "will fetch build data");
let workspaces = Arc::clone(&self.workspaces);
let config = self.config.cargo();
self.task_pool.handle.spawn_with_sender(move |sender| {
self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |sender| {
sender.send(Task::FetchBuildData(BuildDataProgress::Begin)).unwrap();
let progress = {
@ -280,7 +280,7 @@ impl GlobalState {
let dummy_replacements = self.config.dummy_replacements().clone();
let proc_macro_clients = self.proc_macro_clients.clone();
self.task_pool.handle.spawn_with_sender(move |sender| {
self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |sender| {
sender.send(Task::LoadProcMacros(ProcMacroProgress::Begin)).unwrap();
let dummy_replacements = &dummy_replacements;

View file

@ -1,76 +1,42 @@
//! A thin wrapper around `ThreadPool` to make sure that we join all things
//! properly.
use std::sync::{Arc, Barrier};
//! A thin wrapper around [`stdx::thread::Pool`] which threads a sender through spawned jobs.
//! It is used in [`crate::global_state::GlobalState`] throughout the main loop.
use crossbeam_channel::Sender;
use stdx::thread::{Pool, ThreadIntent};
pub(crate) struct TaskPool<T> {
sender: Sender<T>,
inner: threadpool::ThreadPool,
pool: Pool,
}
impl<T> TaskPool<T> {
pub(crate) fn new_with_threads(sender: Sender<T>, threads: usize) -> TaskPool<T> {
const STACK_SIZE: usize = 8 * 1024 * 1024;
let inner = threadpool::Builder::new()
.thread_name("Worker".into())
.thread_stack_size(STACK_SIZE)
.num_threads(threads)
.build();
// Set QoS of all threads in threadpool.
let barrier = Arc::new(Barrier::new(threads + 1));
for _ in 0..threads {
let barrier = barrier.clone();
inner.execute(move || {
stdx::thread::set_current_thread_qos_class(stdx::thread::QoSClass::Utility);
barrier.wait();
});
}
barrier.wait();
TaskPool { sender, inner }
TaskPool { sender, pool: Pool::new(threads) }
}
pub(crate) fn spawn<F>(&mut self, task: F)
pub(crate) fn spawn<F>(&mut self, intent: ThreadIntent, task: F)
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
self.inner.execute({
self.pool.spawn(intent, {
let sender = self.sender.clone();
move || {
if stdx::thread::IS_QOS_AVAILABLE {
debug_assert_eq!(
stdx::thread::get_current_thread_qos_class(),
Some(stdx::thread::QoSClass::Utility)
);
}
sender.send(task()).unwrap()
}
move || sender.send(task()).unwrap()
})
}
pub(crate) fn spawn_with_sender<F>(&mut self, task: F)
pub(crate) fn spawn_with_sender<F>(&mut self, intent: ThreadIntent, task: F)
where
F: FnOnce(Sender<T>) + Send + 'static,
T: Send + 'static,
{
self.inner.execute({
self.pool.spawn(intent, {
let sender = self.sender.clone();
move || task(sender)
})
}
pub(crate) fn len(&self) -> usize {
self.inner.queued_count()
}
}
impl<T> Drop for TaskPool<T> {
fn drop(&mut self) {
self.inner.join()
self.pool.len()
}
}

View file

@ -165,7 +165,7 @@ impl Server {
fn new(dir: TestDir, config: Config) -> Server {
let (connection, client) = Connection::memory();
let _thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
let _thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
.name("test server".to_string())
.spawn(move || main_loop(config, connection).unwrap())
.expect("failed to spawn a thread");

View file

@ -16,6 +16,7 @@ libc = "0.2.135"
backtrace = { version = "0.3.65", optional = true }
always-assert = { version = "0.1.2", features = ["log"] }
jod-thread = "0.1.2"
crossbeam-channel = "0.5.5"
# Think twice before adding anything here
[target.'cfg(windows)'.dependencies]

View file

@ -1,36 +1,46 @@
//! A utility module for working with threads that automatically joins threads upon drop
//! and provides functionality for interfacing with operating system quality of service (QoS) APIs.
//! and abstracts over operating system quality of service (QoS) APIs
//! through the concept of a “thread intent”.
//!
//! The intent of a thread is frozen at thread creation time,
//! i.e. there is no API to change the intent of a thread once it has been spawned.
//!
//! As a system, rust-analyzer should have the property that
//! old manual scheduling APIs are replaced entirely by QoS.
//! To maintain this invariant, we panic when it is clear that
//! old scheduling APIs have been used.
//!
//! Moreover, we also want to ensure that every thread has a QoS set explicitly
//! Moreover, we also want to ensure that every thread has an intent set explicitly
//! to force a decision about its importance to the system.
//! Thus, [`QoSClass`] has no default value
//! and every entry point to creating a thread requires a [`QoSClass`] upfront.
//! Thus, [`ThreadIntent`] has no default value
//! and every entry point to creating a thread requires a [`ThreadIntent`] upfront.
use std::fmt;
pub fn spawn<F, T>(qos_class: QoSClass, f: F) -> JoinHandle<T>
mod intent;
mod pool;
pub use intent::ThreadIntent;
pub use pool::Pool;
pub fn spawn<F, T>(intent: ThreadIntent, f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
Builder::new(qos_class).spawn(f).expect("failed to spawn thread")
Builder::new(intent).spawn(f).expect("failed to spawn thread")
}
pub struct Builder {
qos_class: QoSClass,
intent: ThreadIntent,
inner: jod_thread::Builder,
allow_leak: bool,
}
impl Builder {
pub fn new(qos_class: QoSClass) -> Builder {
Builder { qos_class, inner: jod_thread::Builder::new(), allow_leak: false }
pub fn new(intent: ThreadIntent) -> Builder {
Builder { intent, inner: jod_thread::Builder::new(), allow_leak: false }
}
pub fn name(self, name: String) -> Builder {
@ -52,7 +62,7 @@ impl Builder {
T: Send + 'static,
{
let inner_handle = self.inner.spawn(move || {
set_current_thread_qos_class(self.qos_class);
self.intent.apply_to_current_thread();
f()
})?;
@ -90,237 +100,3 @@ impl<T> fmt::Debug for JoinHandle<T> {
f.pad("JoinHandle { .. }")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
// Please maintain order from least to most priority for the derived `Ord` impl.
pub enum QoSClass {
// Documentation adapted from https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/include/sys/qos.h#L55
//
/// TLDR: invisible maintenance tasks
///
/// Contract:
///
/// * **You do not care about how long it takes for work to finish.**
/// * **You do not care about work being deferred temporarily.**
/// (e.g. if the devices battery is in a critical state)
///
/// Examples:
///
/// * in a video editor:
/// creating periodic backups of project files
/// * in a browser:
/// cleaning up cached sites which have not been accessed in a long time
/// * in a collaborative word processor:
/// creating a searchable index of all documents
///
/// Use this QoS class for background tasks
/// which the user did not initiate themselves
/// and which are invisible to the user.
/// It is expected that this work will take significant time to complete:
/// minutes or even hours.
///
/// This QoS class provides the most energy and thermally-efficient execution possible.
/// All other work is prioritized over background tasks.
Background,
/// TLDR: tasks that dont block using your app
///
/// Contract:
///
/// * **Your app remains useful even as the task is executing.**
///
/// Examples:
///
/// * in a video editor:
/// exporting a video to disk
/// the user can still work on the timeline
/// * in a browser:
/// automatically extracting a downloaded zip file
/// the user can still switch tabs
/// * in a collaborative word processor:
/// downloading images embedded in a document
/// the user can still make edits
///
/// Use this QoS class for tasks which
/// may or may not be initiated by the user,
/// but whose result is visible.
/// It is expected that this work will take a few seconds to a few minutes.
/// Typically your app will include a progress bar
/// for tasks using this class.
///
/// This QoS class provides a balance between
/// performance, responsiveness and efficiency.
Utility,
/// TLDR: tasks that block using your app
///
/// Contract:
///
/// * **You need this work to complete
/// before the user can keep interacting with your app.**
/// * **Your work will not take more than a few seconds to complete.**
///
/// Examples:
///
/// * in a video editor:
/// opening a saved project
/// * in a browser:
/// loading a list of the users bookmarks and top sites
/// when a new tab is created
/// * in a collaborative word processor:
/// running a search on the documents content
///
/// Use this QoS class for tasks which were initiated by the user
/// and block the usage of your app while they are in progress.
/// It is expected that this work will take a few seconds or less to complete;
/// not long enough to cause the user to switch to something else.
/// Your app will likely indicate progress on these tasks
/// through the display of placeholder content or modals.
///
/// This QoS class is not energy-efficient.
/// Rather, it provides responsiveness
/// by prioritizing work above other tasks on the system
/// except for critical user-interactive work.
UserInitiated,
/// TLDR: render loops and nothing else
///
/// Contract:
///
/// * **You absolutely need this work to complete immediately
/// or your app will appear to freeze.**
/// * **Your work will always complete virtually instantaneously.**
///
/// Examples:
///
/// * the main thread in a GUI application
/// * the update & render loop in a game
/// * a secondary thread which progresses an animation
///
/// Use this QoS class for any work which, if delayed,
/// will make your user interface unresponsive.
/// It is expected that this work will be virtually instantaneous.
///
/// This QoS class is not energy-efficient.
/// Specifying this class is a request to run with
/// nearly all available system CPU and I/O bandwidth even under contention.
UserInteractive,
}
pub const IS_QOS_AVAILABLE: bool = imp::IS_QOS_AVAILABLE;
pub fn set_current_thread_qos_class(class: QoSClass) {
imp::set_current_thread_qos_class(class)
}
pub fn get_current_thread_qos_class() -> Option<QoSClass> {
imp::get_current_thread_qos_class()
}
// All Apple platforms use XNU as their kernel
// and thus have the concept of QoS.
#[cfg(target_vendor = "apple")]
mod imp {
use super::QoSClass;
pub(super) const IS_QOS_AVAILABLE: bool = true;
pub(super) fn set_current_thread_qos_class(class: QoSClass) {
let c = match class {
QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE,
QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED,
QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY,
QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND,
};
let code = unsafe { libc::pthread_set_qos_class_self_np(c, 0) };
if code == 0 {
return;
}
let errno = unsafe { *libc::__error() };
match errno {
libc::EPERM => {
// This thread has been excluded from the QoS system
// due to a previous call to a function such as `pthread_setschedparam`
// which is incompatible with QoS.
//
// Panic instead of returning an error
// to maintain the invariant that we only use QoS APIs.
panic!("tried to set QoS of thread which has opted out of QoS (os error {errno})")
}
libc::EINVAL => {
// This is returned if we pass something other than a qos_class_t
// to `pthread_set_qos_class_self_np`.
//
// This is impossible, so again panic.
unreachable!(
"invalid qos_class_t value was passed to pthread_set_qos_class_self_np"
)
}
_ => {
// `pthread_set_qos_class_self_np`s documentation
// does not mention any other errors.
unreachable!("`pthread_set_qos_class_self_np` returned unexpected error {errno}")
}
}
}
pub(super) fn get_current_thread_qos_class() -> Option<QoSClass> {
let current_thread = unsafe { libc::pthread_self() };
let mut qos_class_raw = libc::qos_class_t::QOS_CLASS_UNSPECIFIED;
let code = unsafe {
libc::pthread_get_qos_class_np(current_thread, &mut qos_class_raw, std::ptr::null_mut())
};
if code != 0 {
// `pthread_get_qos_class_np`s documentation states that
// an error value is placed into errno if the return code is not zero.
// However, it never states what errors are possible.
// Inspecting the source[0] shows that, as of this writing, it always returns zero.
//
// Whatever errors the function could report in future are likely to be
// ones which we cannot handle anyway
//
// 0: https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/src/qos.c#L171-L177
let errno = unsafe { *libc::__error() };
unreachable!("`pthread_get_qos_class_np` failed unexpectedly (os error {errno})");
}
match qos_class_raw {
libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE => Some(QoSClass::UserInteractive),
libc::qos_class_t::QOS_CLASS_USER_INITIATED => Some(QoSClass::UserInitiated),
libc::qos_class_t::QOS_CLASS_DEFAULT => None, // QoS has never been set
libc::qos_class_t::QOS_CLASS_UTILITY => Some(QoSClass::Utility),
libc::qos_class_t::QOS_CLASS_BACKGROUND => Some(QoSClass::Background),
libc::qos_class_t::QOS_CLASS_UNSPECIFIED => {
// Using manual scheduling APIs causes threads to “opt out” of QoS.
// At this point they become incompatible with QoS,
// and as such have the “unspecified” QoS class.
//
// Panic instead of returning an error
// to maintain the invariant that we only use QoS APIs.
panic!("tried to get QoS of thread which has opted out of QoS")
}
}
}
}
// FIXME: Windows has QoS APIs, we should use them!
#[cfg(not(target_vendor = "apple"))]
mod imp {
use super::QoSClass;
pub(super) const IS_QOS_AVAILABLE: bool = false;
pub(super) fn set_current_thread_qos_class(_: QoSClass) {}
pub(super) fn get_current_thread_qos_class() -> Option<QoSClass> {
None
}
}

View file

@ -0,0 +1,287 @@
//! An opaque façade around platform-specific QoS APIs.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
// Please maintain order from least to most priority for the derived `Ord` impl.
pub enum ThreadIntent {
/// Any thread which does work that isnt in the critical path of the user typing
/// (e.g. processing Go To Definition).
Worker,
/// Any thread which does work caused by the user typing
/// (e.g. processing syntax highlighting).
LatencySensitive,
}
impl ThreadIntent {
// These APIs must remain private;
// we only want consumers to set thread intent
// either during thread creation or using our pool impl.
pub(super) fn apply_to_current_thread(self) {
let class = thread_intent_to_qos_class(self);
set_current_thread_qos_class(class);
}
pub(super) fn assert_is_used_on_current_thread(self) {
if IS_QOS_AVAILABLE {
let class = thread_intent_to_qos_class(self);
assert_eq!(get_current_thread_qos_class(), Some(class));
}
}
}
use imp::QoSClass;
const IS_QOS_AVAILABLE: bool = imp::IS_QOS_AVAILABLE;
fn set_current_thread_qos_class(class: QoSClass) {
imp::set_current_thread_qos_class(class)
}
fn get_current_thread_qos_class() -> Option<QoSClass> {
imp::get_current_thread_qos_class()
}
fn thread_intent_to_qos_class(intent: ThreadIntent) -> QoSClass {
imp::thread_intent_to_qos_class(intent)
}
// All Apple platforms use XNU as their kernel
// and thus have the concept of QoS.
#[cfg(target_vendor = "apple")]
mod imp {
use super::ThreadIntent;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
// Please maintain order from least to most priority for the derived `Ord` impl.
pub(super) enum QoSClass {
// Documentation adapted from https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/include/sys/qos.h#L55
//
/// TLDR: invisible maintenance tasks
///
/// Contract:
///
/// * **You do not care about how long it takes for work to finish.**
/// * **You do not care about work being deferred temporarily.**
/// (e.g. if the devices battery is in a critical state)
///
/// Examples:
///
/// * in a video editor:
/// creating periodic backups of project files
/// * in a browser:
/// cleaning up cached sites which have not been accessed in a long time
/// * in a collaborative word processor:
/// creating a searchable index of all documents
///
/// Use this QoS class for background tasks
/// which the user did not initiate themselves
/// and which are invisible to the user.
/// It is expected that this work will take significant time to complete:
/// minutes or even hours.
///
/// This QoS class provides the most energy and thermally-efficient execution possible.
/// All other work is prioritized over background tasks.
Background,
/// TLDR: tasks that dont block using your app
///
/// Contract:
///
/// * **Your app remains useful even as the task is executing.**
///
/// Examples:
///
/// * in a video editor:
/// exporting a video to disk
/// the user can still work on the timeline
/// * in a browser:
/// automatically extracting a downloaded zip file
/// the user can still switch tabs
/// * in a collaborative word processor:
/// downloading images embedded in a document
/// the user can still make edits
///
/// Use this QoS class for tasks which
/// may or may not be initiated by the user,
/// but whose result is visible.
/// It is expected that this work will take a few seconds to a few minutes.
/// Typically your app will include a progress bar
/// for tasks using this class.
///
/// This QoS class provides a balance between
/// performance, responsiveness and efficiency.
Utility,
/// TLDR: tasks that block using your app
///
/// Contract:
///
/// * **You need this work to complete
/// before the user can keep interacting with your app.**
/// * **Your work will not take more than a few seconds to complete.**
///
/// Examples:
///
/// * in a video editor:
/// opening a saved project
/// * in a browser:
/// loading a list of the users bookmarks and top sites
/// when a new tab is created
/// * in a collaborative word processor:
/// running a search on the documents content
///
/// Use this QoS class for tasks which were initiated by the user
/// and block the usage of your app while they are in progress.
/// It is expected that this work will take a few seconds or less to complete;
/// not long enough to cause the user to switch to something else.
/// Your app will likely indicate progress on these tasks
/// through the display of placeholder content or modals.
///
/// This QoS class is not energy-efficient.
/// Rather, it provides responsiveness
/// by prioritizing work above other tasks on the system
/// except for critical user-interactive work.
UserInitiated,
/// TLDR: render loops and nothing else
///
/// Contract:
///
/// * **You absolutely need this work to complete immediately
/// or your app will appear to freeze.**
/// * **Your work will always complete virtually instantaneously.**
///
/// Examples:
///
/// * the main thread in a GUI application
/// * the update & render loop in a game
/// * a secondary thread which progresses an animation
///
/// Use this QoS class for any work which, if delayed,
/// will make your user interface unresponsive.
/// It is expected that this work will be virtually instantaneous.
///
/// This QoS class is not energy-efficient.
/// Specifying this class is a request to run with
/// nearly all available system CPU and I/O bandwidth even under contention.
UserInteractive,
}
pub(super) const IS_QOS_AVAILABLE: bool = true;
pub(super) fn set_current_thread_qos_class(class: QoSClass) {
let c = match class {
QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE,
QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED,
QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY,
QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND,
};
let code = unsafe { libc::pthread_set_qos_class_self_np(c, 0) };
if code == 0 {
return;
}
let errno = unsafe { *libc::__error() };
match errno {
libc::EPERM => {
// This thread has been excluded from the QoS system
// due to a previous call to a function such as `pthread_setschedparam`
// which is incompatible with QoS.
//
// Panic instead of returning an error
// to maintain the invariant that we only use QoS APIs.
panic!("tried to set QoS of thread which has opted out of QoS (os error {errno})")
}
libc::EINVAL => {
// This is returned if we pass something other than a qos_class_t
// to `pthread_set_qos_class_self_np`.
//
// This is impossible, so again panic.
unreachable!(
"invalid qos_class_t value was passed to pthread_set_qos_class_self_np"
)
}
_ => {
// `pthread_set_qos_class_self_np`s documentation
// does not mention any other errors.
unreachable!("`pthread_set_qos_class_self_np` returned unexpected error {errno}")
}
}
}
pub(super) fn get_current_thread_qos_class() -> Option<QoSClass> {
let current_thread = unsafe { libc::pthread_self() };
let mut qos_class_raw = libc::qos_class_t::QOS_CLASS_UNSPECIFIED;
let code = unsafe {
libc::pthread_get_qos_class_np(current_thread, &mut qos_class_raw, std::ptr::null_mut())
};
if code != 0 {
// `pthread_get_qos_class_np`s documentation states that
// an error value is placed into errno if the return code is not zero.
// However, it never states what errors are possible.
// Inspecting the source[0] shows that, as of this writing, it always returns zero.
//
// Whatever errors the function could report in future are likely to be
// ones which we cannot handle anyway
//
// 0: https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/src/qos.c#L171-L177
let errno = unsafe { *libc::__error() };
unreachable!("`pthread_get_qos_class_np` failed unexpectedly (os error {errno})");
}
match qos_class_raw {
libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE => Some(QoSClass::UserInteractive),
libc::qos_class_t::QOS_CLASS_USER_INITIATED => Some(QoSClass::UserInitiated),
libc::qos_class_t::QOS_CLASS_DEFAULT => None, // QoS has never been set
libc::qos_class_t::QOS_CLASS_UTILITY => Some(QoSClass::Utility),
libc::qos_class_t::QOS_CLASS_BACKGROUND => Some(QoSClass::Background),
libc::qos_class_t::QOS_CLASS_UNSPECIFIED => {
// Using manual scheduling APIs causes threads to “opt out” of QoS.
// At this point they become incompatible with QoS,
// and as such have the “unspecified” QoS class.
//
// Panic instead of returning an error
// to maintain the invariant that we only use QoS APIs.
panic!("tried to get QoS of thread which has opted out of QoS")
}
}
}
pub(super) fn thread_intent_to_qos_class(intent: ThreadIntent) -> QoSClass {
match intent {
ThreadIntent::Worker => QoSClass::Utility,
ThreadIntent::LatencySensitive => QoSClass::UserInitiated,
}
}
}
// FIXME: Windows has QoS APIs, we should use them!
#[cfg(not(target_vendor = "apple"))]
mod imp {
use super::ThreadIntent;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub(super) enum QoSClass {
Default,
}
pub(super) const IS_QOS_AVAILABLE: bool = false;
pub(super) fn set_current_thread_qos_class(_: QoSClass) {}
pub(super) fn get_current_thread_qos_class() -> Option<QoSClass> {
None
}
pub(super) fn thread_intent_to_qos_class(_: ThreadIntent) -> QoSClass {
QoSClass::Default
}
}

View file

@ -0,0 +1,92 @@
//! [`Pool`] implements a basic custom thread pool
//! inspired by the [`threadpool` crate](http://docs.rs/threadpool).
//! When you spawn a task you specify a thread intent
//! so the pool can schedule it to run on a thread with that intent.
//! rust-analyzer uses this to prioritize work based on latency requirements.
//!
//! The thread pool is implemented entirely using
//! the threading utilities in [`crate::thread`].
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use crossbeam_channel::{Receiver, Sender};
use super::{Builder, JoinHandle, ThreadIntent};
pub struct Pool {
// `_handles` is never read: the field is present
// only for its `Drop` impl.
// The worker threads exit once the channel closes;
// make sure to keep `job_sender` above `handles`
// so that the channel is actually closed
// before we join the worker threads!
job_sender: Sender<Job>,
_handles: Vec<JoinHandle>,
extant_tasks: Arc<AtomicUsize>,
}
struct Job {
requested_intent: ThreadIntent,
f: Box<dyn FnOnce() + Send + 'static>,
}
impl Pool {
pub fn new(threads: usize) -> Pool {
const STACK_SIZE: usize = 8 * 1024 * 1024;
const INITIAL_INTENT: ThreadIntent = ThreadIntent::Worker;
let (job_sender, job_receiver) = crossbeam_channel::unbounded();
let extant_tasks = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let handle = Builder::new(INITIAL_INTENT)
.stack_size(STACK_SIZE)
.name("Worker".into())
.spawn({
let extant_tasks = Arc::clone(&extant_tasks);
let job_receiver: Receiver<Job> = job_receiver.clone();
move || {
let mut current_intent = INITIAL_INTENT;
for job in job_receiver {
if job.requested_intent != current_intent {
job.requested_intent.apply_to_current_thread();
current_intent = job.requested_intent;
}
extant_tasks.fetch_add(1, Ordering::SeqCst);
(job.f)();
extant_tasks.fetch_sub(1, Ordering::SeqCst);
}
}
})
.expect("failed to spawn thread");
handles.push(handle);
}
Pool { _handles: handles, extant_tasks, job_sender }
}
pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
where
F: FnOnce() + Send + 'static,
{
let f = Box::new(move || {
if cfg!(debug_assertions) {
intent.assert_is_used_on_current_thread();
}
f()
});
let job = Job { requested_intent: intent, f };
self.job_sender.send(job).unwrap();
}
pub fn len(&self) -> usize {
self.extant_tasks.load(Ordering::SeqCst)
}
}

View file

@ -34,7 +34,7 @@ impl loader::Handle for NotifyHandle {
fn spawn(sender: loader::Sender) -> NotifyHandle {
let actor = NotifyActor::new(sender);
let (sender, receiver) = unbounded::<Message>();
let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker)
.name("VfsLoader".to_owned())
.spawn(move || actor.run(receiver))
.expect("failed to spawn thread");