std: Second pass stabilization for comm
This commit is a second pass stabilization for the `std::comm` module,
performing the following actions:
* The entire `std::comm` module was moved under `std::sync::mpsc`. This movement
reflects that channels are just yet another synchronization primitive, and
they don't necessarily deserve a special place outside of the other
concurrency primitives that the standard library offers.
* The `send` and `recv` methods have all been removed.
* The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`.
This means that all send/receive operations return a `Result` now indicating
whether the operation was successful or not.
* The error type of `send` is now a `SendError` to implement a custom error
message and allow for `unwrap()`. The error type contains an `into_inner`
method to extract the value.
* The error type of `recv` is now `RecvError` for the same reasons as `send`.
* The `TryRecvError` and `TrySendError` types have had public reexports removed
of their variants and the variant names have been tweaked with enum
namespacing rules.
* The `Messages` iterator is renamed to `Iter`
This functionality is now all `#[stable]`:
* `Sender`
* `SyncSender`
* `Receiver`
* `std::sync::mpsc`
* `channel`
* `sync_channel`
* `Iter`
* `Sender::send`
* `Sender::clone`
* `SyncSender::send`
* `SyncSender::try_send`
* `SyncSender::clone`
* `Receiver::recv`
* `Receiver::try_recv`
* `Receiver::iter`
* `SendError`
* `RecvError`
* `TrySendError::{mod, Full, Disconnected}`
* `TryRecvError::{mod, Empty, Disconnected}`
* `SendError::into_inner`
* `TrySendError::into_inner`
This is a breaking change due to the modification of where this module is
located, as well as the changing of the semantics of `send` and `recv`. Most
programs just need to rename imports of `std::comm` to `std::sync::mpsc` and
add calls to `unwrap` after a send or a receive operation.
[breaking-change]
This commit is contained in:
parent
bb8f4fc3b7
commit
bc83a009f6
109 changed files with 1175 additions and 1206 deletions
|
|
@ -1,205 +0,0 @@
|
|||
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* 1. Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
*
|
||||
* 2. Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
|
||||
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
|
||||
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
||||
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
|
||||
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
* The views and conclusions contained in the software and documentation are
|
||||
* those of the authors and should not be interpreted as representing official
|
||||
* policies, either expressed or implied, of Dmitry Vyukov.
|
||||
*/
|
||||
|
||||
//! A mostly lock-free multi-producer, single consumer queue.
|
||||
//!
|
||||
//! This module contains an implementation of a concurrent MPSC queue. This
|
||||
//! queue can be used to share data between tasks, and is also used as the
|
||||
//! building block of channels in rust.
|
||||
//!
|
||||
//! Note that the current implementation of this queue has a caveat of the `pop`
|
||||
//! method, and see the method for more information about it. Due to this
|
||||
//! caveat, this queue may not be appropriate for all use-cases.
|
||||
|
||||
#![experimental]
|
||||
|
||||
// http://www.1024cores.net/home/lock-free-algorithms
|
||||
// /queues/non-intrusive-mpsc-node-based-queue
|
||||
|
||||
pub use self::PopResult::*;
|
||||
|
||||
use core::prelude::*;
|
||||
|
||||
use alloc::boxed::Box;
|
||||
use core::mem;
|
||||
use core::cell::UnsafeCell;
|
||||
|
||||
use sync::atomic::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
|
||||
|
||||
/// A result of the `pop` function.
|
||||
pub enum PopResult<T> {
|
||||
/// Some data has been popped
|
||||
Data(T),
|
||||
/// The queue is empty
|
||||
Empty,
|
||||
/// The queue is in an inconsistent state. Popping data should succeed, but
|
||||
/// some pushers have yet to make enough progress in order allow a pop to
|
||||
/// succeed. It is recommended that a pop() occur "in the near future" in
|
||||
/// order to see if the sender has made progress or not
|
||||
Inconsistent,
|
||||
}
|
||||
|
||||
struct Node<T> {
|
||||
next: AtomicPtr<Node<T>>,
|
||||
value: Option<T>,
|
||||
}
|
||||
|
||||
/// The multi-producer single-consumer structure. This is not cloneable, but it
|
||||
/// may be safely shared so long as it is guaranteed that there is only one
|
||||
/// popper at a time (many pushers are allowed).
|
||||
pub struct Queue<T> {
|
||||
head: AtomicPtr<Node<T>>,
|
||||
tail: UnsafeCell<*mut Node<T>>,
|
||||
}
|
||||
|
||||
unsafe impl<T:Send> Send for Queue<T> { }
|
||||
unsafe impl<T:Send> Sync for Queue<T> { }
|
||||
|
||||
impl<T> Node<T> {
|
||||
unsafe fn new(v: Option<T>) -> *mut Node<T> {
|
||||
mem::transmute(box Node {
|
||||
next: AtomicPtr::new(0 as *mut Node<T>),
|
||||
value: v,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send> Queue<T> {
|
||||
/// Creates a new queue that is safe to share among multiple producers and
|
||||
/// one consumer.
|
||||
pub fn new() -> Queue<T> {
|
||||
let stub = unsafe { Node::new(None) };
|
||||
Queue {
|
||||
head: AtomicPtr::new(stub),
|
||||
tail: UnsafeCell::new(stub),
|
||||
}
|
||||
}
|
||||
|
||||
/// Pushes a new value onto this queue.
|
||||
pub fn push(&self, t: T) {
|
||||
unsafe {
|
||||
let n = Node::new(Some(t));
|
||||
let prev = self.head.swap(n, AcqRel);
|
||||
(*prev).next.store(n, Release);
|
||||
}
|
||||
}
|
||||
|
||||
/// Pops some data from this queue.
|
||||
///
|
||||
/// Note that the current implementation means that this function cannot
|
||||
/// return `Option<T>`. It is possible for this queue to be in an
|
||||
/// inconsistent state where many pushes have succeeded and completely
|
||||
/// finished, but pops cannot return `Some(t)`. This inconsistent state
|
||||
/// happens when a pusher is pre-empted at an inopportune moment.
|
||||
///
|
||||
/// This inconsistent state means that this queue does indeed have data, but
|
||||
/// it does not currently have access to it at this time.
|
||||
pub fn pop(&self) -> PopResult<T> {
|
||||
unsafe {
|
||||
let tail = *self.tail.get();
|
||||
let next = (*tail).next.load(Acquire);
|
||||
|
||||
if !next.is_null() {
|
||||
*self.tail.get() = next;
|
||||
assert!((*tail).value.is_none());
|
||||
assert!((*next).value.is_some());
|
||||
let ret = (*next).value.take().unwrap();
|
||||
let _: Box<Node<T>> = mem::transmute(tail);
|
||||
return Data(ret);
|
||||
}
|
||||
|
||||
if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[unsafe_destructor]
|
||||
impl<T: Send> Drop for Queue<T> {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let mut cur = *self.tail.get();
|
||||
while !cur.is_null() {
|
||||
let next = (*cur).next.load(Relaxed);
|
||||
let _: Box<Node<T>> = mem::transmute(cur);
|
||||
cur = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use prelude::v1::*;
|
||||
|
||||
use comm::channel;
|
||||
use super::{Queue, Data, Empty, Inconsistent};
|
||||
use sync::Arc;
|
||||
use thread::Thread;
|
||||
|
||||
#[test]
|
||||
fn test_full() {
|
||||
let q = Queue::new();
|
||||
q.push(box 1i);
|
||||
q.push(box 2i);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test() {
|
||||
let nthreads = 8u;
|
||||
let nmsgs = 1000u;
|
||||
let q = Queue::new();
|
||||
match q.pop() {
|
||||
Empty => {}
|
||||
Inconsistent | Data(..) => panic!()
|
||||
}
|
||||
let (tx, rx) = channel();
|
||||
let q = Arc::new(q);
|
||||
|
||||
for _ in range(0, nthreads) {
|
||||
let tx = tx.clone();
|
||||
let q = q.clone();
|
||||
Thread::spawn(move|| {
|
||||
for i in range(0, nmsgs) {
|
||||
q.push(i);
|
||||
}
|
||||
tx.send(());
|
||||
}).detach();
|
||||
}
|
||||
|
||||
let mut i = 0u;
|
||||
while i < nthreads * nmsgs {
|
||||
match q.pop() {
|
||||
Empty | Inconsistent => {},
|
||||
Data(_) => { i += 1 }
|
||||
}
|
||||
}
|
||||
drop(tx);
|
||||
for _ in range(0, nthreads) {
|
||||
rx.recv();
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue