Auto merge of #54339 - cramertj:no-cx, r=aturon

Remove spawning from task::Context

r? @aturon

cc https://github.com/rust-lang-nursery/wg-net/issues/56
This commit is contained in:
bors 2018-09-23 10:09:22 +00:00
commit 2287a7a6e2
13 changed files with 93 additions and 568 deletions

View file

@ -60,7 +60,7 @@ use core::borrow;
use core::cmp::Ordering;
use core::convert::From;
use core::fmt;
use core::future::{Future, FutureObj, LocalFutureObj, UnsafeFutureObj};
use core::future::Future;
use core::hash::{Hash, Hasher};
use core::iter::FusedIterator;
use core::marker::{Unpin, Unsize};
@ -68,7 +68,7 @@ use core::mem;
use core::pin::Pin;
use core::ops::{CoerceUnsized, Deref, DerefMut, Generator, GeneratorState};
use core::ptr::{self, NonNull, Unique};
use core::task::{Context, Poll, Spawn, SpawnErrorKind, SpawnObjError};
use core::task::{LocalWaker, Poll};
use raw_vec::RawVec;
use str::from_boxed_utf8_unchecked;
@ -804,70 +804,7 @@ impl<T> Generator for Box<T>
impl<F: ?Sized + Future + Unpin> Future for Box<F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
F::poll(Pin::new(&mut *self), cx)
}
}
#[unstable(feature = "futures_api", issue = "50547")]
unsafe impl<'a, T, F> UnsafeFutureObj<'a, T> for Box<F>
where F: Future<Output = T> + 'a
{
fn into_raw(self) -> *mut () {
Box::into_raw(self) as *mut ()
}
unsafe fn poll(ptr: *mut (), cx: &mut Context) -> Poll<T> {
let ptr = ptr as *mut F;
let pin: Pin<&mut F> = Pin::new_unchecked(&mut *ptr);
F::poll(pin, cx)
}
unsafe fn drop(ptr: *mut ()) {
drop(Box::from_raw(ptr as *mut F))
}
}
#[unstable(feature = "futures_api", issue = "50547")]
impl<Sp> Spawn for Box<Sp>
where Sp: Spawn + ?Sized
{
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnObjError> {
(**self).spawn_obj(future)
}
fn status(&self) -> Result<(), SpawnErrorKind> {
(**self).status()
}
}
#[unstable(feature = "futures_api", issue = "50547")]
impl<'a, F: Future<Output = ()> + Send + 'a> From<Box<F>> for FutureObj<'a, ()> {
fn from(boxed: Box<F>) -> Self {
FutureObj::new(boxed)
}
}
#[unstable(feature = "futures_api", issue = "50547")]
impl<'a, F: Future<Output = ()> + 'a> From<Box<F>> for LocalFutureObj<'a, ()> {
fn from(boxed: Box<F>) -> Self {
LocalFutureObj::new(boxed)
}
}
#[unstable(feature = "futures_api", issue = "50547")]
impl<'a, F: Future<Output = ()> + Send + 'a> From<Pin<Box<F>>> for FutureObj<'a, ()> {
fn from(boxed: Pin<Box<F>>) -> Self {
FutureObj::new(boxed)
}
}
#[unstable(feature = "futures_api", issue = "50547")]
impl<'a, F: Future<Output = ()> + 'a> From<Pin<Box<F>>> for LocalFutureObj<'a, ()> {
fn from(boxed: Pin<Box<F>>) -> Self {
LocalFutureObj::new(boxed)
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
F::poll(Pin::new(&mut *self), lw)
}
}

View file

