From 3d76922f973caa358cf803d5539dbfadeb3ca830 Mon Sep 17 00:00:00 2001 From: Donovan Preston Date: Fri, 27 Jan 2012 14:04:13 -0800 Subject: [PATCH] Implement timers. --- src/libstd/uvtmp.rs | 25 +++++++++++----- src/rt/rust_uvtmp.cpp | 66 +++++++++++++++++++++++++++++++++++++++++-- src/rt/rustrt.def.in | 1 + 3 files changed, 83 insertions(+), 9 deletions(-) diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs index fb9df4bfbd39..e1d155e7c164 100644 --- a/src/libstd/uvtmp.rs +++ b/src/libstd/uvtmp.rs @@ -27,6 +27,11 @@ native mod rustrt { thread: thread, req_id: u32, chan: comm::chan); + fn rust_uvtmp_timer( + thread: thread, + timeout: u32, + req_id: u32, + chan: comm::chan); fn rust_uvtmp_delete_buf(buf: *u8); fn rust_uvtmp_get_req_id(cd: connect_data) -> u32; } @@ -39,7 +44,9 @@ enum iomsg { whatever, connected(connect_data), wrote(connect_data), - read(connect_data, *u8, ctypes::ssize_t) + read(connect_data, *u8, ctypes::ssize_t), + timer(u32), + exit } fn create_thread() -> thread { @@ -58,8 +65,7 @@ fn delete_thread(thread: thread) { rustrt::rust_uvtmp_delete_thread(thread) } -fn connect(thread: thread, req_id: u32, - ip: str, ch: comm::chan) -> connect_data { +fn connect(thread: thread, req_id: u32, ip: str, ch: comm::chan) -> connect_data { str::as_buf(ip) {|ipbuf| rustrt::rust_uvtmp_connect(thread, req_id, ipbuf, ch) } @@ -80,6 +86,11 @@ fn read_start(thread: thread, req_id: u32, rustrt::rust_uvtmp_read_start(thread, req_id, chan); } +fn timer_start(thread: thread, timeout: u32, req_id: u32, + chan: comm::chan) { + rustrt::rust_uvtmp_timer(thread, timeout, req_id, chan); +} + fn delete_buf(buf: *u8) { rustrt::rust_uvtmp_delete_buf(buf); } @@ -106,7 +117,7 @@ fn test_connect() { connect(thread, 0u32, "74.125.224.146", chan); alt comm::recv(port) { connected(cd) { - close_connection(thread, 0u32); + close_connection(thread, cd); } } join_thread(thread); @@ -123,10 +134,10 @@ fn test_http() { connect(thread, 0u32, "74.125.224.146", chan); alt comm::recv(port) { connected(cd) { - write(thread, 0u32, str::bytes("GET / HTTP/1.0\n\n"), chan); + write(thread, cd, str::bytes("GET / HTTP/1.0\n\n"), chan); alt comm::recv(port) { wrote(cd) { - read_start(thread, 0u32, chan); + read_start(thread, cd, chan); let keep_going = true; while keep_going { alt comm::recv(port) { @@ -146,7 +157,7 @@ fn test_http() { } } } - close_connection(thread, 0u32); + close_connection(thread, cd); } } } diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp index 27e0021bc6c0..63dcd7b64f2b 100644 --- a/src/rt/rust_uvtmp.cpp +++ b/src/rt/rust_uvtmp.cpp @@ -15,9 +15,12 @@ struct connect_data { chan_handle chan; }; +const intptr_t whatever_tag = 0; const intptr_t connected_tag = 1; const intptr_t wrote_tag = 2; const intptr_t read_tag = 3; +const intptr_t timer_tag = 4; +const intptr_t exit_tag = 5; struct iomsg { intptr_t tag; @@ -29,6 +32,7 @@ struct iomsg { uint8_t *buf; ssize_t nread; } read_val; + uint32_t timer_req_id; } val; }; @@ -44,6 +48,13 @@ struct read_start_data { chan_handle chan; }; +struct timer_start_data { + rust_uvtmp_thread *thread; + uint32_t timeout; + uint32_t req_id; + chan_handle chan; +}; + // FIXME: Copied from rust_builtins.cpp. Could bitrot easily static void send(rust_task *task, chan_handle chan, void *data) { @@ -72,7 +83,7 @@ private: std::queue close_connection_queue; std::queue write_queue; std::queue read_start_queue; - + std::queue timer_start_queue; public: rust_uvtmp_thread() { @@ -139,6 +150,17 @@ public: read_start_queue.push(rd); } + void + timer(uint32_t timeout, uint32_t req_id, chan_handle chan) { + scoped_lock with(lock); + + timer_start_data *td = new timer_start_data(); + td->timeout = timeout; + td->req_id = req_id; + td->chan = chan; + timer_start_queue.push(td); + } + private: virtual void @@ -159,6 +181,7 @@ private: close_connections(); write_buffers(); start_reads(); + start_timers(); close_idle_if_stop(); } @@ -246,7 +269,7 @@ private: void on_write(uv_write_t *handle, write_data *wd) { iomsg msg; - msg.tag = wrote_tag; + msg.tag = timer_tag; msg.val.wrote_val = wd->cd; send(task, wd->chan, &msg); @@ -299,6 +322,40 @@ private: } } + void + start_timers() { + assert (lock.lock_held_by_current_thread()); + while (!timer_start_queue.empty()) { + timer_start_data *td = timer_start_queue.front(); + timer_start_queue.pop(); + + td->thread = this; + + uv_timer_t *timer = (uv_timer_t *)malloc(sizeof(uv_timer_t)); + timer->data = td; + int result = uv_timer_init(loop, timer); + result = uv_timer_start(timer, timer_cb, td->timeout, 0); + } + } + + static void + timer_cb(uv_timer_t *handle, int what) { + timer_start_data *td = (timer_start_data*)handle->data; + rust_uvtmp_thread *self = td->thread; + self->on_timer(td); + free(handle); + } + + void + on_timer(timer_start_data *rd) { + iomsg msg; + msg.tag = timer_tag; + msg.val.timer_req_id = rd->req_id; + + send(task, rd->chan, &msg); + delete rd; + } + void close_idle_if_stop() { assert(lock.lock_held_by_current_thread()); @@ -353,6 +410,11 @@ rust_uvtmp_read_start(rust_uvtmp_thread *thread, uint32_t req_id, thread->read_start(req_id, *chan); } +extern "C" void +rust_uvtmp_timer(rust_uvtmp_thread *thread, uint32_t timeout, uint32_t req_id, chan_handle *chan) { + thread->timer(timeout, req_id, *chan); +} + extern "C" void rust_uvtmp_delete_buf(uint8_t *buf) { delete [] buf; diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 17a0274d96e5..052f4a42779c 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -96,6 +96,7 @@ rust_uvtmp_connect rust_uvtmp_close_connection rust_uvtmp_write rust_uvtmp_read_start +rust_uvtmp_timer rust_uvtmp_delete_buf rust_uvtmp_get_req_id