diff --git a/src/lib/comm.rs b/src/lib/comm.rs new file mode 100644 index 000000000000..7fcac262a5c1 --- /dev/null +++ b/src/lib/comm.rs @@ -0,0 +1,48 @@ +import sys; + +export _chan; +export _port; + +export mk_port; +export mk_chan; + +native "rust" mod rustrt { + type rust_chan; + type rust_port; + + fn new_chan(po : *rust_port) -> *rust_chan; + fn del_chan(ch : *rust_chan); + fn drop_chan(ch : *rust_chan); + + fn new_port(unit_sz : uint) -> *rust_port; + fn del_port(po : *rust_port); + fn drop_port(po : *rust_port); +} + +resource chan_ptr(ch: *rustrt::rust_chan) { + rustrt::drop_chan(ch); + rustrt::drop_chan(ch); // FIXME: We shouldn't have to do this + // twice. + rustrt::del_chan(ch); +} + +tag _chan[T] { _chan(@chan_dtor); } + +resource port_ptr(po: *rustrt::rust_port) { + rustrt::drop_port(po); + rustrt::del_port(po); +} + +tag _port[T] { _port(@port_dtor); } + +fn mk_port[T]() -> _port[T] { + _port(@port_dtor(rustrt::new_port(sys::size_of[T]()))) +} + +fn mk_chan[T](po : &_port[T]) -> _chan[T] { + alt po { + _port(_po) { + _chan(@chan_dtor(rustrt::new_chan(**_po))) + } + } +} diff --git a/src/lib/std.rc b/src/lib/std.rc index 622a9563c874..822997149626 100644 --- a/src/lib/std.rc +++ b/src/lib/std.rc @@ -23,6 +23,7 @@ mod io; mod ioivec; mod sys; mod task; +mod comm; // Utility modules. diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 662ec3318d54..143a0b22f73e 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -868,6 +868,53 @@ sched_threads(rust_task *task) { return task->kernel->num_threads; } +extern "C" CDECL rust_port* +new_port(rust_task *task, size_t unit_sz) { + LOG(task, comm, "new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", + (uintptr_t) task, task->name, unit_sz); + // take a reference on behalf of the port + task->ref(); + return new (task->kernel, "rust_port") rust_port(task, unit_sz); +} + +extern "C" CDECL void +del_port(rust_task *task, rust_port *port) { + LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port); + I(task->sched, !port->ref_count); + delete port; + + // FIXME: this should happen in the port. + task->deref(); +} + +extern "C" CDECL rust_chan* +new_chan(rust_task *task, rust_port *port) { + rust_scheduler *sched = task->sched; + LOG(task, comm, "new_chan(" + "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", + (uintptr_t) task, task->name, port); + I(sched, port); + return new (task->kernel, "rust_chan") + rust_chan(task->kernel, port, port->unit_sz); +} + +extern "C" CDECL +void del_chan(rust_task *task, rust_chan *chan) { + LOG(task, comm, "del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); + chan->destroy(); +} + +extern "C" CDECL +void drop_chan(rust_task *task, rust_chan *chan) { + chan->ref_count--; +} + +extern "C" CDECL +void drop_port(rust_task *, rust_port *port) { + port->ref_count--; +>>>>>>> Started working on a library-based comm system. Creating and deleting ports work. +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 57512fd8c2ae..c8d71165a8a3 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -95,40 +95,33 @@ upcall_trace_str(rust_task *task, char const *c) { task->sched->log(task, 2, "trace: %s", c); } +extern "C" CDECL rust_port* +new_port(rust_task *task, size_t unit_sz); extern "C" CDECL rust_port* upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", (uintptr_t) task, task->name, unit_sz); - // take a reference on behalf of the port - task->ref(); - return new (task->kernel, "rust_port") rust_port(task, unit_sz); + return new_port(task, unit_sz); } +extern "C" CDECL void +del_port(rust_task *task, rust_port *port); extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); - LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); - I(task->sched, !port->ref_count); - delete port; - - // FIXME: this should happen in the port. - task->deref(); + return del_port(task, port); } /** * Creates a new channel pointing to a given port. */ extern "C" CDECL rust_chan* +new_chan(rust_task *task, rust_port *port); +extern "C" CDECL rust_chan* upcall_new_chan(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); - rust_scheduler *sched = task->sched; - LOG(task, comm, "upcall_new_chan(" - "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", - (uintptr_t) task, task->name, port); - I(sched, port); - return new (task->kernel, "rust_chan") - rust_chan(task->kernel, port, port->unit_sz); + return new_chan(task, port); } /** @@ -148,11 +141,11 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) { * appear to be live, causing modify-after-free errors. */ extern "C" CDECL +void del_chan(rust_task *task, rust_chan *chan); +extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - - LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - chan->destroy(); + del_chan(task, chan); } /** diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index ffe2a8228285..49e2cbc9c1e5 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -5,11 +5,15 @@ debug_box debug_fn debug_obj debug_opaque +del_chan +del_port debug_ptrcast debug_tag debug_trap debug_tydesc do_gc +drop_chan +drop_port get_time hack_allow_leaks ivec_copy_from_buf @@ -20,6 +24,8 @@ ivec_reserve_shared ivec_to_ptr last_os_error nano_time +new_chan +new_port pin_task unpin_task rand_free diff --git a/src/test/run-pass/task-comm-chan-chan.rs b/src/test/run-pass/task-comm-chan-chan.rs new file mode 100644 index 000000000000..5c05b16ce62d --- /dev/null +++ b/src/test/run-pass/task-comm-chan-chan.rs @@ -0,0 +1,53 @@ +// xfail-stage0 +// xfail-stage1 +// xfail-stage2 +// xfail-stage3 + +// Test case for issue #763, provided by robarnold. + +use std; +import std::task; + +tag request { + quit; + close(int, chan[bool]); +} + +type ctx = chan[request]; + +fn request_task(c : chan[ctx]) { + let p = port(); + c <| chan(p); + let req; + while (true) { + p |> req; + alt (req) { + quit. { + ret; + } + close(what, status) { + log "closing now"; + log what; + status <| true; + } + } + } +} + +fn new() -> ctx { + let p = port(); + let t = spawn request_task(chan(p)); + let cx; + p |> cx; + ret cx; +} + +fn main() { + let cx = new(); + + let p = port(); + cx <| close(4, chan(p)); + let result; + p |> result; + cx <| quit; +} diff --git a/src/test/stdtest/comm.rs b/src/test/stdtest/comm.rs new file mode 100644 index 000000000000..040593be9ba7 --- /dev/null +++ b/src/test/stdtest/comm.rs @@ -0,0 +1,8 @@ +use std; +import std::comm; + +#[test] +fn create_port_and_chan() { + let p = comm::mk_port[int](); + let c = comm::mk_chan(p); +} diff --git a/src/test/stdtest/stdtest.rc b/src/test/stdtest/stdtest.rc index e313b77bd0c2..bdce3b5ec532 100644 --- a/src/test/stdtest/stdtest.rc +++ b/src/test/stdtest/stdtest.rc @@ -2,6 +2,7 @@ use std; mod bitv; mod box; +mod comm; mod deque; mod either; mod fs;