@ -15,7 +15,7 @@
use marker::Unpin;
use ops;
use pin::Pin;
use task::{self, Poll};
use task::{Poll, LocalWaker};
/// A future represents an asychronous computation.
///
@ -50,18 +50,18 @@ pub trait Future {
///
/// Once a future has finished, clients should not `poll` it again.
///
/// When a future is not ready yet, `poll` returns
/// `Poll::Pending`. The future will *also* register the
/// interest of the current task in the value being produced. For example,
/// if the future represents the availability of data on a socket, then the
/// task is recorded so that when data arrives, it is woken up (via
/// [`cx.waker()`]). Once a task has been woken up,
/// it should attempt to `poll` the future again, which may or may not
/// produce a final value.
/// When a future is not ready yet, `poll` returns `Poll::Pending` and
/// stores a clone of the [`LocalWaker`] to be woken once the future can
/// make progress. For example, a future waiting for a socket to become
/// readable would call `.clone()` on the [`LocalWaker`] and store it.
/// When a signal arrives elsewhere indicating that the socket is readable,
/// `[LocalWaker::wake]` is called and the socket future's task is awoken.
/// Once a task has been woken up, it should attempt to `poll` the future
/// again, which may or may not produce a final value.
///
/// Note that if `Pending` is returned it only means that the *current* task
/// (represented by the argument `cx`) will receive a notification. Tasks
/// from previous calls to `poll` will *not* receive notifications.
/// Note that on multiple calls to `poll`, only the most recent
/// [`LocalWaker`] passed to `poll` should be scheduled to receive a
/// wakeup.
///
/// # Runtime characteristics
///
@ -69,9 +69,9 @@ pub trait Future {
/// progress, meaning that each time the current task is woken up, it should
/// actively re-`poll` pending futures that it still has an interest in.
///
/// The `poll` function is not called repeatedly in a tight loop for
/// futures, but only whenever the future itself is ready, as signaled via
/// the `Waker` inside `task::Context`. If you're familiar with the
/// The `poll` function is not called repeatedly in a tight loop-- instead,
/// it should only be called when the future indicates that it is ready to
/// make progress (by calling `wake()`). If you're familiar with the
/// `poll(2)` or `select(2)` syscalls on Unix it's worth noting that futures
/// typically do *not* suffer the same problems of "all wakeups must poll
/// all events"; they are more like `epoll(4)`.
@ -83,6 +83,16 @@ pub trait Future {
/// thread pool (or something similar) to ensure that `poll` can return
/// quickly.
///
/// # [`LocalWaker`], [`Waker`] and thread-safety
///
/// The `poll` function takes a [`LocalWaker`], an object which knows how to
/// awaken the current task. [`LocalWaker`] is not `Send` nor `Sync`, so in
/// order to make thread-safe futures the [`LocalWaker::into_waker`] method
/// should be used to convert the [`LocalWaker`] into a thread-safe version.
/// [`LocalWaker::wake`] implementations have the ability to be more
/// efficient, however, so when thread safety is not necessary,
/// [`LocalWaker`] should be preferred.
///
/// # Panics
///
/// Once a future has completed (returned `Ready` from `poll`),
@ -92,15 +102,18 @@ pub trait Future {
///
/// [`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
/// [`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
/// [`cx.waker()`]: ../task/struct.Context.html#method.waker
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output>;
/// [`LocalWaker`]: ../task/struct.LocalWaker.html
/// [`LocalWaker::into_waker`]: ../task/struct.LocalWaker.html#method.into_waker
/// [`LocalWaker::wake`]: ../task/struct.LocalWaker.html#method.wake
/// [`Waker`]: ../task/struct.Waker.html
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output>;
}
impl<'a, F: ?Sized + Future + Unpin> Future for &'a mut F {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
F::poll(Pin::new(&mut **self), cx)
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
F::poll(Pin::new(&mut **self), lw)
}
}
@ -111,7 +124,7 @@ where
{
type Output = <<P as ops::Deref>::Target as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(cx)
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(lw)
}
}

View file

@ -1,203 +0,0 @@
// Copyright 2018 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable(feature = "futures_api",
reason = "futures in libcore are unstable",
issue = "50547")]
use fmt;
use future::Future;
use marker::{PhantomData, Unpin};
use ops;
use pin::Pin;
use task::{Context, Poll};
/// A custom trait object for polling futures, roughly akin to
/// `Box<dyn Future<Output = T> + 'a>`.
///
/// This custom trait object was introduced for two reasons:
/// - Currently it is not possible to take `dyn Trait` by value and
/// `Box<dyn Trait>` is not available in no_std contexts.
/// - The `Future` trait is currently not object safe: The `Future::poll`
/// method makes uses the arbitrary self types feature and traits in which
/// this feature is used are currently not object safe due to current compiler
/// limitations. (See tracking issue for arbitrary self types for more
/// information #44874)
pub struct LocalFutureObj<'a, T> {
ptr: *mut (),
poll_fn: unsafe fn(*mut (), &mut Context) -> Poll<T>,
drop_fn: unsafe fn(*mut ()),
_marker: PhantomData<&'a ()>,
}
impl<'a, T> Unpin for LocalFutureObj<'a, T> {}
impl<'a, T> LocalFutureObj<'a, T> {
/// Create a `LocalFutureObj` from a custom trait object representation.
#[inline]
pub fn new<F: UnsafeFutureObj<'a, T> + 'a>(f: F) -> LocalFutureObj<'a, T> {
LocalFutureObj {
ptr: f.into_raw(),
poll_fn: F::poll,
drop_fn: F::drop,
_marker: PhantomData,
}
}
/// Converts the `LocalFutureObj` into a `FutureObj`
/// To make this operation safe one has to ensure that the `UnsafeFutureObj`
/// instance from which this `LocalFutureObj` was created actually
/// implements `Send`.
#[inline]
pub unsafe fn into_future_obj(self) -> FutureObj<'a, T> {
FutureObj(self)
}
}
impl<'a, T> fmt::Debug for LocalFutureObj<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("LocalFutureObj")
.finish()
}
}
impl<'a, T> From<FutureObj<'a, T>> for LocalFutureObj<'a, T> {
#[inline]
fn from(f: FutureObj<'a, T>) -> LocalFutureObj<'a, T> {
f.0
}
}
impl<'a, T> Future for LocalFutureObj<'a, T> {
type Output = T;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<T> {
unsafe {
((*self).poll_fn)((*self).ptr, cx)
}
}
}
impl<'a, T> Drop for LocalFutureObj<'a, T> {
fn drop(&mut self) {
unsafe {
(self.drop_fn)(self.ptr)
}
}
}
/// A custom trait object for polling futures, roughly akin to
/// `Box<dyn Future<Output = T> + Send + 'a>`.
///
/// This custom trait object was introduced for two reasons:
/// - Currently it is not possible to take `dyn Trait` by value and
/// `Box<dyn Trait>` is not available in no_std contexts.
/// - The `Future` trait is currently not object safe: The `Future::poll`
/// method makes uses the arbitrary self types feature and traits in which
/// this feature is used are currently not object safe due to current compiler
/// limitations. (See tracking issue for arbitrary self types for more
/// information #44874)
pub struct FutureObj<'a, T>(LocalFutureObj<'a, T>);
impl<'a, T> Unpin for FutureObj<'a, T> {}
unsafe impl<'a, T> Send for FutureObj<'a, T> {}
impl<'a, T> FutureObj<'a, T> {
/// Create a `FutureObj` from a custom trait object representation.
#[inline]
pub fn new<F: UnsafeFutureObj<'a, T> + Send>(f: F) -> FutureObj<'a, T> {
FutureObj(LocalFutureObj::new(f))
}
}
impl<'a, T> fmt::Debug for FutureObj<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FutureObj")
.finish()
}
}
impl<'a, T> Future for FutureObj<'a, T> {
type Output = T;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<T> {
let pinned_field: Pin<&mut LocalFutureObj<'a, T>> = unsafe {
Pin::map_unchecked_mut(self, |x| &mut x.0)
};
LocalFutureObj::poll(pinned_field, cx)
}
}
/// A custom implementation of a future trait object for `FutureObj`, providing
/// a hand-rolled vtable.
///
/// This custom representation is typically used only in `no_std` contexts,
/// where the default `Box`-based implementation is not available.
///
/// The implementor must guarantee that it is safe to call `poll` repeatedly (in
/// a non-concurrent fashion) with the result of `into_raw` until `drop` is
/// called.
pub unsafe trait UnsafeFutureObj<'a, T>: 'a {
/// Convert an owned instance into a (conceptually owned) void pointer.
fn into_raw(self) -> *mut ();
/// Poll the future represented by the given void pointer.
///
/// # Safety
///
/// The trait implementor must guarantee that it is safe to repeatedly call
/// `poll` with the result of `into_raw` until `drop` is called; such calls
/// are not, however, allowed to race with each other or with calls to
/// `drop`.
unsafe fn poll(ptr: *mut (), cx: &mut Context) -> Poll<T>;
/// Drops the future represented by the given void pointer.
///
/// # Safety
///
/// The trait implementor must guarantee that it is safe to call this
/// function once per `into_raw` invocation; that call cannot race with
/// other calls to `drop` or `poll`.
unsafe fn drop(ptr: *mut ());
}
unsafe impl<'a, T, F> UnsafeFutureObj<'a, T> for &'a mut F
where F: Future<Output = T> + Unpin + 'a
{
fn into_raw(self) -> *mut () {
self as *mut F as *mut ()
}
unsafe fn poll(ptr: *mut (), cx: &mut Context) -> Poll<T> {
let p: Pin<&mut F> = Pin::new_unchecked(&mut *(ptr as *mut F));
F::poll(p, cx)
}
unsafe fn drop(_ptr: *mut ()) {}
}
#[unstable(feature = "futures_api", issue = "50547")]
unsafe impl<'a, T, P, F> UnsafeFutureObj<'a, T> for Pin<P> where
P: ops::DerefMut<Target = F> + 'a,
F: Future<Output = T> + 'a,
{
fn into_raw(mut self) -> *mut () {
unsafe { Pin::get_mut_unchecked(Pin::as_mut(&mut self)) as *mut F as *mut () }
}
unsafe fn poll(ptr: *mut (), cx: &mut Context) -> Poll<T> {
let future: Pin<&mut F> = Pin::new_unchecked(&mut *(ptr as *mut F));
F::poll(future, cx)
}
unsafe fn drop(_ptr: *mut ()) {}
}

