Rollup merge of #145895 - RalfJung:unpark, r=joboet
thread parking: fix docs and examples Fixes https://github.com/rust-lang/rust/issues/145816 r? ```@joboet``` Cc ```@m-ou-se``` ```@Amanieu```
This commit is contained in:
commit
1037c082bc
4 changed files with 59 additions and 15 deletions
|
|
@ -276,7 +276,9 @@ fn wait(
|
|||
// If the managing thread happens to signal and unpark us before we
|
||||
// can park ourselves, the result could be this thread never gets
|
||||
// unparked. Luckily `park` comes with the guarantee that if it got
|
||||
// an `unpark` just before on an unparked thread it does not park.
|
||||
// an `unpark` just before on an unparked thread it does not park. Crucially, we know
|
||||
// the `unpark` must have happened between the `compare_exchange_weak` above and here,
|
||||
// and there's no other `park` in that code that could steal our token.
|
||||
// SAFETY: we retrieved this handle on the current thread above.
|
||||
unsafe { node.thread.park() }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1021,13 +1021,23 @@ impl Drop for PanicGuard {
|
|||
/// specifying a maximum time to block the thread for.
|
||||
///
|
||||
/// * The [`unpark`] method on a [`Thread`] atomically makes the token available
|
||||
/// if it wasn't already. Because the token is initially absent, [`unpark`]
|
||||
/// followed by [`park`] will result in the second call returning immediately.
|
||||
/// if it wasn't already. Because the token can be held by a thread even if it is currently not
|
||||
/// parked, [`unpark`] followed by [`park`] will result in the second call returning immediately.
|
||||
/// However, note that to rely on this guarantee, you need to make sure that your `unpark` happens
|
||||
/// after all `park` that may be done by other data structures!
|
||||
///
|
||||
/// The API is typically used by acquiring a handle to the current thread,
|
||||
/// placing that handle in a shared data structure so that other threads can
|
||||
/// find it, and then `park`ing in a loop. When some desired condition is met, another
|
||||
/// thread calls [`unpark`] on the handle.
|
||||
/// The API is typically used by acquiring a handle to the current thread, placing that handle in a
|
||||
/// shared data structure so that other threads can find it, and then `park`ing in a loop. When some
|
||||
/// desired condition is met, another thread calls [`unpark`] on the handle. The last bullet point
|
||||
/// above guarantees that even if the `unpark` occurs before the thread is finished `park`ing, it
|
||||
/// will be woken up properly.
|
||||
///
|
||||
/// Note that the coordination via the shared data structure is crucial: If you `unpark` a thread
|
||||
/// without first establishing that it is about to be `park`ing within your code, that `unpark` may
|
||||
/// get consumed by a *different* `park` in the same thread, leading to a deadlock. This also means
|
||||
/// you must not call unknown code between setting up for parking and calling `park`; for instance,
|
||||
/// if you invoke `println!`, that may itself call `park` and thus consume your `unpark` and cause a
|
||||
/// deadlock.
|
||||
///
|
||||
/// The motivation for this design is twofold:
|
||||
///
|
||||
|
|
@ -1058,21 +1068,24 @@ impl Drop for PanicGuard {
|
|||
///
|
||||
/// ```
|
||||
/// use std::thread;
|
||||
/// use std::sync::{Arc, atomic::{Ordering, AtomicBool}};
|
||||
/// use std::sync::atomic::{Ordering, AtomicBool};
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let flag = Arc::new(AtomicBool::new(false));
|
||||
/// let flag2 = Arc::clone(&flag);
|
||||
/// static QUEUED: AtomicBool = AtomicBool::new(false);
|
||||
/// static FLAG: AtomicBool = AtomicBool::new(false);
|
||||
///
|
||||
/// let parked_thread = thread::spawn(move || {
|
||||
/// println!("Thread spawned");
|
||||
/// // Signal that we are going to `park`. Between this store and our `park`, there may
|
||||
/// // be no other `park`, or else that `park` could consume our `unpark` token!
|
||||
/// QUEUED.store(true, Ordering::Release);
|
||||
/// // We want to wait until the flag is set. We *could* just spin, but using
|
||||
/// // park/unpark is more efficient.
|
||||
/// while !flag2.load(Ordering::Relaxed) {
|
||||
/// println!("Parking thread");
|
||||
/// while !FLAG.load(Ordering::Acquire) {
|
||||
/// // We can *not* use `println!` here since that could use thread parking internally.
|
||||
/// thread::park();
|
||||
/// // We *could* get here spuriously, i.e., way before the 10ms below are over!
|
||||
/// // But that is no problem, we are in a loop until the flag is set anyway.
|
||||
/// println!("Thread unparked");
|
||||
/// }
|
||||
/// println!("Flag received");
|
||||
/// });
|
||||
|
|
@ -1080,11 +1093,22 @@ impl Drop for PanicGuard {
|
|||
/// // Let some time pass for the thread to be spawned.
|
||||
/// thread::sleep(Duration::from_millis(10));
|
||||
///
|
||||
/// // Ensure the thread is about to park.
|
||||
/// // This is crucial! It guarantees that the `unpark` below is not consumed
|
||||
/// // by some other code in the parked thread (e.g. inside `println!`).
|
||||
/// while !QUEUED.load(Ordering::Acquire) {
|
||||
/// // Spinning is of course inefficient; in practice, this would more likely be
|
||||
/// // a dequeue where we have no work to do if there's nobody queued.
|
||||
/// std::hint::spin_loop();
|
||||
/// }
|
||||
///
|
||||
/// // Set the flag, and let the thread wake up.
|
||||
/// // There is no race condition here, if `unpark`
|
||||
/// // There is no race condition here: if `unpark`
|
||||
/// // happens first, `park` will return immediately.
|
||||
/// // There is also no other `park` that could consume this token,
|
||||
/// // since we waited until the other thread got queued.
|
||||
/// // Hence there is no risk of a deadlock.
|
||||
/// flag.store(true, Ordering::Relaxed);
|
||||
/// FLAG.store(true, Ordering::Release);
|
||||
/// println!("Unpark the thread");
|
||||
/// parked_thread.thread().unpark();
|
||||
///
|
||||
|
|
@ -1494,10 +1518,14 @@ impl Thread {
|
|||
/// ```
|
||||
/// use std::thread;
|
||||
/// use std::time::Duration;
|
||||
/// use std::sync::atomic::{AtomicBool, Ordering};
|
||||
///
|
||||
/// static QUEUED: AtomicBool = AtomicBool::new(false);
|
||||
///
|
||||
/// let parked_thread = thread::Builder::new()
|
||||
/// .spawn(|| {
|
||||
/// println!("Parking thread");
|
||||
/// QUEUED.store(true, Ordering::Release);
|
||||
/// thread::park();
|
||||
/// println!("Thread unparked");
|
||||
/// })
|
||||
|
|
@ -1506,6 +1534,15 @@ impl Thread {
|
|||
/// // Let some time pass for the thread to be spawned.
|
||||
/// thread::sleep(Duration::from_millis(10));
|
||||
///
|
||||
/// // Wait until the other thread is queued.
|
||||
/// // This is crucial! It guarantees that the `unpark` below is not consumed
|
||||
/// // by some other code in the parked thread (e.g. inside `println!`).
|
||||
/// while !QUEUED.load(Ordering::Acquire) {
|
||||
/// // Spinning is of course inefficient; in practice, this would more likely be
|
||||
/// // a dequeue where we have no work to do if there's nobody queued.
|
||||
/// std::hint::spin_loop();
|
||||
/// }
|
||||
///
|
||||
/// println!("Unpark the thread");
|
||||
/// parked_thread.thread().unpark();
|
||||
///
|
||||
|
|
|
|||
|
|
@ -287,6 +287,8 @@ fn test_park_unpark_called_other_thread() {
|
|||
for _ in 0..10 {
|
||||
let th = thread::current();
|
||||
|
||||
// Here we rely on `thread::spawn` (specifically the part that runs after spawning
|
||||
// the thread) to not consume the parking token.
|
||||
let _guard = thread::spawn(move || {
|
||||
super::sleep(Duration::from_millis(50));
|
||||
th.unpark();
|
||||
|
|
@ -316,6 +318,8 @@ fn test_park_timeout_unpark_called_other_thread() {
|
|||
for _ in 0..10 {
|
||||
let th = thread::current();
|
||||
|
||||
// Here we rely on `thread::spawn` (specifically the part that runs after spawning
|
||||
// the thread) to not consume the parking token.
|
||||
let _guard = thread::spawn(move || {
|
||||
super::sleep(Duration::from_millis(50));
|
||||
th.unpark();
|
||||
|
|
|
|||
|
|
@ -606,6 +606,7 @@ Definite bugs found:
|
|||
* [A bug in the new `RwLock::downgrade` implementation](https://rust-lang.zulipchat.com/#narrow/channel/269128-miri/topic/Miri.20error.20library.20test) (caught by Miri before it landed in the Rust repo)
|
||||
* [Mockall reading uninitialized memory when mocking `std::io::Read::read`, even if all expectations are satisfied](https://github.com/asomers/mockall/issues/647) (caught by Miri running Tokio's test suite)
|
||||
* [`ReentrantLock` not correctly dealing with reuse of addresses for TLS storage of different threads](https://github.com/rust-lang/rust/pull/141248)
|
||||
* [Rare Deadlock in the thread (un)parking example code](https://github.com/rust-lang/rust/issues/145816)
|
||||
|
||||
Violations of [Stacked Borrows] found that are likely bugs (but Stacked Borrows is currently just an experiment):
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue