From 8852279a9ecac970e30b6d92d7efdcbd5485769c Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 15 Jan 2013 19:53:35 -0800 Subject: [PATCH] core: Add new weak task API --- src/libcore/pipes.rs | 10 ++ src/libcore/private.rs | 2 + src/libcore/private/weak_task.rs | 187 +++++++++++++++++++++++++++++++ src/rt/rust_builtin.cpp | 12 ++ src/rt/rust_kernel.cpp | 24 ++-- src/rt/rust_kernel.h | 2 + src/rt/rustrt.def.in | 4 +- 7 files changed, 233 insertions(+), 8 deletions(-) create mode 100644 src/libcore/private/weak_task.rs diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 2ff4effbd6ee..2865c9421380 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -1234,6 +1234,16 @@ pub fn oneshot() -> (PortOne, ChanOne) { (port, chan) } +impl PortOne { + fn recv(self) -> T { recv_one(self) } + fn try_recv(self) -> Option { try_recv_one(self) } +} + +impl ChanOne { + fn send(self, data: T) { send_one(self, data) } + fn try_send(self, data: T) -> bool { try_send_one(self, data) } +} + /** * Receive a message from a oneshot pipe, failing if the connection was * closed. diff --git a/src/libcore/private.rs b/src/libcore/private.rs index 3eadce1c30cc..aa976ee745d3 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -34,6 +34,8 @@ pub mod at_exit; pub mod global; #[path = "private/finally.rs"] pub mod finally; +#[path = "private/weak_task.rs"] +pub mod weak_task; extern mod rustrt { #[legacy_exports]; diff --git a/src/libcore/private/weak_task.rs b/src/libcore/private/weak_task.rs new file mode 100644 index 000000000000..868361b0e607 --- /dev/null +++ b/src/libcore/private/weak_task.rs @@ -0,0 +1,187 @@ +/*! +Weak tasks + +Weak tasks are a runtime feature for building global services that +do not keep the runtime alive. Normally the runtime exits when all +tasks exits, but if a task is weak then the runtime may exit while +it is running, sending a notification to the task that the runtime +is trying to shut down. +*/ + +use option::{Some, None, swap_unwrap}; +use private::at_exit::at_exit; +use private::global::global_data_clone_create; +use private::finally::Finally; +use pipes::{Port, Chan, SharedChan, stream}; +use task::{Task, task, spawn}; +use task::rt::{task_id, get_task_id}; +use send_map::linear::LinearMap; +use ops::Drop; + +type ShutdownMsg = (); + +// XXX: This could be a PortOne but I've experienced bugginess +// with oneshot pipes and try_send +pub unsafe fn weaken_task(f: &fn(Port)) { + let service = global_data_clone_create(global_data_key, + create_global_service); + let (shutdown_port, shutdown_chan) = stream::(); + let shutdown_port = ~mut Some(shutdown_port); + let task = get_task_id(); + // Expect the weak task service to be alive + assert service.try_send(RegisterWeakTask(task, shutdown_chan)); + unsafe { rust_inc_weak_task_count(); } + do fn&() { + let shutdown_port = swap_unwrap(&mut *shutdown_port); + f(shutdown_port) + }.finally || { + unsafe { rust_dec_weak_task_count(); } + // Service my have already exited + service.send(UnregisterWeakTask(task)); + } +} + +type WeakTaskService = SharedChan; +type TaskHandle = task_id; + +fn global_data_key(_v: WeakTaskService) { } + +enum ServiceMsg { + RegisterWeakTask(TaskHandle, Chan), + UnregisterWeakTask(TaskHandle), + Shutdown +} + +fn create_global_service() -> ~WeakTaskService { + + debug!("creating global weak task service"); + let (port, chan) = stream::(); + let port = ~mut Some(port); + let chan = SharedChan(chan); + let chan_clone = chan.clone(); + + do task().unlinked().spawn { + debug!("running global weak task service"); + let port = swap_unwrap(&mut *port); + let port = ~mut Some(port); + do fn&() { + let port = swap_unwrap(&mut *port); + // The weak task service is itself a weak task + debug!("weakening the weak service task"); + unsafe { rust_inc_weak_task_count(); } + run_weak_task_service(port); + }.finally { + debug!("unweakening the weak service task"); + unsafe { rust_dec_weak_task_count(); } + } + } + + do at_exit { + debug!("shutting down weak task service"); + chan.send(Shutdown); + } + + return ~chan_clone; +} + +fn run_weak_task_service(port: Port) { + + let mut shutdown_map = LinearMap(); + + loop { + match port.recv() { + RegisterWeakTask(task, shutdown_chan) => { + let previously_unregistered = + shutdown_map.insert(task, shutdown_chan); + assert previously_unregistered; + } + UnregisterWeakTask(task) => { + match shutdown_map.pop(&task) { + Some(shutdown_chan) => { + // Oneshot pipes must send, even though + // nobody will receive this + shutdown_chan.send(()); + } + None => fail + } + } + Shutdown => break + } + } + + do shutdown_map.consume |_, shutdown_chan| { + // Weak task may have already exited + shutdown_chan.send(()); + } +} + +extern { + unsafe fn rust_inc_weak_task_count(); + unsafe fn rust_dec_weak_task_count(); +} + +#[test] +fn test_simple() unsafe { + let (port, chan) = stream(); + do spawn unsafe { + do weaken_task |_signal| { + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_weak_weak() unsafe { + let (port, chan) = stream(); + do spawn unsafe { + do weaken_task |_signal| { + } + do weaken_task |_signal| { + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_wait_for_signal() unsafe { + do spawn unsafe { + do weaken_task |signal| { + signal.recv(); + } + } +} + +#[test] +fn test_wait_for_signal_many() unsafe { + use uint; + for uint::range(0, 100) |_| { + do spawn unsafe { + do weaken_task |signal| { + signal.recv(); + } + } + } +} + +#[test] +fn test_select_stream_and_oneshot() unsafe { + use pipes::select2i; + use either::{Left, Right}; + + let (port, chan) = stream(); + let (waitport, waitchan) = stream(); + do spawn unsafe { + do weaken_task |signal| { + match select2i(&port, &signal) { + Left(*) => (), + Right(*) => fail + } + } + waitchan.send(()); + } + chan.send(()); + waitport.recv(); +} + diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 221afb89b237..a5e1260d4a55 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -1038,6 +1038,18 @@ rust_get_global_data_ptr() { return &task->kernel->global_data; } +extern "C" void +rust_inc_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->inc_weak_task_count(); +} + +extern "C" void +rust_dec_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->dec_weak_task_count(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 9c6ba9dcda3e..d270ac076331 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -377,17 +377,12 @@ rust_kernel::weaken_task(rust_port_id chan) { KLOG_("Weakening task with channel %" PRIdPTR, chan); weak_task_chans.push_back(chan); } - uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); - KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); - if (new_non_weak_tasks == 0) { - begin_shutdown(); - } + inc_weak_task_count(); } void rust_kernel::unweaken_task(rust_port_id chan) { - uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); - KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); + dec_weak_task_count(); { scoped_lock with(weak_task_lock); KLOG_("Unweakening task with channel %" PRIdPTR, chan); @@ -399,6 +394,21 @@ rust_kernel::unweaken_task(rust_port_id chan) { } } +void +rust_kernel::inc_weak_task_count() { + uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); + if (new_non_weak_tasks == 0) { + begin_shutdown(); + } +} + +void +rust_kernel::dec_weak_task_count() { + uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); +} + void rust_kernel::end_weak_tasks() { std::vector chancopies; diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 99b230f7872b..f90ecf01a7b3 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -187,6 +187,8 @@ public: void unregister_task(); void weaken_task(rust_port_id chan); void unweaken_task(rust_port_id chan); + void inc_weak_task_count(); + void dec_weak_task_count(); bool send_to_port(rust_port_id chan, void *sptr); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 8c26832f3498..5be823d8fded 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -211,4 +211,6 @@ linenoiseHistoryLoad rust_raw_thread_start rust_raw_thread_join_delete rust_register_exit_function -rust_get_global_data_ptr \ No newline at end of file +rust_get_global_data_ptr +rust_inc_weak_task_count +rust_dec_weak_task_count \ No newline at end of file