View file

@ -16,6 +16,3 @@
mod future;
pub use self::future::Future;
mod future_obj;
pub use self::future_obj::{FutureObj, LocalFutureObj, UnsafeFutureObj};

View file

@ -1,98 +0,0 @@
// Copyright 2018 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable(feature = "futures_api",
reason = "futures in libcore are unstable",
issue = "50547")]
use fmt;
use super::{Spawn, Waker, LocalWaker};
/// Information about the currently-running task.
///
/// Contexts are always tied to the stack, since they are set up specifically
/// when performing a single `poll` step on a task.
pub struct Context<'a> {
local_waker: &'a LocalWaker,
spawner: &'a mut dyn Spawn,
}
impl<'a> fmt::Debug for Context<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Context")
.finish()
}
}
impl<'a> Context<'a> {
/// Create a new task `Context` with the provided `local_waker`, `waker`,
/// and `spawner`.
#[inline]
pub fn new(
local_waker: &'a LocalWaker,
spawner: &'a mut dyn Spawn,
) -> Context<'a> {
Context { local_waker, spawner }
}
/// Get the `LocalWaker` associated with the current task.
#[inline]
pub fn local_waker(&self) -> &'a LocalWaker {
self.local_waker
}
/// Get the `Waker` associated with the current task.
#[inline]
pub fn waker(&self) -> &'a Waker {
unsafe { &*(self.local_waker as *const LocalWaker as *const Waker) }
}
/// Get the spawner associated with this task.
///
/// This method is useful primarily if you want to explicitly handle
/// spawn failures.
#[inline]
pub fn spawner(&mut self) -> &mut dyn Spawn {
self.spawner
}
/// Produce a context like the current one, but using the given waker
/// instead.
///
/// This advanced method is primarily used when building "internal
/// schedulers" within a task, where you want to provide some customized
/// wakeup logic.
#[inline]
pub fn with_waker<'b>(
&'b mut self,
local_waker: &'b LocalWaker,
) -> Context<'b> {
Context {
local_waker,
spawner: self.spawner,
}
}
/// Produce a context like the current one, but using the given spawner
/// instead.
///
/// This advanced method is primarily used when building "internal
/// schedulers" within a task.
#[inline]
pub fn with_spawner<'b, Sp: Spawn>(
&'b mut self,
spawner: &'b mut Sp,
) -> Context<'b> {
Context {
local_waker: self.local_waker,
spawner,
}
}
}

