core: Move Exclusive and SharedMutableState to the private mod
This commit is contained in:
parent
2ac64d91ac
commit
d53cfd225a
9 changed files with 428 additions and 429 deletions
|
|
@ -1107,7 +1107,7 @@ impl<T: Send> PortSet<T> : Recv<T> {
|
|||
}
|
||||
|
||||
/// A channel that can be shared between many senders.
|
||||
type SharedChan<T: Send> = unsafe::Exclusive<Chan<T>>;
|
||||
type SharedChan<T: Send> = private::Exclusive<Chan<T>>;
|
||||
|
||||
impl<T: Send> SharedChan<T>: Channel<T> {
|
||||
fn send(+x: T) {
|
||||
|
|
@ -1131,7 +1131,7 @@ impl<T: Send> SharedChan<T>: Channel<T> {
|
|||
|
||||
/// Converts a `chan` into a `shared_chan`.
|
||||
fn SharedChan<T:Send>(+c: Chan<T>) -> SharedChan<T> {
|
||||
unsafe::exclusive(move c)
|
||||
private::exclusive(move c)
|
||||
}
|
||||
|
||||
/// Receive a message from one of two endpoints.
|
||||
|
|
|
|||
|
|
@ -6,20 +6,41 @@
|
|||
|
||||
export chan_from_global_ptr, weaken_task;
|
||||
|
||||
export SharedMutableState, shared_mutable_state, clone_shared_mutable_state;
|
||||
export get_shared_mutable_state, get_shared_immutable_state;
|
||||
export unwrap_shared_mutable_state;
|
||||
export Exclusive, exclusive, unwrap_exclusive;
|
||||
|
||||
use compare_and_swap = rustrt::rust_compare_and_swap_ptr;
|
||||
use task::TaskBuilder;
|
||||
use task::atomically;
|
||||
|
||||
extern mod rustrt {
|
||||
fn rust_task_weaken(ch: rust_port_id);
|
||||
fn rust_task_unweaken(ch: rust_port_id);
|
||||
|
||||
#[rust_stack]
|
||||
fn rust_atomic_increment(p: &mut libc::intptr_t)
|
||||
-> libc::intptr_t;
|
||||
|
||||
#[rust_stack]
|
||||
fn rust_atomic_decrement(p: &mut libc::intptr_t)
|
||||
-> libc::intptr_t;
|
||||
|
||||
#[rust_stack]
|
||||
fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
|
||||
oldval: libc::uintptr_t,
|
||||
newval: libc::uintptr_t) -> bool;
|
||||
|
||||
fn rust_create_little_lock() -> rust_little_lock;
|
||||
fn rust_destroy_little_lock(lock: rust_little_lock);
|
||||
fn rust_lock_little_lock(lock: rust_little_lock);
|
||||
fn rust_unlock_little_lock(lock: rust_little_lock);
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)] // runtime type
|
||||
type rust_port_id = uint;
|
||||
|
||||
extern mod rustrt {
|
||||
fn rust_compare_and_swap_ptr(address: *libc::uintptr_t,
|
||||
oldval: libc::uintptr_t,
|
||||
newval: libc::uintptr_t) -> bool;
|
||||
fn rust_task_weaken(ch: rust_port_id);
|
||||
fn rust_task_unweaken(ch: rust_port_id);
|
||||
}
|
||||
|
||||
type GlobalPtr = *libc::uintptr_t;
|
||||
|
||||
/**
|
||||
|
|
@ -68,7 +89,8 @@ unsafe fn chan_from_global_ptr<T: Send>(
|
|||
// Install the channel
|
||||
log(debug,~"BEFORE COMPARE AND SWAP");
|
||||
let swapped = compare_and_swap(
|
||||
global, 0u, unsafe::reinterpret_cast(&ch));
|
||||
unsafe::reinterpret_cast(&global),
|
||||
0u, unsafe::reinterpret_cast(&ch));
|
||||
log(debug,fmt!("AFTER .. swapped? %?", swapped));
|
||||
|
||||
if swapped {
|
||||
|
|
@ -262,3 +284,386 @@ fn test_weaken_task_fail() {
|
|||
};
|
||||
assert result::is_err(res);
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
* Shared state & exclusive ARC
|
||||
****************************************************************************/
|
||||
|
||||
// An unwrapper uses this protocol to communicate with the "other" task that
|
||||
// drops the last refcount on an arc. Unfortunately this can't be a proper
|
||||
// pipe protocol because the unwrapper has to access both stages at once.
|
||||
type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>;
|
||||
|
||||
struct ArcData<T> {
|
||||
mut count: libc::intptr_t,
|
||||
mut unwrapper: libc::uintptr_t, // either a UnwrapProto or 0
|
||||
// FIXME(#3224) should be able to make this non-option to save memory, and
|
||||
// in unwrap() use "let ~ArcData { data: result, _ } = thing" to unwrap it
|
||||
mut data: Option<T>,
|
||||
}
|
||||
|
||||
struct ArcDestruct<T> {
|
||||
mut data: *libc::c_void,
|
||||
drop unsafe {
|
||||
if self.data.is_null() {
|
||||
return; // Happens when destructing an unwrapper's handle.
|
||||
}
|
||||
do task::unkillable {
|
||||
let data: ~ArcData<T> = unsafe::reinterpret_cast(&self.data);
|
||||
let new_count = rustrt::rust_atomic_decrement(&mut data.count);
|
||||
assert new_count >= 0;
|
||||
if new_count == 0 {
|
||||
// Were we really last, or should we hand off to an unwrapper?
|
||||
// It's safe to not xchg because the unwrapper will set the
|
||||
// unwrap lock *before* dropping his/her reference. In effect,
|
||||
// being here means we're the only *awake* task with the data.
|
||||
if data.unwrapper != 0 {
|
||||
let p: UnwrapProto =
|
||||
unsafe::reinterpret_cast(&data.unwrapper);
|
||||
let (message, response) = option::swap_unwrap(p);
|
||||
// Send 'ready' and wait for a response.
|
||||
pipes::send_one(move message, ());
|
||||
// Unkillable wait. Message guaranteed to come.
|
||||
if pipes::recv_one(move response) {
|
||||
// Other task got the data.
|
||||
unsafe::forget(move data);
|
||||
} else {
|
||||
// Other task was killed. drop glue takes over.
|
||||
}
|
||||
} else {
|
||||
// drop glue takes over.
|
||||
}
|
||||
} else {
|
||||
unsafe::forget(move data);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
|
||||
ArcDestruct {
|
||||
data: data
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn unwrap_shared_mutable_state<T: Send>(+rc: SharedMutableState<T>)
|
||||
-> T {
|
||||
struct DeathThroes<T> {
|
||||
mut ptr: Option<~ArcData<T>>,
|
||||
mut response: Option<pipes::ChanOne<bool>>,
|
||||
drop unsafe {
|
||||
let response = option::swap_unwrap(&mut self.response);
|
||||
// In case we get killed early, we need to tell the person who
|
||||
// tried to wake us whether they should hand-off the data to us.
|
||||
if task::failing() {
|
||||
pipes::send_one(move response, false);
|
||||
// Either this swap_unwrap or the one below (at "Got here")
|
||||
// ought to run.
|
||||
unsafe::forget(option::swap_unwrap(&mut self.ptr));
|
||||
} else {
|
||||
assert self.ptr.is_none();
|
||||
pipes::send_one(move response, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
do task::unkillable {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&rc.data);
|
||||
let (c1,p1) = pipes::oneshot(); // ()
|
||||
let (c2,p2) = pipes::oneshot(); // bool
|
||||
let server: UnwrapProto = ~mut Some((move c1,move p2));
|
||||
let serverp: libc::uintptr_t = unsafe::transmute(move server);
|
||||
// Try to put our server end in the unwrapper slot.
|
||||
if rustrt::rust_compare_and_swap_ptr(&mut ptr.unwrapper, 0, serverp) {
|
||||
// Got in. Step 0: Tell destructor not to run. We are now it.
|
||||
rc.data = ptr::null();
|
||||
// Step 1 - drop our own reference.
|
||||
let new_count = rustrt::rust_atomic_decrement(&mut ptr.count);
|
||||
assert new_count >= 0;
|
||||
if new_count == 0 {
|
||||
// We were the last owner. Can unwrap immediately.
|
||||
// Also we have to free the server endpoints.
|
||||
let _server: UnwrapProto = unsafe::transmute(move serverp);
|
||||
option::swap_unwrap(&mut ptr.data)
|
||||
// drop glue takes over.
|
||||
} else {
|
||||
// The *next* person who sees the refcount hit 0 will wake us.
|
||||
let end_result =
|
||||
DeathThroes { ptr: Some(move ptr),
|
||||
response: Some(move c2) };
|
||||
let mut p1 = Some(move p1); // argh
|
||||
do task::rekillable {
|
||||
pipes::recv_one(option::swap_unwrap(&mut p1));
|
||||
}
|
||||
// Got here. Back in the 'unkillable' without getting killed.
|
||||
// Recover ownership of ptr, then take the data out.
|
||||
let ptr = option::swap_unwrap(&mut end_result.ptr);
|
||||
option::swap_unwrap(&mut ptr.data)
|
||||
// drop glue takes over.
|
||||
}
|
||||
} else {
|
||||
// Somebody else was trying to unwrap. Avoid guaranteed deadlock.
|
||||
unsafe::forget(move ptr);
|
||||
// Also we have to free the (rejected) server endpoints.
|
||||
let _server: UnwrapProto = unsafe::transmute(move serverp);
|
||||
fail ~"Another task is already unwrapping this ARC!";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
|
||||
*
|
||||
* Data races between tasks can result in crashes and, with sufficient
|
||||
* cleverness, arbitrary type coercion.
|
||||
*/
|
||||
type SharedMutableState<T: Send> = ArcDestruct<T>;
|
||||
|
||||
unsafe fn shared_mutable_state<T: Send>(+data: T) -> SharedMutableState<T> {
|
||||
let data = ~ArcData { count: 1, unwrapper: 0, data: Some(move data) };
|
||||
unsafe {
|
||||
let ptr = unsafe::transmute(move data);
|
||||
ArcDestruct(ptr)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
unsafe fn get_shared_mutable_state<T: Send>(rc: &a/SharedMutableState<T>)
|
||||
-> &a/mut T {
|
||||
unsafe {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&(*rc).data);
|
||||
assert ptr.count > 0;
|
||||
// Cast us back into the correct region
|
||||
let r = unsafe::transmute_region(option::get_ref(&ptr.data));
|
||||
unsafe::forget(move ptr);
|
||||
return unsafe::transmute_mut(r);
|
||||
}
|
||||
}
|
||||
#[inline(always)]
|
||||
unsafe fn get_shared_immutable_state<T: Send>(rc: &a/SharedMutableState<T>)
|
||||
-> &a/T {
|
||||
unsafe {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&(*rc).data);
|
||||
assert ptr.count > 0;
|
||||
// Cast us back into the correct region
|
||||
let r = unsafe::transmute_region(option::get_ref(&ptr.data));
|
||||
unsafe::forget(move ptr);
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn clone_shared_mutable_state<T: Send>(rc: &SharedMutableState<T>)
|
||||
-> SharedMutableState<T> {
|
||||
unsafe {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&(*rc).data);
|
||||
let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
|
||||
assert new_count >= 2;
|
||||
unsafe::forget(move ptr);
|
||||
}
|
||||
ArcDestruct((*rc).data)
|
||||
}
|
||||
|
||||
/****************************************************************************/
|
||||
|
||||
#[allow(non_camel_case_types)] // runtime type
|
||||
type rust_little_lock = *libc::c_void;
|
||||
|
||||
struct LittleLock {
|
||||
l: rust_little_lock,
|
||||
drop { rustrt::rust_destroy_little_lock(self.l); }
|
||||
}
|
||||
|
||||
fn LittleLock() -> LittleLock {
|
||||
LittleLock {
|
||||
l: rustrt::rust_create_little_lock()
|
||||
}
|
||||
}
|
||||
|
||||
impl LittleLock {
|
||||
#[inline(always)]
|
||||
unsafe fn lock<T>(f: fn() -> T) -> T {
|
||||
struct Unlock {
|
||||
l: rust_little_lock,
|
||||
drop { rustrt::rust_unlock_little_lock(self.l); }
|
||||
}
|
||||
|
||||
fn Unlock(l: rust_little_lock) -> Unlock {
|
||||
Unlock {
|
||||
l: l
|
||||
}
|
||||
}
|
||||
|
||||
do atomically {
|
||||
rustrt::rust_lock_little_lock(self.l);
|
||||
let _r = Unlock(self.l);
|
||||
f()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ExData<T: Send> { lock: LittleLock, mut failed: bool, mut data: T, }
|
||||
/**
|
||||
* An arc over mutable data that is protected by a lock. For library use only.
|
||||
*/
|
||||
struct Exclusive<T: Send> { x: SharedMutableState<ExData<T>> }
|
||||
|
||||
fn exclusive<T:Send >(+user_data: T) -> Exclusive<T> {
|
||||
let data = ExData {
|
||||
lock: LittleLock(), mut failed: false, mut data: user_data
|
||||
};
|
||||
Exclusive { x: unsafe { shared_mutable_state(move data) } }
|
||||
}
|
||||
|
||||
impl<T: Send> Exclusive<T> {
|
||||
// Duplicate an exclusive ARC, as std::arc::clone.
|
||||
fn clone() -> Exclusive<T> {
|
||||
Exclusive { x: unsafe { clone_shared_mutable_state(&self.x) } }
|
||||
}
|
||||
|
||||
// Exactly like std::arc::mutex_arc,access(), but with the little_lock
|
||||
// instead of a proper mutex. Same reason for being unsafe.
|
||||
//
|
||||
// Currently, scheduling operations (i.e., yielding, receiving on a pipe,
|
||||
// accessing the provided condition variable) are prohibited while inside
|
||||
// the exclusive. Supporting that is a work in progress.
|
||||
#[inline(always)]
|
||||
unsafe fn with<U>(f: fn(x: &mut T) -> U) -> U {
|
||||
let rec = unsafe { get_shared_mutable_state(&self.x) };
|
||||
do rec.lock.lock {
|
||||
if rec.failed {
|
||||
fail ~"Poisoned exclusive - another task failed inside!";
|
||||
}
|
||||
rec.failed = true;
|
||||
let result = f(&mut rec.data);
|
||||
rec.failed = false;
|
||||
move result
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
unsafe fn with_imm<U>(f: fn(x: &T) -> U) -> U {
|
||||
do self.with |x| {
|
||||
f(unsafe::transmute_immut(x))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#2585) make this a by-move method on the exclusive
|
||||
fn unwrap_exclusive<T: Send>(+arc: Exclusive<T>) -> T {
|
||||
let Exclusive { x: x } <- arc;
|
||||
let inner = unsafe { unwrap_shared_mutable_state(move x) };
|
||||
let ExData { data: data, _ } <- inner;
|
||||
move data
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
#[test]
|
||||
fn exclusive_arc() {
|
||||
let mut futures = ~[];
|
||||
|
||||
let num_tasks = 10u;
|
||||
let count = 10u;
|
||||
|
||||
let total = exclusive(~mut 0u);
|
||||
|
||||
for uint::range(0u, num_tasks) |_i| {
|
||||
let total = total.clone();
|
||||
vec::push(futures, future::spawn(|| {
|
||||
for uint::range(0u, count) |_i| {
|
||||
do total.with |count| {
|
||||
**count += 1u;
|
||||
}
|
||||
}
|
||||
}));
|
||||
};
|
||||
|
||||
for futures.each |f| { f.get() }
|
||||
|
||||
do total.with |total| {
|
||||
assert **total == num_tasks * count
|
||||
};
|
||||
}
|
||||
|
||||
#[test] #[should_fail] #[ignore(cfg(windows))]
|
||||
fn exclusive_poison() {
|
||||
// Tests that if one task fails inside of an exclusive, subsequent
|
||||
// accesses will also fail.
|
||||
let x = exclusive(1);
|
||||
let x2 = x.clone();
|
||||
do task::try {
|
||||
do x2.with |one| {
|
||||
assert *one == 2;
|
||||
}
|
||||
};
|
||||
do x.with |one| {
|
||||
assert *one == 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exclusive_unwrap_basic() {
|
||||
let x = exclusive(~~"hello");
|
||||
assert unwrap_exclusive(x) == ~~"hello";
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exclusive_unwrap_contended() {
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = ~mut Some(x.clone());
|
||||
do task::spawn {
|
||||
let x2 = option::swap_unwrap(x2);
|
||||
do x2.with |_hello| { }
|
||||
task::yield();
|
||||
}
|
||||
assert unwrap_exclusive(x) == ~~"hello";
|
||||
|
||||
// Now try the same thing, but with the child task blocking.
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = ~mut Some(x.clone());
|
||||
let mut res = None;
|
||||
do task::task().future_result(|+r| res = Some(r)).spawn {
|
||||
let x2 = option::swap_unwrap(x2);
|
||||
assert unwrap_exclusive(x2) == ~~"hello";
|
||||
}
|
||||
// Have to get rid of our reference before blocking.
|
||||
{ let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
|
||||
let res = option::swap_unwrap(&mut res);
|
||||
future::get(&res);
|
||||
}
|
||||
|
||||
#[test] #[should_fail] #[ignore(cfg(windows))]
|
||||
fn exclusive_unwrap_conflict() {
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = ~mut Some(x.clone());
|
||||
let mut res = None;
|
||||
do task::task().future_result(|+r| res = Some(r)).spawn {
|
||||
let x2 = option::swap_unwrap(x2);
|
||||
assert unwrap_exclusive(x2) == ~~"hello";
|
||||
}
|
||||
assert unwrap_exclusive(x) == ~~"hello";
|
||||
let res = option::swap_unwrap(&mut res);
|
||||
future::get(&res);
|
||||
}
|
||||
|
||||
#[test] #[ignore(cfg(windows))]
|
||||
fn exclusive_unwrap_deadlock() {
|
||||
// This is not guaranteed to get to the deadlock before being killed,
|
||||
// but it will show up sometimes, and if the deadlock were not there,
|
||||
// the test would nondeterministically fail.
|
||||
let result = do task::try {
|
||||
// a task that has two references to the same exclusive will
|
||||
// deadlock when it unwraps. nothing to be done about that.
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = x.clone();
|
||||
do task::spawn {
|
||||
for 10.times { task::yield(); } // try to let the unwrapper go
|
||||
fail; // punt it awake from its deadlock
|
||||
}
|
||||
let _z = unwrap_exclusive(x);
|
||||
do x2.with |_hello| { }
|
||||
};
|
||||
assert result.is_err();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -900,7 +900,7 @@ type TaskGroupData = {
|
|||
// tasks in this group.
|
||||
mut descendants: TaskSet,
|
||||
};
|
||||
type TaskGroupArc = unsafe::Exclusive<Option<TaskGroupData>>;
|
||||
type TaskGroupArc = private::Exclusive<Option<TaskGroupData>>;
|
||||
|
||||
type TaskGroupInner = &mut Option<TaskGroupData>;
|
||||
|
||||
|
|
@ -929,7 +929,7 @@ type AncestorNode = {
|
|||
// Recursive rest of the list.
|
||||
mut ancestors: AncestorList,
|
||||
};
|
||||
enum AncestorList = Option<unsafe::Exclusive<AncestorNode>>;
|
||||
enum AncestorList = Option<private::Exclusive<AncestorNode>>;
|
||||
|
||||
// Accessors for taskgroup arcs and ancestor arcs that wrap the unsafety.
|
||||
#[inline(always)]
|
||||
|
|
@ -938,7 +938,7 @@ fn access_group<U>(x: &TaskGroupArc, blk: fn(TaskGroupInner) -> U) -> U {
|
|||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn access_ancestors<U>(x: &unsafe::Exclusive<AncestorNode>,
|
||||
fn access_ancestors<U>(x: &private::Exclusive<AncestorNode>,
|
||||
blk: fn(x: &mut AncestorNode) -> U) -> U {
|
||||
unsafe { x.with(blk) }
|
||||
}
|
||||
|
|
@ -1222,7 +1222,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
|
|||
let mut members = new_taskset();
|
||||
taskset_insert(&mut members, spawner);
|
||||
let tasks =
|
||||
unsafe::exclusive(Some({ mut members: move members,
|
||||
private::exclusive(Some({ mut members: move members,
|
||||
mut descendants: new_taskset() }));
|
||||
// Main task/group has no ancestors, no notifier, etc.
|
||||
let group =
|
||||
|
|
@ -1244,7 +1244,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
|
|||
(move g, move a, spawner_group.is_main)
|
||||
} else {
|
||||
// Child is in a separate group from spawner.
|
||||
let g = unsafe::exclusive(Some({ mut members: new_taskset(),
|
||||
let g = private::exclusive(Some({ mut members: new_taskset(),
|
||||
mut descendants: new_taskset() }));
|
||||
let a = if supervised {
|
||||
// Child's ancestors start with the spawner.
|
||||
|
|
@ -1259,7 +1259,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
|
|||
};
|
||||
assert new_generation < uint::max_value;
|
||||
// Build a new node in the ancestor list.
|
||||
AncestorList(Some(unsafe::exclusive(
|
||||
AncestorList(Some(private::exclusive(
|
||||
{ generation: new_generation,
|
||||
mut parent_group: Some(spawner_group.tasks.clone()),
|
||||
mut ancestors: move old_ancestors })))
|
||||
|
|
|
|||
|
|
@ -4,14 +4,8 @@ export reinterpret_cast, forget, bump_box_refcount, transmute;
|
|||
export transmute_mut, transmute_immut, transmute_region, transmute_mut_region;
|
||||
export transmute_mut_unsafe, transmute_immut_unsafe;
|
||||
|
||||
export SharedMutableState, shared_mutable_state, clone_shared_mutable_state;
|
||||
export get_shared_mutable_state, get_shared_immutable_state;
|
||||
export unwrap_shared_mutable_state;
|
||||
export Exclusive, exclusive, unwrap_exclusive;
|
||||
export copy_lifetime;
|
||||
|
||||
use task::atomically;
|
||||
|
||||
#[abi = "rust-intrinsic"]
|
||||
extern mod rusti {
|
||||
fn forget<T>(-x: T);
|
||||
|
|
@ -96,298 +90,6 @@ unsafe fn copy_lifetime_to_unsafe<S,T>(_ptr: &a/S, +ptr: *T) -> &a/T {
|
|||
}
|
||||
|
||||
|
||||
/****************************************************************************
|
||||
* Shared state & exclusive ARC
|
||||
****************************************************************************/
|
||||
|
||||
// An unwrapper uses this protocol to communicate with the "other" task that
|
||||
// drops the last refcount on an arc. Unfortunately this can't be a proper
|
||||
// pipe protocol because the unwrapper has to access both stages at once.
|
||||
type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>;
|
||||
|
||||
struct ArcData<T> {
|
||||
mut count: libc::intptr_t,
|
||||
mut unwrapper: libc::uintptr_t, // either a UnwrapProto or 0
|
||||
// FIXME(#3224) should be able to make this non-option to save memory, and
|
||||
// in unwrap() use "let ~ArcData { data: result, _ } = thing" to unwrap it
|
||||
mut data: Option<T>,
|
||||
}
|
||||
|
||||
struct ArcDestruct<T> {
|
||||
mut data: *libc::c_void,
|
||||
drop unsafe {
|
||||
if self.data.is_null() {
|
||||
return; // Happens when destructing an unwrapper's handle.
|
||||
}
|
||||
do task::unkillable {
|
||||
let data: ~ArcData<T> = unsafe::reinterpret_cast(&self.data);
|
||||
let new_count = rustrt::rust_atomic_decrement(&mut data.count);
|
||||
assert new_count >= 0;
|
||||
if new_count == 0 {
|
||||
// Were we really last, or should we hand off to an unwrapper?
|
||||
// It's safe to not xchg because the unwrapper will set the
|
||||
// unwrap lock *before* dropping his/her reference. In effect,
|
||||
// being here means we're the only *awake* task with the data.
|
||||
if data.unwrapper != 0 {
|
||||
let p: UnwrapProto =
|
||||
unsafe::reinterpret_cast(&data.unwrapper);
|
||||
let (message, response) = option::swap_unwrap(p);
|
||||
// Send 'ready' and wait for a response.
|
||||
pipes::send_one(move message, ());
|
||||
// Unkillable wait. Message guaranteed to come.
|
||||
if pipes::recv_one(move response) {
|
||||
// Other task got the data.
|
||||
unsafe::forget(move data);
|
||||
} else {
|
||||
// Other task was killed. drop glue takes over.
|
||||
}
|
||||
} else {
|
||||
// drop glue takes over.
|
||||
}
|
||||
} else {
|
||||
unsafe::forget(move data);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
|
||||
ArcDestruct {
|
||||
data: data
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn unwrap_shared_mutable_state<T: Send>(+rc: SharedMutableState<T>)
|
||||
-> T {
|
||||
struct DeathThroes<T> {
|
||||
mut ptr: Option<~ArcData<T>>,
|
||||
mut response: Option<pipes::ChanOne<bool>>,
|
||||
drop unsafe {
|
||||
let response = option::swap_unwrap(&mut self.response);
|
||||
// In case we get killed early, we need to tell the person who
|
||||
// tried to wake us whether they should hand-off the data to us.
|
||||
if task::failing() {
|
||||
pipes::send_one(move response, false);
|
||||
// Either this swap_unwrap or the one below (at "Got here")
|
||||
// ought to run.
|
||||
unsafe::forget(option::swap_unwrap(&mut self.ptr));
|
||||
} else {
|
||||
assert self.ptr.is_none();
|
||||
pipes::send_one(move response, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
do task::unkillable {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&rc.data);
|
||||
let (c1,p1) = pipes::oneshot(); // ()
|
||||
let (c2,p2) = pipes::oneshot(); // bool
|
||||
let server: UnwrapProto = ~mut Some((move c1,move p2));
|
||||
let serverp: libc::uintptr_t = unsafe::transmute(move server);
|
||||
// Try to put our server end in the unwrapper slot.
|
||||
if rustrt::rust_compare_and_swap_ptr(&mut ptr.unwrapper, 0, serverp) {
|
||||
// Got in. Step 0: Tell destructor not to run. We are now it.
|
||||
rc.data = ptr::null();
|
||||
// Step 1 - drop our own reference.
|
||||
let new_count = rustrt::rust_atomic_decrement(&mut ptr.count);
|
||||
assert new_count >= 0;
|
||||
if new_count == 0 {
|
||||
// We were the last owner. Can unwrap immediately.
|
||||
// Also we have to free the server endpoints.
|
||||
let _server: UnwrapProto = unsafe::transmute(move serverp);
|
||||
option::swap_unwrap(&mut ptr.data)
|
||||
// drop glue takes over.
|
||||
} else {
|
||||
// The *next* person who sees the refcount hit 0 will wake us.
|
||||
let end_result =
|
||||
DeathThroes { ptr: Some(move ptr),
|
||||
response: Some(move c2) };
|
||||
let mut p1 = Some(move p1); // argh
|
||||
do task::rekillable {
|
||||
pipes::recv_one(option::swap_unwrap(&mut p1));
|
||||
}
|
||||
// Got here. Back in the 'unkillable' without getting killed.
|
||||
// Recover ownership of ptr, then take the data out.
|
||||
let ptr = option::swap_unwrap(&mut end_result.ptr);
|
||||
option::swap_unwrap(&mut ptr.data)
|
||||
// drop glue takes over.
|
||||
}
|
||||
} else {
|
||||
// Somebody else was trying to unwrap. Avoid guaranteed deadlock.
|
||||
unsafe::forget(move ptr);
|
||||
// Also we have to free the (rejected) server endpoints.
|
||||
let _server: UnwrapProto = unsafe::transmute(move serverp);
|
||||
fail ~"Another task is already unwrapping this ARC!";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
|
||||
*
|
||||
* Data races between tasks can result in crashes and, with sufficient
|
||||
* cleverness, arbitrary type coercion.
|
||||
*/
|
||||
type SharedMutableState<T: Send> = ArcDestruct<T>;
|
||||
|
||||
unsafe fn shared_mutable_state<T: Send>(+data: T) -> SharedMutableState<T> {
|
||||
let data = ~ArcData { count: 1, unwrapper: 0, data: Some(move data) };
|
||||
unsafe {
|
||||
let ptr = unsafe::transmute(move data);
|
||||
ArcDestruct(ptr)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
unsafe fn get_shared_mutable_state<T: Send>(rc: &a/SharedMutableState<T>)
|
||||
-> &a/mut T {
|
||||
unsafe {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&(*rc).data);
|
||||
assert ptr.count > 0;
|
||||
// Cast us back into the correct region
|
||||
let r = unsafe::transmute_region(option::get_ref(&ptr.data));
|
||||
unsafe::forget(move ptr);
|
||||
return unsafe::transmute_mut(r);
|
||||
}
|
||||
}
|
||||
#[inline(always)]
|
||||
unsafe fn get_shared_immutable_state<T: Send>(rc: &a/SharedMutableState<T>)
|
||||
-> &a/T {
|
||||
unsafe {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&(*rc).data);
|
||||
assert ptr.count > 0;
|
||||
// Cast us back into the correct region
|
||||
let r = unsafe::transmute_region(option::get_ref(&ptr.data));
|
||||
unsafe::forget(move ptr);
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn clone_shared_mutable_state<T: Send>(rc: &SharedMutableState<T>)
|
||||
-> SharedMutableState<T> {
|
||||
unsafe {
|
||||
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(&(*rc).data);
|
||||
let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
|
||||
assert new_count >= 2;
|
||||
unsafe::forget(move ptr);
|
||||
}
|
||||
ArcDestruct((*rc).data)
|
||||
}
|
||||
|
||||
/****************************************************************************/
|
||||
|
||||
#[allow(non_camel_case_types)] // runtime type
|
||||
type rust_little_lock = *libc::c_void;
|
||||
|
||||
#[abi = "cdecl"]
|
||||
extern mod rustrt {
|
||||
#[rust_stack]
|
||||
fn rust_atomic_increment(p: &mut libc::intptr_t)
|
||||
-> libc::intptr_t;
|
||||
|
||||
#[rust_stack]
|
||||
fn rust_atomic_decrement(p: &mut libc::intptr_t)
|
||||
-> libc::intptr_t;
|
||||
|
||||
#[rust_stack]
|
||||
fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
|
||||
oldval: libc::uintptr_t,
|
||||
newval: libc::uintptr_t) -> bool;
|
||||
|
||||
fn rust_create_little_lock() -> rust_little_lock;
|
||||
fn rust_destroy_little_lock(lock: rust_little_lock);
|
||||
fn rust_lock_little_lock(lock: rust_little_lock);
|
||||
fn rust_unlock_little_lock(lock: rust_little_lock);
|
||||
}
|
||||
|
||||
struct LittleLock {
|
||||
l: rust_little_lock,
|
||||
drop { rustrt::rust_destroy_little_lock(self.l); }
|
||||
}
|
||||
|
||||
fn LittleLock() -> LittleLock {
|
||||
LittleLock {
|
||||
l: rustrt::rust_create_little_lock()
|
||||
}
|
||||
}
|
||||
|
||||
impl LittleLock {
|
||||
#[inline(always)]
|
||||
unsafe fn lock<T>(f: fn() -> T) -> T {
|
||||
struct Unlock {
|
||||
l: rust_little_lock,
|
||||
drop { rustrt::rust_unlock_little_lock(self.l); }
|
||||
}
|
||||
|
||||
fn Unlock(l: rust_little_lock) -> Unlock {
|
||||
Unlock {
|
||||
l: l
|
||||
}
|
||||
}
|
||||
|
||||
do atomically {
|
||||
rustrt::rust_lock_little_lock(self.l);
|
||||
let _r = Unlock(self.l);
|
||||
f()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ExData<T: Send> { lock: LittleLock, mut failed: bool, mut data: T, }
|
||||
/**
|
||||
* An arc over mutable data that is protected by a lock. For library use only.
|
||||
*/
|
||||
struct Exclusive<T: Send> { x: SharedMutableState<ExData<T>> }
|
||||
|
||||
fn exclusive<T:Send >(+user_data: T) -> Exclusive<T> {
|
||||
let data = ExData {
|
||||
lock: LittleLock(), mut failed: false, mut data: user_data
|
||||
};
|
||||
Exclusive { x: unsafe { shared_mutable_state(move data) } }
|
||||
}
|
||||
|
||||
impl<T: Send> Exclusive<T> {
|
||||
// Duplicate an exclusive ARC, as std::arc::clone.
|
||||
fn clone() -> Exclusive<T> {
|
||||
Exclusive { x: unsafe { clone_shared_mutable_state(&self.x) } }
|
||||
}
|
||||
|
||||
// Exactly like std::arc::mutex_arc,access(), but with the little_lock
|
||||
// instead of a proper mutex. Same reason for being unsafe.
|
||||
//
|
||||
// Currently, scheduling operations (i.e., yielding, receiving on a pipe,
|
||||
// accessing the provided condition variable) are prohibited while inside
|
||||
// the exclusive. Supporting that is a work in progress.
|
||||
#[inline(always)]
|
||||
unsafe fn with<U>(f: fn(x: &mut T) -> U) -> U {
|
||||
let rec = unsafe { get_shared_mutable_state(&self.x) };
|
||||
do rec.lock.lock {
|
||||
if rec.failed {
|
||||
fail ~"Poisoned exclusive - another task failed inside!";
|
||||
}
|
||||
rec.failed = true;
|
||||
let result = f(&mut rec.data);
|
||||
rec.failed = false;
|
||||
move result
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
unsafe fn with_imm<U>(f: fn(x: &T) -> U) -> U {
|
||||
do self.with |x| {
|
||||
f(unsafe::transmute_immut(x))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#2585) make this a by-move method on the exclusive
|
||||
fn unwrap_exclusive<T: Send>(+arc: Exclusive<T>) -> T {
|
||||
let Exclusive { x: x } <- arc;
|
||||
let inner = unsafe { unwrap_shared_mutable_state(move x) };
|
||||
let ExData { data: data, _ } <- inner;
|
||||
move data
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
* Tests
|
||||
****************************************************************************/
|
||||
|
|
@ -431,112 +133,4 @@ mod tests {
|
|||
assert ~[76u8, 0u8] == transmute(~"L");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exclusive_arc() {
|
||||
let mut futures = ~[];
|
||||
|
||||
let num_tasks = 10u;
|
||||
let count = 10u;
|
||||
|
||||
let total = exclusive(~mut 0u);
|
||||
|
||||
for uint::range(0u, num_tasks) |_i| {
|
||||
let total = total.clone();
|
||||
vec::push(futures, future::spawn(|| {
|
||||
for uint::range(0u, count) |_i| {
|
||||
do total.with |count| {
|
||||
**count += 1u;
|
||||
}
|
||||
}
|
||||
}));
|
||||
};
|
||||
|
||||
for futures.each |f| { f.get() }
|
||||
|
||||
do total.with |total| {
|
||||
assert **total == num_tasks * count
|
||||
};
|
||||
}
|
||||
|
||||
#[test] #[should_fail] #[ignore(cfg(windows))]
|
||||
fn exclusive_poison() {
|
||||
// Tests that if one task fails inside of an exclusive, subsequent
|
||||
// accesses will also fail.
|
||||
let x = exclusive(1);
|
||||
let x2 = x.clone();
|
||||
do task::try {
|
||||
do x2.with |one| {
|
||||
assert *one == 2;
|
||||
}
|
||||
};
|
||||
do x.with |one| {
|
||||
assert *one == 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exclusive_unwrap_basic() {
|
||||
let x = exclusive(~~"hello");
|
||||
assert unwrap_exclusive(x) == ~~"hello";
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exclusive_unwrap_contended() {
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = ~mut Some(x.clone());
|
||||
do task::spawn {
|
||||
let x2 = option::swap_unwrap(x2);
|
||||
do x2.with |_hello| { }
|
||||
task::yield();
|
||||
}
|
||||
assert unwrap_exclusive(x) == ~~"hello";
|
||||
|
||||
// Now try the same thing, but with the child task blocking.
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = ~mut Some(x.clone());
|
||||
let mut res = None;
|
||||
do task::task().future_result(|+r| res = Some(r)).spawn {
|
||||
let x2 = option::swap_unwrap(x2);
|
||||
assert unwrap_exclusive(x2) == ~~"hello";
|
||||
}
|
||||
// Have to get rid of our reference before blocking.
|
||||
{ let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
|
||||
let res = option::swap_unwrap(&mut res);
|
||||
future::get(&res);
|
||||
}
|
||||
|
||||
#[test] #[should_fail] #[ignore(cfg(windows))]
|
||||
fn exclusive_unwrap_conflict() {
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = ~mut Some(x.clone());
|
||||
let mut res = None;
|
||||
do task::task().future_result(|+r| res = Some(r)).spawn {
|
||||
let x2 = option::swap_unwrap(x2);
|
||||
assert unwrap_exclusive(x2) == ~~"hello";
|
||||
}
|
||||
assert unwrap_exclusive(x) == ~~"hello";
|
||||
let res = option::swap_unwrap(&mut res);
|
||||
future::get(&res);
|
||||
}
|
||||
|
||||
#[test] #[ignore(cfg(windows))]
|
||||
fn exclusive_unwrap_deadlock() {
|
||||
// This is not guaranteed to get to the deadlock before being killed,
|
||||
// but it will show up sometimes, and if the deadlock were not there,
|
||||
// the test would nondeterministically fail.
|
||||
let result = do task::try {
|
||||
// a task that has two references to the same exclusive will
|
||||
// deadlock when it unwraps. nothing to be done about that.
|
||||
let x = exclusive(~~"hello");
|
||||
let x2 = x.clone();
|
||||
do task::spawn {
|
||||
for 10.times { task::yield(); } // try to let the unwrapper go
|
||||
fail; // punt it awake from its deadlock
|
||||
}
|
||||
let _z = unwrap_exclusive(x);
|
||||
do x2.with |_hello| { }
|
||||
};
|
||||
assert result.is_err();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2239,8 +2239,8 @@ mod tests {
|
|||
#[test]
|
||||
fn test_swap_remove_noncopyable() {
|
||||
// Tests that we don't accidentally run destructors twice.
|
||||
let mut v = ~[::unsafe::exclusive(()), ::unsafe::exclusive(()),
|
||||
::unsafe::exclusive(())];
|
||||
let mut v = ~[::private::exclusive(()), ::private::exclusive(()),
|
||||
::private::exclusive(())];
|
||||
let mut _e = swap_remove(v, 0);
|
||||
assert (len(v) == 2);
|
||||
_e = swap_remove(v, 1);
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
* between tasks.
|
||||
*/
|
||||
|
||||
use unsafe::{SharedMutableState, shared_mutable_state,
|
||||
use private::{SharedMutableState, shared_mutable_state,
|
||||
clone_shared_mutable_state, unwrap_shared_mutable_state,
|
||||
get_shared_mutable_state, get_shared_immutable_state};
|
||||
use sync::{Mutex, mutex_with_condvars,
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@
|
|||
export Condvar, Semaphore, Mutex, mutex_with_condvars;
|
||||
export RWlock, rwlock_with_condvars, RWlockReadMode, RWlockWriteMode;
|
||||
|
||||
use unsafe::{Exclusive, exclusive};
|
||||
use private::{Exclusive, exclusive};
|
||||
|
||||
/****************************************************************************
|
||||
* Internals
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
fn main() {
|
||||
let x = Some(unsafe::exclusive(false));
|
||||
let x = Some(private::exclusive(false));
|
||||
match x {
|
||||
Some(copy z) => { //~ ERROR copying a noncopyable value
|
||||
do z.with |b| { assert !*b; }
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
fn main() {
|
||||
let x = Some(unsafe::exclusive(true));
|
||||
let x = Some(private::exclusive(true));
|
||||
match move x {
|
||||
Some(ref z) if z.with(|b| *b) => {
|
||||
do z.with |b| { assert *b; }
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue