Fallout from stabilization

This commit is contained in:
Aaron Turon 2015-02-17 15:10:25 -08:00
parent d8f8f7a58c
commit d0de2b46e9
89 changed files with 578 additions and 558 deletions

View file

@ -10,7 +10,7 @@
//! Generic support for building blocking abstractions.
use thread::Thread;
use thread::{self, Thread};
use sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering};
use sync::Arc;
use marker::{Sync, Send};
@ -40,7 +40,7 @@ impl !Sync for WaitToken {}
pub fn tokens() -> (WaitToken, SignalToken) {
let inner = Arc::new(Inner {
thread: Thread::current(),
thread: thread::current(),
woken: ATOMIC_BOOL_INIT,
});
let wait_token = WaitToken {
@ -80,7 +80,7 @@ impl SignalToken {
impl WaitToken {
pub fn wait(self) {
while !self.inner.woken.load(Ordering::SeqCst) {
Thread::park()
thread::park()
}
}
}

View file

@ -53,12 +53,12 @@
//! Simple usage:
//!
//! ```
//! use std::thread::Thread;
//! use std::thread;
//! use std::sync::mpsc::channel;
//!
//! // Create a simple streaming channel
//! let (tx, rx) = channel();
//! Thread::spawn(move|| {
//! thread::spawn(move|| {
//! tx.send(10).unwrap();
//! });
//! assert_eq!(rx.recv().unwrap(), 10);
@ -67,7 +67,7 @@
//! Shared usage:
//!
//! ```
//! use std::thread::Thread;
//! use std::thread;
//! use std::sync::mpsc::channel;
//!
//! // Create a shared channel that can be sent along from many threads
@ -76,7 +76,7 @@
//! let (tx, rx) = channel();
//! for i in 0..10 {
//! let tx = tx.clone();
//! Thread::spawn(move|| {
//! thread::spawn(move|| {
//! tx.send(i).unwrap();
//! });
//! }
@ -102,11 +102,11 @@
//! Synchronous channels:
//!
//! ```
//! use std::thread::Thread;
//! use std::thread;
//! use std::sync::mpsc::sync_channel;
//!
//! let (tx, rx) = sync_channel::<int>(0);
//! Thread::spawn(move|| {
//! thread::spawn(move|| {
//! // This will wait for the parent task to start receiving
//! tx.send(53).unwrap();
//! });
@ -467,14 +467,14 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
///
/// ```
/// use std::sync::mpsc::channel;
/// use std::thread::Thread;
/// use std::thread;
///
/// // tx is is the sending half (tx for transmission), and rx is the receiving
/// // half (rx for receiving).
/// let (tx, rx) = channel();
///
/// // Spawn off an expensive computation
/// Thread::spawn(move|| {
/// thread::spawn(move|| {
/// # fn expensive_computation() {}
/// tx.send(expensive_computation()).unwrap();
/// });
@ -509,14 +509,14 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
///
/// ```
/// use std::sync::mpsc::sync_channel;
/// use std::thread::Thread;
/// use std::thread;
///
/// let (tx, rx) = sync_channel(1);
///
/// // this returns immediately
/// tx.send(1).unwrap();
///
/// Thread::spawn(move|| {
/// thread::spawn(move|| {
/// // this will block until the previous message has been received
/// tx.send(2).unwrap();
/// });
@ -1026,7 +1026,7 @@ mod test {
use std::env;
use super::*;
use thread::Thread;
use thread;
pub fn stress_factor() -> uint {
match env::var("RUST_TEST_STRESS") {
@ -1069,7 +1069,7 @@ mod test {
#[test]
fn smoke_threads() {
let (tx, rx) = channel::<int>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
tx.send(1).unwrap();
});
assert_eq!(rx.recv().unwrap(), 1);
@ -1101,7 +1101,7 @@ mod test {
#[test]
fn port_gone_concurrent() {
let (tx, rx) = channel::<int>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx.recv().unwrap();
});
while tx.send(1).is_ok() {}
@ -1111,7 +1111,7 @@ mod test {
fn port_gone_concurrent_shared() {
let (tx, rx) = channel::<int>();
let tx2 = tx.clone();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx.recv().unwrap();
});
while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
@ -1136,7 +1136,7 @@ mod test {
#[test]
fn chan_gone_concurrent() {
let (tx, rx) = channel::<int>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
tx.send(1).unwrap();
tx.send(1).unwrap();
});
@ -1146,7 +1146,7 @@ mod test {
#[test]
fn stress() {
let (tx, rx) = channel::<int>();
let t = Thread::scoped(move|| {
let t = thread::spawn(move|| {
for _ in 0u..10000 { tx.send(1).unwrap(); }
});
for _ in 0u..10000 {
@ -1161,7 +1161,7 @@ mod test {
static NTHREADS: uint = 8;
let (tx, rx) = channel::<int>();
let t = Thread::scoped(move|| {
let t = thread::spawn(move|| {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
@ -1173,7 +1173,7 @@ mod test {
for _ in 0..NTHREADS {
let tx = tx.clone();
Thread::spawn(move|| {
thread::spawn(move|| {
for _ in 0..AMT { tx.send(1).unwrap(); }
});
}
@ -1185,14 +1185,14 @@ mod test {
fn send_from_outside_runtime() {
let (tx1, rx1) = channel::<()>();
let (tx2, rx2) = channel::<int>();
let t1 = Thread::scoped(move|| {
let t1 = thread::spawn(move|| {
tx1.send(()).unwrap();
for _ in 0..40 {
assert_eq!(rx2.recv().unwrap(), 1);
}
});
rx1.recv().unwrap();
let t2 = Thread::scoped(move|| {
let t2 = thread::spawn(move|| {
for _ in 0..40 {
tx2.send(1).unwrap();
}
@ -1204,7 +1204,7 @@ mod test {
#[test]
fn recv_from_outside_runtime() {
let (tx, rx) = channel::<int>();
let t = Thread::scoped(move|| {
let t = thread::spawn(move|| {
for _ in 0..40 {
assert_eq!(rx.recv().unwrap(), 1);
}
@ -1219,11 +1219,11 @@ mod test {
fn no_runtime() {
let (tx1, rx1) = channel::<int>();
let (tx2, rx2) = channel::<int>();
let t1 = Thread::scoped(move|| {
let t1 = thread::spawn(move|| {
assert_eq!(rx1.recv().unwrap(), 1);
tx2.send(2).unwrap();
});
let t2 = Thread::scoped(move|| {
let t2 = thread::spawn(move|| {
tx1.send(1).unwrap();
assert_eq!(rx2.recv().unwrap(), 2);
});
@ -1256,7 +1256,7 @@ mod test {
#[test]
fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will panic
let res = Thread::scoped(move|| {
let res = thread::spawn(move|| {
let (tx, rx) = channel::<int>();
drop(tx);
rx.recv().unwrap();
@ -1325,7 +1325,7 @@ mod test {
#[test]
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = channel::<Box<int>>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
assert!(rx.recv().unwrap() == box 10);
});
@ -1335,10 +1335,10 @@ mod test {
#[test]
fn oneshot_multi_task_recv_then_close() {
let (tx, rx) = channel::<Box<int>>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
drop(tx);
});
let res = Thread::scoped(move|| {
let res = thread::spawn(move|| {
assert!(rx.recv().unwrap() == box 10);
}).join();
assert!(res.is_err());
@ -1348,7 +1348,7 @@ mod test {
fn oneshot_multi_thread_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel::<int>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
drop(rx);
});
drop(tx);
@ -1359,10 +1359,10 @@ mod test {
fn oneshot_multi_thread_send_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel::<int>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
drop(rx);
});
let _ = Thread::scoped(move|| {
let _ = thread::spawn(move|| {
tx.send(1).unwrap();
}).join();
}
@ -1372,14 +1372,14 @@ mod test {
fn oneshot_multi_thread_recv_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel::<int>();
Thread::spawn(move|| {
let res = Thread::scoped(move|| {
thread::spawn(move|| {
let res = thread::spawn(move|| {
rx.recv().unwrap();
}).join();
assert!(res.is_err());
});
let _t = Thread::spawn(move|| {
Thread::spawn(move|| {
let _t = thread::spawn(move|| {
thread::spawn(move|| {
drop(tx);
});
});
@ -1390,7 +1390,7 @@ mod test {
fn oneshot_multi_thread_send_recv_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
tx.send(box 10).unwrap();
});
assert!(rx.recv().unwrap() == box 10);
@ -1408,7 +1408,7 @@ mod test {
fn send(tx: Sender<Box<int>>, i: int) {
if i == 10 { return }
Thread::spawn(move|| {
thread::spawn(move|| {
tx.send(box i).unwrap();
send(tx, i + 1);
});
@ -1417,7 +1417,7 @@ mod test {
fn recv(rx: Receiver<Box<int>>, i: int) {
if i == 10 { return }
Thread::spawn(move|| {
thread::spawn(move|| {
assert!(rx.recv().unwrap() == box i);
recv(rx, i + 1);
});
@ -1439,7 +1439,7 @@ mod test {
let total = stress_factor() + 100;
for _ in 0..total {
let tx = tx.clone();
Thread::spawn(move|| {
thread::spawn(move|| {
tx.send(()).unwrap();
});
}
@ -1454,7 +1454,7 @@ mod test {
let (tx, rx) = channel::<int>();
let (total_tx, total_rx) = channel::<int>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
let mut acc = 0;
for x in rx.iter() {
acc += x;
@ -1474,7 +1474,7 @@ mod test {
let (tx, rx) = channel::<int>();
let (count_tx, count_rx) = channel();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
let mut count = 0;
for x in rx.iter() {
if count >= 3 {
@ -1499,7 +1499,7 @@ mod test {
let (tx1, rx1) = channel::<int>();
let (tx2, rx2) = channel::<()>();
let (tx3, rx3) = channel::<()>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx2.recv().unwrap();
tx1.send(1).unwrap();
tx3.send(()).unwrap();
@ -1524,13 +1524,13 @@ mod test {
fn destroy_upgraded_shared_port_when_sender_still_active() {
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx.recv().unwrap(); // wait on a oneshot
drop(rx); // destroy a shared
tx2.send(()).unwrap();
});
// make sure the other task has gone to sleep
for _ in 0u..5000 { Thread::yield_now(); }
for _ in 0u..5000 { thread::yield_now(); }
// upgrade to a shared chan and send a message
let t = tx.clone();
@ -1547,7 +1547,7 @@ mod sync_tests {
use prelude::v1::*;
use std::env;
use thread::Thread;
use thread;
use super::*;
pub fn stress_factor() -> uint {
@ -1583,7 +1583,7 @@ mod sync_tests {
#[test]
fn smoke_threads() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
tx.send(1).unwrap();
});
assert_eq!(rx.recv().unwrap(), 1);
@ -1608,7 +1608,7 @@ mod sync_tests {
#[test]
fn port_gone_concurrent() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx.recv().unwrap();
});
while tx.send(1).is_ok() {}
@ -1618,7 +1618,7 @@ mod sync_tests {
fn port_gone_concurrent_shared() {
let (tx, rx) = sync_channel::<int>(0);
let tx2 = tx.clone();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx.recv().unwrap();
});
while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
@ -1643,7 +1643,7 @@ mod sync_tests {
#[test]
fn chan_gone_concurrent() {
let (tx, rx) = sync_channel::<int>(0);
Thread::spawn(move|| {
thread::spawn(move|| {
tx.send(1).unwrap();
tx.send(1).unwrap();
});
@ -1653,7 +1653,7 @@ mod sync_tests {
#[test]
fn stress() {
let (tx, rx) = sync_channel::<int>(0);
Thread::spawn(move|| {
thread::spawn(move|| {
for _ in 0u..10000 { tx.send(1).unwrap(); }
});
for _ in 0u..10000 {
@ -1668,7 +1668,7 @@ mod sync_tests {
let (tx, rx) = sync_channel::<int>(0);
let (dtx, drx) = sync_channel::<()>(0);
Thread::spawn(move|| {
thread::spawn(move|| {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
@ -1681,7 +1681,7 @@ mod sync_tests {
for _ in 0..NTHREADS {
let tx = tx.clone();
Thread::spawn(move|| {
thread::spawn(move|| {
for _ in 0..AMT { tx.send(1).unwrap(); }
});
}
@ -1714,7 +1714,7 @@ mod sync_tests {
#[test]
fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will panic
let res = Thread::scoped(move|| {
let res = thread::spawn(move|| {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
rx.recv().unwrap();
@ -1789,7 +1789,7 @@ mod sync_tests {
#[test]
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = sync_channel::<Box<int>>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
assert!(rx.recv().unwrap() == box 10);
});
@ -1799,10 +1799,10 @@ mod sync_tests {
#[test]
fn oneshot_multi_task_recv_then_close() {
let (tx, rx) = sync_channel::<Box<int>>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
drop(tx);
});
let res = Thread::scoped(move|| {
let res = thread::spawn(move|| {
assert!(rx.recv().unwrap() == box 10);
}).join();
assert!(res.is_err());
@ -1812,7 +1812,7 @@ mod sync_tests {
fn oneshot_multi_thread_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
drop(rx);
});
drop(tx);
@ -1823,10 +1823,10 @@ mod sync_tests {
fn oneshot_multi_thread_send_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
drop(rx);
});
let _ = Thread::scoped(move || {
let _ = thread::spawn(move || {
tx.send(1).unwrap();
}).join();
}
@ -1836,14 +1836,14 @@ mod sync_tests {
fn oneshot_multi_thread_recv_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
let res = Thread::scoped(move|| {
let _t = thread::spawn(move|| {
let res = thread::spawn(move|| {
rx.recv().unwrap();
}).join();
assert!(res.is_err());
});
let _t = Thread::spawn(move|| {
Thread::spawn(move|| {
let _t = thread::spawn(move|| {
thread::spawn(move|| {
drop(tx);
});
});
@ -1854,7 +1854,7 @@ mod sync_tests {
fn oneshot_multi_thread_send_recv_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<Box<int>>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
tx.send(box 10).unwrap();
});
assert!(rx.recv().unwrap() == box 10);
@ -1872,7 +1872,7 @@ mod sync_tests {
fn send(tx: SyncSender<Box<int>>, i: int) {
if i == 10 { return }
Thread::spawn(move|| {
thread::spawn(move|| {
tx.send(box i).unwrap();
send(tx, i + 1);
});
@ -1881,7 +1881,7 @@ mod sync_tests {
fn recv(rx: Receiver<Box<int>>, i: int) {
if i == 10 { return }
Thread::spawn(move|| {
thread::spawn(move|| {
assert!(rx.recv().unwrap() == box i);
recv(rx, i + 1);
});
@ -1903,7 +1903,7 @@ mod sync_tests {
let total = stress_factor() + 100;
for _ in 0..total {
let tx = tx.clone();
Thread::spawn(move|| {
thread::spawn(move|| {
tx.send(()).unwrap();
});
}
@ -1918,7 +1918,7 @@ mod sync_tests {
let (tx, rx) = sync_channel::<int>(0);
let (total_tx, total_rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
let mut acc = 0;
for x in rx.iter() {
acc += x;
@ -1938,7 +1938,7 @@ mod sync_tests {
let (tx, rx) = sync_channel::<int>(0);
let (count_tx, count_rx) = sync_channel(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
let mut count = 0;
for x in rx.iter() {
if count >= 3 {
@ -1963,7 +1963,7 @@ mod sync_tests {
let (tx1, rx1) = sync_channel::<int>(1);
let (tx2, rx2) = sync_channel::<()>(1);
let (tx3, rx3) = sync_channel::<()>(1);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx2.recv().unwrap();
tx1.send(1).unwrap();
tx3.send(()).unwrap();
@ -1988,13 +1988,13 @@ mod sync_tests {
fn destroy_upgraded_shared_port_when_sender_still_active() {
let (tx, rx) = sync_channel::<()>(0);
let (tx2, rx2) = sync_channel::<()>(0);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx.recv().unwrap(); // wait on a oneshot
drop(rx); // destroy a shared
tx2.send(()).unwrap();
});
// make sure the other task has gone to sleep
for _ in 0u..5000 { Thread::yield_now(); }
for _ in 0u..5000 { thread::yield_now(); }
// upgrade to a shared chan and send a message
let t = tx.clone();
@ -2008,14 +2008,14 @@ mod sync_tests {
#[test]
fn send1() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
let _t = thread::spawn(move|| { rx.recv().unwrap(); });
assert_eq!(tx.send(1), Ok(()));
}
#[test]
fn send2() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| { drop(rx); });
let _t = thread::spawn(move|| { drop(rx); });
assert!(tx.send(1).is_err());
}
@ -2023,7 +2023,7 @@ mod sync_tests {
fn send3() {
let (tx, rx) = sync_channel::<int>(1);
assert_eq!(tx.send(1), Ok(()));
let _t =Thread::spawn(move|| { drop(rx); });
let _t =thread::spawn(move|| { drop(rx); });
assert!(tx.send(1).is_err());
}
@ -2033,11 +2033,11 @@ mod sync_tests {
let tx2 = tx.clone();
let (done, donerx) = channel();
let done2 = done.clone();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
assert!(tx.send(1).is_err());
done.send(()).unwrap();
});
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
assert!(tx2.send(2).is_err());
done2.send(()).unwrap();
});
@ -2073,7 +2073,7 @@ mod sync_tests {
let (tx1, rx1) = sync_channel::<()>(3);
let (tx2, rx2) = sync_channel::<()>(3);
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx1.recv().unwrap();
tx2.try_send(()).unwrap();
});

View file

@ -160,7 +160,7 @@ mod tests {
use sync::mpsc::channel;
use super::{Queue, Data, Empty, Inconsistent};
use sync::Arc;
use thread::Thread;
use thread;
#[test]
fn test_full() {
@ -184,7 +184,7 @@ mod tests {
for _ in 0..nthreads {
let tx = tx.clone();
let q = q.clone();
Thread::spawn(move|| {
thread::spawn(move|| {
for i in 0..nmsgs {
q.push(i);
}

View file

@ -347,7 +347,7 @@ impl Iterator for Packets {
mod test {
use prelude::v1::*;
use thread::Thread;
use thread;
use sync::mpsc::*;
// Don't use the libstd version so we can pull in the right Select structure
@ -427,11 +427,11 @@ mod test {
let (_tx2, rx2) = channel::<int>();
let (tx3, rx3) = channel::<int>();
let _t = Thread::spawn(move|| {
for _ in 0u..20 { Thread::yield_now(); }
let _t = thread::spawn(move|| {
for _ in 0u..20 { thread::yield_now(); }
tx1.send(1).unwrap();
rx3.recv().unwrap();
for _ in 0u..20 { Thread::yield_now(); }
for _ in 0u..20 { thread::yield_now(); }
});
select! {
@ -451,8 +451,8 @@ mod test {
let (tx2, rx2) = channel::<int>();
let (tx3, rx3) = channel::<()>();
let _t = Thread::spawn(move|| {
for _ in 0u..20 { Thread::yield_now(); }
let _t = thread::spawn(move|| {
for _ in 0u..20 { thread::yield_now(); }
tx1.send(1).unwrap();
tx2.send(2).unwrap();
rx3.recv().unwrap();
@ -478,7 +478,7 @@ mod test {
let (tx2, rx2) = channel::<int>();
let (tx3, rx3) = channel::<()>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
for i in 0..AMT {
if i % 2 == 0 {
tx1.send(i).unwrap();
@ -504,7 +504,7 @@ mod test {
let (_tx2, rx2) = channel::<int>();
let (tx3, rx3) = channel::<()>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx3.recv().unwrap();
tx1.clone();
assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
@ -526,7 +526,7 @@ mod test {
let (_tx2, rx2) = channel::<int>();
let (tx3, rx3) = channel::<()>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
rx3.recv().unwrap();
tx1.clone();
assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
@ -547,7 +547,7 @@ mod test {
let (tx1, rx1) = channel::<()>();
let (tx2, rx2) = channel::<()>();
let (tx3, rx3) = channel::<()>();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
let s = Select::new();
let mut h1 = s.handle(&rx1);
let mut h2 = s.handle(&rx2);
@ -557,7 +557,7 @@ mod test {
tx3.send(()).unwrap();
});
for _ in 0u..1000 { Thread::yield_now(); }
for _ in 0u..1000 { thread::yield_now(); }
drop(tx1.clone());
tx2.send(()).unwrap();
rx3.recv().unwrap();
@ -663,14 +663,14 @@ mod test {
fn oneshot_data_waiting() {
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
select! {
_n = rx1.recv() => {}
}
tx2.send(()).unwrap();
});
for _ in 0u..100 { Thread::yield_now() }
for _ in 0u..100 { thread::yield_now() }
tx1.send(()).unwrap();
rx2.recv().unwrap();
}
@ -683,14 +683,14 @@ mod test {
tx1.send(()).unwrap();
rx1.recv().unwrap();
rx1.recv().unwrap();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
select! {
_n = rx1.recv() => {}
}
tx2.send(()).unwrap();
});
for _ in 0u..100 { Thread::yield_now() }
for _ in 0u..100 { thread::yield_now() }
tx1.send(()).unwrap();
rx2.recv().unwrap();
}
@ -702,14 +702,14 @@ mod test {
drop(tx1.clone());
tx1.send(()).unwrap();
rx1.recv().unwrap();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
select! {
_n = rx1.recv() => {}
}
tx2.send(()).unwrap();
});
for _ in 0u..100 { Thread::yield_now() }
for _ in 0u..100 { thread::yield_now() }
tx1.send(()).unwrap();
rx2.recv().unwrap();
}
@ -726,8 +726,8 @@ mod test {
#[test]
fn sync2() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
for _ in 0u..100 { Thread::yield_now() }
let _t = thread::spawn(move|| {
for _ in 0u..100 { thread::yield_now() }
tx.send(1).unwrap();
});
select! {
@ -739,8 +739,8 @@ mod test {
fn sync3() {
let (tx1, rx1) = sync_channel::<int>(0);
let (tx2, rx2): (Sender<int>, Receiver<int>) = channel();
let _t = Thread::spawn(move|| { tx1.send(1).unwrap(); });
let _t = Thread::spawn(move|| { tx2.send(2).unwrap(); });
let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
select! {
n = rx1.recv() => {
let n = n.unwrap();

View file

@ -31,7 +31,7 @@ use sync::mpsc::mpsc_queue as mpsc;
use sync::mpsc::select::StartResult::*;
use sync::mpsc::select::StartResult;
use sync::{Mutex, MutexGuard};
use thread::Thread;
use thread;
const DISCONNECTED: isize = isize::MIN;
const FUDGE: isize = 1024;
@ -194,7 +194,7 @@ impl<T: Send> Packet<T> {
match self.queue.pop() {
mpsc::Data(..) => {}
mpsc::Empty => break,
mpsc::Inconsistent => Thread::yield_now(),
mpsc::Inconsistent => thread::yield_now(),
}
}
// maybe we're done, if we're not the last ones
@ -283,7 +283,7 @@ impl<T: Send> Packet<T> {
mpsc::Inconsistent => {
let data;
loop {
Thread::yield_now();
thread::yield_now();
match self.queue.pop() {
mpsc::Data(t) => { data = t; break }
mpsc::Empty => panic!("inconsistent => empty"),
@ -460,7 +460,7 @@ impl<T: Send> Packet<T> {
drop(self.take_to_wake());
} else {
while self.to_wake.load(Ordering::SeqCst) != 0 {
Thread::yield_now();
thread::yield_now();
}
}
// if the number of steals is -1, it was the pre-emptive -1 steal

View file

@ -246,7 +246,7 @@ mod test {
use sync::Arc;
use super::Queue;
use thread::Thread;
use thread;
use sync::mpsc::channel;
#[test]
@ -324,7 +324,7 @@ mod test {
let (tx, rx) = channel();
let q2 = q.clone();
let _t = Thread::spawn(move|| {
let _t = thread::spawn(move|| {
for _ in 0u..100000 {
loop {
match q2.pop() {

View file

@ -26,7 +26,7 @@ use core::prelude::*;
use core::cmp;
use core::isize;
use thread::Thread;
use thread;
use sync::atomic::{AtomicIsize, AtomicUsize, Ordering, AtomicBool};
use sync::mpsc::Receiver;
@ -440,7 +440,7 @@ impl<T: Send> Packet<T> {
drop(self.take_to_wake());
} else {
while self.to_wake.load(Ordering::SeqCst) != 0 {
Thread::yield_now();
thread::yield_now();
}
}
assert_eq!(self.steals, 0);