From 26e6eb3d14d7ff3bcbfa5ca442a6928776982e98 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Tue, 10 Jul 2012 10:58:44 -0700 Subject: [PATCH] Handle failure conditions correctly in pipes. --- src/libcore/pipes.rs | 33 ++++++++++++++++++--------- src/libcore/task.rs | 1 + src/rt/rust_builtin.cpp | 4 ++-- src/rt/rust_task.cpp | 6 ++--- src/rt/rust_task.h | 2 +- src/test/run-pass/pipe-detect-term.rs | 21 ++++++++++++++++- 6 files changed, 48 insertions(+), 19 deletions(-) diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 441f323d7bf6..7e05f048dad9 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -47,7 +47,7 @@ extern mod rustrt { #[rust_stack] fn task_clear_event_reject(task: *rust_task); - fn task_wait_event(this: *rust_task) -> *libc::c_void; + fn task_wait_event(this: *rust_task, killed: &mut bool) -> *libc::c_void; fn task_signal_event(target: *rust_task, event: *libc::c_void); } @@ -57,6 +57,16 @@ unsafe fn uniquify(x: *T) -> ~T { unsafe { unsafe::reinterpret_cast(x) } } +fn wait_event(this: *rust_task) -> *libc::c_void { + let mut killed = false; + + let res = rustrt::task_wait_event(this, &mut killed); + if killed && !task::failing() { + fail "killed" + } + res +} + fn swap_state_acq(&dst: state, src: state) -> state { unsafe { reinterpret_cast(rusti::atomic_xchng_acq( @@ -113,23 +123,23 @@ fn recv(-p: recv_packet) -> option { let this = rustrt::rust_get_task(); rustrt::task_clear_event_reject(this); p.header.blocked_task = some(this); + let mut first = true; loop { + rustrt::task_clear_event_reject(this); let old_state = swap_state_acq(p.header.state, blocked); #debug("%?", old_state); alt old_state { empty { #debug("no data available on %?, going to sleep.", p_); - rustrt::task_wait_event(this); + wait_event(this); #debug("woke up, p.state = %?", p.header.state); - if p.header.state == full { - let mut payload = none; - payload <-> (*p).payload; - p.header.state = terminated; - ret some(option::unwrap(payload)) + } + blocked { + if first { + fail "blocking on already blocked packet" } } - blocked { fail "blocking on already blocked packet" } full { let mut payload = none; payload <-> (*p).payload; @@ -141,11 +151,12 @@ fn recv(-p: recv_packet) -> option { ret none; } } + first = false; } } /// Returns true if messages are available. -fn peek(p: recv_packet) -> bool { +pure fn peek(p: recv_packet) -> bool { alt p.header().state { empty { false } blocked { fail "peeking on blocked packet" } @@ -236,7 +247,7 @@ fn wait_many(pkts: ~[&a.packet_header]) -> uint { while !data_avail { #debug("sleeping on %? packets", pkts.len()); - let event = rustrt::task_wait_event(this) as *packet_header; + let event = wait_event(this) as *packet_header; let pos = vec::position(pkts, |p| ptr::addr_of(*p) == event); alt pos { @@ -356,7 +367,7 @@ class recv_packet { option::unwrap(p) } - fn header() -> &self.packet_header { + pure fn header() -> &self.packet_header { alt self.p { some(packet) { unsafe { diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 45ed620b30e8..3d05611aa98f 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -46,6 +46,7 @@ export future_result; export future_task; export unsupervise; export run_listener; +export run_with; export spawn; export spawn_with; diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 732dbaa32931..aa314d2811e9 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -930,11 +930,11 @@ task_clear_event_reject(rust_task *task) { // Waits on an event, returning the pointer to the event that unblocked this // task. extern "C" void * -task_wait_event(rust_task *task) { +task_wait_event(rust_task *task, bool *killed) { // TODO: we should assert that the passed in task is the currently running // task. We wouldn't want to wait some other task. - return task->wait_event(); + return task->wait_event(killed); } extern "C" void diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 6a9f5cf50012..f5e2fcc9a085 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -713,16 +713,14 @@ rust_task::allow_kill() { } void * -rust_task::wait_event() { +rust_task::wait_event(bool *killed) { scoped_lock with(state_lock); if(!event_reject) { block_locked(&event_cond, "waiting on event"); - bool killed = false; state_lock.unlock(); - yield(&killed); + yield(killed); state_lock.lock(); - // TODO: what is the right thing to do if we are killed? } event_reject = false; diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index c5bdc50e4308..6cb6dbaa6535 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -316,7 +316,7 @@ public: this->event_reject = false; } - void *wait_event(); + void *wait_event(bool *killed); void signal_event(void *event); void cleanup_after_turn(); diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs index 6a80fd46f34a..ae9e178e74c2 100644 --- a/src/test/run-pass/pipe-detect-term.rs +++ b/src/test/run-pass/pipe-detect-term.rs @@ -26,5 +26,24 @@ fn main() { } }); - sleep(iotask, 1000); + sleep(iotask, 100); + + let b = task::builder(); + task::unsupervise(b); + task::run(b, failtest); +} + +// Make sure the right thing happens during failure. +fn failtest() { + let iotask = uv::global_loop::get(); + + let (c, p) = oneshot::init(); + + do task::spawn_with(c) |_c| { + fail; + } + + #error("%?", recv(p)); + // make sure we get killed if we missed it in the receive. + loop { task::yield() } }