diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 5113c28aa085..771b15588d04 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -136,6 +136,9 @@ mod work_queue; /// A parallel queue. mod message_queue; +/// A mostly lock-free multi-producer, single consumer queue. +mod mpsc_queue; + /// A parallel data structure for tracking sleeping schedulers. mod sleeper_list; diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs new file mode 100644 index 000000000000..57b7d4f469b8 --- /dev/null +++ b/src/libstd/rt/mpsc_queue.rs @@ -0,0 +1,203 @@ +/* Multi-producer/single-consumer queue + * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. + +use unstable::sync::UnsafeArc; +use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; +use ptr::{mut_null, to_mut_unsafe_ptr}; +use cast; +use option::*; +use clone::Clone; +use default::Default; +use kinds::Send; +use fmt; + +struct Node { + next: AtomicPtr>, + value: Option, +} + +impl Node { + fn new(value: T) -> Node { + Node{next: AtomicPtr::new(mut_null()), value: Some(value)} + } +} + +impl Default for Node { + fn default() -> Node { + Node{next: AtomicPtr::new(mut_null()), value: None} + } +} + +struct State { + stub: Node, + head: AtomicPtr>, + tail: *mut Node, +} + +struct Queue { + priv state: UnsafeArc>, +} + +impl Clone for Queue { + fn clone(&self) -> Queue { + Queue { + state: self.state.clone() + } + } +} + +impl fmt::Default for Queue { + fn fmt(value: &Queue, f: &mut fmt::Formatter) { + write!(f.buf, "Queue({})", value.state.get()); + } +} + +impl Queue { + pub fn new() -> Queue { + let mut q = Queue{state: UnsafeArc::new(State { + stub: Default::default(), + head: AtomicPtr::new(mut_null()), + tail: mut_null(), + })}; + let stub = q.get_stub_unsafe(); + q.get_head().store(stub, Relaxed); + q.set_tail(stub); + q + } + + pub fn push(&mut self, value: T) { + unsafe { + let node = cast::transmute(~Node::new(value)); + self.push_node(node); + } + } + + fn push_node(&mut self, node: *mut Node) { + unsafe { + (*node).next.store(mut_null(), Release); + let prev = (*self.state.get()).head.swap(node, Relaxed); + (*prev).next.store(node, Release); + } + } + + fn get_stub_unsafe(&mut self) -> *mut Node { + unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) } + } + + fn get_head(&mut self) -> &mut AtomicPtr> { + unsafe { &mut (*self.state.get()).head } + } + + fn get_tail(&mut self) -> *mut Node { + unsafe { (*self.state.get()).tail } + } + + fn set_tail(&mut self, tail: *mut Node) { + unsafe { (*self.state.get()).tail = tail } + } + + pub fn casual_pop(&mut self) -> Option { + self.pop() + } + + pub fn pop(&mut self) -> Option { + unsafe { + let mut tail = self.get_tail(); + let mut next = (*tail).next.load(Acquire); + let stub = self.get_stub_unsafe(); + if tail == stub { + if mut_null() == next { + return None + } + self.set_tail(next); + tail = next; + next = (*next).next.load(Acquire); + } + if next != mut_null() { + let tail: ~Node = cast::transmute(tail); + self.set_tail(next); + return tail.value + } + let head = self.get_head().load(Relaxed); + if tail != head { + return None + } + self.push_node(stub); + next = (*tail).next.load(Acquire); + if next != mut_null() { + let tail: ~Node = cast::transmute(tail); + self.set_tail(next); + return tail.value + } + } + None + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use option::*; + use task; + use comm; + use fmt; + use super::Queue; + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let mut q = Queue::new(); + assert_eq!(None, q.pop()); + + for _ in range(0, nthreads) { + let (port, chan) = comm::stream(); + chan.send(q.clone()); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + for i in range(0, nmsgs) { + q.push(i); + } + } + } + + let mut i = 0u; + loop { + match q.pop() { + None => {}, + Some(_) => { + i += 1; + if i == nthreads*nmsgs { break } + } + } + } + } +} + diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index b008a8a74f2c..e739eed32fed 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -19,7 +19,7 @@ use super::stack::{StackPool}; use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; -use super::message_queue::MessageQueue; +use super::mpsc_queue::Queue; use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; @@ -47,7 +47,7 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - priv message_queue: MessageQueue, + priv message_queue: Queue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -137,7 +137,7 @@ impl Scheduler { let mut sched = Scheduler { sleeper_list: sleeper_list, - message_queue: MessageQueue::new(), + message_queue: Queue::new(), sleepy: false, no_sleep: false, event_loop: event_loop, @@ -802,7 +802,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: MessageQueue, + priv queue: Queue, sched_id: uint }