View file

@ -14,12 +14,6 @@
//! Types and Traits for working with asynchronous tasks.
mod context;
pub use self::context::Context;
mod spawn;
pub use self::spawn::{Spawn, SpawnErrorKind, SpawnObjError, SpawnLocalObjError};
mod poll;
pub use self::poll::Poll;

View file

@ -1,93 +0,0 @@
// Copyright 2018 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable(feature = "futures_api",
reason = "futures in libcore are unstable",
issue = "50547")]
use fmt;
use future::{FutureObj, LocalFutureObj};
/// Spawns tasks that poll futures to completion onto its associated task
/// executor.
///
/// The term "task" refers to a kind of lightweight "thread". Task executors
/// are responsible for scheduling the execution of tasks on operating system
/// threads.
pub trait Spawn {
/// Spawns a new task with the given future. The future will be polled until
/// completion.
///
/// # Errors
///
/// The executor may be unable to spawn tasks, either because it has
/// been shut down or is resource-constrained.
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnObjError>;
/// Determines whether the executor is able to spawn new tasks.
///
/// # Returns
///
/// An `Ok` return means the executor is *likely* (but not guaranteed)
/// to accept a subsequent spawn attempt. Likewise, an `Err` return
/// means that `spawn` is likely, but not guaranteed, to yield an error.
#[inline]
fn status(&self) -> Result<(), SpawnErrorKind> {
Ok(())
}
}
/// Provides the reason that an executor was unable to spawn.
pub struct SpawnErrorKind {
_hidden: (),
}
impl fmt::Debug for SpawnErrorKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("SpawnErrorKind")
.field(&"shutdown")
.finish()
}
}
impl SpawnErrorKind {
/// Spawning is failing because the executor has been shut down.
pub fn shutdown() -> SpawnErrorKind {
SpawnErrorKind { _hidden: () }
}
/// Check whether this error is the `shutdown` error.
pub fn is_shutdown(&self) -> bool {
true
}
}
/// The result of a failed spawn
#[derive(Debug)]
pub struct SpawnObjError {
/// The kind of error
pub kind: SpawnErrorKind,
/// The future for which spawning inside a task was attempted
pub future: FutureObj<'static, ()>,
}
/// The result of a failed spawn
#[derive(Debug)]
pub struct SpawnLocalObjError {
/// The kind of error
pub kind: SpawnErrorKind,
/// The future for which spawning inside a task was attempted
pub future: LocalFutureObj<'static, ()>,
}

