Port ID-based channels.

This commit is contained in:
Eric Holk 2011-08-09 16:07:49 -07:00
parent 04af99ecb0
commit 39b16077bb
9 changed files with 96 additions and 7 deletions

View file

@ -1,12 +1,15 @@
import sys;
import ptr;
import unsafe;
import task;
import task::task_id;
export _chan;
export _port;
export mk_port;
export chan_from_unsafe_ptr;
export send;
native "rust" mod rustrt {
type void;
@ -17,16 +20,28 @@ native "rust" mod rustrt {
fn take_chan(ch : *rust_chan);
fn drop_chan(ch : *rust_chan);
fn chan_send(ch: *rust_chan, v : *void);
// FIXME: data should be -T, not &T, but this doesn't seem to be
// supported yet.
fn chan_id_send[~T](target_task : task_id, target_port : port_id,
data : &T);
fn new_port(unit_sz : uint) -> *rust_port;
fn del_port(po : *rust_port);
fn drop_port(po : *rust_port);
fn get_port_id(po : *rust_port) -> port_id;
}
native "rust-intrinsic" mod rusti {
fn recv[T](port : *rustrt::rust_port) -> T;
fn recv[~T](port : *rustrt::rust_port) -> T;
}
type port_id = int;
type chan_t[~T] = {
task : task_id,
port : port_id
};
resource chan_ptr(ch: *rustrt::rust_chan) {
rustrt::drop_chan(ch);
}
@ -36,7 +51,7 @@ resource port_ptr(po: *rustrt::rust_port) {
rustrt::del_port(po);
}
obj _chan[T](raw_chan : @chan_ptr) {
obj _chan[~T](raw_chan : @chan_ptr) {
fn send(v : &T) {
rustrt::chan_send(**raw_chan,
unsafe::reinterpret_cast(ptr::addr_of(v)));
@ -49,20 +64,33 @@ obj _chan[T](raw_chan : @chan_ptr) {
}
}
fn chan_from_unsafe_ptr[T](ch : *u8) -> _chan[T] {
fn chan_from_unsafe_ptr[~T](ch : *u8) -> _chan[T] {
_chan(@chan_ptr(unsafe::reinterpret_cast(ch)))
}
obj _port[T](raw_port : @port_ptr) {
obj _port[~T](raw_port : @port_ptr) {
fn mk_chan() -> _chan[T] {
_chan(@chan_ptr(rustrt::new_chan(**raw_port)))
}
// FIXME: rename this to chan once chan is not a keyword.
fn mk_chan2() -> chan_t[T] {
{
task: task::get_task_id(),
port: rustrt::get_port_id(**raw_port)
}
}
fn recv() -> T {
ret rusti::recv(**raw_port)
}
}
fn mk_port[T]() -> _port[T] {
fn mk_port[~T]() -> _port[T] {
_port(@port_ptr(rustrt::new_port(sys::size_of[T]())))
}
// FIXME: make data move-mode once the snapshot is updated.
fn send[~T](ch : chan_t[T], data : &T) {
rustrt::chan_id_send(ch.task, ch.port, data);
}

View file

@ -5,6 +5,7 @@ native "rust" mod rustrt {
fn unsupervise();
fn pin_task();
fn unpin_task();
fn get_task_id() -> task_id;
fn clone_chan(c: *rust_chan) -> *rust_chan;
type rust_chan;
@ -12,6 +13,12 @@ native "rust" mod rustrt {
fn set_min_stack(stack_size: uint);
}
type task_id = int;
fn get_task_id() -> task_id {
rustrt::get_task_id()
}
/**
* Hints the scheduler to yield this task for a specified ammount of time.
*
@ -33,6 +40,7 @@ fn pin() { rustrt::pin_task(); }
fn unpin() { rustrt::unpin_task(); }
// FIXME: remove this
fn clone_chan[T](c: chan[T]) -> chan[T] {
let cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c));
ret unsafe::reinterpret_cast(cloned);

View file

@ -702,6 +702,11 @@ unpin_task(rust_task *task) {
task->unpin();
}
extern "C" CDECL rust_task_id
get_task_id(rust_task *task) {
return task->id;
}
extern "C" CDECL rust_chan *
clone_chan(rust_task *task, rust_chan *chan) {
return chan->clone(task);
@ -738,6 +743,11 @@ del_port(rust_task *task, rust_port *port) {
task->deref();
}
extern "C" CDECL rust_port_id
get_port_id(rust_task *task, rust_port *port) {
return port->id;
}
extern "C" CDECL rust_chan*
new_chan(rust_task *task, rust_port *port) {
rust_scheduler *sched = task->sched;
@ -775,6 +785,19 @@ chan_send(rust_task *task, rust_chan *chan, void *sptr) {
chan->send(sptr);
}
extern "C" CDECL void
chan_id_send(rust_task *task, type_desc *t, rust_task_id target_task_id,
rust_port_id target_port_id, void *sptr) {
// FIXME: make sure this is thread-safe
rust_task *target_task = task->kernel->get_task_by_id(target_task_id);
if(target_task) {
rust_port *port = target_task->get_port_by_id(target_port_id);
if(port) {
port->remote_chan->send(sptr);
}
}
}
extern "C" CDECL void
port_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
{

View file

@ -1,6 +1,9 @@
#include "rust_internal.h"
#include "rust_port.h"
extern "C" CDECL rust_chan*
new_chan(rust_task *task, rust_port *port);
rust_port::rust_port(rust_task *task, size_t unit_sz)
: ref_count(1), kernel(task->kernel), task(task),
unit_sz(unit_sz), writers(task), chans(task) {
@ -10,6 +13,7 @@ rust_port::rust_port(rust_task *task, size_t unit_sz)
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
id = task->register_port(this);
remote_chan = new_chan(task, this);
}
rust_port::~rust_port() {
@ -22,6 +26,9 @@ rust_port::~rust_port() {
chan->disassociate();
}
remote_chan->deref();
remote_chan = NULL;
task->release_port(id);
}

View file

@ -5,12 +5,11 @@ class rust_port : public kernel_owned<rust_port>, public rust_cond {
public:
RUST_REFCOUNTED(rust_port);
private:
rust_port_id id;
public:
rust_kernel *kernel;
rust_task *task;
rust_chan *remote_chan;
size_t unit_sz;
ptr_vec<rust_token> writers;
ptr_vec<rust_chan> chans;

View file

@ -9,6 +9,7 @@ aio_serve
aio_stop
aio_writedata
align_of
chan_id_send
chan_send
check_claims
clone_chan
@ -25,6 +26,8 @@ debug_tydesc
do_gc
drop_chan
drop_port
get_port_id
get_task_id
get_time
hack_allow_leaks
ivec_copy_from_buf

View file

@ -1,3 +1,10 @@
// FIXME: this test is xfailed until sending strings is legal again.
//xfail-stage0
//xfail-stage1
//xfail-stage2
//xfail-stage3
use std;
import std::task;
import std::comm;

View file

@ -32,6 +32,8 @@ fn test_vec() {
}
fn test_str() {
// FIXME: re-enable this once strings are unique and sendable
/*
let po = comm::mk_port();
let ch = po.mk_chan();
let s0: str = "test";
@ -42,6 +44,7 @@ fn test_str() {
assert (s1.(1) as u8 == 'e' as u8);
assert (s1.(2) as u8 == 's' as u8);
assert (s1.(3) as u8 == 't' as u8);
*/
}
fn test_tag() {

View file

@ -17,3 +17,14 @@ fn send_recv() {
log_err v;
assert(42 == v);
}
#[test]
fn send_recv2() {
let p = comm::mk_port[int]();
let c = p.mk_chan2();
comm::send(c, 42);
let v = p.recv();
log_err v;
assert(42 == v);
}