Be more defensive in pipes (#3098)

This commit is contained in:
Eric Holk 2012-08-03 11:43:10 -07:00
parent 9e68966611
commit 01ca0d1f68
3 changed files with 18 additions and 8 deletions

View file

@ -127,9 +127,9 @@ fn swap_unwrap<T>(opt: &mut option<T>) -> T {
unwrap(util::replace(opt, none))
}
pure fn unwrap_expect<T>(-opt: option<T>, reason: ~str) -> T {
pure fn unwrap_expect<T>(-opt: option<T>, reason: &str) -> T {
//! As unwrap, but with a specified failure message.
if opt.is_none() { fail reason; }
if opt.is_none() { fail reason.to_unique(); }
unwrap(opt)
}

View file

@ -343,7 +343,7 @@ Fails if the sender closes the connection.
*/
fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
option::unwrap(try_recv(p))
option::unwrap_expect(try_recv(p), "connection closed")
}
/** Attempts to receive a message from a pipe.
@ -391,10 +391,13 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
full {
let mut payload = none;
payload <-> p.payload;
p.header.blocked_task = none;
p.header.state = empty;
return some(option::unwrap(payload))
}
terminated {
// This assert detects when we've accidentally unsafely
// casted too big of a number to a state.
assert old_state == terminated;
return none;
}
@ -428,10 +431,13 @@ fn sender_terminate<T: send>(p: *packet<T>) {
}
blocked {
// wake up the target
let target = p.header.blocked_task.get();
rustrt::task_signal_event(target,
ptr::addr_of(p.header) as *libc::c_void);
alt p.header.blocked_task {
some(target) =>
rustrt::task_signal_event(
target,
ptr::addr_of(p.header) as *libc::c_void),
none => { debug!{"receiver is already shutting down"} }
}
// The receiver will eventually clean up.
//unsafe { forget(p) }
}
@ -448,6 +454,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
#[doc(hidden)]
fn receiver_terminate<T: send>(p: *packet<T>) {
let p = unsafe { &*p };
assert p.header.blocked_task == none;
alt swap_state_rel(p.header.state, terminated) {
empty {
// the sender will clean up
@ -514,7 +521,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
for pkts.each |p| { unsafe{ (*p).unblock()} }
debug!{"%?, %?", ready_packet, pkts[ready_packet]};
debug!("%?, %?", ready_packet, pkts[ready_packet]);
unsafe {
assert (*pkts[ready_packet]).state == full

View file

@ -680,6 +680,9 @@ void
rust_task::signal_event(void *event) {
scoped_lock with(lifecycle_lock);
assert(task_state_blocked == state ||
task_state_running == state);
this->event = event;
event_reject = true;
if(task_state_blocked == state) {