View file

@ -123,6 +123,15 @@ impl LocalWaker {
LocalWaker { inner }
}
/// Converts this `LocalWaker` into a `Waker`.
///
/// `Waker` is nearly identical to `LocalWaker`, but is threadsafe
/// (implements `Send` and `Sync`).
#[inline]
pub fn into_waker(self) -> Waker {
self.into()
}
/// Wake up the task associated with this `LocalWaker`.
#[inline]
pub fn wake(&self) {

View file

@ -15,7 +15,7 @@ use core::marker::Unpin;
use core::pin::Pin;
use core::option::Option;
use core::ptr::NonNull;
use core::task::{self, Poll};
use core::task::{LocalWaker, Poll};
use core::ops::{Drop, Generator, GeneratorState};
#[doc(inline)]
@ -42,8 +42,8 @@ impl<T: Generator<Yield = ()>> !Unpin for GenFuture<T> {}
#[unstable(feature = "gen_future", issue = "50547")]
impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
set_task_cx(cx, || match unsafe { Pin::get_mut_unchecked(self).0.resume() } {
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
set_task_waker(lw, || match unsafe { Pin::get_mut_unchecked(self).0.resume() } {
GeneratorState::Yielded(()) => Poll::Pending,
GeneratorState::Complete(x) => Poll::Ready(x),
})
@ -51,66 +51,61 @@ impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
}
thread_local! {
static TLS_CX: Cell<Option<NonNull<task::Context<'static>>>> = Cell::new(None);
static TLS_WAKER: Cell<Option<NonNull<LocalWaker>>> = Cell::new(None);
}
struct SetOnDrop(Option<NonNull<task::Context<'static>>>);
struct SetOnDrop(Option<NonNull<LocalWaker>>);
impl Drop for SetOnDrop {
fn drop(&mut self) {
TLS_CX.with(|tls_cx| {
tls_cx.set(self.0.take());
TLS_WAKER.with(|tls_waker| {
tls_waker.set(self.0.take());
});
}
}
#[unstable(feature = "gen_future", issue = "50547")]
/// Sets the thread-local task context used by async/await futures.
pub fn set_task_cx<F, R>(cx: &mut task::Context, f: F) -> R
pub fn set_task_waker<F, R>(lw: &LocalWaker, f: F) -> R
where
F: FnOnce() -> R
{
let old_cx = TLS_CX.with(|tls_cx| {
tls_cx.replace(NonNull::new(
cx
as *mut task::Context
as *mut ()
as *mut task::Context<'static>
))
let old_waker = TLS_WAKER.with(|tls_waker| {
tls_waker.replace(Some(NonNull::from(lw)))
});
let _reset_cx = SetOnDrop(old_cx);
let _reset_waker = SetOnDrop(old_waker);
f()
}
#[unstable(feature = "gen_future", issue = "50547")]
/// Retrieves the thread-local task context used by async/await futures.
/// Retrieves the thread-local task waker used by async/await futures.
///
/// This function acquires exclusive access to the task context.
/// This function acquires exclusive access to the task waker.
///
/// Panics if no task has been set or if the task context has already been
/// retrieved by a surrounding call to get_task_cx.
pub fn get_task_cx<F, R>(f: F) -> R
/// Panics if no waker has been set or if the waker has already been
/// retrieved by a surrounding call to get_task_waker.
pub fn get_task_waker<F, R>(f: F) -> R
where
F: FnOnce(&mut task::Context) -> R
F: FnOnce(&LocalWaker) -> R
{
let cx_ptr = TLS_CX.with(|tls_cx| {
// Clear the entry so that nested `with_get_cx` calls
let waker_ptr = TLS_WAKER.with(|tls_waker| {
// Clear the entry so that nested `get_task_waker` calls
// will fail or set their own value.
tls_cx.replace(None)
tls_waker.replace(None)
});
let _reset_cx = SetOnDrop(cx_ptr);
let _reset_waker = SetOnDrop(waker_ptr);
let mut cx_ptr = cx_ptr.expect(
"TLS task::Context not set. This is a rustc bug. \
let mut waker_ptr = waker_ptr.expect(
"TLS LocalWaker not set. This is a rustc bug. \
Please file an issue on https://github.com/rust-lang/rust.");
unsafe { f(cx_ptr.as_mut()) }
unsafe { f(waker_ptr.as_mut()) }
}
#[unstable(feature = "gen_future", issue = "50547")]
/// Polls a future in the current thread-local task context.
pub fn poll_in_task_cx<F>(f: Pin<&mut F>) -> Poll<F::Output>
/// Polls a future in the current thread-local task waker.
pub fn poll_with_tls_waker<F>(f: Pin<&mut F>) -> Poll<F::Output>
where
F: Future
{
get_task_cx(|cx| F::poll(f, cx))
get_task_waker(|lw| F::poll(f, lw))
}

View file

@ -229,7 +229,7 @@ macro_rules! await {
let mut pinned = $e;
loop {
if let $crate::task::Poll::Ready(x) =
$crate::future::poll_in_task_cx(unsafe {
$crate::future::poll_with_tls_waker(unsafe {
$crate::pin::Pin::new_unchecked(&mut pinned)
})
{

View file

@ -22,7 +22,7 @@ use panicking;
use ptr::{Unique, NonNull};
use rc::Rc;
use sync::{Arc, Mutex, RwLock, atomic};
use task::{self, Poll};
use task::{LocalWaker, Poll};
use thread::Result;
#[stable(feature = "panic_hooks", since = "1.10.0")]
@ -327,9 +327,9 @@ impl<T: fmt::Debug> fmt::Debug for AssertUnwindSafe<T> {
impl<'a, F: Future> Future for AssertUnwindSafe<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
let pinned_field = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) };
F::poll(pinned_field, cx)
F::poll(pinned_field, lw)
}
}

View file

@ -18,10 +18,8 @@ use std::sync::{
Arc,
atomic::{self, AtomicUsize},
};
use std::future::FutureObj;
use std::task::{
Context, Poll, Wake,
Spawn, SpawnObjError,
LocalWaker, Poll, Wake,
local_waker_from_nonlocal,
};
@ -35,24 +33,17 @@ impl Wake for Counter {
}
}
struct NoopSpawner;
impl Spawn for NoopSpawner {
fn spawn_obj(&mut self, _: FutureObj<'static, ()>) -> Result<(), SpawnObjError> {
Ok(())
}
}
struct WakeOnceThenComplete(bool);
fn wake_and_yield_once() -> WakeOnceThenComplete { WakeOnceThenComplete(false) }
impl Future for WakeOnceThenComplete {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<()> {
if self.0 {
Poll::Ready(())
} else {
cx.waker().wake();
lw.wake();
self.0 = true;
Poll::Pending
}
@ -150,13 +141,10 @@ where
let mut fut = Box::pinned(f(9));
let counter = Arc::new(Counter { wakes: AtomicUsize::new(0) });
let waker = local_waker_from_nonlocal(counter.clone());
let spawner = &mut NoopSpawner;
let cx = &mut Context::new(&waker, spawner);
assert_eq!(0, counter.wakes.load(atomic::Ordering::SeqCst));
assert_eq!(Poll::Pending, fut.as_mut().poll(cx));
assert_eq!(Poll::Pending, fut.as_mut().poll(&waker));
assert_eq!(1, counter.wakes.load(atomic::Ordering::SeqCst));
assert_eq!(Poll::Ready(9), fut.as_mut().poll(cx));
assert_eq!(Poll::Ready(9), fut.as_mut().poll(&waker));
}
fn main() {

View file

@ -18,11 +18,8 @@ use std::sync::{
Arc,
atomic::{self, AtomicUsize},
};
use std::future::FutureObj;
use std::task::{
Context, Poll,
Wake, Waker, LocalWaker,
Spawn, SpawnObjError,
Poll, Wake, Waker, LocalWaker,
local_waker, local_waker_from_nonlocal,
};
@ -41,24 +38,17 @@ impl Wake for Counter {
}
}
struct NoopSpawner;
impl Spawn for NoopSpawner {
fn spawn_obj(&mut self, _: FutureObj<'static, ()>) -> Result<(), SpawnObjError> {
Ok(())
}
}
struct MyFuture;
impl Future for MyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Ensure all the methods work appropriately
cx.waker().wake();
cx.waker().wake();
cx.local_waker().wake();
cx.spawner().spawn_obj(Box::pinned(MyFuture).into()).unwrap();
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
// Wake once locally
lw.wake();
// Wake twice non-locally
let waker = lw.clone().into_waker();
waker.wake();
waker.wake();
Poll::Ready(())
}
}
@ -69,9 +59,7 @@ fn test_local_waker() {
nonlocal_wakes: AtomicUsize::new(0),
});
let waker = unsafe { local_waker(counter.clone()) };
let spawner = &mut NoopSpawner;
let cx = &mut Context::new(&waker, spawner);
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(cx));
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&waker));
assert_eq!(1, counter.local_wakes.load(atomic::Ordering::SeqCst));
assert_eq!(2, counter.nonlocal_wakes.load(atomic::Ordering::SeqCst));
}
@ -82,9 +70,7 @@ fn test_local_as_nonlocal_waker() {
nonlocal_wakes: AtomicUsize::new(0),
});
let waker: LocalWaker = local_waker_from_nonlocal(counter.clone());
let spawner = &mut NoopSpawner;
let cx = &mut Context::new(&waker, spawner);
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(cx));
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&waker));
assert_eq!(0, counter.local_wakes.load(atomic::Ordering::SeqCst));
assert_eq!(3, counter.nonlocal_wakes.load(atomic::Ordering::SeqCst));
}