Add task::worker. Spawns a task and returns a channel to it
It takes a lot of boilerplate to create a task and establish a way to talk to it. This function simplifies that, allowing you to write something like 'worker(f).chan <| start'. Implementation is very unsafe and only works for a few types of channels, but something like this is very useful.
This commit is contained in:
parent
b82bedb28e
commit
73fed01108
2 changed files with 93 additions and 0 deletions
|
|
@ -52,6 +52,76 @@ fn clone_chan[T](chan[T] c) -> chan[T] {
|
|||
ret unsafe::reinterpret_cast(cloned);
|
||||
}
|
||||
|
||||
// Spawn a task and immediately return a channel for communicating to it
|
||||
fn worker[T](fn(port[T]) f) -> rec(task task, chan[T] chan) {
|
||||
// FIXME: This is frighteningly unsafe and only works for
|
||||
// a few cases
|
||||
|
||||
type opaque = int;
|
||||
|
||||
// FIXME: This terrible hackery is because worktask can't currently
|
||||
// have type params
|
||||
type wordsz1 = int;
|
||||
type wordsz2 = rec(int a, int b);
|
||||
type wordsz3 = rec(int a, int b, int c);
|
||||
type wordsz4 = rec(int a, int b, int c, int d);
|
||||
type opaquechan_1wordsz = chan[chan[wordsz1]];
|
||||
type opaquechan_2wordsz = chan[chan[wordsz2]];
|
||||
type opaquechan_3wordsz = chan[chan[wordsz3]];
|
||||
type opaquechan_4wordsz = chan[chan[wordsz4]];
|
||||
|
||||
fn worktask1(opaquechan_1wordsz setupch, opaque fptr) {
|
||||
let *fn(port[wordsz1]) f = unsafe::reinterpret_cast(fptr);
|
||||
auto p = port[wordsz1]();
|
||||
setupch <| chan(p);
|
||||
(*f)(p);
|
||||
}
|
||||
|
||||
fn worktask2(opaquechan_2wordsz setupch, opaque fptr) {
|
||||
let *fn(port[wordsz2]) f = unsafe::reinterpret_cast(fptr);
|
||||
auto p = port[wordsz2]();
|
||||
setupch <| chan(p);
|
||||
(*f)(p);
|
||||
}
|
||||
|
||||
fn worktask3(opaquechan_3wordsz setupch, opaque fptr) {
|
||||
let *fn(port[wordsz3]) f = unsafe::reinterpret_cast(fptr);
|
||||
auto p = port[wordsz3]();
|
||||
setupch <| chan(p);
|
||||
(*f)(p);
|
||||
}
|
||||
|
||||
fn worktask4(opaquechan_4wordsz setupch, opaque fptr) {
|
||||
let *fn(port[wordsz4]) f = unsafe::reinterpret_cast(fptr);
|
||||
auto p = port[wordsz4]();
|
||||
setupch <| chan(p);
|
||||
(*f)(p);
|
||||
}
|
||||
|
||||
auto p = port[chan[T]]();
|
||||
auto setupch = chan(p);
|
||||
auto fptr = unsafe::reinterpret_cast(ptr::addr_of(f));
|
||||
|
||||
auto Tsz = sys::size_of[T]();
|
||||
auto t = if Tsz == sys::size_of[wordsz1]() {
|
||||
auto setupchptr = unsafe::reinterpret_cast(setupch);
|
||||
spawn worktask1(setupchptr, fptr)
|
||||
} else if Tsz == sys::size_of[wordsz2]() {
|
||||
auto setupchptr = unsafe::reinterpret_cast(setupch);
|
||||
spawn worktask2(setupchptr, fptr)
|
||||
} else if Tsz == sys::size_of[wordsz3]() {
|
||||
auto setupchptr = unsafe::reinterpret_cast(setupch);
|
||||
spawn worktask3(setupchptr, fptr)
|
||||
} else if Tsz == sys::size_of[wordsz4]() {
|
||||
auto setupchptr = unsafe::reinterpret_cast(setupch);
|
||||
spawn worktask4(setupchptr, fptr)
|
||||
} else {
|
||||
fail #fmt("unhandled type size %u in task::worker", Tsz)
|
||||
};
|
||||
auto ch; p |> ch;
|
||||
ret rec(task = t, chan = ch);
|
||||
}
|
||||
|
||||
// Local Variables:
|
||||
// mode: rust;
|
||||
// fill-column: 78;
|
||||
|
|
|
|||
|
|
@ -32,3 +32,26 @@ fn test_join() {
|
|||
|
||||
assert task::join(failtask) == task::tr_failure;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_worker() {
|
||||
task::worker(fn(port[int] p) {
|
||||
auto x; p |> x;
|
||||
assert x == 10;
|
||||
}).chan <| 10;
|
||||
|
||||
task::worker(fn(port[rec(int x, int y)] p) {
|
||||
auto x; p |> x;
|
||||
assert x.y == 20;
|
||||
}).chan <| rec(x = 10, y = 20);
|
||||
|
||||
task::worker(fn(port[rec(int x, int y, int z)] p) {
|
||||
auto x; p |> x;
|
||||
assert x.z == 30;
|
||||
}).chan <| rec(x = 10, y = 20, z = 30);
|
||||
|
||||
task::worker(fn(port[rec(int a, int b, int c, int d)] p) {
|
||||
auto x; p |> x;
|
||||
assert x.d == 40;
|
||||
}).chan <| rec(a = 10, b = 20, c = 30, d = 40);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue