From df4c0b8e4349d50f317553de5a47d0cd56cdc227 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 7 Nov 2013 15:13:06 -0800 Subject: [PATCH] Make the uv bindings resilient to linked failure In the ideal world, uv I/O could be canceled safely at any time. In reality, however, we are unable to do this. Right now linked failure is fairly flaky as implemented in the runtime, making it very difficult to test whether the linked failure mechanisms inside of the uv bindings are ready for this kind of interaction. Right now, all constructors will execute in a task::unkillable block, and all homing I/O operations will prevent linked failure in the duration of the homing operation. What this means is that tasks which perform I/O are still susceptible to linked failure, but the I/O operations themselves will never get interrupted. Instead, the linked failure will be received at the edge of the I/O operation. --- src/librustuv/addrinfo.rs | 44 +- src/librustuv/async.rs | 43 +- src/librustuv/file.rs | 172 ++++---- src/librustuv/idle.rs | 71 ++- src/librustuv/lib.rs | 120 ++++-- src/librustuv/net.rs | 877 +++++++++++++++++++------------------- src/librustuv/pipe.rs | 114 +++-- src/librustuv/process.rs | 9 +- src/librustuv/stream.rs | 82 ++-- src/librustuv/timer.rs | 61 ++- src/librustuv/tty.rs | 2 +- src/librustuv/uvio.rs | 65 +-- 12 files changed, 839 insertions(+), 821 deletions(-) diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index d5bfd729eb56..56f6eda53575 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -9,7 +9,6 @@ // except according to those terms. use ai = std::rt::io::net::addrinfo; -use std::cast; use std::libc::c_int; use std::ptr::null; use std::rt::BlockedTask; @@ -17,7 +16,7 @@ use std::rt::local::Local; use std::rt::sched::Scheduler; use net; -use super::{Loop, UvError, Request}; +use super::{Loop, UvError, Request, wait_until_woken_after}; use uvll; struct Addrinfo { @@ -76,7 +75,7 @@ impl GetAddrInfoRequest { } }); let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo); - let req = Request::new(uvll::UV_GETADDRINFO); + let mut req = Request::new(uvll::UV_GETADDRINFO); return match unsafe { uvll::uv_getaddrinfo(loop_.handle, req.handle, @@ -84,12 +83,11 @@ impl GetAddrInfoRequest { hint_ptr) } { 0 => { + req.defuse(); // uv callback now owns this request let mut cx = Ctx { slot: None, status: 0, addrinfo: None }; - req.set_data(&cx); - req.defuse(); - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - cx.slot = Some(task); + + do wait_until_woken_after(&mut cx.slot) { + req.set_data(&cx); } match cx.status { @@ -105,8 +103,8 @@ impl GetAddrInfoRequest { status: c_int, res: *uvll::addrinfo) { let req = Request::wrap(req); - if status == uvll::ECANCELED { return } - let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; cx.status = status; cx.addrinfo = Some(Addrinfo { handle: res }); @@ -191,25 +189,23 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] { mod test { use std::rt::io::net::ip::{SocketAddr, Ipv4Addr}; use super::*; - use super::super::run_uv_loop; + use super::super::local_loop; #[test] fn getaddrinfo_test() { - do run_uv_loop |l| { - match GetAddrInfoRequest::run(l, Some("localhost"), None, None) { - Ok(infos) => { - let mut found_local = false; - let local_addr = &SocketAddr { - ip: Ipv4Addr(127, 0, 0, 1), - port: 0 - }; - for addr in infos.iter() { - found_local = found_local || addr.address == *local_addr; - } - assert!(found_local); + match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) { + Ok(infos) => { + let mut found_local = false; + let local_addr = &SocketAddr { + ip: Ipv4Addr(127, 0, 0, 1), + port: 0 + }; + for addr in infos.iter() { + found_local = found_local || addr.address == *local_addr; } - Err(e) => fail!("{:?}", e), + assert!(found_local); } + Err(e) => fail!("{:?}", e), } } } diff --git a/src/librustuv/async.rs b/src/librustuv/async.rs index 334e154a397f..04e7bce5bd18 100644 --- a/src/librustuv/async.rs +++ b/src/librustuv/async.rs @@ -131,11 +131,12 @@ mod test_remote { use std::rt::tube::Tube; use super::*; - use super::super::run_uv_loop; + use super::super::local_loop; - // Make sure that we can fire watchers in remote threads + // Make sure that we can fire watchers in remote threads and that they + // actually trigger what they say they will. #[test] - fn test_uv_remote() { + fn smoke_test() { struct MyCallback(Option>); impl Callback for MyCallback { fn call(&mut self) { @@ -147,35 +148,15 @@ mod test_remote { } } - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(Some(tube.clone())); - let watcher = Cell::new(AsyncWatcher::new(l, cb as ~Callback)); + let mut tube = Tube::new(); + let cb = ~MyCallback(Some(tube.clone())); + let watcher = Cell::new(AsyncWatcher::new(local_loop(), cb as ~Callback)); - let thread = do Thread::start { - watcher.take().fire(); - }; + let thread = do Thread::start { + watcher.take().fire(); + }; - assert_eq!(tube.recv(), 1); - thread.join(); - } - } - - #[test] - fn smoke_test() { - static mut hits: uint = 0; - - struct MyCallback; - impl Callback for MyCallback { - fn call(&mut self) { - unsafe { hits += 1; } - } - } - - do run_uv_loop |l| { - let mut watcher = AsyncWatcher::new(l, ~MyCallback as ~Callback); - watcher.fire(); - } - assert!(unsafe { hits > 0 }); + assert_eq!(tube.recv(), 1); + thread.join(); } } diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index ac89ef38e8ec..bdb1429f5b62 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -15,14 +15,14 @@ use std::cast; use std::libc::{c_int, c_char, c_void, c_uint}; use std::libc; use std::rt::BlockedTask; -use std::rt::io; use std::rt::io::{FileStat, IoError}; -use std::rt::rtio; +use std::rt::io; use std::rt::local::Local; +use std::rt::rtio; use std::rt::sched::{Scheduler, SchedHandle}; use std::vec; -use super::{Loop, UvError, uv_error_to_io_error}; +use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after}; use uvio::HomingIO; use uvll; @@ -305,10 +305,8 @@ fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int) 0 => { req.fired = true; let mut slot = None; - unsafe { uvll::set_data_for_req(req.req, &slot) } - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - slot = Some(task); + do wait_until_woken_after(&mut slot) { + unsafe { uvll::set_data_for_req(req.req, &slot) } } match req.get_result() { n if n < 0 => Err(UvError(n)), @@ -454,123 +452,113 @@ mod test { use std::str; use std::vec; use super::*; - use super::super::{run_uv_loop}; + use l = super::super::local_loop; #[test] fn file_test_full_simple_sync() { - do run_uv_loop |l| { - let create_flags = O_RDWR | O_CREAT; - let read_flags = O_RDONLY; - let mode = S_IWUSR | S_IRUSR; - let path_str = "./tmp/file_full_simple_sync.txt"; + let create_flags = O_RDWR | O_CREAT; + let read_flags = O_RDONLY; + let mode = S_IWUSR | S_IRUSR; + let path_str = "./tmp/file_full_simple_sync.txt"; - { - // open/create - let result = FsRequest::open(l, &path_str.to_c_str(), - create_flags as int, mode as int); - assert!(result.is_ok()); - let result = result.unwrap(); - let fd = result.fd; + { + // open/create + let result = FsRequest::open(l(), &path_str.to_c_str(), + create_flags as int, mode as int); + assert!(result.is_ok()); + let result = result.unwrap(); + let fd = result.fd; - // write - let result = FsRequest::write(l, fd, "hello".as_bytes(), -1); - assert!(result.is_ok()); - } - - { - // re-open - let result = FsRequest::open(l, &path_str.to_c_str(), - read_flags as int, 0); - assert!(result.is_ok()); - let result = result.unwrap(); - let fd = result.fd; - - // read - let mut read_mem = vec::from_elem(1000, 0u8); - let result = FsRequest::read(l, fd, read_mem, 0); - assert!(result.is_ok()); - - let nread = result.unwrap(); - assert!(nread > 0); - let read_str = str::from_utf8(read_mem.slice(0, nread as uint)); - assert_eq!(read_str, ~"hello"); - } - // unlink - let result = FsRequest::unlink(l, &path_str.to_c_str()); + // write + let result = FsRequest::write(l(), fd, "hello".as_bytes(), -1); assert!(result.is_ok()); } + + { + // re-open + let result = FsRequest::open(l(), &path_str.to_c_str(), + read_flags as int, 0); + assert!(result.is_ok()); + let result = result.unwrap(); + let fd = result.fd; + + // read + let mut read_mem = vec::from_elem(1000, 0u8); + let result = FsRequest::read(l(), fd, read_mem, 0); + assert!(result.is_ok()); + + let nread = result.unwrap(); + assert!(nread > 0); + let read_str = str::from_utf8(read_mem.slice(0, nread as uint)); + assert_eq!(read_str, ~"hello"); + } + // unlink + let result = FsRequest::unlink(l(), &path_str.to_c_str()); + assert!(result.is_ok()); } #[test] fn file_test_stat() { - do run_uv_loop |l| { - let path = &"./tmp/file_test_stat_simple".to_c_str(); - let create_flags = (O_RDWR | O_CREAT) as int; - let mode = (S_IWUSR | S_IRUSR) as int; + let path = &"./tmp/file_test_stat_simple".to_c_str(); + let create_flags = (O_RDWR | O_CREAT) as int; + let mode = (S_IWUSR | S_IRUSR) as int; - let result = FsRequest::open(l, path, create_flags, mode); - assert!(result.is_ok()); - let file = result.unwrap(); + let result = FsRequest::open(l(), path, create_flags, mode); + assert!(result.is_ok()); + let file = result.unwrap(); - let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0); - assert!(result.is_ok()); + let result = FsRequest::write(l(), file.fd, "hello".as_bytes(), 0); + assert!(result.is_ok()); - let result = FsRequest::stat(l, path); - assert!(result.is_ok()); - assert_eq!(result.unwrap().size, 5); + let result = FsRequest::stat(l(), path); + assert!(result.is_ok()); + assert_eq!(result.unwrap().size, 5); - fn free(_: T) {} - free(file); + fn free(_: T) {} + free(file); - let result = FsRequest::unlink(l, path); - assert!(result.is_ok()); - } + let result = FsRequest::unlink(l(), path); + assert!(result.is_ok()); } #[test] fn file_test_mk_rm_dir() { - do run_uv_loop |l| { - let path = &"./tmp/mk_rm_dir".to_c_str(); - let mode = S_IWUSR | S_IRUSR; + let path = &"./tmp/mk_rm_dir".to_c_str(); + let mode = S_IWUSR | S_IRUSR; - let result = FsRequest::mkdir(l, path, mode); - assert!(result.is_ok()); + let result = FsRequest::mkdir(l(), path, mode); + assert!(result.is_ok()); - let result = FsRequest::stat(l, path); - assert!(result.is_ok()); - assert!(result.unwrap().kind == io::TypeDirectory); + let result = FsRequest::stat(l(), path); + assert!(result.is_ok()); + assert!(result.unwrap().kind == io::TypeDirectory); - let result = FsRequest::rmdir(l, path); - assert!(result.is_ok()); + let result = FsRequest::rmdir(l(), path); + assert!(result.is_ok()); - let result = FsRequest::stat(l, path); - assert!(result.is_err()); - } + let result = FsRequest::stat(l(), path); + assert!(result.is_err()); } #[test] fn file_test_mkdir_chokes_on_double_create() { - do run_uv_loop |l| { - let path = &"./tmp/double_create_dir".to_c_str(); - let mode = S_IWUSR | S_IRUSR; + let path = &"./tmp/double_create_dir".to_c_str(); + let mode = S_IWUSR | S_IRUSR; - let result = FsRequest::stat(l, path); - assert!(result.is_err(), "{:?}", result); - let result = FsRequest::mkdir(l, path, mode as c_int); - assert!(result.is_ok(), "{:?}", result); - let result = FsRequest::mkdir(l, path, mode as c_int); - assert!(result.is_err(), "{:?}", result); - let result = FsRequest::rmdir(l, path); - assert!(result.is_ok(), "{:?}", result); - } + let result = FsRequest::stat(l(), path); + assert!(result.is_err(), "{:?}", result); + let result = FsRequest::mkdir(l(), path, mode as c_int); + assert!(result.is_ok(), "{:?}", result); + let result = FsRequest::mkdir(l(), path, mode as c_int); + assert!(result.is_err(), "{:?}", result); + let result = FsRequest::rmdir(l(), path); + assert!(result.is_ok(), "{:?}", result); } #[test] fn file_test_rmdir_chokes_on_nonexistant_path() { - do run_uv_loop |l| { - let path = &"./tmp/never_existed_dir".to_c_str(); - let result = FsRequest::rmdir(l, path); - assert!(result.is_err()); - } + let path = &"./tmp/never_existed_dir".to_c_str(); + let result = FsRequest::rmdir(l(), path); + assert!(result.is_err()); } } diff --git a/src/librustuv/idle.rs b/src/librustuv/idle.rs index b3527ce9fb42..83fc53dce1cd 100644 --- a/src/librustuv/idle.rs +++ b/src/librustuv/idle.rs @@ -83,7 +83,6 @@ impl UvHandle for IdleWatcher { } extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - if status == uvll::ECANCELED { return } assert_eq!(status, 0); let idle: &mut IdleWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; idle.callback.call(); @@ -101,7 +100,7 @@ mod test { use super::*; use std::rt::tube::Tube; use std::rt::rtio::{Callback, PausibleIdleCallback}; - use super::super::run_uv_loop; + use super::super::local_loop; struct MyCallback(Tube, int); impl Callback for MyCallback { @@ -114,55 +113,47 @@ mod test { #[test] fn not_used() { - do run_uv_loop |l| { - let cb = ~MyCallback(Tube::new(), 1); - let _idle = IdleWatcher::new(l, cb as ~Callback); - } + let cb = ~MyCallback(Tube::new(), 1); + let _idle = IdleWatcher::new(local_loop(), cb as ~Callback); } #[test] fn smoke_test() { - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle = IdleWatcher::new(l, cb as ~Callback); - idle.resume(); - tube.recv(); - } + let mut tube = Tube::new(); + let cb = ~MyCallback(tube.clone(), 1); + let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback); + idle.resume(); + tube.recv(); } #[test] fn fun_combinations_of_methods() { - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle = IdleWatcher::new(l, cb as ~Callback); - idle.resume(); - tube.recv(); - idle.pause(); - idle.resume(); - idle.resume(); - tube.recv(); - idle.pause(); - idle.pause(); - idle.resume(); - tube.recv(); - } + let mut tube = Tube::new(); + let cb = ~MyCallback(tube.clone(), 1); + let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback); + idle.resume(); + tube.recv(); + idle.pause(); + idle.resume(); + idle.resume(); + tube.recv(); + idle.pause(); + idle.pause(); + idle.resume(); + tube.recv(); } #[test] fn pause_pauses() { - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle1 = IdleWatcher::new(l, cb as ~Callback); - let cb = ~MyCallback(tube.clone(), 2); - let mut idle2 = IdleWatcher::new(l, cb as ~Callback); - idle2.resume(); - assert_eq!(tube.recv(), 2); - idle2.pause(); - idle1.resume(); - assert_eq!(tube.recv(), 1); - } + let mut tube = Tube::new(); + let cb = ~MyCallback(tube.clone(), 1); + let mut idle1 = IdleWatcher::new(local_loop(), cb as ~Callback); + let cb = ~MyCallback(tube.clone(), 2); + let mut idle2 = IdleWatcher::new(local_loop(), cb as ~Callback); + idle2.resume(); + assert_eq!(tube.recv(), 2); + idle2.pause(); + idle1.resume(); + assert_eq!(tube.recv(), 1); } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 5bedba08fb0e..4da5ad4275f7 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -45,15 +45,19 @@ via `close` and `delete` methods. #[feature(macro_rules, globs)]; -use std::cast; -use std::str::raw::from_c_str; -use std::vec; -use std::ptr; -use std::str; -use std::libc::{c_void, c_int, malloc, free}; use std::cast::transmute; +use std::cast; +use std::libc::{c_int, malloc, free}; use std::ptr::null; +use std::ptr; +use std::rt::BlockedTask; +use std::rt::local::Local; +use std::rt::sched::Scheduler; +use std::str::raw::from_c_str; +use std::str; +use std::task; use std::unstable::finally::Finally; +use std::vec; use std::rt::io::IoError; @@ -124,27 +128,90 @@ pub trait UvHandle { uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb) } } + + fn close(&mut self) { + let mut slot = None; + + unsafe { + uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb); + uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>()); + + do wait_until_woken_after(&mut slot) { + uvll::set_data_for_uv_handle(self.uv_handle(), &slot); + } + } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + unsafe { + let data = uvll::get_data_for_uv_handle(handle); + uvll::free_handle(handle); + if data == ptr::null() { return } + let slot: &mut Option = cast::transmute(data); + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(slot.take_unwrap()); + } + } + } +} + +pub struct ForbidUnwind { + msg: &'static str, + failing_before: bool, +} + +impl ForbidUnwind { + fn new(s: &'static str) -> ForbidUnwind { + ForbidUnwind { + msg: s, failing_before: task::failing(), + } + } +} + +impl Drop for ForbidUnwind { + fn drop(&mut self) { + assert!(self.failing_before == task::failing(), + "failing sadface {}", self.msg); + } +} + +fn wait_until_woken_after(slot: *mut Option, f: &fn()) { + let _f = ForbidUnwind::new("wait_until_woken_after"); + unsafe { + assert!((*slot).is_none()); + let sched: ~Scheduler = Local::take(); + do sched.deschedule_running_task_and_then |_, task| { + f(); + *slot = Some(task); + } + } } pub struct Request { handle: *uvll::uv_req_t, + priv defused: bool, } impl Request { pub fn new(ty: uvll::uv_req_type) -> Request { - Request::wrap(unsafe { uvll::malloc_req(ty) }) + unsafe { + let handle = uvll::malloc_req(ty); + uvll::set_data_for_req(handle, null::<()>()); + Request::wrap(handle) + } } pub fn wrap(handle: *uvll::uv_req_t) -> Request { - Request { handle: handle } + Request { handle: handle, defused: false } } pub fn set_data(&self, t: *T) { unsafe { uvll::set_data_for_req(self.handle, t) } } - pub fn get_data(&self) -> *c_void { - unsafe { uvll::get_data_for_req(self.handle) } + pub unsafe fn get_data(&self) -> &'static mut T { + let data = uvll::get_data_for_req(self.handle); + assert!(data != null()); + cast::transmute(data) } // This function should be used when the request handle has been given to an @@ -155,17 +222,15 @@ impl Request { // This is still a problem in blocking situations due to linked failure. In // the connection callback the handle should be re-wrapped with the `wrap` // function to ensure its destruction. - pub fn defuse(mut self) { - self.handle = ptr::null(); + pub fn defuse(&mut self) { + self.defused = true; } } impl Drop for Request { fn drop(&mut self) { - unsafe { - if self.handle != ptr::null() { - uvll::free_req(self.handle) - } + if !self.defused { + unsafe { uvll::free_req(self.handle) } } } } @@ -300,23 +365,18 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf { uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t } } -fn run_uv_loop(f: proc(&mut Loop)) { - use std::rt::local::Local; - use std::rt::test::run_in_uv_task; - use std::rt::sched::Scheduler; - use std::cell::Cell; - - let f = Cell::new(f); - do run_in_uv_task { - let mut io = None; - do Local::borrow |sched: &mut Scheduler| { - sched.event_loop.io(|i| unsafe { +#[cfg(test)] +fn local_loop() -> &'static mut Loop { + unsafe { + cast::transmute(do Local::borrow |sched: &mut Scheduler| { + let mut io = None; + do sched.event_loop.io |i| { let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) = cast::transmute(i); io = Some(uvio); - }); - } - f.take()(io.unwrap().uv_loop()); + } + io.unwrap() + }.uv_loop()) } } diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 5d228cd78486..bf5f6c88527e 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -19,11 +19,13 @@ use std::rt::rtio; use std::rt::sched::{Scheduler, SchedHandle}; use std::rt::tube::Tube; use std::str; +use std::task; use std::vec; use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, - uv_error_to_io_error, UvHandle, slice_to_uv_buf}; + uv_error_to_io_error, UvHandle, slice_to_uv_buf, + wait_until_woken_after}; use uvio::HomingIO; use uvll; @@ -206,46 +208,46 @@ impl TcpWatcher { { struct Ctx { status: c_int, task: Option } - let tcp = TcpWatcher::new(loop_); - let ret = do socket_addr_as_uv_socket_addr(address) |addr| { - let req = Request::new(uvll::UV_CONNECT); - let result = match addr { - UvIpv4SocketAddr(addr) => unsafe { - uvll::tcp_connect(req.handle, tcp.handle, addr, - connect_cb) - }, - UvIpv6SocketAddr(addr) => unsafe { - uvll::tcp_connect6(req.handle, tcp.handle, addr, - connect_cb) - }, - }; - match result { - 0 => { - let mut cx = Ctx { status: 0, task: None }; - req.set_data(&cx); - req.defuse(); - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); - } - match cx.status { - 0 => Ok(()), - n => Err(UvError(n)), + return do task::unkillable { + let tcp = TcpWatcher::new(loop_); + let ret = do socket_addr_as_uv_socket_addr(address) |addr| { + let mut req = Request::new(uvll::UV_CONNECT); + let result = match addr { + UvIpv4SocketAddr(addr) => unsafe { + uvll::tcp_connect(req.handle, tcp.handle, addr, + connect_cb) + }, + UvIpv6SocketAddr(addr) => unsafe { + uvll::tcp_connect6(req.handle, tcp.handle, addr, + connect_cb) + }, + }; + match result { + 0 => { + req.defuse(); // uv callback now owns this request + let mut cx = Ctx { status: 0, task: None }; + do wait_until_woken_after(&mut cx.task) { + req.set_data(&cx); + } + match cx.status { + 0 => Ok(()), + n => Err(UvError(n)), + } } + n => Err(UvError(n)) } - n => Err(UvError(n)) - } - }; + }; - return match ret { - Ok(()) => Ok(tcp), - Err(e) => Err(e), + match ret { + Ok(()) => Ok(tcp), + Err(e) => Err(e), + } }; extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { let req = Request::wrap(req); - if status == uvll::ECANCELED { return } - let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; cx.status = status; let scheduler: ~Scheduler = Local::take(); scheduler.resume_blocked_task_immediately(cx.task.take_unwrap()); @@ -310,10 +312,14 @@ impl rtio::RtioTcpStream for TcpWatcher { } } +impl UvHandle for TcpWatcher { + fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle } +} + impl Drop for TcpWatcher { fn drop(&mut self) { let _m = self.fire_homing_missile(); - self.stream.close(); + self.close(); } } @@ -323,25 +329,27 @@ impl TcpListener { pub fn bind(loop_: &mut Loop, address: SocketAddr) -> Result<~TcpListener, UvError> { - let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; - assert_eq!(unsafe { - uvll::uv_tcp_init(loop_.handle, handle) - }, 0); - let l = ~TcpListener { - home: get_handle_to_current_scheduler!(), - handle: handle, - closing_task: None, - outgoing: Tube::new(), - }; - let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe { - match addr { - UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr), - UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr), + do task::unkillable { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; + assert_eq!(unsafe { + uvll::uv_tcp_init(loop_.handle, handle) + }, 0); + let l = ~TcpListener { + home: get_handle_to_current_scheduler!(), + handle: handle, + closing_task: None, + outgoing: Tube::new(), + }; + let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe { + match addr { + UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr), + UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr), + } + }); + match res { + 0 => Ok(l.install()), + n => Err(UvError(n)) } - }); - match res { - 0 => Ok(l.install()), - n => Err(UvError(n)) } } } @@ -380,6 +388,7 @@ impl rtio::RtioTcpListener for TcpListener { } extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { + assert!(status != uvll::ECANCELED); let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -389,7 +398,6 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0); Ok(~client as ~rtio::RtioTcpStream) } - uvll::ECANCELED => return, n => Err(uv_error_to_io_error(UvError(n))) }; @@ -399,12 +407,8 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { impl Drop for TcpListener { fn drop(&mut self) { - let (_m, sched) = self.fire_homing_missile_sched(); - - do sched.deschedule_running_task_and_then |_, task| { - self.closing_task = Some(task); - unsafe { uvll::uv_close(self.handle, listener_close_cb) } - } + let _m = self.fire_homing_missile(); + self.close(); } } @@ -463,26 +467,34 @@ impl UdpWatcher { pub fn bind(loop_: &Loop, address: SocketAddr) -> Result { - let udp = UdpWatcher { - handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, - home: get_handle_to_current_scheduler!(), - }; - assert_eq!(unsafe { - uvll::uv_udp_init(loop_.handle, udp.handle) - }, 0); - let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe { - match addr { - UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32), - UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32), + do task::unkillable { + let udp = UdpWatcher { + handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, + home: get_handle_to_current_scheduler!(), + }; + assert_eq!(unsafe { + uvll::uv_udp_init(loop_.handle, udp.handle) + }, 0); + let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe { + match addr { + UvIpv4SocketAddr(addr) => + uvll::udp_bind(udp.handle, addr, 0u32), + UvIpv6SocketAddr(addr) => + uvll::udp_bind6(udp.handle, addr, 0u32), + } + }); + match result { + 0 => Ok(udp), + n => Err(UvError(n)), } - }); - match result { - 0 => Ok(udp), - n => Err(UvError(n)), } } } +impl UvHandle for UdpWatcher { + fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle } +} + impl HomingIO for UdpWatcher { fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } @@ -505,7 +517,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { } let _m = self.fire_homing_missile(); - return match unsafe { + let a = match unsafe { uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) } { 0 => { @@ -514,10 +526,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { buf: Some(slice_to_uv_buf(buf)), result: None, }; - unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) } - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); + do wait_until_woken_after(&mut cx.task) { + unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) } } match cx.result.take_unwrap() { (n, _) if n < 0 => @@ -527,23 +537,30 @@ impl rtio::RtioUdpSocket for UdpWatcher { } n => Err(uv_error_to_io_error(UvError(n))) }; + return a; extern fn alloc_cb(handle: *uvll::uv_udp_t, _suggested_size: size_t) -> Buf { let cx: &mut Ctx = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - cx.buf.take().expect("alloc_cb called more than once") + cx.buf.take().expect("recv alloc_cb called more than once") } - extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf, + extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf, addr: *uvll::sockaddr, _flags: c_uint) { + assert!(nread != uvll::ECANCELED as ssize_t); + let cx: &mut Ctx = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(handle)) + }; // When there's no data to read the recv callback can be a no-op. // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring // this we just drop back to kqueue and wait for the next callback. - if nread == 0 { return } - if nread == uvll::ECANCELED as ssize_t { return } + if nread == 0 { + cx.buf = Some(buf); + return + } unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) @@ -566,7 +583,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { let _m = self.fire_homing_missile(); - let req = Request::new(uvll::UV_UDP_SEND); + let mut req = Request::new(uvll::UV_UDP_SEND); let buf = slice_to_uv_buf(buf); let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe { match dst { @@ -579,15 +596,11 @@ impl rtio::RtioUdpSocket for UdpWatcher { return match result { 0 => { + req.defuse(); // uv callback now owns this request let mut cx = Ctx { task: None, result: 0 }; - req.set_data(&cx); - req.defuse(); - - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); + do wait_until_woken_after(&mut cx.task) { + req.set_data(&cx); } - match cx.result { 0 => Ok(()), n => Err(uv_error_to_io_error(UvError(n))) @@ -598,7 +611,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { let req = Request::wrap(req); - let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; cx.result = status; let sched: ~Scheduler = Local::take(); @@ -679,24 +693,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { impl Drop for UdpWatcher { fn drop(&mut self) { // Send ourselves home to close this handle (blocking while doing so). - let (_m, sched) = self.fire_homing_missile_sched(); - let mut slot = None; - unsafe { - uvll::set_data_for_uv_handle(self.handle, &slot); - uvll::uv_close(self.handle, close_cb); - } - do sched.deschedule_running_task_and_then |_, task| { - slot = Some(task); - } - - extern fn close_cb(handle: *uvll::uv_handle_t) { - let slot: &mut Option = unsafe { - cast::transmute(uvll::get_data_for_uv_handle(handle)) - }; - unsafe { uvll::free_handle(handle) } - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(slot.take_unwrap()); - } + let _m = self.fire_homing_missile(); + self.close(); } } @@ -714,397 +712,357 @@ mod test { use std::task; use super::*; - use super::super::{Loop, run_uv_loop}; + use super::super::local_loop; #[test] fn connect_close_ip4() { - do run_uv_loop |l| { - match TcpWatcher::connect(l, next_test_ip4()) { - Ok(*) => fail!(), - Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), - } + match TcpWatcher::connect(local_loop(), next_test_ip4()) { + Ok(*) => fail!(), + Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), } } #[test] fn connect_close_ip6() { - do run_uv_loop |l| { - match TcpWatcher::connect(l, next_test_ip6()) { - Ok(*) => fail!(), - Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), - } + match TcpWatcher::connect(local_loop(), next_test_ip6()) { + Ok(*) => fail!(), + Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), } } #[test] fn udp_bind_close_ip4() { - do run_uv_loop |l| { - match UdpWatcher::bind(l, next_test_ip4()) { - Ok(*) => {} - Err(*) => fail!() - } + match UdpWatcher::bind(local_loop(), next_test_ip4()) { + Ok(*) => {} + Err(*) => fail!() } } #[test] fn udp_bind_close_ip6() { - do run_uv_loop |l| { - match UdpWatcher::bind(l, next_test_ip6()) { - Ok(*) => {} - Err(*) => fail!() - } + match UdpWatcher::bind(local_loop(), next_test_ip6()) { + Ok(*) => {} + Err(*) => fail!() } } #[test] fn listen_ip4() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let addr = next_test_ip4(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); - let handle = l.handle; - do spawn { - let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - let mut w = match w.listen() { - Ok(w) => w, Err(e) => fail!("{:?}", e), - }; - chan.take().send(()); - match w.accept() { - Ok(mut stream) => { - let mut buf = [0u8, ..10]; - match stream.read(buf) { - Ok(10) => {} e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } - } - Err(e) => fail!("{:?}", e) - } - } - - port.recv(); - let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) { + do spawn { + let w = match TcpListener::bind(local_loop(), addr) { Ok(w) => w, Err(e) => fail!("{:?}", e) }; - match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) + let mut w = match w.listen() { + Ok(w) => w, Err(e) => fail!("{:?}", e), + }; + chan.take().send(()); + match w.accept() { + Ok(mut stream) => { + let mut buf = [0u8, ..10]; + match stream.read(buf) { + Ok(10) => {} e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); + } + } + Err(e) => fail!("{:?}", e) } } + + port.recv(); + let mut w = match TcpWatcher::connect(local_loop(), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } } #[test] fn listen_ip6() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let addr = next_test_ip6(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip6(); - let handle = l.handle; - do spawn { - let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - let mut w = match w.listen() { - Ok(w) => w, Err(e) => fail!("{:?}", e), - }; - chan.take().send(()); - match w.accept() { - Ok(mut stream) => { - let mut buf = [0u8, ..10]; - match stream.read(buf) { - Ok(10) => {} e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } - } - Err(e) => fail!("{:?}", e) - } - } - - port.recv(); - let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) { + do spawn { + let w = match TcpListener::bind(local_loop(), addr) { Ok(w) => w, Err(e) => fail!("{:?}", e) }; - match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) + let mut w = match w.listen() { + Ok(w) => w, Err(e) => fail!("{:?}", e), + }; + chan.take().send(()); + match w.accept() { + Ok(mut stream) => { + let mut buf = [0u8, ..10]; + match stream.read(buf) { + Ok(10) => {} e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); + } + } + Err(e) => fail!("{:?}", e) } } + + port.recv(); + let mut w = match TcpWatcher::connect(local_loop(), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } } #[test] fn udp_recv_ip4() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let client = next_test_ip4(); - let server = next_test_ip4(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let client = next_test_ip4(); + let server = next_test_ip4(); - let handle = l.handle; - do spawn { - match UdpWatcher::bind(&mut Loop::wrap(handle), server) { - Ok(mut w) => { - chan.take().send(()); - let mut buf = [0u8, ..10]; - match w.recvfrom(buf) { - Ok((10, addr)) => assert_eq!(addr, client), - e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } + do spawn { + match UdpWatcher::bind(local_loop(), server) { + Ok(mut w) => { + chan.take().send(()); + let mut buf = [0u8, ..10]; + match w.recvfrom(buf) { + Ok((10, addr)) => assert_eq!(addr, client), + e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); } - Err(e) => fail!("{:?}", e) } + Err(e) => fail!("{:?}", e) } + } - port.recv(); - let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) - } + port.recv(); + let mut w = match UdpWatcher::bind(local_loop(), client) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) } } #[test] fn udp_recv_ip6() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let client = next_test_ip6(); - let server = next_test_ip6(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let client = next_test_ip6(); + let server = next_test_ip6(); - let handle = l.handle; - do spawn { - match UdpWatcher::bind(&mut Loop::wrap(handle), server) { - Ok(mut w) => { - chan.take().send(()); - let mut buf = [0u8, ..10]; - match w.recvfrom(buf) { - Ok((10, addr)) => assert_eq!(addr, client), - e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } + do spawn { + match UdpWatcher::bind(local_loop(), server) { + Ok(mut w) => { + chan.take().send(()); + let mut buf = [0u8, ..10]; + match w.recvfrom(buf) { + Ok((10, addr)) => assert_eq!(addr, client), + e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); } - Err(e) => fail!("{:?}", e) } + Err(e) => fail!("{:?}", e) } + } - port.recv(); - let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) - } + port.recv(); + let mut w = match UdpWatcher::bind(local_loop(), client) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) } } #[test] fn test_read_read_read() { - do run_uv_loop |l| { - let addr = next_test_ip4(); - static MAX: uint = 500000; - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); + use std::rt::rtio::*; + let addr = next_test_ip4(); + static MAX: uint = 5000; + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let listener = TcpListener::bind(l, addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - chan.take().send(()); - let mut stream = acceptor.accept().unwrap(); - let buf = [1, .. 2048]; - let mut total_bytes_written = 0; - while total_bytes_written < MAX { - stream.write(buf); - total_bytes_written += buf.len(); + do spawn { + let listener = TcpListener::bind(local_loop(), addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + chan.take().send(()); + let mut stream = acceptor.accept().unwrap(); + let buf = [1, .. 2048]; + let mut total_bytes_written = 0; + while total_bytes_written < MAX { + assert!(stream.write(buf).is_ok()); + uvdebug!("wrote bytes"); + total_bytes_written += buf.len(); + } + } + + do spawn { + port.take().recv(); + let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < MAX { + let nread = stream.read(buf).unwrap(); + total_bytes_read += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); } } - - do spawntask { - let l = &mut Loop::wrap(handle); - port.take().recv(); - let mut stream = TcpWatcher::connect(l, addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < MAX { - let nread = stream.read(buf).unwrap(); - uvdebug!("read {} bytes", nread); - total_bytes_read += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } - } - uvdebug!("read {} bytes total", total_bytes_read); - } + uvdebug!("read {} bytes total", total_bytes_read); } } #[test] - #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send fn test_udp_twice() { - do run_uv_loop |l| { - let server_addr = next_test_ip4(); - let client_addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); + let server_addr = next_test_ip4(); + let client_addr = next_test_ip4(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let mut client = UdpWatcher::bind(l, client_addr).unwrap(); - port.take().recv(); - assert!(client.sendto([1], server_addr).is_ok()); - assert!(client.sendto([2], server_addr).is_ok()); - } - - do spawntask { - let l = &mut Loop::wrap(handle); - let mut server = UdpWatcher::bind(l, server_addr).unwrap(); - chan.take().send(()); - let mut buf1 = [0]; - let mut buf2 = [0]; - let (nread1, src1) = server.recvfrom(buf1).unwrap(); - let (nread2, src2) = server.recvfrom(buf2).unwrap(); - assert_eq!(nread1, 1); - assert_eq!(nread2, 1); - assert_eq!(src1, client_addr); - assert_eq!(src2, client_addr); - assert_eq!(buf1[0], 1); - assert_eq!(buf2[0], 2); - } + do spawn { + let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap(); + port.take().recv(); + assert!(client.sendto([1], server_addr).is_ok()); + assert!(client.sendto([2], server_addr).is_ok()); } + + let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap(); + chan.take().send(()); + let mut buf1 = [0]; + let mut buf2 = [0]; + let (nread1, src1) = server.recvfrom(buf1).unwrap(); + let (nread2, src2) = server.recvfrom(buf2).unwrap(); + assert_eq!(nread1, 1); + assert_eq!(nread2, 1); + assert_eq!(src1, client_addr); + assert_eq!(src2, client_addr); + assert_eq!(buf1[0], 1); + assert_eq!(buf2[0], 2); } #[test] fn test_udp_many_read() { - do run_uv_loop |l| { - let server_out_addr = next_test_ip4(); - let server_in_addr = next_test_ip4(); - let client_out_addr = next_test_ip4(); - let client_in_addr = next_test_ip4(); - static MAX: uint = 500_000; + let server_out_addr = next_test_ip4(); + let server_in_addr = next_test_ip4(); + let client_out_addr = next_test_ip4(); + let client_in_addr = next_test_ip4(); + static MAX: uint = 500_000; - let (p1, c1) = oneshot(); - let (p2, c2) = oneshot(); + let (p1, c1) = oneshot(); + let (p2, c2) = oneshot(); - let first = Cell::new((p1, c2)); - let second = Cell::new((p2, c1)); + let first = Cell::new((p1, c2)); + let second = Cell::new((p2, c1)); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap(); - let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap(); - let (port, chan) = first.take(); - chan.send(()); - port.recv(); - let msg = [1, .. 2048]; - let mut total_bytes_sent = 0; - let mut buf = [1]; - while buf[0] == 1 { - // send more data - assert!(server_out.sendto(msg, client_in_addr).is_ok()); - total_bytes_sent += msg.len(); - // check if the client has received enough - let res = server_in.recvfrom(buf); - assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(nread, 1); - assert_eq!(src, client_out_addr); - } - assert!(total_bytes_sent >= MAX); + do spawn { + let l = local_loop(); + let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap(); + let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap(); + let (port, chan) = first.take(); + chan.send(()); + port.recv(); + let msg = [1, .. 2048]; + let mut total_bytes_sent = 0; + let mut buf = [1]; + while buf[0] == 1 { + // send more data + assert!(server_out.sendto(msg, client_in_addr).is_ok()); + total_bytes_sent += msg.len(); + // check if the client has received enough + let res = server_in.recvfrom(buf); + assert!(res.is_ok()); + let (nread, src) = res.unwrap(); + assert_eq!(nread, 1); + assert_eq!(src, client_out_addr); } + assert!(total_bytes_sent >= MAX); + } - do spawntask { - let l = &mut Loop::wrap(handle); - let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); - let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); - let (port, chan) = second.take(); - port.recv(); - chan.send(()); - let mut total_bytes_recv = 0; - let mut buf = [0, .. 2048]; - while total_bytes_recv < MAX { - // ask for more - assert!(client_out.sendto([1], server_in_addr).is_ok()); - // wait for data - let res = client_in.recvfrom(buf); - assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(src, server_out_addr); - total_bytes_recv += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } + do spawn { + let l = local_loop(); + let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); + let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); + let (port, chan) = second.take(); + port.recv(); + chan.send(()); + let mut total_bytes_recv = 0; + let mut buf = [0, .. 2048]; + while total_bytes_recv < MAX { + // ask for more + assert!(client_out.sendto([1], server_in_addr).is_ok()); + // wait for data + let res = client_in.recvfrom(buf); + assert!(res.is_ok()); + let (nread, src) = res.unwrap(); + assert_eq!(src, server_out_addr); + total_bytes_recv += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); } - // tell the server we're done - assert!(client_out.sendto([0], server_in_addr).is_ok()); } + // tell the server we're done + assert!(client_out.sendto([0], server_in_addr).is_ok()); } } #[test] fn test_read_and_block() { - do run_uv_loop |l| { - let addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); + let addr = next_test_ip4(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let listener = TcpListener::bind(l, addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - let (port2, chan2) = stream(); - chan.take().send(port2); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; + do spawn { + let listener = TcpListener::bind(local_loop(), addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let (port2, chan2) = stream(); + chan.take().send(port2); + let mut stream = acceptor.accept().unwrap(); + let mut buf = [0, .. 2048]; - let expected = 32; - let mut current = 0; - let mut reads = 0; + let expected = 32; + let mut current = 0; + let mut reads = 0; - while current < expected { - let nread = stream.read(buf).unwrap(); - for i in range(0u, nread) { - let val = buf[i] as uint; - assert_eq!(val, current % 8); - current += 1; - } - reads += 1; - - chan2.send(()); + while current < expected { + let nread = stream.read(buf).unwrap(); + for i in range(0u, nread) { + let val = buf[i] as uint; + assert_eq!(val, current % 8); + current += 1; } + reads += 1; - // Make sure we had multiple reads - assert!(reads > 1); + chan2.send(()); } - do spawntask { - let l = &mut Loop::wrap(handle); - let port2 = port.take().recv(); - let mut stream = TcpWatcher::connect(l, addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - port2.recv(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - port2.recv(); - } + // Make sure we had multiple reads + assert!(reads > 1); + } + + do spawn { + let port2 = port.take().recv(); + let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + port2.recv(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + port2.recv(); } } @@ -1113,27 +1071,23 @@ mod test { let addr = next_test_ip4(); do task::spawn_sched(task::SingleThreaded) { - do run_uv_loop |l| { - let listener = TcpListener::bind(l, addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert_eq!(nread, 8); - for i in range(0u, nread) { - assert_eq!(buf[i], i as u8); - } + let listener = TcpListener::bind(local_loop(), addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert_eq!(nread, 8); + for i in range(0u, nread) { + assert_eq!(buf[i], i as u8); } } do task::spawn_sched(task::SingleThreaded) { - do run_uv_loop |l| { - let mut stream = TcpWatcher::connect(l, addr); - while stream.is_err() { - stream = TcpWatcher::connect(l, addr); - } - stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); + let mut stream = TcpWatcher::connect(local_loop(), addr); + while stream.is_err() { + stream = TcpWatcher::connect(local_loop(), addr); } + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); } } @@ -1149,17 +1103,13 @@ mod test { do task::spawn_sched(task::SingleThreaded) { let chan = Cell::new(chan.take()); - do run_uv_loop |l| { - let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap(); - chan.take().send(listener); - } + let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap(); + chan.take().send(listener); } do task::spawn_sched(task::SingleThreaded) { let port = Cell::new(port.take()); - do run_uv_loop |_l| { - port.take().recv(); - } + port.take().recv(); } } @@ -1261,4 +1211,69 @@ mod test { } } + #[should_fail] + #[test] + #[ignore(reason = "linked failure")] + fn linked_failure1() { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); + + do spawn { + let w = TcpListener::bind(local_loop(), addr).unwrap(); + let mut w = w.listen().unwrap(); + chan.take().send(()); + w.accept(); + } + + port.recv(); + fail!(); + } + + #[should_fail] + #[test] + #[ignore(reason = "linked failure")] + fn linked_failure2() { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); + + do spawn { + let w = TcpListener::bind(local_loop(), addr).unwrap(); + let mut w = w.listen().unwrap(); + chan.take().send(()); + let mut buf = [0]; + w.accept().unwrap().read(buf); + } + + port.recv(); + let _w = TcpWatcher::connect(local_loop(), addr).unwrap(); + + fail!(); + } + + #[should_fail] + #[test] + #[ignore(reason = "linked failure")] + fn linked_failure3() { + let (port, chan) = stream(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); + + do spawn { + let chan = chan.take(); + let w = TcpListener::bind(local_loop(), addr).unwrap(); + let mut w = w.listen().unwrap(); + chan.send(()); + let mut conn = w.accept().unwrap(); + chan.send(()); + let buf = [0, ..65536]; + conn.write(buf); + } + + port.recv(); + let _w = TcpWatcher::connect(local_loop(), addr).unwrap(); + port.recv(); + fail!(); + } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 1f28e043dfb4..89a86a2ff7dc 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -9,7 +9,6 @@ // except according to those terms. use std::c_str::CString; -use std::cast; use std::libc; use std::rt::BlockedTask; use std::rt::io::IoError; @@ -17,9 +16,11 @@ use std::rt::local::Local; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; use std::rt::sched::{Scheduler, SchedHandle}; use std::rt::tube::Tube; +use std::task; use stream::StreamWatcher; -use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error}; +use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, + wait_until_woken_after}; use uvio::HomingIO; use uvll; @@ -32,7 +33,6 @@ pub struct PipeWatcher { pub struct PipeListener { home: SchedHandle, pipe: *uvll::uv_pipe_t, - priv closing_task: Option, priv outgoing: Tube>, } @@ -74,36 +74,35 @@ impl PipeWatcher { pub fn connect(loop_: &Loop, name: &CString) -> Result { struct Ctx { task: Option, result: libc::c_int, } - let mut cx = Ctx { task: None, result: 0 }; - let req = Request::new(uvll::UV_CONNECT); - let pipe = PipeWatcher::new(loop_, false); - unsafe { - uvll::set_data_for_req(req.handle, &cx as *Ctx); - uvll::uv_pipe_connect(req.handle, - pipe.handle(), - name.with_ref(|p| p), - connect_cb) - } - req.defuse(); + return do task::unkillable { + let mut cx = Ctx { task: None, result: 0 }; + let mut req = Request::new(uvll::UV_CONNECT); + let pipe = PipeWatcher::new(loop_, false); + + do wait_until_woken_after(&mut cx.task) { + unsafe { + uvll::uv_pipe_connect(req.handle, + pipe.handle(), + name.with_ref(|p| p), + connect_cb) + } + req.set_data(&cx); + req.defuse(); // uv callback now owns this request + } + match cx.result { + 0 => Ok(pipe), + n => Err(UvError(n)) + } - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); - } - return match cx.result { - 0 => Ok(pipe), - n => Err(UvError(n)) }; - extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { - let _req = Request::wrap(req); - if status == uvll::ECANCELED { return } - unsafe { - let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req)); - cx.result = status; - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(cx.task.take_unwrap()); - } + extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {; + let req = Request::wrap(req); + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; + cx.result = status; + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(cx.task.take_unwrap()); } } @@ -133,11 +132,15 @@ impl HomingIO for PipeWatcher { fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home } } +impl UvHandle for PipeWatcher { + fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle } +} + impl Drop for PipeWatcher { fn drop(&mut self) { if !self.defused { let _m = self.fire_homing_missile(); - self.stream.close(); + self.close(); } } } @@ -150,21 +153,24 @@ extern fn pipe_close_cb(handle: *uvll::uv_handle_t) { impl PipeListener { pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> { - let pipe = PipeWatcher::new(loop_, false); - match unsafe { uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p)) } { - 0 => { - // If successful, unwrap the PipeWatcher because we control how - // we close the pipe differently. We can't rely on - // StreamWatcher's default close method. - let p = ~PipeListener { - home: get_handle_to_current_scheduler!(), - pipe: pipe.unwrap(), - closing_task: None, - outgoing: Tube::new(), - }; - Ok(p.install()) + do task::unkillable { + let pipe = PipeWatcher::new(loop_, false); + match unsafe { + uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p)) + } { + 0 => { + // If successful, unwrap the PipeWatcher because we control how + // we close the pipe differently. We can't rely on + // StreamWatcher's default close method. + let p = ~PipeListener { + home: get_handle_to_current_scheduler!(), + pipe: pipe.unwrap(), + outgoing: Tube::new(), + }; + Ok(p.install()) + } + n => Err(UvError(n)) } - n => Err(UvError(n)) } } } @@ -196,6 +202,7 @@ impl UvHandle for PipeListener { } extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { + assert!(status != uvll::ECANCELED); let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -205,7 +212,6 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0); Ok(~client as ~RtioPipe) } - uvll::ECANCELED => return, n => Err(uv_error_to_io_error(UvError(n))) }; @@ -215,23 +221,11 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { impl Drop for PipeListener { fn drop(&mut self) { - let (_m, sched) = self.fire_homing_missile_sched(); - - do sched.deschedule_running_task_and_then |_, task| { - self.closing_task = Some(task); - unsafe { uvll::uv_close(self.pipe, listener_close_cb) } - } + let _m = self.fire_homing_missile(); + self.close(); } } -extern fn listener_close_cb(handle: *uvll::uv_handle_t) { - let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) }; - unsafe { uvll::free_handle(handle) } - - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap()); -} - // PipeAcceptor implementation and traits impl RtioUnixAcceptor for PipeAcceptor { diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index 15d5ae1c33ca..17a7510aa19b 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -19,7 +19,8 @@ use std::rt::rtio::RtioProcess; use std::rt::sched::{Scheduler, SchedHandle}; use std::vec; -use super::{Loop, UvHandle, UvError, uv_error_to_io_error}; +use super::{Loop, UvHandle, UvError, uv_error_to_io_error, + wait_until_woken_after}; use uvio::HomingIO; use uvll; use pipe::PipeWatcher; @@ -222,11 +223,7 @@ impl RtioProcess for Process { // If there's no exit code previously listed, then the // process's exit callback has yet to be invoked. We just // need to deschedule ourselves and wait to be reawoken. - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - assert!(self.to_wake.is_none()); - self.to_wake = Some(task); - } + wait_until_woken_after(&mut self.to_wake, || {}); assert!(self.exit_status.is_some()); } } diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index 745cb5a6fa09..b9ccacf4df70 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -9,12 +9,14 @@ // except according to those terms. use std::cast; -use std::libc::{c_int, size_t, ssize_t, c_void}; +use std::libc::{c_int, size_t, ssize_t}; +use std::ptr; use std::rt::BlockedTask; use std::rt::local::Local; use std::rt::sched::Scheduler; -use super::{UvError, Buf, slice_to_uv_buf, Request}; +use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, + ForbidUnwind}; use uvll; // This is a helper structure which is intended to get embedded into other @@ -63,6 +65,10 @@ impl StreamWatcher { } pub fn read(&mut self, buf: &mut [u8]) -> Result { + // This read operation needs to get canceled on an unwind via libuv's + // uv_read_stop function + let _f = ForbidUnwind::new("stream read"); + // Send off the read request, but don't block until we're sure that the // read request is queued. match unsafe { @@ -74,12 +80,10 @@ impl StreamWatcher { result: 0, task: None, }; - unsafe { - uvll::set_data_for_uv_handle(self.handle, &rcx) - } - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_sched, task| { - rcx.task = Some(task); + do wait_until_woken_after(&mut rcx.task) { + unsafe { + uvll::set_data_for_uv_handle(self.handle, &rcx) + } } match rcx.result { n if n < 0 => Err(UvError(n as c_int)), @@ -91,12 +95,17 @@ impl StreamWatcher { } pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> { + // The ownership of the write request is dubious if this function + // unwinds. I believe that if the write_cb fails to re-schedule the task + // then the write request will be leaked. + let _f = ForbidUnwind::new("stream write"); + // Prepare the write request, either using a cached one or allocating a // new one - if self.last_write_req.is_none() { - self.last_write_req = Some(Request::new(uvll::UV_WRITE)); - } - let req = self.last_write_req.get_ref(); + let mut req = match self.last_write_req.take() { + Some(req) => req, None => Request::new(uvll::UV_WRITE), + }; + req.set_data(ptr::null::<()>()); // Send off the request, but be careful to not block until we're sure // that the write reqeust is queued. If the reqeust couldn't be queued, @@ -107,11 +116,12 @@ impl StreamWatcher { } { 0 => { let mut wcx = WriteContext { result: 0, task: None, }; - req.set_data(&wcx); - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_sched, task| { - wcx.task = Some(task); + req.defuse(); // uv callback now owns this request + + do wait_until_woken_after(&mut wcx.task) { + req.set_data(&wcx); } + self.last_write_req = Some(Request::wrap(req.handle)); match wcx.result { 0 => Ok(()), n => Err(UvError(n)), @@ -120,50 +130,24 @@ impl StreamWatcher { n => Err(UvError(n)), } } - - // This will deallocate an internally used memory, along with closing the - // handle (and freeing it). - pub fn close(&mut self) { - let mut closing_task = None; - unsafe { - uvll::set_data_for_uv_handle(self.handle, &closing_task); - } - - // Wait for this stream to close because it possibly represents a remote - // connection which may have consequences if we close asynchronously. - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - closing_task = Some(task); - unsafe { uvll::uv_close(self.handle, close_cb) } - } - - extern fn close_cb(handle: *uvll::uv_handle_t) { - let data: *c_void = unsafe { uvll::get_data_for_uv_handle(handle) }; - unsafe { uvll::free_handle(handle) } - - let closing_task: &mut Option = unsafe { - cast::transmute(data) - }; - let task = closing_task.take_unwrap(); - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task); - } - } } // This allocation callback expects to be invoked once and only once. It will // unwrap the buffer in the ReadContext stored in the stream and return it. This // will fail if it is called more than once. extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t) -> Buf { + uvdebug!("alloc_cb"); let rcx: &mut ReadContext = unsafe { cast::transmute(uvll::get_data_for_uv_handle(stream)) }; - rcx.buf.take().expect("alloc_cb called more than once") + rcx.buf.take().expect("stream alloc_cb called more than once") } // When a stream has read some data, we will always forcibly stop reading and // return all the data read (even if it didn't fill the whole buffer). extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) { + uvdebug!("read_cb {}", nread); + assert!(nread != uvll::ECANCELED as ssize_t); let rcx: &mut ReadContext = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; @@ -182,11 +166,11 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) { // reading, however, all this does is wake up the blocked task after squirreling // away the error code as a result. extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { - if status == uvll::ECANCELED { return } + let mut req = Request::wrap(req); + assert!(status != uvll::ECANCELED); // Remember to not free the request because it is re-used between writes on // the same stream. - let req = Request::wrap(req); - let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) }; + let wcx: &mut WriteContext = unsafe { req.get_data() }; wcx.result = status; req.defuse(); diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index df35a4892e97..96cf024639f8 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -16,7 +16,7 @@ use std::rt::rtio::RtioTimer; use std::rt::sched::{Scheduler, SchedHandle}; use uvll; -use super::{Loop, UvHandle}; +use super::{Loop, UvHandle, ForbidUnwind}; use uvio::HomingIO; pub struct TimerWatcher { @@ -67,6 +67,11 @@ impl UvHandle for TimerWatcher { impl RtioTimer for TimerWatcher { fn sleep(&mut self, msecs: u64) { let (_m, sched) = self.fire_homing_missile_sched(); + + // If the descheduling operation unwinds after the timer has been + // started, then we need to call stop on the timer. + let _f = ForbidUnwind::new("timer"); + do sched.deschedule_running_task_and_then |_sched, task| { self.action = Some(WakeTask(task)); self.start(msecs, 0); @@ -124,51 +129,43 @@ impl Drop for TimerWatcher { mod test { use super::*; use std::rt::rtio::RtioTimer; - use super::super::run_uv_loop; + use super::super::local_loop; #[test] fn oneshot() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - let port = timer.oneshot(1); - port.recv(); - let port = timer.oneshot(1); - port.recv(); - } + let mut timer = TimerWatcher::new(local_loop()); + let port = timer.oneshot(1); + port.recv(); + let port = timer.oneshot(1); + port.recv(); } #[test] fn override() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - let oport = timer.oneshot(1); - let pport = timer.period(1); - timer.sleep(1); - assert_eq!(oport.try_recv(), None); - assert_eq!(pport.try_recv(), None); - timer.oneshot(1).recv(); - } + let mut timer = TimerWatcher::new(local_loop()); + let oport = timer.oneshot(1); + let pport = timer.period(1); + timer.sleep(1); + assert_eq!(oport.try_recv(), None); + assert_eq!(pport.try_recv(), None); + timer.oneshot(1).recv(); } #[test] fn period() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - let port = timer.period(1); - port.recv(); - port.recv(); - let port = timer.period(1); - port.recv(); - port.recv(); - } + let mut timer = TimerWatcher::new(local_loop()); + let port = timer.period(1); + port.recv(); + port.recv(); + let port = timer.period(1); + port.recv(); + port.recv(); } #[test] fn sleep() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - timer.sleep(1); - timer.sleep(1); - } + let mut timer = TimerWatcher::new(local_loop()); + timer.sleep(1); + timer.sleep(1); } } diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs index c072ab515612..04e406ce987e 100644 --- a/src/librustuv/tty.rs +++ b/src/librustuv/tty.rs @@ -103,6 +103,6 @@ impl HomingIO for TtyWatcher { impl Drop for TtyWatcher { fn drop(&mut self) { let _m = self.fire_homing_missile(); - self.stream.close(); + self.close(); } } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 6ae2c174e18b..75ec5f26b336 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -9,7 +9,7 @@ // except according to those terms. use std::c_str::CString; -use std::comm::{SharedChan, GenericChan}; +use std::comm::SharedChan; use std::libc::c_int; use std::libc; use std::path::Path; @@ -26,7 +26,7 @@ use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, use std::rt::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write, ReadWrite, FileStat}; use std::rt::io::signal::Signum; -use std::task; +use std::util; use ai = std::rt::io::net::addrinfo; #[cfg(test)] use std::unstable::run_in_bare_thread; @@ -44,6 +44,13 @@ pub trait HomingIO { fn go_to_IO_home(&mut self) -> uint { use std::rt::sched::RunOnce; + unsafe { + let task: *mut Task = Local::unsafe_borrow(); + (*task).death.inhibit_kill((*task).unwinder.unwinding); + } + + let _f = ForbidUnwind::new("going home"); + let current_sched_id = do Local::borrow |sched: &mut Scheduler| { sched.sched_id() }; @@ -51,22 +58,17 @@ pub trait HomingIO { // Only need to invoke a context switch if we're not on the right // scheduler. if current_sched_id != self.home().sched_id { - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - /* FIXME(#8674) if the task was already killed then wake - * will return None. In that case, the home pointer will - * never be set. - * - * RESOLUTION IDEA: Since the task is dead, we should - * just abort the IO action. - */ - do task.wake().map |task| { - self.home().send(RunOnce(task)); - }; - } + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + do task.wake().map |task| { + self.home().send(RunOnce(task)); + }; } } + let current_sched_id = do Local::borrow |sched: &mut Scheduler| { + sched.sched_id() + }; + assert!(current_sched_id == self.home().sched_id); self.home().sched_id } @@ -98,25 +100,38 @@ struct HomingMissile { priv io_home: uint, } +impl HomingMissile { + pub fn check(&self, msg: &'static str) { + let local_id = Local::borrow(|sched: &mut Scheduler| sched.sched_id()); + assert!(local_id == self.io_home, "{}", msg); + } +} + impl Drop for HomingMissile { fn drop(&mut self) { + let f = ForbidUnwind::new("leaving home"); + // It would truly be a sad day if we had moved off the home I/O // scheduler while we were doing I/O. - assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()), - self.io_home); + self.check("task moved away from the home scheduler"); // If we were a homed task, then we must send ourselves back to the // original scheduler. Otherwise, we can just return and keep running if !Task::on_appropriate_sched() { - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - do task.wake().map |task| { - Scheduler::run_task(task); - }; - } + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + do task.wake().map |task| { + Scheduler::run_task(task); + }; } } + + util::ignore(f); + + unsafe { + let task: *mut Task = Local::unsafe_borrow(); + (*task).death.allow_kill((*task).unwinder.unwinding); + } } }