From b2cfb7ef8262ebe47514f016f59054ebcfe15d61 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 14 Feb 2012 22:23:16 -0800 Subject: [PATCH] rt: Add rust_port_select function --- mk/rt.mk | 2 + src/rt/rust_builtin.cpp | 8 ++++ src/rt/rust_port.cpp | 34 ++++++++++---- src/rt/rust_port.h | 2 + src/rt/rust_port_selector.cpp | 83 +++++++++++++++++++++++++++++++++++ src/rt/rust_port_selector.h | 27 ++++++++++++ src/rt/rust_task.h | 5 +++ src/rt/rustrt.def.in | 1 + 8 files changed, 153 insertions(+), 9 deletions(-) create mode 100644 src/rt/rust_port_selector.cpp create mode 100644 src/rt/rust_port_selector.h diff --git a/mk/rt.mk b/mk/rt.mk index e43a68e5211d..26f35ea90dbe 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -52,6 +52,7 @@ RUNTIME_CS_$(1) := \ rt/rust_uv.cpp \ rt/rust_uvtmp.cpp \ rt/rust_log.cpp \ + rt/rust_port_selector.cpp \ rt/circular_buffer.cpp \ rt/isaac/randport.cpp \ rt/rust_srv.cpp \ @@ -88,6 +89,7 @@ RUNTIME_HDR_$(1) := rt/globals.h \ rt/rust_stack.h \ rt/rust_task_list.h \ rt/rust_log.h \ + rt/rust_port_selector.h \ rt/circular_buffer.h \ rt/util/array_list.h \ rt/util/indexed_list.h \ diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 628389f10cee..ecc73204f29c 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -593,6 +593,14 @@ port_recv(uintptr_t *dptr, rust_port *port, return; } +extern "C" CDECL void +rust_port_select(rust_port **dptr, rust_port **ports, + size_t n_ports, uintptr_t *yield) { + rust_task *task = rust_task_thread::get_task(); + rust_port_selector *selector = task->get_port_selector(); + selector->select(task, dptr, ports, n_ports, yield); +} + extern "C" CDECL void rust_set_exit_status(intptr_t code) { rust_task *task = rust_task_thread::get_task(); diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index a917c12e151a..5f46b9c4ca0a 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -30,18 +30,34 @@ void rust_port::detach() { void rust_port::send(void *sptr) { I(task->thread, !lock.lock_held_by_current_thread()); - scoped_lock with(lock); + bool did_rendezvous = false; + { + scoped_lock with(lock); - buffer.enqueue(sptr); + buffer.enqueue(sptr); - A(kernel, !buffer.is_empty(), - "rust_chan::transmit with nothing to send."); + A(kernel, !buffer.is_empty(), + "rust_chan::transmit with nothing to send."); - if (task->blocked_on(this)) { - KLOG(kernel, comm, "dequeued in rendezvous_ptr"); - buffer.dequeue(task->rendezvous_ptr); - task->rendezvous_ptr = 0; - task->wakeup(this); + if (task->blocked_on(this)) { + KLOG(kernel, comm, "dequeued in rendezvous_ptr"); + buffer.dequeue(task->rendezvous_ptr); + task->rendezvous_ptr = 0; + task->wakeup(this); + did_rendezvous = true; + } + } + + if (!did_rendezvous) { + // If the task wasn't waiting specifically on this port, + // it may be waiting on a group of ports + + rust_port_selector *port_selector = task->get_port_selector(); + // This check is not definitive. The port selector will take a lock + // and check again whether the task is still blocked. + if (task->blocked_on(port_selector)) { + port_selector->msg_sent_on(this); + } } } diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 44bd686650dc..92ece8a7841e 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -1,6 +1,8 @@ #ifndef RUST_PORT_H #define RUST_PORT_H +#include "rust_internal.h" + class rust_port : public kernel_owned, public rust_cond { public: RUST_REFCOUNTED(rust_port) diff --git a/src/rt/rust_port_selector.cpp b/src/rt/rust_port_selector.cpp new file mode 100644 index 000000000000..e9c351738f74 --- /dev/null +++ b/src/rt/rust_port_selector.cpp @@ -0,0 +1,83 @@ +#include "rust_port.h" +#include "rust_port_selector.h" + +rust_port_selector::rust_port_selector() + : ports(NULL), n_ports(0) { +} + +void +rust_port_selector::select(rust_task *task, rust_port **dptr, + rust_port **ports, + size_t n_ports, uintptr_t *yield) { + + I(task->thread, this->ports == NULL); + I(task->thread, this->n_ports == 0); + I(task->thread, dptr != NULL); + I(task->thread, ports != NULL); + I(task->thread, n_ports != 0); + I(task->thread, yield != NULL); + + *yield = false; + size_t locks_taken = 0; + bool found_msg = false; + + // Take each port's lock as we iterate through them because + // if none of them contain a usable message then we need to + // block the task before any of them can try to send another + // message. + + for (size_t i = 0; i < n_ports; i++) { + rust_port *port = ports[i]; + I(task->thread, port != NULL); + + port->lock.lock(); + locks_taken++; + + if (port->buffer.size() > 0) { + *dptr = port; + found_msg = true; + break; + } + } + + if (!found_msg) { + this->ports = ports; + this->n_ports = n_ports; + I(task->thread, task->rendezvous_ptr == NULL); + task->rendezvous_ptr = (uintptr_t*)dptr; + *yield = true; + task->block(this, "waiting for select rendezvous"); + } + + for (size_t i = 0; i < locks_taken; i++) { + rust_port *port = ports[i]; + port->lock.unlock(); + } +} + +void +rust_port_selector::msg_sent_on(rust_port *port) { + rust_task *task = port->task; + + I(task->thread, !task->lock.lock_held_by_current_thread()); + I(task->thread, !port->lock.lock_held_by_current_thread()); + I(task->thread, !rendezvous_lock.lock_held_by_current_thread()); + + // Prevent two ports from trying to wake up the task + // simultaneously + scoped_lock with(rendezvous_lock); + + if (task->blocked_on(this)) { + for (size_t i = 0; i < n_ports; i++) { + if (port == ports[i]) { + // This was one of the ports we were waiting on + ports = NULL; + n_ports = 0; + *task->rendezvous_ptr = (uintptr_t) port; + task->rendezvous_ptr = NULL; + task->wakeup(this); + return; + } + } + } +} diff --git a/src/rt/rust_port_selector.h b/src/rt/rust_port_selector.h new file mode 100644 index 000000000000..8b4d902a2493 --- /dev/null +++ b/src/rt/rust_port_selector.h @@ -0,0 +1,27 @@ +#ifndef RUST_PORT_SELECTOR_H +#define RUST_PORT_SELECTOR_H + +#include "rust_internal.h" + +struct rust_task; +class rust_port; + +class rust_port_selector : public rust_cond { + private: + rust_port **ports; + size_t n_ports; + lock_and_signal rendezvous_lock; + + public: + rust_port_selector(); + + void select(rust_task *task, + rust_port **dptr, + rust_port **ports, + size_t n_ports, + uintptr_t *yield); + + void msg_sent_on(rust_port *port); +}; + +#endif /* RUST_PORT_SELECTOR_H */ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 7594e677bb0d..fe1b94d6ea54 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -16,6 +16,7 @@ #include "rust_obstack.h" #include "boxed_region.h" #include "rust_stack.h" +#include "rust_port_selector.h" // Corresponds to the rust chan (currently _chan) type. struct chan_handle { @@ -116,6 +117,8 @@ private: uintptr_t next_c_sp; uintptr_t next_rust_sp; + rust_port_selector port_selector; + // Called when the atomic refcount reaches zero void delete_this(); @@ -206,6 +209,8 @@ public: void call_on_c_stack(void *args, void *fn_ptr); void call_on_rust_stack(void *args, void *fn_ptr); bool have_c_stack() { return c_stack != NULL; } + + rust_port_selector *get_port_selector() { return &port_selector; } }; // This stuff is on the stack-switching fast path diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index bcdb2079d971..2030f3207067 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -17,6 +17,7 @@ nano_time new_port new_task port_recv +rust_port_select rand_free rand_new rand_next