diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index d5bfd729eb56..56f6eda53575 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -9,7 +9,6 @@ // except according to those terms. use ai = std::rt::io::net::addrinfo; -use std::cast; use std::libc::c_int; use std::ptr::null; use std::rt::BlockedTask; @@ -17,7 +16,7 @@ use std::rt::local::Local; use std::rt::sched::Scheduler; use net; -use super::{Loop, UvError, Request}; +use super::{Loop, UvError, Request, wait_until_woken_after}; use uvll; struct Addrinfo { @@ -76,7 +75,7 @@ impl GetAddrInfoRequest { } }); let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo); - let req = Request::new(uvll::UV_GETADDRINFO); + let mut req = Request::new(uvll::UV_GETADDRINFO); return match unsafe { uvll::uv_getaddrinfo(loop_.handle, req.handle, @@ -84,12 +83,11 @@ impl GetAddrInfoRequest { hint_ptr) } { 0 => { + req.defuse(); // uv callback now owns this request let mut cx = Ctx { slot: None, status: 0, addrinfo: None }; - req.set_data(&cx); - req.defuse(); - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - cx.slot = Some(task); + + do wait_until_woken_after(&mut cx.slot) { + req.set_data(&cx); } match cx.status { @@ -105,8 +103,8 @@ impl GetAddrInfoRequest { status: c_int, res: *uvll::addrinfo) { let req = Request::wrap(req); - if status == uvll::ECANCELED { return } - let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; cx.status = status; cx.addrinfo = Some(Addrinfo { handle: res }); @@ -191,25 +189,23 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] { mod test { use std::rt::io::net::ip::{SocketAddr, Ipv4Addr}; use super::*; - use super::super::run_uv_loop; + use super::super::local_loop; #[test] fn getaddrinfo_test() { - do run_uv_loop |l| { - match GetAddrInfoRequest::run(l, Some("localhost"), None, None) { - Ok(infos) => { - let mut found_local = false; - let local_addr = &SocketAddr { - ip: Ipv4Addr(127, 0, 0, 1), - port: 0 - }; - for addr in infos.iter() { - found_local = found_local || addr.address == *local_addr; - } - assert!(found_local); + match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) { + Ok(infos) => { + let mut found_local = false; + let local_addr = &SocketAddr { + ip: Ipv4Addr(127, 0, 0, 1), + port: 0 + }; + for addr in infos.iter() { + found_local = found_local || addr.address == *local_addr; } - Err(e) => fail!("{:?}", e), + assert!(found_local); } + Err(e) => fail!("{:?}", e), } } } diff --git a/src/librustuv/async.rs b/src/librustuv/async.rs index 334e154a397f..04e7bce5bd18 100644 --- a/src/librustuv/async.rs +++ b/src/librustuv/async.rs @@ -131,11 +131,12 @@ mod test_remote { use std::rt::tube::Tube; use super::*; - use super::super::run_uv_loop; + use super::super::local_loop; - // Make sure that we can fire watchers in remote threads + // Make sure that we can fire watchers in remote threads and that they + // actually trigger what they say they will. #[test] - fn test_uv_remote() { + fn smoke_test() { struct MyCallback(Option>); impl Callback for MyCallback { fn call(&mut self) { @@ -147,35 +148,15 @@ mod test_remote { } } - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(Some(tube.clone())); - let watcher = Cell::new(AsyncWatcher::new(l, cb as ~Callback)); + let mut tube = Tube::new(); + let cb = ~MyCallback(Some(tube.clone())); + let watcher = Cell::new(AsyncWatcher::new(local_loop(), cb as ~Callback)); - let thread = do Thread::start { - watcher.take().fire(); - }; + let thread = do Thread::start { + watcher.take().fire(); + }; - assert_eq!(tube.recv(), 1); - thread.join(); - } - } - - #[test] - fn smoke_test() { - static mut hits: uint = 0; - - struct MyCallback; - impl Callback for MyCallback { - fn call(&mut self) { - unsafe { hits += 1; } - } - } - - do run_uv_loop |l| { - let mut watcher = AsyncWatcher::new(l, ~MyCallback as ~Callback); - watcher.fire(); - } - assert!(unsafe { hits > 0 }); + assert_eq!(tube.recv(), 1); + thread.join(); } } diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index ac89ef38e8ec..bdb1429f5b62 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -15,14 +15,14 @@ use std::cast; use std::libc::{c_int, c_char, c_void, c_uint}; use std::libc; use std::rt::BlockedTask; -use std::rt::io; use std::rt::io::{FileStat, IoError}; -use std::rt::rtio; +use std::rt::io; use std::rt::local::Local; +use std::rt::rtio; use std::rt::sched::{Scheduler, SchedHandle}; use std::vec; -use super::{Loop, UvError, uv_error_to_io_error}; +use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after}; use uvio::HomingIO; use uvll; @@ -305,10 +305,8 @@ fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int) 0 => { req.fired = true; let mut slot = None; - unsafe { uvll::set_data_for_req(req.req, &slot) } - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - slot = Some(task); + do wait_until_woken_after(&mut slot) { + unsafe { uvll::set_data_for_req(req.req, &slot) } } match req.get_result() { n if n < 0 => Err(UvError(n)), @@ -454,123 +452,113 @@ mod test { use std::str; use std::vec; use super::*; - use super::super::{run_uv_loop}; + use l = super::super::local_loop; #[test] fn file_test_full_simple_sync() { - do run_uv_loop |l| { - let create_flags = O_RDWR | O_CREAT; - let read_flags = O_RDONLY; - let mode = S_IWUSR | S_IRUSR; - let path_str = "./tmp/file_full_simple_sync.txt"; + let create_flags = O_RDWR | O_CREAT; + let read_flags = O_RDONLY; + let mode = S_IWUSR | S_IRUSR; + let path_str = "./tmp/file_full_simple_sync.txt"; - { - // open/create - let result = FsRequest::open(l, &path_str.to_c_str(), - create_flags as int, mode as int); - assert!(result.is_ok()); - let result = result.unwrap(); - let fd = result.fd; + { + // open/create + let result = FsRequest::open(l(), &path_str.to_c_str(), + create_flags as int, mode as int); + assert!(result.is_ok()); + let result = result.unwrap(); + let fd = result.fd; - // write - let result = FsRequest::write(l, fd, "hello".as_bytes(), -1); - assert!(result.is_ok()); - } - - { - // re-open - let result = FsRequest::open(l, &path_str.to_c_str(), - read_flags as int, 0); - assert!(result.is_ok()); - let result = result.unwrap(); - let fd = result.fd; - - // read - let mut read_mem = vec::from_elem(1000, 0u8); - let result = FsRequest::read(l, fd, read_mem, 0); - assert!(result.is_ok()); - - let nread = result.unwrap(); - assert!(nread > 0); - let read_str = str::from_utf8(read_mem.slice(0, nread as uint)); - assert_eq!(read_str, ~"hello"); - } - // unlink - let result = FsRequest::unlink(l, &path_str.to_c_str()); + // write + let result = FsRequest::write(l(), fd, "hello".as_bytes(), -1); assert!(result.is_ok()); } + + { + // re-open + let result = FsRequest::open(l(), &path_str.to_c_str(), + read_flags as int, 0); + assert!(result.is_ok()); + let result = result.unwrap(); + let fd = result.fd; + + // read + let mut read_mem = vec::from_elem(1000, 0u8); + let result = FsRequest::read(l(), fd, read_mem, 0); + assert!(result.is_ok()); + + let nread = result.unwrap(); + assert!(nread > 0); + let read_str = str::from_utf8(read_mem.slice(0, nread as uint)); + assert_eq!(read_str, ~"hello"); + } + // unlink + let result = FsRequest::unlink(l(), &path_str.to_c_str()); + assert!(result.is_ok()); } #[test] fn file_test_stat() { - do run_uv_loop |l| { - let path = &"./tmp/file_test_stat_simple".to_c_str(); - let create_flags = (O_RDWR | O_CREAT) as int; - let mode = (S_IWUSR | S_IRUSR) as int; + let path = &"./tmp/file_test_stat_simple".to_c_str(); + let create_flags = (O_RDWR | O_CREAT) as int; + let mode = (S_IWUSR | S_IRUSR) as int; - let result = FsRequest::open(l, path, create_flags, mode); - assert!(result.is_ok()); - let file = result.unwrap(); + let result = FsRequest::open(l(), path, create_flags, mode); + assert!(result.is_ok()); + let file = result.unwrap(); - let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0); - assert!(result.is_ok()); + let result = FsRequest::write(l(), file.fd, "hello".as_bytes(), 0); + assert!(result.is_ok()); - let result = FsRequest::stat(l, path); - assert!(result.is_ok()); - assert_eq!(result.unwrap().size, 5); + let result = FsRequest::stat(l(), path); + assert!(result.is_ok()); + assert_eq!(result.unwrap().size, 5); - fn free(_: T) {} - free(file); + fn free(_: T) {} + free(file); - let result = FsRequest::unlink(l, path); - assert!(result.is_ok()); - } + let result = FsRequest::unlink(l(), path); + assert!(result.is_ok()); } #[test] fn file_test_mk_rm_dir() { - do run_uv_loop |l| { - let path = &"./tmp/mk_rm_dir".to_c_str(); - let mode = S_IWUSR | S_IRUSR; + let path = &"./tmp/mk_rm_dir".to_c_str(); + let mode = S_IWUSR | S_IRUSR; - let result = FsRequest::mkdir(l, path, mode); - assert!(result.is_ok()); + let result = FsRequest::mkdir(l(), path, mode); + assert!(result.is_ok()); - let result = FsRequest::stat(l, path); - assert!(result.is_ok()); - assert!(result.unwrap().kind == io::TypeDirectory); + let result = FsRequest::stat(l(), path); + assert!(result.is_ok()); + assert!(result.unwrap().kind == io::TypeDirectory); - let result = FsRequest::rmdir(l, path); - assert!(result.is_ok()); + let result = FsRequest::rmdir(l(), path); + assert!(result.is_ok()); - let result = FsRequest::stat(l, path); - assert!(result.is_err()); - } + let result = FsRequest::stat(l(), path); + assert!(result.is_err()); } #[test] fn file_test_mkdir_chokes_on_double_create() { - do run_uv_loop |l| { - let path = &"./tmp/double_create_dir".to_c_str(); - let mode = S_IWUSR | S_IRUSR; + let path = &"./tmp/double_create_dir".to_c_str(); + let mode = S_IWUSR | S_IRUSR; - let result = FsRequest::stat(l, path); - assert!(result.is_err(), "{:?}", result); - let result = FsRequest::mkdir(l, path, mode as c_int); - assert!(result.is_ok(), "{:?}", result); - let result = FsRequest::mkdir(l, path, mode as c_int); - assert!(result.is_err(), "{:?}", result); - let result = FsRequest::rmdir(l, path); - assert!(result.is_ok(), "{:?}", result); - } + let result = FsRequest::stat(l(), path); + assert!(result.is_err(), "{:?}", result); + let result = FsRequest::mkdir(l(), path, mode as c_int); + assert!(result.is_ok(), "{:?}", result); + let result = FsRequest::mkdir(l(), path, mode as c_int); + assert!(result.is_err(), "{:?}", result); + let result = FsRequest::rmdir(l(), path); + assert!(result.is_ok(), "{:?}", result); } #[test] fn file_test_rmdir_chokes_on_nonexistant_path() { - do run_uv_loop |l| { - let path = &"./tmp/never_existed_dir".to_c_str(); - let result = FsRequest::rmdir(l, path); - assert!(result.is_err()); - } + let path = &"./tmp/never_existed_dir".to_c_str(); + let result = FsRequest::rmdir(l(), path); + assert!(result.is_err()); } } diff --git a/src/librustuv/idle.rs b/src/librustuv/idle.rs index b3527ce9fb42..83fc53dce1cd 100644 --- a/src/librustuv/idle.rs +++ b/src/librustuv/idle.rs @@ -83,7 +83,6 @@ impl UvHandle for IdleWatcher { } extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - if status == uvll::ECANCELED { return } assert_eq!(status, 0); let idle: &mut IdleWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; idle.callback.call(); @@ -101,7 +100,7 @@ mod test { use super::*; use std::rt::tube::Tube; use std::rt::rtio::{Callback, PausibleIdleCallback}; - use super::super::run_uv_loop; + use super::super::local_loop; struct MyCallback(Tube, int); impl Callback for MyCallback { @@ -114,55 +113,47 @@ mod test { #[test] fn not_used() { - do run_uv_loop |l| { - let cb = ~MyCallback(Tube::new(), 1); - let _idle = IdleWatcher::new(l, cb as ~Callback); - } + let cb = ~MyCallback(Tube::new(), 1); + let _idle = IdleWatcher::new(local_loop(), cb as ~Callback); } #[test] fn smoke_test() { - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle = IdleWatcher::new(l, cb as ~Callback); - idle.resume(); - tube.recv(); - } + let mut tube = Tube::new(); + let cb = ~MyCallback(tube.clone(), 1); + let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback); + idle.resume(); + tube.recv(); } #[test] fn fun_combinations_of_methods() { - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle = IdleWatcher::new(l, cb as ~Callback); - idle.resume(); - tube.recv(); - idle.pause(); - idle.resume(); - idle.resume(); - tube.recv(); - idle.pause(); - idle.pause(); - idle.resume(); - tube.recv(); - } + let mut tube = Tube::new(); + let cb = ~MyCallback(tube.clone(), 1); + let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback); + idle.resume(); + tube.recv(); + idle.pause(); + idle.resume(); + idle.resume(); + tube.recv(); + idle.pause(); + idle.pause(); + idle.resume(); + tube.recv(); } #[test] fn pause_pauses() { - do run_uv_loop |l| { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle1 = IdleWatcher::new(l, cb as ~Callback); - let cb = ~MyCallback(tube.clone(), 2); - let mut idle2 = IdleWatcher::new(l, cb as ~Callback); - idle2.resume(); - assert_eq!(tube.recv(), 2); - idle2.pause(); - idle1.resume(); - assert_eq!(tube.recv(), 1); - } + let mut tube = Tube::new(); + let cb = ~MyCallback(tube.clone(), 1); + let mut idle1 = IdleWatcher::new(local_loop(), cb as ~Callback); + let cb = ~MyCallback(tube.clone(), 2); + let mut idle2 = IdleWatcher::new(local_loop(), cb as ~Callback); + idle2.resume(); + assert_eq!(tube.recv(), 2); + idle2.pause(); + idle1.resume(); + assert_eq!(tube.recv(), 1); } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 5bedba08fb0e..4da5ad4275f7 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -45,15 +45,19 @@ via `close` and `delete` methods. #[feature(macro_rules, globs)]; -use std::cast; -use std::str::raw::from_c_str; -use std::vec; -use std::ptr; -use std::str; -use std::libc::{c_void, c_int, malloc, free}; use std::cast::transmute; +use std::cast; +use std::libc::{c_int, malloc, free}; use std::ptr::null; +use std::ptr; +use std::rt::BlockedTask; +use std::rt::local::Local; +use std::rt::sched::Scheduler; +use std::str::raw::from_c_str; +use std::str; +use std::task; use std::unstable::finally::Finally; +use std::vec; use std::rt::io::IoError; @@ -124,27 +128,90 @@ pub trait UvHandle { uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb) } } + + fn close(&mut self) { + let mut slot = None; + + unsafe { + uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb); + uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>()); + + do wait_until_woken_after(&mut slot) { + uvll::set_data_for_uv_handle(self.uv_handle(), &slot); + } + } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + unsafe { + let data = uvll::get_data_for_uv_handle(handle); + uvll::free_handle(handle); + if data == ptr::null() { return } + let slot: &mut Option = cast::transmute(data); + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(slot.take_unwrap()); + } + } + } +} + +pub struct ForbidUnwind { + msg: &'static str, + failing_before: bool, +} + +impl ForbidUnwind { + fn new(s: &'static str) -> ForbidUnwind { + ForbidUnwind { + msg: s, failing_before: task::failing(), + } + } +} + +impl Drop for ForbidUnwind { + fn drop(&mut self) { + assert!(self.failing_before == task::failing(), + "failing sadface {}", self.msg); + } +} + +fn wait_until_woken_after(slot: *mut Option, f: &fn()) { + let _f = ForbidUnwind::new("wait_until_woken_after"); + unsafe { + assert!((*slot).is_none()); + let sched: ~Scheduler = Local::take(); + do sched.deschedule_running_task_and_then |_, task| { + f(); + *slot = Some(task); + } + } } pub struct Request { handle: *uvll::uv_req_t, + priv defused: bool, } impl Request { pub fn new(ty: uvll::uv_req_type) -> Request { - Request::wrap(unsafe { uvll::malloc_req(ty) }) + unsafe { + let handle = uvll::malloc_req(ty); + uvll::set_data_for_req(handle, null::<()>()); + Request::wrap(handle) + } } pub fn wrap(handle: *uvll::uv_req_t) -> Request { - Request { handle: handle } + Request { handle: handle, defused: false } } pub fn set_data(&self, t: *T) { unsafe { uvll::set_data_for_req(self.handle, t) } } - pub fn get_data(&self) -> *c_void { - unsafe { uvll::get_data_for_req(self.handle) } + pub unsafe fn get_data(&self) -> &'static mut T { + let data = uvll::get_data_for_req(self.handle); + assert!(data != null()); + cast::transmute(data) } // This function should be used when the request handle has been given to an @@ -155,17 +222,15 @@ impl Request { // This is still a problem in blocking situations due to linked failure. In // the connection callback the handle should be re-wrapped with the `wrap` // function to ensure its destruction. - pub fn defuse(mut self) { - self.handle = ptr::null(); + pub fn defuse(&mut self) { + self.defused = true; } } impl Drop for Request { fn drop(&mut self) { - unsafe { - if self.handle != ptr::null() { - uvll::free_req(self.handle) - } + if !self.defused { + unsafe { uvll::free_req(self.handle) } } } } @@ -300,23 +365,18 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf { uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t } } -fn run_uv_loop(f: proc(&mut Loop)) { - use std::rt::local::Local; - use std::rt::test::run_in_uv_task; - use std::rt::sched::Scheduler; - use std::cell::Cell; - - let f = Cell::new(f); - do run_in_uv_task { - let mut io = None; - do Local::borrow |sched: &mut Scheduler| { - sched.event_loop.io(|i| unsafe { +#[cfg(test)] +fn local_loop() -> &'static mut Loop { + unsafe { + cast::transmute(do Local::borrow |sched: &mut Scheduler| { + let mut io = None; + do sched.event_loop.io |i| { let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) = cast::transmute(i); io = Some(uvio); - }); - } - f.take()(io.unwrap().uv_loop()); + } + io.unwrap() + }.uv_loop()) } } diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 5d228cd78486..bf5f6c88527e 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -19,11 +19,13 @@ use std::rt::rtio; use std::rt::sched::{Scheduler, SchedHandle}; use std::rt::tube::Tube; use std::str; +use std::task; use std::vec; use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, - uv_error_to_io_error, UvHandle, slice_to_uv_buf}; + uv_error_to_io_error, UvHandle, slice_to_uv_buf, + wait_until_woken_after}; use uvio::HomingIO; use uvll; @@ -206,46 +208,46 @@ impl TcpWatcher { { struct Ctx { status: c_int, task: Option } - let tcp = TcpWatcher::new(loop_); - let ret = do socket_addr_as_uv_socket_addr(address) |addr| { - let req = Request::new(uvll::UV_CONNECT); - let result = match addr { - UvIpv4SocketAddr(addr) => unsafe { - uvll::tcp_connect(req.handle, tcp.handle, addr, - connect_cb) - }, - UvIpv6SocketAddr(addr) => unsafe { - uvll::tcp_connect6(req.handle, tcp.handle, addr, - connect_cb) - }, - }; - match result { - 0 => { - let mut cx = Ctx { status: 0, task: None }; - req.set_data(&cx); - req.defuse(); - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); - } - match cx.status { - 0 => Ok(()), - n => Err(UvError(n)), + return do task::unkillable { + let tcp = TcpWatcher::new(loop_); + let ret = do socket_addr_as_uv_socket_addr(address) |addr| { + let mut req = Request::new(uvll::UV_CONNECT); + let result = match addr { + UvIpv4SocketAddr(addr) => unsafe { + uvll::tcp_connect(req.handle, tcp.handle, addr, + connect_cb) + }, + UvIpv6SocketAddr(addr) => unsafe { + uvll::tcp_connect6(req.handle, tcp.handle, addr, + connect_cb) + }, + }; + match result { + 0 => { + req.defuse(); // uv callback now owns this request + let mut cx = Ctx { status: 0, task: None }; + do wait_until_woken_after(&mut cx.task) { + req.set_data(&cx); + } + match cx.status { + 0 => Ok(()), + n => Err(UvError(n)), + } } + n => Err(UvError(n)) } - n => Err(UvError(n)) - } - }; + }; - return match ret { - Ok(()) => Ok(tcp), - Err(e) => Err(e), + match ret { + Ok(()) => Ok(tcp), + Err(e) => Err(e), + } }; extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { let req = Request::wrap(req); - if status == uvll::ECANCELED { return } - let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; cx.status = status; let scheduler: ~Scheduler = Local::take(); scheduler.resume_blocked_task_immediately(cx.task.take_unwrap()); @@ -310,10 +312,14 @@ impl rtio::RtioTcpStream for TcpWatcher { } } +impl UvHandle for TcpWatcher { + fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle } +} + impl Drop for TcpWatcher { fn drop(&mut self) { let _m = self.fire_homing_missile(); - self.stream.close(); + self.close(); } } @@ -323,25 +329,27 @@ impl TcpListener { pub fn bind(loop_: &mut Loop, address: SocketAddr) -> Result<~TcpListener, UvError> { - let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; - assert_eq!(unsafe { - uvll::uv_tcp_init(loop_.handle, handle) - }, 0); - let l = ~TcpListener { - home: get_handle_to_current_scheduler!(), - handle: handle, - closing_task: None, - outgoing: Tube::new(), - }; - let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe { - match addr { - UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr), - UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr), + do task::unkillable { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; + assert_eq!(unsafe { + uvll::uv_tcp_init(loop_.handle, handle) + }, 0); + let l = ~TcpListener { + home: get_handle_to_current_scheduler!(), + handle: handle, + closing_task: None, + outgoing: Tube::new(), + }; + let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe { + match addr { + UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr), + UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr), + } + }); + match res { + 0 => Ok(l.install()), + n => Err(UvError(n)) } - }); - match res { - 0 => Ok(l.install()), - n => Err(UvError(n)) } } } @@ -380,6 +388,7 @@ impl rtio::RtioTcpListener for TcpListener { } extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { + assert!(status != uvll::ECANCELED); let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -389,7 +398,6 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0); Ok(~client as ~rtio::RtioTcpStream) } - uvll::ECANCELED => return, n => Err(uv_error_to_io_error(UvError(n))) }; @@ -399,12 +407,8 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { impl Drop for TcpListener { fn drop(&mut self) { - let (_m, sched) = self.fire_homing_missile_sched(); - - do sched.deschedule_running_task_and_then |_, task| { - self.closing_task = Some(task); - unsafe { uvll::uv_close(self.handle, listener_close_cb) } - } + let _m = self.fire_homing_missile(); + self.close(); } } @@ -463,26 +467,34 @@ impl UdpWatcher { pub fn bind(loop_: &Loop, address: SocketAddr) -> Result { - let udp = UdpWatcher { - handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, - home: get_handle_to_current_scheduler!(), - }; - assert_eq!(unsafe { - uvll::uv_udp_init(loop_.handle, udp.handle) - }, 0); - let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe { - match addr { - UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32), - UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32), + do task::unkillable { + let udp = UdpWatcher { + handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, + home: get_handle_to_current_scheduler!(), + }; + assert_eq!(unsafe { + uvll::uv_udp_init(loop_.handle, udp.handle) + }, 0); + let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe { + match addr { + UvIpv4SocketAddr(addr) => + uvll::udp_bind(udp.handle, addr, 0u32), + UvIpv6SocketAddr(addr) => + uvll::udp_bind6(udp.handle, addr, 0u32), + } + }); + match result { + 0 => Ok(udp), + n => Err(UvError(n)), } - }); - match result { - 0 => Ok(udp), - n => Err(UvError(n)), } } } +impl UvHandle for UdpWatcher { + fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle } +} + impl HomingIO for UdpWatcher { fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } @@ -505,7 +517,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { } let _m = self.fire_homing_missile(); - return match unsafe { + let a = match unsafe { uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) } { 0 => { @@ -514,10 +526,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { buf: Some(slice_to_uv_buf(buf)), result: None, }; - unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) } - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); + do wait_until_woken_after(&mut cx.task) { + unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) } } match cx.result.take_unwrap() { (n, _) if n < 0 => @@ -527,23 +537,30 @@ impl rtio::RtioUdpSocket for UdpWatcher { } n => Err(uv_error_to_io_error(UvError(n))) }; + return a; extern fn alloc_cb(handle: *uvll::uv_udp_t, _suggested_size: size_t) -> Buf { let cx: &mut Ctx = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - cx.buf.take().expect("alloc_cb called more than once") + cx.buf.take().expect("recv alloc_cb called more than once") } - extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf, + extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf, addr: *uvll::sockaddr, _flags: c_uint) { + assert!(nread != uvll::ECANCELED as ssize_t); + let cx: &mut Ctx = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(handle)) + }; // When there's no data to read the recv callback can be a no-op. // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring // this we just drop back to kqueue and wait for the next callback. - if nread == 0 { return } - if nread == uvll::ECANCELED as ssize_t { return } + if nread == 0 { + cx.buf = Some(buf); + return + } unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) @@ -566,7 +583,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { let _m = self.fire_homing_missile(); - let req = Request::new(uvll::UV_UDP_SEND); + let mut req = Request::new(uvll::UV_UDP_SEND); let buf = slice_to_uv_buf(buf); let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe { match dst { @@ -579,15 +596,11 @@ impl rtio::RtioUdpSocket for UdpWatcher { return match result { 0 => { + req.defuse(); // uv callback now owns this request let mut cx = Ctx { task: None, result: 0 }; - req.set_data(&cx); - req.defuse(); - - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); + do wait_until_woken_after(&mut cx.task) { + req.set_data(&cx); } - match cx.result { 0 => Ok(()), n => Err(uv_error_to_io_error(UvError(n))) @@ -598,7 +611,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { let req = Request::wrap(req); - let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) }; + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; cx.result = status; let sched: ~Scheduler = Local::take(); @@ -679,24 +693,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { impl Drop for UdpWatcher { fn drop(&mut self) { // Send ourselves home to close this handle (blocking while doing so). - let (_m, sched) = self.fire_homing_missile_sched(); - let mut slot = None; - unsafe { - uvll::set_data_for_uv_handle(self.handle, &slot); - uvll::uv_close(self.handle, close_cb); - } - do sched.deschedule_running_task_and_then |_, task| { - slot = Some(task); - } - - extern fn close_cb(handle: *uvll::uv_handle_t) { - let slot: &mut Option = unsafe { - cast::transmute(uvll::get_data_for_uv_handle(handle)) - }; - unsafe { uvll::free_handle(handle) } - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(slot.take_unwrap()); - } + let _m = self.fire_homing_missile(); + self.close(); } } @@ -714,397 +712,357 @@ mod test { use std::task; use super::*; - use super::super::{Loop, run_uv_loop}; + use super::super::local_loop; #[test] fn connect_close_ip4() { - do run_uv_loop |l| { - match TcpWatcher::connect(l, next_test_ip4()) { - Ok(*) => fail!(), - Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), - } + match TcpWatcher::connect(local_loop(), next_test_ip4()) { + Ok(*) => fail!(), + Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), } } #[test] fn connect_close_ip6() { - do run_uv_loop |l| { - match TcpWatcher::connect(l, next_test_ip6()) { - Ok(*) => fail!(), - Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), - } + match TcpWatcher::connect(local_loop(), next_test_ip6()) { + Ok(*) => fail!(), + Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), } } #[test] fn udp_bind_close_ip4() { - do run_uv_loop |l| { - match UdpWatcher::bind(l, next_test_ip4()) { - Ok(*) => {} - Err(*) => fail!() - } + match UdpWatcher::bind(local_loop(), next_test_ip4()) { + Ok(*) => {} + Err(*) => fail!() } } #[test] fn udp_bind_close_ip6() { - do run_uv_loop |l| { - match UdpWatcher::bind(l, next_test_ip6()) { - Ok(*) => {} - Err(*) => fail!() - } + match UdpWatcher::bind(local_loop(), next_test_ip6()) { + Ok(*) => {} + Err(*) => fail!() } } #[test] fn listen_ip4() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let addr = next_test_ip4(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); - let handle = l.handle; - do spawn { - let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - let mut w = match w.listen() { - Ok(w) => w, Err(e) => fail!("{:?}", e), - }; - chan.take().send(()); - match w.accept() { - Ok(mut stream) => { - let mut buf = [0u8, ..10]; - match stream.read(buf) { - Ok(10) => {} e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } - } - Err(e) => fail!("{:?}", e) - } - } - - port.recv(); - let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) { + do spawn { + let w = match TcpListener::bind(local_loop(), addr) { Ok(w) => w, Err(e) => fail!("{:?}", e) }; - match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) + let mut w = match w.listen() { + Ok(w) => w, Err(e) => fail!("{:?}", e), + }; + chan.take().send(()); + match w.accept() { + Ok(mut stream) => { + let mut buf = [0u8, ..10]; + match stream.read(buf) { + Ok(10) => {} e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); + } + } + Err(e) => fail!("{:?}", e) } } + + port.recv(); + let mut w = match TcpWatcher::connect(local_loop(), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } } #[test] fn listen_ip6() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let addr = next_test_ip6(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip6(); - let handle = l.handle; - do spawn { - let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - let mut w = match w.listen() { - Ok(w) => w, Err(e) => fail!("{:?}", e), - }; - chan.take().send(()); - match w.accept() { - Ok(mut stream) => { - let mut buf = [0u8, ..10]; - match stream.read(buf) { - Ok(10) => {} e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } - } - Err(e) => fail!("{:?}", e) - } - } - - port.recv(); - let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) { + do spawn { + let w = match TcpListener::bind(local_loop(), addr) { Ok(w) => w, Err(e) => fail!("{:?}", e) }; - match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) + let mut w = match w.listen() { + Ok(w) => w, Err(e) => fail!("{:?}", e), + }; + chan.take().send(()); + match w.accept() { + Ok(mut stream) => { + let mut buf = [0u8, ..10]; + match stream.read(buf) { + Ok(10) => {} e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); + } + } + Err(e) => fail!("{:?}", e) } } + + port.recv(); + let mut w = match TcpWatcher::connect(local_loop(), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } } #[test] fn udp_recv_ip4() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let client = next_test_ip4(); - let server = next_test_ip4(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let client = next_test_ip4(); + let server = next_test_ip4(); - let handle = l.handle; - do spawn { - match UdpWatcher::bind(&mut Loop::wrap(handle), server) { - Ok(mut w) => { - chan.take().send(()); - let mut buf = [0u8, ..10]; - match w.recvfrom(buf) { - Ok((10, addr)) => assert_eq!(addr, client), - e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } + do spawn { + match UdpWatcher::bind(local_loop(), server) { + Ok(mut w) => { + chan.take().send(()); + let mut buf = [0u8, ..10]; + match w.recvfrom(buf) { + Ok((10, addr)) => assert_eq!(addr, client), + e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); } - Err(e) => fail!("{:?}", e) } + Err(e) => fail!("{:?}", e) } + } - port.recv(); - let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) - } + port.recv(); + let mut w = match UdpWatcher::bind(local_loop(), client) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) } } #[test] fn udp_recv_ip6() { - do run_uv_loop |l| { - let (port, chan) = oneshot(); - let chan = Cell::new(chan); - let client = next_test_ip6(); - let server = next_test_ip6(); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let client = next_test_ip6(); + let server = next_test_ip6(); - let handle = l.handle; - do spawn { - match UdpWatcher::bind(&mut Loop::wrap(handle), server) { - Ok(mut w) => { - chan.take().send(()); - let mut buf = [0u8, ..10]; - match w.recvfrom(buf) { - Ok((10, addr)) => assert_eq!(addr, client), - e => fail!("{:?}", e), - } - for i in range(0, 10u8) { - assert_eq!(buf[i], i + 1); - } + do spawn { + match UdpWatcher::bind(local_loop(), server) { + Ok(mut w) => { + chan.take().send(()); + let mut buf = [0u8, ..10]; + match w.recvfrom(buf) { + Ok((10, addr)) => assert_eq!(addr, client), + e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); } - Err(e) => fail!("{:?}", e) } + Err(e) => fail!("{:?}", e) } + } - port.recv(); - let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) { - Ok(w) => w, Err(e) => fail!("{:?}", e) - }; - match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { - Ok(()) => {}, Err(e) => fail!("{:?}", e) - } + port.recv(); + let mut w = match UdpWatcher::bind(local_loop(), client) { + Ok(w) => w, Err(e) => fail!("{:?}", e) + }; + match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) } } #[test] fn test_read_read_read() { - do run_uv_loop |l| { - let addr = next_test_ip4(); - static MAX: uint = 500000; - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); + use std::rt::rtio::*; + let addr = next_test_ip4(); + static MAX: uint = 5000; + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let listener = TcpListener::bind(l, addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - chan.take().send(()); - let mut stream = acceptor.accept().unwrap(); - let buf = [1, .. 2048]; - let mut total_bytes_written = 0; - while total_bytes_written < MAX { - stream.write(buf); - total_bytes_written += buf.len(); + do spawn { + let listener = TcpListener::bind(local_loop(), addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + chan.take().send(()); + let mut stream = acceptor.accept().unwrap(); + let buf = [1, .. 2048]; + let mut total_bytes_written = 0; + while total_bytes_written < MAX { + assert!(stream.write(buf).is_ok()); + uvdebug!("wrote bytes"); + total_bytes_written += buf.len(); + } + } + + do spawn { + port.take().recv(); + let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < MAX { + let nread = stream.read(buf).unwrap(); + total_bytes_read += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); } } - - do spawntask { - let l = &mut Loop::wrap(handle); - port.take().recv(); - let mut stream = TcpWatcher::connect(l, addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < MAX { - let nread = stream.read(buf).unwrap(); - uvdebug!("read {} bytes", nread); - total_bytes_read += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } - } - uvdebug!("read {} bytes total", total_bytes_read); - } + uvdebug!("read {} bytes total", total_bytes_read); } } #[test] - #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send fn test_udp_twice() { - do run_uv_loop |l| { - let server_addr = next_test_ip4(); - let client_addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); + let server_addr = next_test_ip4(); + let client_addr = next_test_ip4(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let mut client = UdpWatcher::bind(l, client_addr).unwrap(); - port.take().recv(); - assert!(client.sendto([1], server_addr).is_ok()); - assert!(client.sendto([2], server_addr).is_ok()); - } - - do spawntask { - let l = &mut Loop::wrap(handle); - let mut server = UdpWatcher::bind(l, server_addr).unwrap(); - chan.take().send(()); - let mut buf1 = [0]; - let mut buf2 = [0]; - let (nread1, src1) = server.recvfrom(buf1).unwrap(); - let (nread2, src2) = server.recvfrom(buf2).unwrap(); - assert_eq!(nread1, 1); - assert_eq!(nread2, 1); - assert_eq!(src1, client_addr); - assert_eq!(src2, client_addr); - assert_eq!(buf1[0], 1); - assert_eq!(buf2[0], 2); - } + do spawn { + let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap(); + port.take().recv(); + assert!(client.sendto([1], server_addr).is_ok()); + assert!(client.sendto([2], server_addr).is_ok()); } + + let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap(); + chan.take().send(()); + let mut buf1 = [0]; + let mut buf2 = [0]; + let (nread1, src1) = server.recvfrom(buf1).unwrap(); + let (nread2, src2) = server.recvfrom(buf2).unwrap(); + assert_eq!(nread1, 1); + assert_eq!(nread2, 1); + assert_eq!(src1, client_addr); + assert_eq!(src2, client_addr); + assert_eq!(buf1[0], 1); + assert_eq!(buf2[0], 2); } #[test] fn test_udp_many_read() { - do run_uv_loop |l| { - let server_out_addr = next_test_ip4(); - let server_in_addr = next_test_ip4(); - let client_out_addr = next_test_ip4(); - let client_in_addr = next_test_ip4(); - static MAX: uint = 500_000; + let server_out_addr = next_test_ip4(); + let server_in_addr = next_test_ip4(); + let client_out_addr = next_test_ip4(); + let client_in_addr = next_test_ip4(); + static MAX: uint = 500_000; - let (p1, c1) = oneshot(); - let (p2, c2) = oneshot(); + let (p1, c1) = oneshot(); + let (p2, c2) = oneshot(); - let first = Cell::new((p1, c2)); - let second = Cell::new((p2, c1)); + let first = Cell::new((p1, c2)); + let second = Cell::new((p2, c1)); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap(); - let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap(); - let (port, chan) = first.take(); - chan.send(()); - port.recv(); - let msg = [1, .. 2048]; - let mut total_bytes_sent = 0; - let mut buf = [1]; - while buf[0] == 1 { - // send more data - assert!(server_out.sendto(msg, client_in_addr).is_ok()); - total_bytes_sent += msg.len(); - // check if the client has received enough - let res = server_in.recvfrom(buf); - assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(nread, 1); - assert_eq!(src, client_out_addr); - } - assert!(total_bytes_sent >= MAX); + do spawn { + let l = local_loop(); + let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap(); + let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap(); + let (port, chan) = first.take(); + chan.send(()); + port.recv(); + let msg = [1, .. 2048]; + let mut total_bytes_sent = 0; + let mut buf = [1]; + while buf[0] == 1 { + // send more data + assert!(server_out.sendto(msg, client_in_addr).is_ok()); + total_bytes_sent += msg.len(); + // check if the client has received enough + let res = server_in.recvfrom(buf); + assert!(res.is_ok()); + let (nread, src) = res.unwrap(); + assert_eq!(nread, 1); + assert_eq!(src, client_out_addr); } + assert!(total_bytes_sent >= MAX); + } - do spawntask { - let l = &mut Loop::wrap(handle); - let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); - let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); - let (port, chan) = second.take(); - port.recv(); - chan.send(()); - let mut total_bytes_recv = 0; - let mut buf = [0, .. 2048]; - while total_bytes_recv < MAX { - // ask for more - assert!(client_out.sendto([1], server_in_addr).is_ok()); - // wait for data - let res = client_in.recvfrom(buf); - assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(src, server_out_addr); - total_bytes_recv += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } + do spawn { + let l = local_loop(); + let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); + let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); + let (port, chan) = second.take(); + port.recv(); + chan.send(()); + let mut total_bytes_recv = 0; + let mut buf = [0, .. 2048]; + while total_bytes_recv < MAX { + // ask for more + assert!(client_out.sendto([1], server_in_addr).is_ok()); + // wait for data + let res = client_in.recvfrom(buf); + assert!(res.is_ok()); + let (nread, src) = res.unwrap(); + assert_eq!(src, server_out_addr); + total_bytes_recv += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); } - // tell the server we're done - assert!(client_out.sendto([0], server_in_addr).is_ok()); } + // tell the server we're done + assert!(client_out.sendto([0], server_in_addr).is_ok()); } } #[test] fn test_read_and_block() { - do run_uv_loop |l| { - let addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); + let addr = next_test_ip4(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); - let handle = l.handle; - do spawntask { - let l = &mut Loop::wrap(handle); - let listener = TcpListener::bind(l, addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - let (port2, chan2) = stream(); - chan.take().send(port2); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; + do spawn { + let listener = TcpListener::bind(local_loop(), addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let (port2, chan2) = stream(); + chan.take().send(port2); + let mut stream = acceptor.accept().unwrap(); + let mut buf = [0, .. 2048]; - let expected = 32; - let mut current = 0; - let mut reads = 0; + let expected = 32; + let mut current = 0; + let mut reads = 0; - while current < expected { - let nread = stream.read(buf).unwrap(); - for i in range(0u, nread) { - let val = buf[i] as uint; - assert_eq!(val, current % 8); - current += 1; - } - reads += 1; - - chan2.send(()); + while current < expected { + let nread = stream.read(buf).unwrap(); + for i in range(0u, nread) { + let val = buf[i] as uint; + assert_eq!(val, current % 8); + current += 1; } + reads += 1; - // Make sure we had multiple reads - assert!(reads > 1); + chan2.send(()); } - do spawntask { - let l = &mut Loop::wrap(handle); - let port2 = port.take().recv(); - let mut stream = TcpWatcher::connect(l, addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - port2.recv(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - port2.recv(); - } + // Make sure we had multiple reads + assert!(reads > 1); + } + + do spawn { + let port2 = port.take().recv(); + let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + port2.recv(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + port2.recv(); } } @@ -1113,27 +1071,23 @@ mod test { let addr = next_test_ip4(); do task::spawn_sched(task::SingleThreaded) { - do run_uv_loop |l| { - let listener = TcpListener::bind(l, addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert_eq!(nread, 8); - for i in range(0u, nread) { - assert_eq!(buf[i], i as u8); - } + let listener = TcpListener::bind(local_loop(), addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert_eq!(nread, 8); + for i in range(0u, nread) { + assert_eq!(buf[i], i as u8); } } do task::spawn_sched(task::SingleThreaded) { - do run_uv_loop |l| { - let mut stream = TcpWatcher::connect(l, addr); - while stream.is_err() { - stream = TcpWatcher::connect(l, addr); - } - stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); + let mut stream = TcpWatcher::connect(local_loop(), addr); + while stream.is_err() { + stream = TcpWatcher::connect(local_loop(), addr); } + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); } } @@ -1149,17 +1103,13 @@ mod test { do task::spawn_sched(task::SingleThreaded) { let chan = Cell::new(chan.take()); - do run_uv_loop |l| { - let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap(); - chan.take().send(listener); - } + let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap(); + chan.take().send(listener); } do task::spawn_sched(task::SingleThreaded) { let port = Cell::new(port.take()); - do run_uv_loop |_l| { - port.take().recv(); - } + port.take().recv(); } } @@ -1261,4 +1211,69 @@ mod test { } } + #[should_fail] + #[test] + #[ignore(reason = "linked failure")] + fn linked_failure1() { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); + + do spawn { + let w = TcpListener::bind(local_loop(), addr).unwrap(); + let mut w = w.listen().unwrap(); + chan.take().send(()); + w.accept(); + } + + port.recv(); + fail!(); + } + + #[should_fail] + #[test] + #[ignore(reason = "linked failure")] + fn linked_failure2() { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); + + do spawn { + let w = TcpListener::bind(local_loop(), addr).unwrap(); + let mut w = w.listen().unwrap(); + chan.take().send(()); + let mut buf = [0]; + w.accept().unwrap().read(buf); + } + + port.recv(); + let _w = TcpWatcher::connect(local_loop(), addr).unwrap(); + + fail!(); + } + + #[should_fail] + #[test] + #[ignore(reason = "linked failure")] + fn linked_failure3() { + let (port, chan) = stream(); + let chan = Cell::new(chan); + let addr = next_test_ip4(); + + do spawn { + let chan = chan.take(); + let w = TcpListener::bind(local_loop(), addr).unwrap(); + let mut w = w.listen().unwrap(); + chan.send(()); + let mut conn = w.accept().unwrap(); + chan.send(()); + let buf = [0, ..65536]; + conn.write(buf); + } + + port.recv(); + let _w = TcpWatcher::connect(local_loop(), addr).unwrap(); + port.recv(); + fail!(); + } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 1f28e043dfb4..89a86a2ff7dc 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -9,7 +9,6 @@ // except according to those terms. use std::c_str::CString; -use std::cast; use std::libc; use std::rt::BlockedTask; use std::rt::io::IoError; @@ -17,9 +16,11 @@ use std::rt::local::Local; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; use std::rt::sched::{Scheduler, SchedHandle}; use std::rt::tube::Tube; +use std::task; use stream::StreamWatcher; -use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error}; +use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, + wait_until_woken_after}; use uvio::HomingIO; use uvll; @@ -32,7 +33,6 @@ pub struct PipeWatcher { pub struct PipeListener { home: SchedHandle, pipe: *uvll::uv_pipe_t, - priv closing_task: Option, priv outgoing: Tube>, } @@ -74,36 +74,35 @@ impl PipeWatcher { pub fn connect(loop_: &Loop, name: &CString) -> Result { struct Ctx { task: Option, result: libc::c_int, } - let mut cx = Ctx { task: None, result: 0 }; - let req = Request::new(uvll::UV_CONNECT); - let pipe = PipeWatcher::new(loop_, false); - unsafe { - uvll::set_data_for_req(req.handle, &cx as *Ctx); - uvll::uv_pipe_connect(req.handle, - pipe.handle(), - name.with_ref(|p| p), - connect_cb) - } - req.defuse(); + return do task::unkillable { + let mut cx = Ctx { task: None, result: 0 }; + let mut req = Request::new(uvll::UV_CONNECT); + let pipe = PipeWatcher::new(loop_, false); + + do wait_until_woken_after(&mut cx.task) { + unsafe { + uvll::uv_pipe_connect(req.handle, + pipe.handle(), + name.with_ref(|p| p), + connect_cb) + } + req.set_data(&cx); + req.defuse(); // uv callback now owns this request + } + match cx.result { + 0 => Ok(pipe), + n => Err(UvError(n)) + } - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - cx.task = Some(task); - } - return match cx.result { - 0 => Ok(pipe), - n => Err(UvError(n)) }; - extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { - let _req = Request::wrap(req); - if status == uvll::ECANCELED { return } - unsafe { - let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req)); - cx.result = status; - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(cx.task.take_unwrap()); - } + extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {; + let req = Request::wrap(req); + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; + cx.result = status; + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(cx.task.take_unwrap()); } } @@ -133,11 +132,15 @@ impl HomingIO for PipeWatcher { fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home } } +impl UvHandle for PipeWatcher { + fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle } +} + impl Drop for PipeWatcher { fn drop(&mut self) { if !self.defused { let _m = self.fire_homing_missile(); - self.stream.close(); + self.close(); } } } @@ -150,21 +153,24 @@ extern fn pipe_close_cb(handle: *uvll::uv_handle_t) { impl PipeListener { pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> { - let pipe = PipeWatcher::new(loop_, false); - match unsafe { uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p)) } { - 0 => { - // If successful, unwrap the PipeWatcher because we control how - // we close the pipe differently. We can't rely on - // StreamWatcher's default close method. - let p = ~PipeListener { - home: get_handle_to_current_scheduler!(), - pipe: pipe.unwrap(), - closing_task: None, - outgoing: Tube::new(), - }; - Ok(p.install()) + do task::unkillable { + let pipe = PipeWatcher::new(loop_, false); + match unsafe { + uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p)) + } { + 0 => { + // If successful, unwrap the PipeWatcher because we control how + // we close the pipe differently. We can't rely on + // StreamWatcher's default close method. + let p = ~PipeListener { + home: get_handle_to_current_scheduler!(), + pipe: pipe.unwrap(), + outgoing: Tube::new(), + }; + Ok(p.install()) + } + n => Err(UvError(n)) } - n => Err(UvError(n)) } } } @@ -196,6 +202,7 @@ impl UvHandle for PipeListener { } extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { + assert!(status != uvll::ECANCELED); let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { @@ -205,7 +212,6 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0); Ok(~client as ~RtioPipe) } - uvll::ECANCELED => return, n => Err(uv_error_to_io_error(UvError(n))) }; @@ -215,23 +221,11 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { impl Drop for PipeListener { fn drop(&mut self) { - let (_m, sched) = self.fire_homing_missile_sched(); - - do sched.deschedule_running_task_and_then |_, task| { - self.closing_task = Some(task); - unsafe { uvll::uv_close(self.pipe, listener_close_cb) } - } + let _m = self.fire_homing_missile(); + self.close(); } } -extern fn listener_close_cb(handle: *uvll::uv_handle_t) { - let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) }; - unsafe { uvll::free_handle(handle) } - - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap()); -} - // PipeAcceptor implementation and traits impl RtioUnixAcceptor for PipeAcceptor { diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index 15d5ae1c33ca..17a7510aa19b 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -19,7 +19,8 @@ use std::rt::rtio::RtioProcess; use std::rt::sched::{Scheduler, SchedHandle}; use std::vec; -use super::{Loop, UvHandle, UvError, uv_error_to_io_error}; +use super::{Loop, UvHandle, UvError, uv_error_to_io_error, + wait_until_woken_after}; use uvio::HomingIO; use uvll; use pipe::PipeWatcher; @@ -222,11 +223,7 @@ impl RtioProcess for Process { // If there's no exit code previously listed, then the // process's exit callback has yet to be invoked. We just // need to deschedule ourselves and wait to be reawoken. - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - assert!(self.to_wake.is_none()); - self.to_wake = Some(task); - } + wait_until_woken_after(&mut self.to_wake, || {}); assert!(self.exit_status.is_some()); } } diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index 745cb5a6fa09..b9ccacf4df70 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -9,12 +9,14 @@ // except according to those terms. use std::cast; -use std::libc::{c_int, size_t, ssize_t, c_void}; +use std::libc::{c_int, size_t, ssize_t}; +use std::ptr; use std::rt::BlockedTask; use std::rt::local::Local; use std::rt::sched::Scheduler; -use super::{UvError, Buf, slice_to_uv_buf, Request}; +use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, + ForbidUnwind}; use uvll; // This is a helper structure which is intended to get embedded into other @@ -63,6 +65,10 @@ impl StreamWatcher { } pub fn read(&mut self, buf: &mut [u8]) -> Result { + // This read operation needs to get canceled on an unwind via libuv's + // uv_read_stop function + let _f = ForbidUnwind::new("stream read"); + // Send off the read request, but don't block until we're sure that the // read request is queued. match unsafe { @@ -74,12 +80,10 @@ impl StreamWatcher { result: 0, task: None, }; - unsafe { - uvll::set_data_for_uv_handle(self.handle, &rcx) - } - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_sched, task| { - rcx.task = Some(task); + do wait_until_woken_after(&mut rcx.task) { + unsafe { + uvll::set_data_for_uv_handle(self.handle, &rcx) + } } match rcx.result { n if n < 0 => Err(UvError(n as c_int)), @@ -91,12 +95,17 @@ impl StreamWatcher { } pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> { + // The ownership of the write request is dubious if this function + // unwinds. I believe that if the write_cb fails to re-schedule the task + // then the write request will be leaked. + let _f = ForbidUnwind::new("stream write"); + // Prepare the write request, either using a cached one or allocating a // new one - if self.last_write_req.is_none() { - self.last_write_req = Some(Request::new(uvll::UV_WRITE)); - } - let req = self.last_write_req.get_ref(); + let mut req = match self.last_write_req.take() { + Some(req) => req, None => Request::new(uvll::UV_WRITE), + }; + req.set_data(ptr::null::<()>()); // Send off the request, but be careful to not block until we're sure // that the write reqeust is queued. If the reqeust couldn't be queued, @@ -107,11 +116,12 @@ impl StreamWatcher { } { 0 => { let mut wcx = WriteContext { result: 0, task: None, }; - req.set_data(&wcx); - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_sched, task| { - wcx.task = Some(task); + req.defuse(); // uv callback now owns this request + + do wait_until_woken_after(&mut wcx.task) { + req.set_data(&wcx); } + self.last_write_req = Some(Request::wrap(req.handle)); match wcx.result { 0 => Ok(()), n => Err(UvError(n)), @@ -120,50 +130,24 @@ impl StreamWatcher { n => Err(UvError(n)), } } - - // This will deallocate an internally used memory, along with closing the - // handle (and freeing it). - pub fn close(&mut self) { - let mut closing_task = None; - unsafe { - uvll::set_data_for_uv_handle(self.handle, &closing_task); - } - - // Wait for this stream to close because it possibly represents a remote - // connection which may have consequences if we close asynchronously. - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |_, task| { - closing_task = Some(task); - unsafe { uvll::uv_close(self.handle, close_cb) } - } - - extern fn close_cb(handle: *uvll::uv_handle_t) { - let data: *c_void = unsafe { uvll::get_data_for_uv_handle(handle) }; - unsafe { uvll::free_handle(handle) } - - let closing_task: &mut Option = unsafe { - cast::transmute(data) - }; - let task = closing_task.take_unwrap(); - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task); - } - } } // This allocation callback expects to be invoked once and only once. It will // unwrap the buffer in the ReadContext stored in the stream and return it. This // will fail if it is called more than once. extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t) -> Buf { + uvdebug!("alloc_cb"); let rcx: &mut ReadContext = unsafe { cast::transmute(uvll::get_data_for_uv_handle(stream)) }; - rcx.buf.take().expect("alloc_cb called more than once") + rcx.buf.take().expect("stream alloc_cb called more than once") } // When a stream has read some data, we will always forcibly stop reading and // return all the data read (even if it didn't fill the whole buffer). extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) { + uvdebug!("read_cb {}", nread); + assert!(nread != uvll::ECANCELED as ssize_t); let rcx: &mut ReadContext = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; @@ -182,11 +166,11 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) { // reading, however, all this does is wake up the blocked task after squirreling // away the error code as a result. extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { - if status == uvll::ECANCELED { return } + let mut req = Request::wrap(req); + assert!(status != uvll::ECANCELED); // Remember to not free the request because it is re-used between writes on // the same stream. - let req = Request::wrap(req); - let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) }; + let wcx: &mut WriteContext = unsafe { req.get_data() }; wcx.result = status; req.defuse(); diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index df35a4892e97..96cf024639f8 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -16,7 +16,7 @@ use std::rt::rtio::RtioTimer; use std::rt::sched::{Scheduler, SchedHandle}; use uvll; -use super::{Loop, UvHandle}; +use super::{Loop, UvHandle, ForbidUnwind}; use uvio::HomingIO; pub struct TimerWatcher { @@ -67,6 +67,11 @@ impl UvHandle for TimerWatcher { impl RtioTimer for TimerWatcher { fn sleep(&mut self, msecs: u64) { let (_m, sched) = self.fire_homing_missile_sched(); + + // If the descheduling operation unwinds after the timer has been + // started, then we need to call stop on the timer. + let _f = ForbidUnwind::new("timer"); + do sched.deschedule_running_task_and_then |_sched, task| { self.action = Some(WakeTask(task)); self.start(msecs, 0); @@ -124,51 +129,43 @@ impl Drop for TimerWatcher { mod test { use super::*; use std::rt::rtio::RtioTimer; - use super::super::run_uv_loop; + use super::super::local_loop; #[test] fn oneshot() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - let port = timer.oneshot(1); - port.recv(); - let port = timer.oneshot(1); - port.recv(); - } + let mut timer = TimerWatcher::new(local_loop()); + let port = timer.oneshot(1); + port.recv(); + let port = timer.oneshot(1); + port.recv(); } #[test] fn override() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - let oport = timer.oneshot(1); - let pport = timer.period(1); - timer.sleep(1); - assert_eq!(oport.try_recv(), None); - assert_eq!(pport.try_recv(), None); - timer.oneshot(1).recv(); - } + let mut timer = TimerWatcher::new(local_loop()); + let oport = timer.oneshot(1); + let pport = timer.period(1); + timer.sleep(1); + assert_eq!(oport.try_recv(), None); + assert_eq!(pport.try_recv(), None); + timer.oneshot(1).recv(); } #[test] fn period() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - let port = timer.period(1); - port.recv(); - port.recv(); - let port = timer.period(1); - port.recv(); - port.recv(); - } + let mut timer = TimerWatcher::new(local_loop()); + let port = timer.period(1); + port.recv(); + port.recv(); + let port = timer.period(1); + port.recv(); + port.recv(); } #[test] fn sleep() { - do run_uv_loop |l| { - let mut timer = TimerWatcher::new(l); - timer.sleep(1); - timer.sleep(1); - } + let mut timer = TimerWatcher::new(local_loop()); + timer.sleep(1); + timer.sleep(1); } } diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs index c072ab515612..04e406ce987e 100644 --- a/src/librustuv/tty.rs +++ b/src/librustuv/tty.rs @@ -103,6 +103,6 @@ impl HomingIO for TtyWatcher { impl Drop for TtyWatcher { fn drop(&mut self) { let _m = self.fire_homing_missile(); - self.stream.close(); + self.close(); } } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 6ae2c174e18b..75ec5f26b336 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -9,7 +9,7 @@ // except according to those terms. use std::c_str::CString; -use std::comm::{SharedChan, GenericChan}; +use std::comm::SharedChan; use std::libc::c_int; use std::libc; use std::path::Path; @@ -26,7 +26,7 @@ use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, use std::rt::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write, ReadWrite, FileStat}; use std::rt::io::signal::Signum; -use std::task; +use std::util; use ai = std::rt::io::net::addrinfo; #[cfg(test)] use std::unstable::run_in_bare_thread; @@ -44,6 +44,13 @@ pub trait HomingIO { fn go_to_IO_home(&mut self) -> uint { use std::rt::sched::RunOnce; + unsafe { + let task: *mut Task = Local::unsafe_borrow(); + (*task).death.inhibit_kill((*task).unwinder.unwinding); + } + + let _f = ForbidUnwind::new("going home"); + let current_sched_id = do Local::borrow |sched: &mut Scheduler| { sched.sched_id() }; @@ -51,22 +58,17 @@ pub trait HomingIO { // Only need to invoke a context switch if we're not on the right // scheduler. if current_sched_id != self.home().sched_id { - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - /* FIXME(#8674) if the task was already killed then wake - * will return None. In that case, the home pointer will - * never be set. - * - * RESOLUTION IDEA: Since the task is dead, we should - * just abort the IO action. - */ - do task.wake().map |task| { - self.home().send(RunOnce(task)); - }; - } + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + do task.wake().map |task| { + self.home().send(RunOnce(task)); + }; } } + let current_sched_id = do Local::borrow |sched: &mut Scheduler| { + sched.sched_id() + }; + assert!(current_sched_id == self.home().sched_id); self.home().sched_id } @@ -98,25 +100,38 @@ struct HomingMissile { priv io_home: uint, } +impl HomingMissile { + pub fn check(&self, msg: &'static str) { + let local_id = Local::borrow(|sched: &mut Scheduler| sched.sched_id()); + assert!(local_id == self.io_home, "{}", msg); + } +} + impl Drop for HomingMissile { fn drop(&mut self) { + let f = ForbidUnwind::new("leaving home"); + // It would truly be a sad day if we had moved off the home I/O // scheduler while we were doing I/O. - assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()), - self.io_home); + self.check("task moved away from the home scheduler"); // If we were a homed task, then we must send ourselves back to the // original scheduler. Otherwise, we can just return and keep running if !Task::on_appropriate_sched() { - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - do task.wake().map |task| { - Scheduler::run_task(task); - }; - } + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + do task.wake().map |task| { + Scheduler::run_task(task); + }; } } + + util::ignore(f); + + unsafe { + let task: *mut Task = Local::unsafe_borrow(); + (*task).death.allow_kill((*task).unwinder.unwinding); + } } }