diff --git a/Cargo.lock b/Cargo.lock index f9c5417ffb57..322a67383b03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1518,7 +1518,6 @@ dependencies = [ "syntax", "test-utils", "thiserror", - "threadpool", "tikv-jemallocator", "toolchain", "tracing", @@ -1712,6 +1711,7 @@ version = "0.0.0" dependencies = [ "always-assert", "backtrace", + "crossbeam-channel", "jod-thread", "libc", "miow", @@ -1823,15 +1823,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "tikv-jemalloc-ctl" version = "0.5.0" diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs index e40257c58f8a..fbb943ccb99d 100644 --- a/crates/flycheck/src/lib.rs +++ b/crates/flycheck/src/lib.rs @@ -90,7 +90,7 @@ impl FlycheckHandle { ) -> FlycheckHandle { let actor = FlycheckActor::new(id, sender, config, workspace_root); let (sender, receiver) = unbounded::(); - let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) + let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) .name("Flycheck".to_owned()) .spawn(move || actor.run(receiver)) .expect("failed to spawn thread"); @@ -409,7 +409,7 @@ impl CargoHandle { let (sender, receiver) = unbounded(); let actor = CargoActor::new(sender, stdout, stderr); - let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) + let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) .name("CargoHandle".to_owned()) .spawn(move || actor.run()) .expect("failed to spawn thread"); diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs index f049a225f077..d704d12a05b2 100644 --- a/crates/ide/src/prime_caches.rs +++ b/crates/ide/src/prime_caches.rs @@ -81,7 +81,7 @@ pub(crate) fn parallel_prime_caches( let worker = prime_caches_worker.clone(); let db = db.snapshot(); - stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) + stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) .allow_leak(true) .spawn(move || Cancelled::catch(|| worker(db))) .expect("failed to spawn thread"); diff --git a/crates/rust-analyzer/Cargo.toml b/crates/rust-analyzer/Cargo.toml index 3f795340b2f6..97bd92092060 100644 --- a/crates/rust-analyzer/Cargo.toml +++ b/crates/rust-analyzer/Cargo.toml @@ -31,7 +31,6 @@ oorandom = "11.1.3" rustc-hash = "1.1.0" serde_json = { workspace = true, features = ["preserve_order"] } serde.workspace = true -threadpool = "1.8.1" rayon = "1.6.1" num_cpus = "1.15.0" mimalloc = { version = "0.1.30", default-features = false, optional = true } diff --git a/crates/rust-analyzer/src/bin/main.rs b/crates/rust-analyzer/src/bin/main.rs index 3224aeae5645..91911dd18096 100644 --- a/crates/rust-analyzer/src/bin/main.rs +++ b/crates/rust-analyzer/src/bin/main.rs @@ -79,13 +79,15 @@ fn try_main(flags: flags::RustAnalyzer) -> Result<()> { return Ok(()); } - // rust-analyzer’s “main thread” is actually a secondary thread - // with an increased stack size at the User Initiated QoS class. - // We use this QoS class because any delay in the main loop + // rust-analyzer’s “main thread” is actually + // a secondary latency-sensitive thread with an increased stack size. + // We use this thread intent because any delay in the main loop // will make actions like hitting enter in the editor slow. - // rust-analyzer does not block the editor’s render loop, - // so we don’t use User Interactive. - with_extra_thread("LspServer", stdx::thread::QoSClass::UserInitiated, run_server)?; + with_extra_thread( + "LspServer", + stdx::thread::ThreadIntent::LatencySensitive, + run_server, + )?; } flags::RustAnalyzerCmd::Parse(cmd) => cmd.run()?, flags::RustAnalyzerCmd::Symbols(cmd) => cmd.run()?, @@ -143,10 +145,10 @@ const STACK_SIZE: usize = 1024 * 1024 * 8; /// space. fn with_extra_thread( thread_name: impl Into, - qos_class: stdx::thread::QoSClass, + thread_intent: stdx::thread::ThreadIntent, f: impl FnOnce() -> Result<()> + Send + 'static, ) -> Result<()> { - let handle = stdx::thread::Builder::new(qos_class) + let handle = stdx::thread::Builder::new(thread_intent) .name(thread_name.into()) .stack_size(STACK_SIZE) .spawn(f)?; diff --git a/crates/rust-analyzer/src/dispatch.rs b/crates/rust-analyzer/src/dispatch.rs index 313bb2ec8dff..ebe77b8dfe72 100644 --- a/crates/rust-analyzer/src/dispatch.rs +++ b/crates/rust-analyzer/src/dispatch.rs @@ -4,6 +4,7 @@ use std::{fmt, panic, thread}; use ide::Cancelled; use lsp_server::ExtractError; use serde::{de::DeserializeOwned, Serialize}; +use stdx::thread::ThreadIntent; use crate::{ global_state::{GlobalState, GlobalStateSnapshot}, @@ -87,7 +88,8 @@ impl<'a> RequestDispatcher<'a> { self } - /// Dispatches the request onto thread pool + /// Dispatches a non-latency-sensitive request onto the thread pool + /// without retrying it if it panics. pub(crate) fn on_no_retry( &mut self, f: fn(GlobalStateSnapshot, R::Params) -> Result, @@ -102,7 +104,7 @@ impl<'a> RequestDispatcher<'a> { None => return self, }; - self.global_state.task_pool.handle.spawn({ + self.global_state.task_pool.handle.spawn(ThreadIntent::Worker, { let world = self.global_state.snapshot(); move || { let result = panic::catch_unwind(move || { @@ -123,11 +125,49 @@ impl<'a> RequestDispatcher<'a> { self } - /// Dispatches the request onto thread pool + /// Dispatches a non-latency-sensitive request onto the thread pool. pub(crate) fn on( &mut self, f: fn(GlobalStateSnapshot, R::Params) -> Result, ) -> &mut Self + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug, + R::Result: Serialize, + { + self.on_with_thread_intent::(ThreadIntent::Worker, f) + } + + /// Dispatches a latency-sensitive request onto the thread pool. + pub(crate) fn on_latency_sensitive( + &mut self, + f: fn(GlobalStateSnapshot, R::Params) -> Result, + ) -> &mut Self + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug, + R::Result: Serialize, + { + self.on_with_thread_intent::(ThreadIntent::LatencySensitive, f) + } + + pub(crate) fn finish(&mut self) { + if let Some(req) = self.req.take() { + tracing::error!("unknown request: {:?}", req); + let response = lsp_server::Response::new_err( + req.id, + lsp_server::ErrorCode::MethodNotFound as i32, + "unknown request".to_string(), + ); + self.global_state.respond(response); + } + } + + fn on_with_thread_intent( + &mut self, + intent: ThreadIntent, + f: fn(GlobalStateSnapshot, R::Params) -> Result, + ) -> &mut Self where R: lsp_types::request::Request + 'static, R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug, @@ -138,7 +178,7 @@ impl<'a> RequestDispatcher<'a> { None => return self, }; - self.global_state.task_pool.handle.spawn({ + self.global_state.task_pool.handle.spawn(intent, { let world = self.global_state.snapshot(); move || { let result = panic::catch_unwind(move || { @@ -155,18 +195,6 @@ impl<'a> RequestDispatcher<'a> { self } - pub(crate) fn finish(&mut self) { - if let Some(req) = self.req.take() { - tracing::error!("unknown request: {:?}", req); - let response = lsp_server::Response::new_err( - req.id, - lsp_server::ErrorCode::MethodNotFound as i32, - "unknown request".to_string(), - ); - self.global_state.respond(response); - } - } - fn parse(&mut self) -> Option<(lsp_server::Request, R::Params, String)> where R: lsp_types::request::Request, diff --git a/crates/rust-analyzer/src/handlers/notification.rs b/crates/rust-analyzer/src/handlers/notification.rs index 7074ef018a13..09de6900c8fb 100644 --- a/crates/rust-analyzer/src/handlers/notification.rs +++ b/crates/rust-analyzer/src/handlers/notification.rs @@ -291,7 +291,7 @@ fn run_flycheck(state: &mut GlobalState, vfs_path: VfsPath) -> bool { } Ok(()) }; - state.task_pool.handle.spawn_with_sender(move |_| { + state.task_pool.handle.spawn_with_sender(stdx::thread::ThreadIntent::Worker, move |_| { if let Err(e) = std::panic::catch_unwind(task) { tracing::error!("flycheck task panicked: {e:?}") } diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index a28edde2f490..19c49a23000e 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -397,7 +397,7 @@ impl GlobalState { tracing::debug!(%cause, "will prime caches"); let num_worker_threads = self.config.prime_caches_num_threads(); - self.task_pool.handle.spawn_with_sender({ + self.task_pool.handle.spawn_with_sender(stdx::thread::ThreadIntent::Worker, { let analysis = self.snapshot().analysis; move |sender| { sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap(); @@ -678,7 +678,32 @@ impl GlobalState { .on_sync::(handlers::handle_selection_range) .on_sync::(handlers::handle_matching_brace) .on_sync::(handlers::handle_on_type_formatting) - // All other request handlers: + // We can’t run latency-sensitive request handlers which do semantic + // analysis on the main thread because that would block other + // requests. Instead, we run these request handlers on higher priority + // threads in the threadpool. + .on_latency_sensitive::(handlers::handle_completion) + .on_latency_sensitive::( + handlers::handle_completion_resolve, + ) + .on_latency_sensitive::( + handlers::handle_semantic_tokens_full, + ) + .on_latency_sensitive::( + handlers::handle_semantic_tokens_full_delta, + ) + .on_latency_sensitive::( + handlers::handle_semantic_tokens_range, + ) + // Formatting is not caused by the user typing, + // but it does qualify as latency-sensitive + // because a delay before formatting is applied + // can be confusing for the user. + .on_latency_sensitive::(handlers::handle_formatting) + .on_latency_sensitive::( + handlers::handle_range_formatting, + ) + // All other request handlers .on::(handlers::fetch_dependency_list) .on::(handlers::handle_analyzer_status) .on::(handlers::handle_syntax_tree) @@ -706,8 +731,6 @@ impl GlobalState { .on::(handlers::handle_goto_type_definition) .on_no_retry::(handlers::handle_inlay_hints) .on::(handlers::handle_inlay_hints_resolve) - .on::(handlers::handle_completion) - .on::(handlers::handle_completion_resolve) .on::(handlers::handle_code_lens) .on::(handlers::handle_code_lens_resolve) .on::(handlers::handle_folding_range) @@ -715,8 +738,6 @@ impl GlobalState { .on::(handlers::handle_prepare_rename) .on::(handlers::handle_rename) .on::(handlers::handle_references) - .on::(handlers::handle_formatting) - .on::(handlers::handle_range_formatting) .on::(handlers::handle_document_highlight) .on::(handlers::handle_call_hierarchy_prepare) .on::( @@ -725,15 +746,6 @@ impl GlobalState { .on::( handlers::handle_call_hierarchy_outgoing, ) - .on::( - handlers::handle_semantic_tokens_full, - ) - .on::( - handlers::handle_semantic_tokens_full_delta, - ) - .on::( - handlers::handle_semantic_tokens_range, - ) .on::(handlers::handle_will_rename_files) .on::(handlers::handle_ssr) .finish(); @@ -781,7 +793,10 @@ impl GlobalState { tracing::trace!("updating notifications for {:?}", subscriptions); let snapshot = self.snapshot(); - self.task_pool.handle.spawn(move || { + + // Diagnostics are triggered by the user typing + // so we run them on a latency sensitive thread. + self.task_pool.handle.spawn(stdx::thread::ThreadIntent::LatencySensitive, move || { let _p = profile::span("publish_diagnostics"); let diagnostics = subscriptions .into_iter() diff --git a/crates/rust-analyzer/src/reload.rs b/crates/rust-analyzer/src/reload.rs index 4e2948557382..6e8c8ea91a1b 100644 --- a/crates/rust-analyzer/src/reload.rs +++ b/crates/rust-analyzer/src/reload.rs @@ -27,7 +27,7 @@ use ide_db::{ use itertools::Itertools; use proc_macro_api::{MacroDylib, ProcMacroServer}; use project_model::{PackageRoot, ProjectWorkspace, WorkspaceBuildScripts}; -use stdx::format_to; +use stdx::{format_to, thread::ThreadIntent}; use syntax::SmolStr; use triomphe::Arc; use vfs::{file_set::FileSetConfig, AbsPath, AbsPathBuf, ChangeKind}; @@ -185,7 +185,7 @@ impl GlobalState { pub(crate) fn fetch_workspaces(&mut self, cause: Cause) { tracing::info!(%cause, "will fetch workspaces"); - self.task_pool.handle.spawn_with_sender({ + self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, { let linked_projects = self.config.linked_projects(); let detached_files = self.config.detached_files().to_vec(); let cargo_config = self.config.cargo(); @@ -260,7 +260,7 @@ impl GlobalState { tracing::info!(%cause, "will fetch build data"); let workspaces = Arc::clone(&self.workspaces); let config = self.config.cargo(); - self.task_pool.handle.spawn_with_sender(move |sender| { + self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |sender| { sender.send(Task::FetchBuildData(BuildDataProgress::Begin)).unwrap(); let progress = { @@ -280,7 +280,7 @@ impl GlobalState { let dummy_replacements = self.config.dummy_replacements().clone(); let proc_macro_clients = self.proc_macro_clients.clone(); - self.task_pool.handle.spawn_with_sender(move |sender| { + self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |sender| { sender.send(Task::LoadProcMacros(ProcMacroProgress::Begin)).unwrap(); let dummy_replacements = &dummy_replacements; diff --git a/crates/rust-analyzer/src/task_pool.rs b/crates/rust-analyzer/src/task_pool.rs index 0c5a4f305534..a5a10e86914e 100644 --- a/crates/rust-analyzer/src/task_pool.rs +++ b/crates/rust-analyzer/src/task_pool.rs @@ -1,76 +1,42 @@ -//! A thin wrapper around `ThreadPool` to make sure that we join all things -//! properly. -use std::sync::{Arc, Barrier}; +//! A thin wrapper around [`stdx::thread::Pool`] which threads a sender through spawned jobs. +//! It is used in [`crate::global_state::GlobalState`] throughout the main loop. use crossbeam_channel::Sender; +use stdx::thread::{Pool, ThreadIntent}; pub(crate) struct TaskPool { sender: Sender, - inner: threadpool::ThreadPool, + pool: Pool, } impl TaskPool { pub(crate) fn new_with_threads(sender: Sender, threads: usize) -> TaskPool { - const STACK_SIZE: usize = 8 * 1024 * 1024; - - let inner = threadpool::Builder::new() - .thread_name("Worker".into()) - .thread_stack_size(STACK_SIZE) - .num_threads(threads) - .build(); - - // Set QoS of all threads in threadpool. - let barrier = Arc::new(Barrier::new(threads + 1)); - for _ in 0..threads { - let barrier = barrier.clone(); - inner.execute(move || { - stdx::thread::set_current_thread_qos_class(stdx::thread::QoSClass::Utility); - barrier.wait(); - }); - } - barrier.wait(); - - TaskPool { sender, inner } + TaskPool { sender, pool: Pool::new(threads) } } - pub(crate) fn spawn(&mut self, task: F) + pub(crate) fn spawn(&mut self, intent: ThreadIntent, task: F) where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { - self.inner.execute({ + self.pool.spawn(intent, { let sender = self.sender.clone(); - move || { - if stdx::thread::IS_QOS_AVAILABLE { - debug_assert_eq!( - stdx::thread::get_current_thread_qos_class(), - Some(stdx::thread::QoSClass::Utility) - ); - } - - sender.send(task()).unwrap() - } + move || sender.send(task()).unwrap() }) } - pub(crate) fn spawn_with_sender(&mut self, task: F) + pub(crate) fn spawn_with_sender(&mut self, intent: ThreadIntent, task: F) where F: FnOnce(Sender) + Send + 'static, T: Send + 'static, { - self.inner.execute({ + self.pool.spawn(intent, { let sender = self.sender.clone(); move || task(sender) }) } pub(crate) fn len(&self) -> usize { - self.inner.queued_count() - } -} - -impl Drop for TaskPool { - fn drop(&mut self) { - self.inner.join() + self.pool.len() } } diff --git a/crates/rust-analyzer/tests/slow-tests/support.rs b/crates/rust-analyzer/tests/slow-tests/support.rs index 33d7f6576c3c..b2a8041ae9b5 100644 --- a/crates/rust-analyzer/tests/slow-tests/support.rs +++ b/crates/rust-analyzer/tests/slow-tests/support.rs @@ -165,7 +165,7 @@ impl Server { fn new(dir: TestDir, config: Config) -> Server { let (connection, client) = Connection::memory(); - let _thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) + let _thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) .name("test server".to_string()) .spawn(move || main_loop(config, connection).unwrap()) .expect("failed to spawn a thread"); diff --git a/crates/stdx/Cargo.toml b/crates/stdx/Cargo.toml index 986e3fcdcfc3..a67f36ae9006 100644 --- a/crates/stdx/Cargo.toml +++ b/crates/stdx/Cargo.toml @@ -16,6 +16,7 @@ libc = "0.2.135" backtrace = { version = "0.3.65", optional = true } always-assert = { version = "0.1.2", features = ["log"] } jod-thread = "0.1.2" +crossbeam-channel = "0.5.5" # Think twice before adding anything here [target.'cfg(windows)'.dependencies] diff --git a/crates/stdx/src/thread.rs b/crates/stdx/src/thread.rs index 5042f001435e..e577eb431371 100644 --- a/crates/stdx/src/thread.rs +++ b/crates/stdx/src/thread.rs @@ -1,36 +1,46 @@ //! A utility module for working with threads that automatically joins threads upon drop -//! and provides functionality for interfacing with operating system quality of service (QoS) APIs. +//! and abstracts over operating system quality of service (QoS) APIs +//! through the concept of a “thread intent”. +//! +//! The intent of a thread is frozen at thread creation time, +//! i.e. there is no API to change the intent of a thread once it has been spawned. //! //! As a system, rust-analyzer should have the property that //! old manual scheduling APIs are replaced entirely by QoS. //! To maintain this invariant, we panic when it is clear that //! old scheduling APIs have been used. //! -//! Moreover, we also want to ensure that every thread has a QoS set explicitly +//! Moreover, we also want to ensure that every thread has an intent set explicitly //! to force a decision about its importance to the system. -//! Thus, [`QoSClass`] has no default value -//! and every entry point to creating a thread requires a [`QoSClass`] upfront. +//! Thus, [`ThreadIntent`] has no default value +//! and every entry point to creating a thread requires a [`ThreadIntent`] upfront. use std::fmt; -pub fn spawn(qos_class: QoSClass, f: F) -> JoinHandle +mod intent; +mod pool; + +pub use intent::ThreadIntent; +pub use pool::Pool; + +pub fn spawn(intent: ThreadIntent, f: F) -> JoinHandle where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, { - Builder::new(qos_class).spawn(f).expect("failed to spawn thread") + Builder::new(intent).spawn(f).expect("failed to spawn thread") } pub struct Builder { - qos_class: QoSClass, + intent: ThreadIntent, inner: jod_thread::Builder, allow_leak: bool, } impl Builder { - pub fn new(qos_class: QoSClass) -> Builder { - Builder { qos_class, inner: jod_thread::Builder::new(), allow_leak: false } + pub fn new(intent: ThreadIntent) -> Builder { + Builder { intent, inner: jod_thread::Builder::new(), allow_leak: false } } pub fn name(self, name: String) -> Builder { @@ -52,7 +62,7 @@ impl Builder { T: Send + 'static, { let inner_handle = self.inner.spawn(move || { - set_current_thread_qos_class(self.qos_class); + self.intent.apply_to_current_thread(); f() })?; @@ -90,237 +100,3 @@ impl fmt::Debug for JoinHandle { f.pad("JoinHandle { .. }") } } - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -// Please maintain order from least to most priority for the derived `Ord` impl. -pub enum QoSClass { - // Documentation adapted from https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/include/sys/qos.h#L55 - // - /// TLDR: invisible maintenance tasks - /// - /// Contract: - /// - /// * **You do not care about how long it takes for work to finish.** - /// * **You do not care about work being deferred temporarily.** - /// (e.g. if the device’s battery is in a critical state) - /// - /// Examples: - /// - /// * in a video editor: - /// creating periodic backups of project files - /// * in a browser: - /// cleaning up cached sites which have not been accessed in a long time - /// * in a collaborative word processor: - /// creating a searchable index of all documents - /// - /// Use this QoS class for background tasks - /// which the user did not initiate themselves - /// and which are invisible to the user. - /// It is expected that this work will take significant time to complete: - /// minutes or even hours. - /// - /// This QoS class provides the most energy and thermally-efficient execution possible. - /// All other work is prioritized over background tasks. - Background, - - /// TLDR: tasks that don’t block using your app - /// - /// Contract: - /// - /// * **Your app remains useful even as the task is executing.** - /// - /// Examples: - /// - /// * in a video editor: - /// exporting a video to disk – - /// the user can still work on the timeline - /// * in a browser: - /// automatically extracting a downloaded zip file – - /// the user can still switch tabs - /// * in a collaborative word processor: - /// downloading images embedded in a document – - /// the user can still make edits - /// - /// Use this QoS class for tasks which - /// may or may not be initiated by the user, - /// but whose result is visible. - /// It is expected that this work will take a few seconds to a few minutes. - /// Typically your app will include a progress bar - /// for tasks using this class. - /// - /// This QoS class provides a balance between - /// performance, responsiveness and efficiency. - Utility, - - /// TLDR: tasks that block using your app - /// - /// Contract: - /// - /// * **You need this work to complete - /// before the user can keep interacting with your app.** - /// * **Your work will not take more than a few seconds to complete.** - /// - /// Examples: - /// - /// * in a video editor: - /// opening a saved project - /// * in a browser: - /// loading a list of the user’s bookmarks and top sites - /// when a new tab is created - /// * in a collaborative word processor: - /// running a search on the document’s content - /// - /// Use this QoS class for tasks which were initiated by the user - /// and block the usage of your app while they are in progress. - /// It is expected that this work will take a few seconds or less to complete; - /// not long enough to cause the user to switch to something else. - /// Your app will likely indicate progress on these tasks - /// through the display of placeholder content or modals. - /// - /// This QoS class is not energy-efficient. - /// Rather, it provides responsiveness - /// by prioritizing work above other tasks on the system - /// except for critical user-interactive work. - UserInitiated, - - /// TLDR: render loops and nothing else - /// - /// Contract: - /// - /// * **You absolutely need this work to complete immediately - /// or your app will appear to freeze.** - /// * **Your work will always complete virtually instantaneously.** - /// - /// Examples: - /// - /// * the main thread in a GUI application - /// * the update & render loop in a game - /// * a secondary thread which progresses an animation - /// - /// Use this QoS class for any work which, if delayed, - /// will make your user interface unresponsive. - /// It is expected that this work will be virtually instantaneous. - /// - /// This QoS class is not energy-efficient. - /// Specifying this class is a request to run with - /// nearly all available system CPU and I/O bandwidth even under contention. - UserInteractive, -} - -pub const IS_QOS_AVAILABLE: bool = imp::IS_QOS_AVAILABLE; - -pub fn set_current_thread_qos_class(class: QoSClass) { - imp::set_current_thread_qos_class(class) -} - -pub fn get_current_thread_qos_class() -> Option { - imp::get_current_thread_qos_class() -} - -// All Apple platforms use XNU as their kernel -// and thus have the concept of QoS. -#[cfg(target_vendor = "apple")] -mod imp { - use super::QoSClass; - - pub(super) const IS_QOS_AVAILABLE: bool = true; - - pub(super) fn set_current_thread_qos_class(class: QoSClass) { - let c = match class { - QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, - QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED, - QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY, - QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND, - }; - - let code = unsafe { libc::pthread_set_qos_class_self_np(c, 0) }; - - if code == 0 { - return; - } - - let errno = unsafe { *libc::__error() }; - - match errno { - libc::EPERM => { - // This thread has been excluded from the QoS system - // due to a previous call to a function such as `pthread_setschedparam` - // which is incompatible with QoS. - // - // Panic instead of returning an error - // to maintain the invariant that we only use QoS APIs. - panic!("tried to set QoS of thread which has opted out of QoS (os error {errno})") - } - - libc::EINVAL => { - // This is returned if we pass something other than a qos_class_t - // to `pthread_set_qos_class_self_np`. - // - // This is impossible, so again panic. - unreachable!( - "invalid qos_class_t value was passed to pthread_set_qos_class_self_np" - ) - } - - _ => { - // `pthread_set_qos_class_self_np`’s documentation - // does not mention any other errors. - unreachable!("`pthread_set_qos_class_self_np` returned unexpected error {errno}") - } - } - } - - pub(super) fn get_current_thread_qos_class() -> Option { - let current_thread = unsafe { libc::pthread_self() }; - let mut qos_class_raw = libc::qos_class_t::QOS_CLASS_UNSPECIFIED; - let code = unsafe { - libc::pthread_get_qos_class_np(current_thread, &mut qos_class_raw, std::ptr::null_mut()) - }; - - if code != 0 { - // `pthread_get_qos_class_np`’s documentation states that - // an error value is placed into errno if the return code is not zero. - // However, it never states what errors are possible. - // Inspecting the source[0] shows that, as of this writing, it always returns zero. - // - // Whatever errors the function could report in future are likely to be - // ones which we cannot handle anyway - // - // 0: https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/src/qos.c#L171-L177 - let errno = unsafe { *libc::__error() }; - unreachable!("`pthread_get_qos_class_np` failed unexpectedly (os error {errno})"); - } - - match qos_class_raw { - libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE => Some(QoSClass::UserInteractive), - libc::qos_class_t::QOS_CLASS_USER_INITIATED => Some(QoSClass::UserInitiated), - libc::qos_class_t::QOS_CLASS_DEFAULT => None, // QoS has never been set - libc::qos_class_t::QOS_CLASS_UTILITY => Some(QoSClass::Utility), - libc::qos_class_t::QOS_CLASS_BACKGROUND => Some(QoSClass::Background), - - libc::qos_class_t::QOS_CLASS_UNSPECIFIED => { - // Using manual scheduling APIs causes threads to “opt out” of QoS. - // At this point they become incompatible with QoS, - // and as such have the “unspecified” QoS class. - // - // Panic instead of returning an error - // to maintain the invariant that we only use QoS APIs. - panic!("tried to get QoS of thread which has opted out of QoS") - } - } - } -} - -// FIXME: Windows has QoS APIs, we should use them! -#[cfg(not(target_vendor = "apple"))] -mod imp { - use super::QoSClass; - - pub(super) const IS_QOS_AVAILABLE: bool = false; - - pub(super) fn set_current_thread_qos_class(_: QoSClass) {} - - pub(super) fn get_current_thread_qos_class() -> Option { - None - } -} diff --git a/crates/stdx/src/thread/intent.rs b/crates/stdx/src/thread/intent.rs new file mode 100644 index 000000000000..7b65db30cc5b --- /dev/null +++ b/crates/stdx/src/thread/intent.rs @@ -0,0 +1,287 @@ +//! An opaque façade around platform-specific QoS APIs. + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +// Please maintain order from least to most priority for the derived `Ord` impl. +pub enum ThreadIntent { + /// Any thread which does work that isn’t in the critical path of the user typing + /// (e.g. processing Go To Definition). + Worker, + + /// Any thread which does work caused by the user typing + /// (e.g. processing syntax highlighting). + LatencySensitive, +} + +impl ThreadIntent { + // These APIs must remain private; + // we only want consumers to set thread intent + // either during thread creation or using our pool impl. + + pub(super) fn apply_to_current_thread(self) { + let class = thread_intent_to_qos_class(self); + set_current_thread_qos_class(class); + } + + pub(super) fn assert_is_used_on_current_thread(self) { + if IS_QOS_AVAILABLE { + let class = thread_intent_to_qos_class(self); + assert_eq!(get_current_thread_qos_class(), Some(class)); + } + } +} + +use imp::QoSClass; + +const IS_QOS_AVAILABLE: bool = imp::IS_QOS_AVAILABLE; + +fn set_current_thread_qos_class(class: QoSClass) { + imp::set_current_thread_qos_class(class) +} + +fn get_current_thread_qos_class() -> Option { + imp::get_current_thread_qos_class() +} + +fn thread_intent_to_qos_class(intent: ThreadIntent) -> QoSClass { + imp::thread_intent_to_qos_class(intent) +} + +// All Apple platforms use XNU as their kernel +// and thus have the concept of QoS. +#[cfg(target_vendor = "apple")] +mod imp { + use super::ThreadIntent; + + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + // Please maintain order from least to most priority for the derived `Ord` impl. + pub(super) enum QoSClass { + // Documentation adapted from https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/include/sys/qos.h#L55 + // + /// TLDR: invisible maintenance tasks + /// + /// Contract: + /// + /// * **You do not care about how long it takes for work to finish.** + /// * **You do not care about work being deferred temporarily.** + /// (e.g. if the device’s battery is in a critical state) + /// + /// Examples: + /// + /// * in a video editor: + /// creating periodic backups of project files + /// * in a browser: + /// cleaning up cached sites which have not been accessed in a long time + /// * in a collaborative word processor: + /// creating a searchable index of all documents + /// + /// Use this QoS class for background tasks + /// which the user did not initiate themselves + /// and which are invisible to the user. + /// It is expected that this work will take significant time to complete: + /// minutes or even hours. + /// + /// This QoS class provides the most energy and thermally-efficient execution possible. + /// All other work is prioritized over background tasks. + Background, + + /// TLDR: tasks that don’t block using your app + /// + /// Contract: + /// + /// * **Your app remains useful even as the task is executing.** + /// + /// Examples: + /// + /// * in a video editor: + /// exporting a video to disk – + /// the user can still work on the timeline + /// * in a browser: + /// automatically extracting a downloaded zip file – + /// the user can still switch tabs + /// * in a collaborative word processor: + /// downloading images embedded in a document – + /// the user can still make edits + /// + /// Use this QoS class for tasks which + /// may or may not be initiated by the user, + /// but whose result is visible. + /// It is expected that this work will take a few seconds to a few minutes. + /// Typically your app will include a progress bar + /// for tasks using this class. + /// + /// This QoS class provides a balance between + /// performance, responsiveness and efficiency. + Utility, + + /// TLDR: tasks that block using your app + /// + /// Contract: + /// + /// * **You need this work to complete + /// before the user can keep interacting with your app.** + /// * **Your work will not take more than a few seconds to complete.** + /// + /// Examples: + /// + /// * in a video editor: + /// opening a saved project + /// * in a browser: + /// loading a list of the user’s bookmarks and top sites + /// when a new tab is created + /// * in a collaborative word processor: + /// running a search on the document’s content + /// + /// Use this QoS class for tasks which were initiated by the user + /// and block the usage of your app while they are in progress. + /// It is expected that this work will take a few seconds or less to complete; + /// not long enough to cause the user to switch to something else. + /// Your app will likely indicate progress on these tasks + /// through the display of placeholder content or modals. + /// + /// This QoS class is not energy-efficient. + /// Rather, it provides responsiveness + /// by prioritizing work above other tasks on the system + /// except for critical user-interactive work. + UserInitiated, + + /// TLDR: render loops and nothing else + /// + /// Contract: + /// + /// * **You absolutely need this work to complete immediately + /// or your app will appear to freeze.** + /// * **Your work will always complete virtually instantaneously.** + /// + /// Examples: + /// + /// * the main thread in a GUI application + /// * the update & render loop in a game + /// * a secondary thread which progresses an animation + /// + /// Use this QoS class for any work which, if delayed, + /// will make your user interface unresponsive. + /// It is expected that this work will be virtually instantaneous. + /// + /// This QoS class is not energy-efficient. + /// Specifying this class is a request to run with + /// nearly all available system CPU and I/O bandwidth even under contention. + UserInteractive, + } + + pub(super) const IS_QOS_AVAILABLE: bool = true; + + pub(super) fn set_current_thread_qos_class(class: QoSClass) { + let c = match class { + QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, + QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED, + QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY, + QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND, + }; + + let code = unsafe { libc::pthread_set_qos_class_self_np(c, 0) }; + + if code == 0 { + return; + } + + let errno = unsafe { *libc::__error() }; + + match errno { + libc::EPERM => { + // This thread has been excluded from the QoS system + // due to a previous call to a function such as `pthread_setschedparam` + // which is incompatible with QoS. + // + // Panic instead of returning an error + // to maintain the invariant that we only use QoS APIs. + panic!("tried to set QoS of thread which has opted out of QoS (os error {errno})") + } + + libc::EINVAL => { + // This is returned if we pass something other than a qos_class_t + // to `pthread_set_qos_class_self_np`. + // + // This is impossible, so again panic. + unreachable!( + "invalid qos_class_t value was passed to pthread_set_qos_class_self_np" + ) + } + + _ => { + // `pthread_set_qos_class_self_np`’s documentation + // does not mention any other errors. + unreachable!("`pthread_set_qos_class_self_np` returned unexpected error {errno}") + } + } + } + + pub(super) fn get_current_thread_qos_class() -> Option { + let current_thread = unsafe { libc::pthread_self() }; + let mut qos_class_raw = libc::qos_class_t::QOS_CLASS_UNSPECIFIED; + let code = unsafe { + libc::pthread_get_qos_class_np(current_thread, &mut qos_class_raw, std::ptr::null_mut()) + }; + + if code != 0 { + // `pthread_get_qos_class_np`’s documentation states that + // an error value is placed into errno if the return code is not zero. + // However, it never states what errors are possible. + // Inspecting the source[0] shows that, as of this writing, it always returns zero. + // + // Whatever errors the function could report in future are likely to be + // ones which we cannot handle anyway + // + // 0: https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/src/qos.c#L171-L177 + let errno = unsafe { *libc::__error() }; + unreachable!("`pthread_get_qos_class_np` failed unexpectedly (os error {errno})"); + } + + match qos_class_raw { + libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE => Some(QoSClass::UserInteractive), + libc::qos_class_t::QOS_CLASS_USER_INITIATED => Some(QoSClass::UserInitiated), + libc::qos_class_t::QOS_CLASS_DEFAULT => None, // QoS has never been set + libc::qos_class_t::QOS_CLASS_UTILITY => Some(QoSClass::Utility), + libc::qos_class_t::QOS_CLASS_BACKGROUND => Some(QoSClass::Background), + + libc::qos_class_t::QOS_CLASS_UNSPECIFIED => { + // Using manual scheduling APIs causes threads to “opt out” of QoS. + // At this point they become incompatible with QoS, + // and as such have the “unspecified” QoS class. + // + // Panic instead of returning an error + // to maintain the invariant that we only use QoS APIs. + panic!("tried to get QoS of thread which has opted out of QoS") + } + } + } + + pub(super) fn thread_intent_to_qos_class(intent: ThreadIntent) -> QoSClass { + match intent { + ThreadIntent::Worker => QoSClass::Utility, + ThreadIntent::LatencySensitive => QoSClass::UserInitiated, + } + } +} + +// FIXME: Windows has QoS APIs, we should use them! +#[cfg(not(target_vendor = "apple"))] +mod imp { + use super::ThreadIntent; + + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub(super) enum QoSClass { + Default, + } + + pub(super) const IS_QOS_AVAILABLE: bool = false; + + pub(super) fn set_current_thread_qos_class(_: QoSClass) {} + + pub(super) fn get_current_thread_qos_class() -> Option { + None + } + + pub(super) fn thread_intent_to_qos_class(_: ThreadIntent) -> QoSClass { + QoSClass::Default + } +} diff --git a/crates/stdx/src/thread/pool.rs b/crates/stdx/src/thread/pool.rs new file mode 100644 index 000000000000..2ddd7da74c29 --- /dev/null +++ b/crates/stdx/src/thread/pool.rs @@ -0,0 +1,92 @@ +//! [`Pool`] implements a basic custom thread pool +//! inspired by the [`threadpool` crate](http://docs.rs/threadpool). +//! When you spawn a task you specify a thread intent +//! so the pool can schedule it to run on a thread with that intent. +//! rust-analyzer uses this to prioritize work based on latency requirements. +//! +//! The thread pool is implemented entirely using +//! the threading utilities in [`crate::thread`]. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use crossbeam_channel::{Receiver, Sender}; + +use super::{Builder, JoinHandle, ThreadIntent}; + +pub struct Pool { + // `_handles` is never read: the field is present + // only for its `Drop` impl. + + // The worker threads exit once the channel closes; + // make sure to keep `job_sender` above `handles` + // so that the channel is actually closed + // before we join the worker threads! + job_sender: Sender, + _handles: Vec, + extant_tasks: Arc, +} + +struct Job { + requested_intent: ThreadIntent, + f: Box, +} + +impl Pool { + pub fn new(threads: usize) -> Pool { + const STACK_SIZE: usize = 8 * 1024 * 1024; + const INITIAL_INTENT: ThreadIntent = ThreadIntent::Worker; + + let (job_sender, job_receiver) = crossbeam_channel::unbounded(); + let extant_tasks = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::with_capacity(threads); + for _ in 0..threads { + let handle = Builder::new(INITIAL_INTENT) + .stack_size(STACK_SIZE) + .name("Worker".into()) + .spawn({ + let extant_tasks = Arc::clone(&extant_tasks); + let job_receiver: Receiver = job_receiver.clone(); + move || { + let mut current_intent = INITIAL_INTENT; + for job in job_receiver { + if job.requested_intent != current_intent { + job.requested_intent.apply_to_current_thread(); + current_intent = job.requested_intent; + } + extant_tasks.fetch_add(1, Ordering::SeqCst); + (job.f)(); + extant_tasks.fetch_sub(1, Ordering::SeqCst); + } + } + }) + .expect("failed to spawn thread"); + + handles.push(handle); + } + + Pool { _handles: handles, extant_tasks, job_sender } + } + + pub fn spawn(&self, intent: ThreadIntent, f: F) + where + F: FnOnce() + Send + 'static, + { + let f = Box::new(move || { + if cfg!(debug_assertions) { + intent.assert_is_used_on_current_thread(); + } + f() + }); + + let job = Job { requested_intent: intent, f }; + self.job_sender.send(job).unwrap(); + } + + pub fn len(&self) -> usize { + self.extant_tasks.load(Ordering::SeqCst) + } +} diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index 26f7a9fc4235..abfc51dfec66 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs @@ -34,7 +34,7 @@ impl loader::Handle for NotifyHandle { fn spawn(sender: loader::Sender) -> NotifyHandle { let actor = NotifyActor::new(sender); let (sender, receiver) = unbounded::(); - let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) + let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) .name("VfsLoader".to_owned()) .spawn(move || actor.run(receiver)) .expect("failed to spawn thread");