diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index 965e97893b64..d5bfd729eb56 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -189,28 +189,27 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] { #[cfg(test)] mod test { - use Loop; use std::rt::io::net::ip::{SocketAddr, Ipv4Addr}; use super::*; + use super::super::run_uv_loop; #[test] fn getaddrinfo_test() { - let mut loop_ = Loop::new(); - let mut req = GetAddrInfoRequest::new(); - do req.getaddrinfo(&loop_, Some("localhost"), None, None) |_, addrinfo, _| { - let sockaddrs = accum_addrinfo(addrinfo); - let mut found_local = false; - let local_addr = &SocketAddr { - ip: Ipv4Addr(127, 0, 0, 1), - port: 0 - }; - for addr in sockaddrs.iter() { - found_local = found_local || addr.address == *local_addr; + do run_uv_loop |l| { + match GetAddrInfoRequest::run(l, Some("localhost"), None, None) { + Ok(infos) => { + let mut found_local = false; + let local_addr = &SocketAddr { + ip: Ipv4Addr(127, 0, 0, 1), + port: 0 + }; + for addr in infos.iter() { + found_local = found_local || addr.address == *local_addr; + } + assert!(found_local); + } + Err(e) => fail!("{:?}", e), } - assert!(found_local); } - loop_.run(); - loop_.close(); - req.delete(); } } diff --git a/src/librustuv/async.rs b/src/librustuv/async.rs index f4c7f633ee26..334e154a397f 100644 --- a/src/librustuv/async.rs +++ b/src/librustuv/async.rs @@ -126,62 +126,56 @@ impl Drop for AsyncWatcher { #[cfg(test)] mod test_remote { use std::cell::Cell; - use std::rt::test::*; + use std::rt::rtio::Callback; use std::rt::thread::Thread; use std::rt::tube::Tube; - use std::rt::rtio::EventLoop; - use std::rt::local::Local; - use std::rt::sched::Scheduler; + use super::*; + use super::super::run_uv_loop; + + // Make sure that we can fire watchers in remote threads #[test] fn test_uv_remote() { - do run_in_mt_newsched_task { - let mut tube = Tube::new(); - let tube_clone = tube.clone(); - let remote_cell = Cell::new_empty(); - do Local::borrow |sched: &mut Scheduler| { - let tube_clone = tube_clone.clone(); - let tube_clone_cell = Cell::new(tube_clone); - let remote = do sched.event_loop.remote_callback { - // This could be called multiple times - if !tube_clone_cell.is_empty() { - tube_clone_cell.take().send(1); - } - }; - remote_cell.put_back(remote); + struct MyCallback(Option>); + impl Callback for MyCallback { + fn call(&mut self) { + // this can get called more than once, but we only want to send + // once + if self.is_some() { + self.take_unwrap().send(1); + } } + } + + do run_uv_loop |l| { + let mut tube = Tube::new(); + let cb = ~MyCallback(Some(tube.clone())); + let watcher = Cell::new(AsyncWatcher::new(l, cb as ~Callback)); + let thread = do Thread::start { - remote_cell.take().fire(); + watcher.take().fire(); }; - assert!(tube.recv() == 1); + assert_eq!(tube.recv(), 1); thread.join(); } } -} - -#[cfg(test)] -mod test { - - use super::*; - use Loop; - use std::unstable::run_in_bare_thread; - use std::rt::thread::Thread; - use std::cell::Cell; #[test] fn smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) ); - let watcher_cell = Cell::new(watcher); - let thread = do Thread::start { - let mut watcher = watcher_cell.take(); - watcher.send(); - }; - loop_.run(); - loop_.close(); - thread.join(); + static mut hits: uint = 0; + + struct MyCallback; + impl Callback for MyCallback { + fn call(&mut self) { + unsafe { hits += 1; } + } } + + do run_uv_loop |l| { + let mut watcher = AsyncWatcher::new(l, ~MyCallback as ~Callback); + watcher.fire(); + } + assert!(unsafe { hits > 0 }); } } diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index 45f4125d7920..3b4760e0ff4e 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -455,297 +455,136 @@ impl rtio::RtioFileStream for FileWatcher { #[cfg(test)] mod test { - use super::*; - //use std::rt::test::*; - use std::libc::{STDOUT_FILENO, c_int}; - use std::vec; - use std::str; - use std::unstable::run_in_bare_thread; - use super::super::{Loop, Buf, slice_to_uv_buf}; + use std::libc::c_int; use std::libc::{O_CREAT, O_RDWR, O_RDONLY, S_IWUSR, S_IRUSR}; - - #[test] - fn file_test_full_simple() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let create_flags = O_RDWR | O_CREAT; - let read_flags = O_RDONLY; - // 0644 BZZT! WRONG! 0600! See below. - let mode = S_IWUSR |S_IRUSR; - // these aren't defined in std::libc :( - //map_mode(S_IRGRP) | - //map_mode(S_IROTH); - let path_str = "./tmp/file_full_simple.txt"; - let write_val = "hello".as_bytes().to_owned(); - let write_buf = slice_to_uv_buf(write_val); - let write_buf_ptr: *Buf = &write_buf; - let read_buf_len = 1028; - let read_mem = vec::from_elem(read_buf_len, 0u8); - let read_buf = slice_to_uv_buf(read_mem); - let read_buf_ptr: *Buf = &read_buf; - let open_req = FsRequest::new(); - do open_req.open(&loop_, &path_str.to_c_str(), create_flags as int, - mode as int) |req, uverr| { - assert!(uverr.is_none()); - let fd = req.get_result(); - let buf = unsafe { *write_buf_ptr }; - let write_req = FsRequest::new(); - do write_req.write(&req.get_loop(), fd, buf, -1) |req, uverr| { - let close_req = FsRequest::new(); - do close_req.close(&req.get_loop(), fd) |req, _| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let open_req = FsRequest::new(); - do open_req.open(&loop_, &path_str.to_c_str(), - read_flags as int,0) |req, uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let fd = req.get_result(); - let read_buf = unsafe { *read_buf_ptr }; - let read_req = FsRequest::new(); - do read_req.read(&loop_, fd, read_buf, 0) |req, uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - // we know nread >=0 because uverr is none.. - let nread = req.get_result() as uint; - // nread == 0 would be EOF - if nread > 0 { - let read_str = unsafe { - let read_buf = *read_buf_ptr; - str::from_utf8( - vec::from_buf( - read_buf.base, nread)) - }; - assert!(read_str == ~"hello"); - let close_req = FsRequest::new(); - do close_req.close(&loop_, fd) |req,uverr| { - assert!(uverr.is_none()); - let loop_ = &req.get_loop(); - let unlink_req = FsRequest::new(); - do unlink_req.unlink(loop_, - &path_str.to_c_str()) - |_,uverr| { - assert!(uverr.is_none()); - }; - }; - }; - }; - }; - }; - }; - }; - loop_.run(); - loop_.close(); - } - } + use std::rt::io; + use std::str; + use std::vec; + use super::*; + use super::super::{run_uv_loop}; #[test] fn file_test_full_simple_sync() { - do run_in_bare_thread { - // setup - let mut loop_ = Loop::new(); - let create_flags = O_RDWR | - O_CREAT; + do run_uv_loop |l| { + let create_flags = O_RDWR | O_CREAT; let read_flags = O_RDONLY; - // 0644 - let mode = S_IWUSR | - S_IRUSR; - //S_IRGRP | - //S_IROTH; + let mode = S_IWUSR | S_IRUSR; let path_str = "./tmp/file_full_simple_sync.txt"; - let write_val = "hello".as_bytes().to_owned(); - let write_buf = slice_to_uv_buf(write_val); - // open/create - let open_req = FsRequest::new(); - let result = open_req.open_sync(&loop_, &path_str.to_c_str(), - create_flags as int, mode as int); - assert!(result.is_ok()); - let fd = result.unwrap(); - // write - let write_req = FsRequest::new(); - let result = write_req.write_sync(&loop_, fd, write_buf, -1); - assert!(result.is_ok()); - // close - let close_req = FsRequest::new(); - let result = close_req.close_sync(&loop_, fd); - assert!(result.is_ok()); - // re-open - let open_req = FsRequest::new(); - let result = open_req.open_sync(&loop_, &path_str.to_c_str(), - read_flags as int,0); - assert!(result.is_ok()); - let len = 1028; - let fd = result.unwrap(); - // read - let read_mem: ~[u8] = vec::from_elem(len, 0u8); - let buf = slice_to_uv_buf(read_mem); - let read_req = FsRequest::new(); - let result = read_req.read_sync(&loop_, fd, buf, 0); - assert!(result.is_ok()); - let nread = result.unwrap(); - // nread == 0 would be EOF.. we know it's >= zero because otherwise - // the above assert would fail - if nread > 0 { - let read_str = str::from_utf8( - read_mem.slice(0, nread as uint)); - assert!(read_str == ~"hello"); + + { + // open/create + let result = FsRequest::open(l, &path_str.to_c_str(), + create_flags as int, mode as int); + assert!(result.is_ok()); + let result = result.unwrap(); + let fd = result.fd; + + // write + let result = FsRequest::write(l, fd, "hello".as_bytes(), -1); + assert!(result.is_ok()); + // close - let close_req = FsRequest::new(); - let result = close_req.close_sync(&loop_, fd); + let result = FsRequest::close(l, fd, true); assert!(result.is_ok()); + } + + { + // re-open + let result = FsRequest::open(l, &path_str.to_c_str(), + read_flags as int, 0); + assert!(result.is_ok()); + let result = result.unwrap(); + let fd = result.fd; + + // read + let mut read_mem = vec::from_elem(1000, 0u8); + let result = FsRequest::read(l, fd, read_mem, 0); + assert!(result.is_ok()); + + let nread = result.unwrap(); + assert!(nread > 0); + let read_str = str::from_utf8(read_mem.slice(0, nread as uint)); + assert_eq!(read_str, ~"hello"); + + // close + let result = FsRequest::close(l, fd, true); + assert!(result.is_ok()); + // unlink - let unlink_req = FsRequest::new(); - let result = unlink_req.unlink_sync(&loop_, &path_str.to_c_str()); + let result = FsRequest::unlink(l, &path_str.to_c_str()); assert!(result.is_ok()); - } else { fail!("nread was 0.. wudn't expectin' that."); } - loop_.close(); + } } } - fn naive_print(loop_: &Loop, input: &str) { - let write_val = input.as_bytes(); - let write_buf = slice_to_uv_buf(write_val); - let write_req = FsRequest::new(); - write_req.write_sync(loop_, STDOUT_FILENO, write_buf, -1); - } + #[test] + fn file_test_stat() { + do run_uv_loop |l| { + let path = &"./tmp/file_test_stat_simple".to_c_str(); + let create_flags = (O_RDWR | O_CREAT) as int; + let mode = (S_IWUSR | S_IRUSR) as int; - #[test] - fn file_test_write_to_stdout() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - naive_print(&loop_, "zanzibar!\n"); - loop_.run(); - loop_.close(); - }; - } - #[test] - fn file_test_stat_simple() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let path = "./tmp/file_test_stat_simple.txt"; - let create_flags = O_RDWR | - O_CREAT; - let mode = S_IWUSR | - S_IRUSR; - let write_val = "hello".as_bytes().to_owned(); - let write_buf = slice_to_uv_buf(write_val); - let write_buf_ptr: *Buf = &write_buf; - let open_req = FsRequest::new(); - do open_req.open(&loop_, &path.to_c_str(), create_flags as int, - mode as int) |req, uverr| { - assert!(uverr.is_none()); - let fd = req.get_result(); - let buf = unsafe { *write_buf_ptr }; - let write_req = FsRequest::new(); - do write_req.write(&req.get_loop(), fd, buf, 0) |req, uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let stat = req.get_stat(); - let sz: uint = stat.st_size as uint; - assert!(sz > 0); - let close_req = FsRequest::new(); - do close_req.close(&loop_, fd) |req, uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let unlink_req = FsRequest::new(); - do unlink_req.unlink(&loop_, - &path.to_c_str()) |req,uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, - &path.to_c_str()) |_, uverr| { - // should cause an error because the - // file doesn't exist anymore - assert!(uverr.is_some()); - }; - }; - }; - }; - }; - }; - loop_.run(); - loop_.close(); + let result = FsRequest::open(l, path, create_flags, mode); + assert!(result.is_ok()); + let file = result.unwrap(); + + let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0); + assert!(result.is_ok()); + + let result = FsRequest::stat(l, path); + assert!(result.is_ok()); + assert_eq!(result.unwrap().size, 5); + + fn free(_: T) {} + free(file); + + let result = FsRequest::unlink(l, path); + assert!(result.is_ok()); } } #[test] fn file_test_mk_rm_dir() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let path = "./tmp/mk_rm_dir"; - let mode = S_IWUSR | - S_IRUSR; - let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path.to_c_str(), - mode as c_int) |req,uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let stat = req.get_stat(); - naive_print(&loop_, format!("{:?}", stat)); - assert!(stat.is_dir()); - let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path.to_c_str()) |_req, uverr| { - assert!(uverr.is_some()); - } - } - } - } - loop_.run(); - loop_.close(); + do run_uv_loop |l| { + let path = &"./tmp/mk_rm_dir".to_c_str(); + let mode = S_IWUSR | S_IRUSR; + + let result = FsRequest::mkdir(l, path, mode); + assert!(result.is_ok()); + + let result = FsRequest::stat(l, path); + assert!(result.is_ok()); + assert!(result.unwrap().kind == io::TypeDirectory); + + let result = FsRequest::rmdir(l, path); + assert!(result.is_ok()); + + let result = FsRequest::stat(l, path); + assert!(result.is_err()); } } + #[test] fn file_test_mkdir_chokes_on_double_create() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let path = "./tmp/double_create_dir"; - let mode = S_IWUSR | - S_IRUSR; - let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path.to_c_str(), mode as c_int) |req,uverr| { - assert!(uverr.is_none()); - let loop_ = req.get_loop(); - let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path.to_c_str(), - mode as c_int) |req,uverr| { - assert!(uverr.is_some()); - let loop_ = req.get_loop(); - let _stat = req.get_stat(); - let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| { - assert!(uverr.is_none()); - let _loop = req.get_loop(); - } - } - } - loop_.run(); - loop_.close(); + do run_uv_loop |l| { + let path = &"./tmp/double_create_dir".to_c_str(); + let mode = S_IWUSR | S_IRUSR; + + let result = FsRequest::mkdir(l, path, mode as c_int); + assert!(result.is_ok()); + let result = FsRequest::mkdir(l, path, mode as c_int); + assert!(result.is_err()); + let result = FsRequest::rmdir(l, path); + assert!(result.is_ok()); } } + #[test] fn file_test_rmdir_chokes_on_nonexistant_path() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let path = "./tmp/never_existed_dir"; - let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path.to_c_str()) |_req, uverr| { - assert!(uverr.is_some()); - } - loop_.run(); - loop_.close(); + do run_uv_loop |l| { + let path = &"./tmp/never_existed_dir".to_c_str(); + let result = FsRequest::rmdir(l, path); + assert!(result.is_err()); } } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 1afc9b1d0ea6..5bedba08fb0e 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -50,15 +50,13 @@ use std::str::raw::from_c_str; use std::vec; use std::ptr; use std::str; -use std::libc::{c_void, c_int, size_t, malloc, free}; +use std::libc::{c_void, c_int, malloc, free}; use std::cast::transmute; use std::ptr::null; use std::unstable::finally::Finally; use std::rt::io::IoError; -//#[cfg(test)] use unstable::run_in_bare_thread; - pub use self::async::AsyncWatcher; pub use self::file::{FsRequest, FileWatcher}; pub use self::idle::IdleWatcher; @@ -302,62 +300,58 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf { uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t } } -// XXX: Do these conversions without copying +fn run_uv_loop(f: proc(&mut Loop)) { + use std::rt::local::Local; + use std::rt::test::run_in_uv_task; + use std::rt::sched::Scheduler; + use std::cell::Cell; -/// Transmute an owned vector to a Buf -pub fn vec_to_uv_buf(v: ~[u8]) -> Buf { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - let data = malloc(v.len() as size_t) as *u8; - assert!(data.is_not_null()); - do v.as_imm_buf |b, l| { - let data = data as *mut u8; - ptr::copy_memory(data, b, l) + let f = Cell::new(f); + do run_in_uv_task { + let mut io = None; + do Local::borrow |sched: &mut Scheduler| { + sched.event_loop.io(|i| unsafe { + let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) = + cast::transmute(i); + io = Some(uvio); + }); } - uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t } + f.take()(io.unwrap().uv_loop()); } } -/// Transmute a Buf that was once a ~[u8] back to ~[u8] -pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { - #[fixed_stack_segment]; #[inline(never)]; +#[cfg(test)] +mod test { + use std::cast::transmute; + use std::ptr; + use std::unstable::run_in_bare_thread; - if !(buf.len == 0 && buf.base.is_null()) { - let v = unsafe { vec::from_buf(buf.base, buf.len as uint) }; - unsafe { free(buf.base as *c_void) }; - return Some(v); - } else { - // No buffer - uvdebug!("No buffer!"); - return None; - } -} -/* -#[test] -fn test_slice_to_uv_buf() { - let slice = [0, .. 20]; - let buf = slice_to_uv_buf(slice); + use super::{slice_to_uv_buf, Loop}; - assert!(buf.len == 20); + #[test] + fn test_slice_to_uv_buf() { + let slice = [0, .. 20]; + let buf = slice_to_uv_buf(slice); - unsafe { - let base = transmute::<*u8, *mut u8>(buf.base); - (*base) = 1; - (*ptr::mut_offset(base, 1)) = 2; + assert_eq!(buf.len, 20); + + unsafe { + let base = transmute::<*u8, *mut u8>(buf.base); + (*base) = 1; + (*ptr::mut_offset(base, 1)) = 2; + } + + assert!(slice[0] == 1); + assert!(slice[1] == 2); } - assert!(slice[0] == 1); - assert!(slice[1] == 2); -} - -#[test] -fn loop_smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - loop_.run(); - loop_.close(); + #[test] + fn loop_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + loop_.run(); + loop_.close(); + } } } -*/ diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 28c2c4df12a0..9fd771b97395 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -705,350 +705,559 @@ impl Drop for UdpWatcher { #[cfg(test)] mod test { - use super::*; - use std::util::ignore; use std::cell::Cell; - use std::vec; - use std::unstable::run_in_bare_thread; - use std::rt::thread::Thread; + use std::comm::oneshot; use std::rt::test::*; - use super::super::{Loop, AllocCallback}; - use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf}; + use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor, + RtioUdpSocket}; + use std::task; + + use super::*; + use super::super::{Loop, run_uv_loop}; #[test] fn connect_close_ip4() { - do run_in_bare_thread() { - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - // Connect to a port where nobody is listening - let addr = next_test_ip4(); - do tcp_watcher.connect(addr) |stream_watcher, status| { - uvdebug!("tcp_watcher.connect!"); - assert!(status.is_some()); - assert_eq!(status.unwrap().name(), ~"ECONNREFUSED"); - stream_watcher.close(||()); + do run_uv_loop |l| { + match TcpWatcher::connect(l, next_test_ip4()) { + Ok(*) => fail!(), + Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), } - loop_.run(); - loop_.close(); } } #[test] fn connect_close_ip6() { - do run_in_bare_thread() { - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - // Connect to a port where nobody is listening - let addr = next_test_ip6(); - do tcp_watcher.connect(addr) |stream_watcher, status| { - uvdebug!("tcp_watcher.connect!"); - assert!(status.is_some()); - assert_eq!(status.unwrap().name(), ~"ECONNREFUSED"); - stream_watcher.close(||()); + do run_uv_loop |l| { + match TcpWatcher::connect(l, next_test_ip6()) { + Ok(*) => fail!(), + Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"), } - loop_.run(); - loop_.close(); } } #[test] fn udp_bind_close_ip4() { - do run_in_bare_thread() { - let mut loop_ = Loop::new(); - let mut udp_watcher = { UdpWatcher::new(&mut loop_) }; - let addr = next_test_ip4(); - udp_watcher.bind(addr); - udp_watcher.close(||()); - loop_.run(); - loop_.close(); + do run_uv_loop |l| { + match UdpWatcher::bind(l, next_test_ip4()) { + Ok(*) => {} + Err(*) => fail!() + } } } #[test] fn udp_bind_close_ip6() { - do run_in_bare_thread() { - let mut loop_ = Loop::new(); - let mut udp_watcher = { UdpWatcher::new(&mut loop_) }; - let addr = next_test_ip6(); - udp_watcher.bind(addr); - udp_watcher.close(||()); - loop_.run(); - loop_.close(); + do run_uv_loop |l| { + match UdpWatcher::bind(l, next_test_ip6()) { + Ok(*) => {} + Err(*) => fail!() + } } } #[test] fn listen_ip4() { - do run_in_bare_thread() { - static MAX: int = 10; - let mut loop_ = Loop::new(); - let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do run_uv_loop |l| { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); let addr = next_test_ip4(); - server_tcp_watcher.bind(addr); - let loop_ = loop_; - uvdebug!("listening"); - let mut stream = server_tcp_watcher.as_stream(); - let res = do stream.listen |mut server_stream_watcher, status| { - uvdebug!("listened!"); - assert!(status.is_none()); - let mut loop_ = loop_; - let client_tcp_watcher = TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = client_tcp_watcher.as_stream(); - server_stream_watcher.accept(client_tcp_watcher); - let count_cell = Cell::new(0); - let server_stream_watcher = server_stream_watcher; - uvdebug!("starting read"); - let alloc: AllocCallback = |size| { - vec_to_uv_buf(vec::from_elem(size, 0u8)) + + let handle = l.handle; + do spawn { + let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) }; - do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| { - - uvdebug!("i'm reading!"); - let buf = vec_from_uv_buf(buf); - let mut count = count_cell.take(); - if status.is_none() { - uvdebug!("got {} bytes", nread); - let buf = buf.unwrap(); - for byte in buf.slice(0, nread as uint).iter() { - assert!(*byte == count as u8); - uvdebug!("{}", *byte as uint); - count += 1; + let mut w = match w.listen() { + Ok(w) => w, Err(e) => fail!("{:?}", e), + }; + chan.take().send(()); + match w.accept() { + Ok(mut stream) => { + let mut buf = [0u8, ..10]; + match stream.read(buf) { + Ok(10) => {} e => fail!("{:?}", e), } - } else { - assert_eq!(count, MAX); - do stream_watcher.close { - server_stream_watcher.close(||()); + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); } } - count_cell.put_back(count); + Err(e) => fail!("{:?}", e) } + } + + port.recv(); + let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) }; - - assert!(res.is_ok()); - - let client_thread = do Thread::start { - uvdebug!("starting client thread"); - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - do tcp_watcher.connect(addr) |mut stream_watcher, status| { - uvdebug!("connecting"); - assert!(status.is_none()); - let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; - let buf = slice_to_uv_buf(msg); - let msg_cell = Cell::new(msg); - do stream_watcher.write(buf) |stream_watcher, status| { - uvdebug!("writing"); - assert!(status.is_none()); - let msg_cell = Cell::new(msg_cell.take()); - stream_watcher.close(||ignore(msg_cell.take())); - } - } - loop_.run(); - loop_.close(); - }; - - let mut loop_ = loop_; - loop_.run(); - loop_.close(); - client_thread.join(); - }; + match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } + } } #[test] fn listen_ip6() { - do run_in_bare_thread() { - static MAX: int = 10; - let mut loop_ = Loop::new(); - let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do run_uv_loop |l| { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); let addr = next_test_ip6(); - server_tcp_watcher.bind(addr); - let loop_ = loop_; - uvdebug!("listening"); - let mut stream = server_tcp_watcher.as_stream(); - let res = do stream.listen |mut server_stream_watcher, status| { - uvdebug!("listened!"); - assert!(status.is_none()); - let mut loop_ = loop_; - let client_tcp_watcher = TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = client_tcp_watcher.as_stream(); - server_stream_watcher.accept(client_tcp_watcher); - let count_cell = Cell::new(0); - let server_stream_watcher = server_stream_watcher; - uvdebug!("starting read"); - let alloc: AllocCallback = |size| { - vec_to_uv_buf(vec::from_elem(size, 0u8)) + + let handle = l.handle; + do spawn { + let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) }; - do client_tcp_watcher.read_start(alloc) - |stream_watcher, nread, buf, status| { - - uvdebug!("i'm reading!"); - let buf = vec_from_uv_buf(buf); - let mut count = count_cell.take(); - if status.is_none() { - uvdebug!("got {} bytes", nread); - let buf = buf.unwrap(); - let r = buf.slice(0, nread as uint); - for byte in r.iter() { - assert!(*byte == count as u8); - uvdebug!("{}", *byte as uint); - count += 1; + let mut w = match w.listen() { + Ok(w) => w, Err(e) => fail!("{:?}", e), + }; + chan.take().send(()); + match w.accept() { + Ok(mut stream) => { + let mut buf = [0u8, ..10]; + match stream.read(buf) { + Ok(10) => {} e => fail!("{:?}", e), } - } else { - assert_eq!(count, MAX); - do stream_watcher.close { - server_stream_watcher.close(||()); + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); } } - count_cell.put_back(count); + Err(e) => fail!("{:?}", e) } - }; - assert!(res.is_ok()); + } - let client_thread = do Thread::start { - uvdebug!("starting client thread"); - let mut loop_ = Loop::new(); - let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; - do tcp_watcher.connect(addr) |mut stream_watcher, status| { - uvdebug!("connecting"); - assert!(status.is_none()); - let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; - let buf = slice_to_uv_buf(msg); - let msg_cell = Cell::new(msg); - do stream_watcher.write(buf) |stream_watcher, status| { - uvdebug!("writing"); - assert!(status.is_none()); - let msg_cell = Cell::new(msg_cell.take()); - stream_watcher.close(||ignore(msg_cell.take())); - } - } - loop_.run(); - loop_.close(); + port.recv(); + let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) { + Ok(w) => w, Err(e) => fail!("{:?}", e) }; - - let mut loop_ = loop_; - loop_.run(); - loop_.close(); - client_thread.join(); + match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } } } #[test] fn udp_recv_ip4() { - do run_in_bare_thread() { - static MAX: int = 10; - let mut loop_ = Loop::new(); - let server_addr = next_test_ip4(); - let client_addr = next_test_ip4(); + do run_uv_loop |l| { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let client = next_test_ip4(); + let server = next_test_ip4(); - let mut server = UdpWatcher::new(&loop_); - assert!(server.bind(server_addr).is_ok()); - - uvdebug!("starting read"); - let alloc: AllocCallback = |size| { - vec_to_uv_buf(vec::from_elem(size, 0u8)) - }; - - do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| { - server.recv_stop(); - uvdebug!("i'm reading!"); - assert!(status.is_none()); - assert_eq!(flags, 0); - assert_eq!(src, client_addr); - - let buf = vec_from_uv_buf(buf); - let mut count = 0; - uvdebug!("got {} bytes", nread); - - let buf = buf.unwrap(); - for &byte in buf.slice(0, nread as uint).iter() { - assert!(byte == count as u8); - uvdebug!("{}", byte as uint); - count += 1; + let handle = l.handle; + do spawn { + match UdpWatcher::bind(&mut Loop::wrap(handle), server) { + Ok(mut w) => { + chan.take().send(()); + let mut buf = [0u8, ..10]; + match w.recvfrom(buf) { + Ok((10, addr)) => assert_eq!(addr, client), + e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); + } + } + Err(e) => fail!("{:?}", e) } - assert_eq!(count, MAX); - - server.close(||{}); } - let thread = do Thread::start { - let mut loop_ = Loop::new(); - let mut client = UdpWatcher::new(&loop_); - assert!(client.bind(client_addr).is_ok()); - let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; - let buf = slice_to_uv_buf(msg); - do client.send(buf, server_addr) |client, status| { - uvdebug!("writing"); - assert!(status.is_none()); - client.close(||{}); - } - - loop_.run(); - loop_.close(); + port.recv(); + let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) { + Ok(w) => w, Err(e) => fail!("{:?}", e) }; - - loop_.run(); - loop_.close(); - thread.join(); + match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } } } #[test] fn udp_recv_ip6() { - do run_in_bare_thread() { - static MAX: int = 10; - let mut loop_ = Loop::new(); - let server_addr = next_test_ip6(); - let client_addr = next_test_ip6(); + do run_uv_loop |l| { + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let client = next_test_ip6(); + let server = next_test_ip6(); - let mut server = UdpWatcher::new(&loop_); - assert!(server.bind(server_addr).is_ok()); - - uvdebug!("starting read"); - let alloc: AllocCallback = |size| { - vec_to_uv_buf(vec::from_elem(size, 0u8)) - }; - - do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| { - server.recv_stop(); - uvdebug!("i'm reading!"); - assert!(status.is_none()); - assert_eq!(flags, 0); - assert_eq!(src, client_addr); - - let buf = vec_from_uv_buf(buf); - let mut count = 0; - uvdebug!("got {} bytes", nread); - - let buf = buf.unwrap(); - for &byte in buf.slice(0, nread as uint).iter() { - assert!(byte == count as u8); - uvdebug!("{}", byte as uint); - count += 1; + let handle = l.handle; + do spawn { + match UdpWatcher::bind(&mut Loop::wrap(handle), server) { + Ok(mut w) => { + chan.take().send(()); + let mut buf = [0u8, ..10]; + match w.recvfrom(buf) { + Ok((10, addr)) => assert_eq!(addr, client), + e => fail!("{:?}", e), + } + for i in range(0, 10u8) { + assert_eq!(buf[i], i + 1); + } + } + Err(e) => fail!("{:?}", e) } - assert_eq!(count, MAX); - - server.close(||{}); } - let thread = do Thread::start { - let mut loop_ = Loop::new(); - let mut client = UdpWatcher::new(&loop_); - assert!(client.bind(client_addr).is_ok()); - let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; - let buf = slice_to_uv_buf(msg); - do client.send(buf, server_addr) |client, status| { - uvdebug!("writing"); - assert!(status.is_none()); - client.close(||{}); - } - - loop_.run(); - loop_.close(); + port.recv(); + let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) { + Ok(w) => w, Err(e) => fail!("{:?}", e) }; - - loop_.run(); - loop_.close(); - thread.join(); + match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) { + Ok(()) => {}, Err(e) => fail!("{:?}", e) + } } } + + #[test] + fn test_read_read_read() { + do run_uv_loop |l| { + let addr = next_test_ip4(); + static MAX: uint = 500000; + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let handle = l.handle; + do spawntask { + let l = &mut Loop::wrap(handle); + let listener = TcpListener::bind(l, addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + chan.take().send(()); + let mut stream = acceptor.accept().unwrap(); + let buf = [1, .. 2048]; + let mut total_bytes_written = 0; + while total_bytes_written < MAX { + stream.write(buf); + total_bytes_written += buf.len(); + } + } + + do spawntask { + let l = &mut Loop::wrap(handle); + port.take().recv(); + let mut stream = TcpWatcher::connect(l, addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < MAX { + let nread = stream.read(buf).unwrap(); + uvdebug!("read {} bytes", nread); + total_bytes_read += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); + } + } + uvdebug!("read {} bytes total", total_bytes_read); + } + } + } + + #[test] + #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send + fn test_udp_twice() { + do run_uv_loop |l| { + let server_addr = next_test_ip4(); + let client_addr = next_test_ip4(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let handle = l.handle; + do spawntask { + let l = &mut Loop::wrap(handle); + let mut client = UdpWatcher::bind(l, client_addr).unwrap(); + port.take().recv(); + assert!(client.sendto([1], server_addr).is_ok()); + assert!(client.sendto([2], server_addr).is_ok()); + } + + do spawntask { + let l = &mut Loop::wrap(handle); + let mut server = UdpWatcher::bind(l, server_addr).unwrap(); + chan.take().send(()); + let mut buf1 = [0]; + let mut buf2 = [0]; + let (nread1, src1) = server.recvfrom(buf1).unwrap(); + let (nread2, src2) = server.recvfrom(buf2).unwrap(); + assert_eq!(nread1, 1); + assert_eq!(nread2, 1); + assert_eq!(src1, client_addr); + assert_eq!(src2, client_addr); + assert_eq!(buf1[0], 1); + assert_eq!(buf2[0], 2); + } + } + } + + #[test] + fn test_udp_many_read() { + do run_uv_loop |l| { + let server_out_addr = next_test_ip4(); + let server_in_addr = next_test_ip4(); + let client_out_addr = next_test_ip4(); + let client_in_addr = next_test_ip4(); + static MAX: uint = 500_000; + + let (p1, c1) = oneshot(); + let (p2, c2) = oneshot(); + + let first = Cell::new((p1, c2)); + let second = Cell::new((p2, c1)); + + let handle = l.handle; + do spawntask { + let l = &mut Loop::wrap(handle); + let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap(); + let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap(); + let (port, chan) = first.take(); + chan.send(()); + port.recv(); + let msg = [1, .. 2048]; + let mut total_bytes_sent = 0; + let mut buf = [1]; + while buf[0] == 1 { + // send more data + assert!(server_out.sendto(msg, client_in_addr).is_ok()); + total_bytes_sent += msg.len(); + // check if the client has received enough + let res = server_in.recvfrom(buf); + assert!(res.is_ok()); + let (nread, src) = res.unwrap(); + assert_eq!(nread, 1); + assert_eq!(src, client_out_addr); + } + assert!(total_bytes_sent >= MAX); + } + + do spawntask { + let l = &mut Loop::wrap(handle); + let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); + let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); + let (port, chan) = second.take(); + port.recv(); + chan.send(()); + let mut total_bytes_recv = 0; + let mut buf = [0, .. 2048]; + while total_bytes_recv < MAX { + // ask for more + assert!(client_out.sendto([1], server_in_addr).is_ok()); + // wait for data + let res = client_in.recvfrom(buf); + assert!(res.is_ok()); + let (nread, src) = res.unwrap(); + assert_eq!(src, server_out_addr); + total_bytes_recv += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); + } + } + // tell the server we're done + assert!(client_out.sendto([0], server_in_addr).is_ok()); + } + } + } + + #[test] + fn test_read_and_block() { + do run_uv_loop |l| { + let addr = next_test_ip4(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let handle = l.handle; + do spawntask { + let l = &mut Loop::wrap(handle); + let listener = TcpListener::bind(l, addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let (port2, chan2) = stream(); + chan.take().send(port2); + let mut stream = acceptor.accept().unwrap(); + let mut buf = [0, .. 2048]; + + let expected = 32; + let mut current = 0; + let mut reads = 0; + + while current < expected { + let nread = stream.read(buf).unwrap(); + for i in range(0u, nread) { + let val = buf[i] as uint; + assert_eq!(val, current % 8); + current += 1; + } + reads += 1; + + chan2.send(()); + } + + // Make sure we had multiple reads + assert!(reads > 1); + } + + do spawntask { + let l = &mut Loop::wrap(handle); + let port2 = port.take().recv(); + let mut stream = TcpWatcher::connect(l, addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + port2.recv(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + port2.recv(); + } + } + } + + #[test] + fn test_simple_tcp_server_and_client_on_diff_threads() { + let addr = next_test_ip4(); + + do task::spawn_sched(task::SingleThreaded) { + do run_uv_loop |l| { + let listener = TcpListener::bind(l, addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let mut stream = acceptor.accept().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert_eq!(nread, 8); + for i in range(0u, nread) { + assert_eq!(buf[i], i as u8); + } + } + } + + do task::spawn_sched(task::SingleThreaded) { + do run_uv_loop |l| { + let mut stream = TcpWatcher::connect(l, addr); + while stream.is_err() { + stream = TcpWatcher::connect(l, addr); + } + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); + } + } + } + + // On one thread, create a udp socket. Then send that socket to another + // thread and destroy the socket on the remote thread. This should make sure + // that homing kicks in for the socket to go back home to the original + // thread, close itself, and then come back to the last thread. + #[test] + fn test_homing_closes_correctly() { + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + do task::spawn_sched(task::SingleThreaded) { + let chan = Cell::new(chan.take()); + do run_uv_loop |l| { + let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap(); + chan.take().send(listener); + } + } + + do task::spawn_sched(task::SingleThreaded) { + let port = Cell::new(port.take()); + do run_uv_loop |_l| { + port.take().recv(); + } + } + } + + // This is a bit of a crufty old test, but it has its uses. + #[test] + fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { + use std::cast; + use std::rt::local::Local; + use std::rt::rtio::{EventLoop, IoFactory}; + use std::rt::sched::Scheduler; + use std::rt::sched::{Shutdown, TaskFromFriend}; + use std::rt::sleeper_list::SleeperList; + use std::rt::task::Task; + use std::rt::task::UnwindResult; + use std::rt::thread::Thread; + use std::rt::work_queue::WorkQueue; + use std::unstable::run_in_bare_thread; + use uvio::UvEventLoop; + + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let loop1 = ~UvEventLoop::new() as ~EventLoop; + let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), + sleepers.clone()); + let loop2 = ~UvEventLoop::new() as ~EventLoop; + let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + let tasksFriendHandle = Cell::new(sched2.make_handle()); + + let on_exit: ~fn(UnwindResult) = |exit_status| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + assert!(exit_status.is_success()); + }; + + unsafe fn local_io() -> &'static mut IoFactory { + do Local::borrow |sched: &mut Scheduler| { + let mut io = None; + sched.event_loop.io(|i| io = Some(i)); + cast::transmute(io.unwrap()) + } + } + + let test_function: ~fn() = || { + let io = unsafe { local_io() }; + let addr = next_test_ip4(); + let maybe_socket = io.udp_bind(addr); + // this socket is bound to this event loop + assert!(maybe_socket.is_ok()); + + // block self on sched1 + do task::unkillable { // FIXME(#8674) + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + // unblock task + do task.wake().map |task| { + // send self to sched2 + tasksFriendHandle.take().send(TaskFromFriend(task)); + }; + // sched1 should now sleep since it has nothing else to do + } + } + // sched2 will wake up and get the task as we do nothing else, + // the function ends and the socket goes out of scope sched2 + // will start to run the destructor the destructor will first + // block the task, set it's home as sched1, then enqueue it + // sched2 will dequeue the task, see that it has a home, and + // send it to sched1 sched1 will wake up, exec the close + // function on the correct loop, and then we're done + }; + + let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, + test_function); + main_task.death.on_exit = Some(on_exit); + let main_task = Cell::new(main_task); + + let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, + None) || {}); + + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); + + let thread1 = do Thread::start { + sched1.take().bootstrap(main_task.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(null_task.take()); + }; + + thread1.join(); + thread2.join(); + } + } + } diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 18b05073e830..bf24ec405c2f 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -123,92 +123,52 @@ impl Drop for TimerWatcher { #[cfg(test)] mod test { use super::*; - use Loop; - use std::unstable::run_in_bare_thread; + use std::rt::rtio::RtioTimer; + use super::super::run_uv_loop; #[test] - fn smoke_test() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - let mut loop_ = Loop::new(); - let mut timer = TimerWatcher::new(&mut loop_); - do timer.start(10, 0) |timer, status| { - assert!(status.is_none()); - unsafe { *count_ptr += 1 }; - timer.close(||()); - } - loop_.run(); - loop_.close(); - assert!(count == 1); + fn oneshot() { + do run_uv_loop |l| { + let mut timer = TimerWatcher::new(l); + let port = timer.oneshot(1); + port.recv(); + let port = timer.oneshot(1); + port.recv(); } } #[test] - fn start_twice() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - let mut loop_ = Loop::new(); - let mut timer = TimerWatcher::new(&mut loop_); - do timer.start(10, 0) |timer, status| { - let mut timer = timer; - assert!(status.is_none()); - unsafe { *count_ptr += 1 }; - do timer.start(10, 0) |timer, status| { - assert!(status.is_none()); - unsafe { *count_ptr += 1 }; - timer.close(||()); - } - } - loop_.run(); - loop_.close(); - assert!(count == 2); + fn override() { + do run_uv_loop |l| { + let mut timer = TimerWatcher::new(l); + let oport = timer.oneshot(1); + let pport = timer.period(1); + timer.sleep(1); + assert_eq!(oport.try_recv(), None); + assert_eq!(pport.try_recv(), None); + timer.oneshot(1).recv(); } } #[test] - fn repeat_stop() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - let mut loop_ = Loop::new(); - let mut timer = TimerWatcher::new(&mut loop_); - do timer.start(1, 2) |timer, status| { - assert!(status.is_none()); - unsafe { - *count_ptr += 1; - - if *count_ptr == 10 { - - // Stop the timer and do something else - let mut timer = timer; - timer.stop(); - // Freeze timer so it can be captured - let timer = timer; - - let mut loop_ = timer.event_loop(); - let mut timer2 = TimerWatcher::new(&mut loop_); - do timer2.start(10, 0) |timer2, _| { - - *count_ptr += 1; - - timer2.close(||()); - - // Restart the original timer - let mut timer = timer; - do timer.start(1, 0) |timer, _| { - *count_ptr += 1; - timer.close(||()); - } - } - } - }; - } - loop_.run(); - loop_.close(); - assert!(count == 12); + fn period() { + do run_uv_loop |l| { + let mut timer = TimerWatcher::new(l); + let port = timer.period(1); + port.recv(); + port.recv(); + let port = timer.period(1); + port.recv(); + port.recv(); } } + #[test] + fn sleep() { + do run_uv_loop |l| { + let mut timer = TimerWatcher::new(l); + timer.sleep(1); + timer.sleep(1); + } + } } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 2aac43072dd0..e9d8aab2e8b6 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -9,8 +9,6 @@ // except according to those terms. use std::c_str::CString; -use std::cast::transmute; -use std::cast; use std::comm::{SharedChan, GenericChan}; use std::libc::c_int; use std::libc; @@ -23,7 +21,6 @@ use std::rt::local::Local; use std::rt::rtio::*; use std::rt::sched::{Scheduler, SchedHandle}; use std::rt::task::Task; -use std::str; use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR, S_IWUSR}; use std::rt::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write, @@ -33,10 +30,6 @@ use std::task; use ai = std::rt::io::net::addrinfo; #[cfg(test)] use std::unstable::run_in_bare_thread; -#[cfg(test)] use std::rt::test::{spawntask, - next_test_ip4, - run_in_mt_newsched_task}; -#[cfg(test)] use std::rt::comm::oneshot; use super::*; use addrinfo::GetAddrInfoRequest; @@ -370,626 +363,3 @@ impl IoFactory for UvIoFactory { } } } - -// this function is full of lies -unsafe fn local_io() -> &'static mut IoFactory { - do Local::borrow |sched: &mut Scheduler| { - let mut io = None; - sched.event_loop.io(|i| io = Some(i)); - cast::transmute(io.unwrap()) - } -} - -#[test] -fn test_simple_io_no_connect() { - do run_in_mt_newsched_task { - unsafe { - let io = local_io(); - let addr = next_test_ip4(); - let maybe_chan = io.tcp_connect(addr); - assert!(maybe_chan.is_err()); - } - } -} - -#[test] -fn test_simple_udp_io_bind_only() { - do run_in_mt_newsched_task { - unsafe { - let io = local_io(); - let addr = next_test_ip4(); - let maybe_socket = io.udp_bind(addr); - assert!(maybe_socket.is_ok()); - } - } -} - -#[test] -fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { - use std::rt::sleeper_list::SleeperList; - use std::rt::work_queue::WorkQueue; - use std::rt::thread::Thread; - use std::rt::task::Task; - use std::rt::sched::{Shutdown, TaskFromFriend}; - use std::rt::task::UnwindResult; - do run_in_bare_thread { - let sleepers = SleeperList::new(); - let work_queue1 = WorkQueue::new(); - let work_queue2 = WorkQueue::new(); - let queues = ~[work_queue1.clone(), work_queue2.clone()]; - - let loop1 = ~UvEventLoop::new() as ~EventLoop; - let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), - sleepers.clone()); - let loop2 = ~UvEventLoop::new() as ~EventLoop; - let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), - sleepers.clone()); - - let handle1 = Cell::new(sched1.make_handle()); - let handle2 = Cell::new(sched2.make_handle()); - let tasksFriendHandle = Cell::new(sched2.make_handle()); - - let on_exit: ~fn(UnwindResult) = |exit_status| { - handle1.take().send(Shutdown); - handle2.take().send(Shutdown); - assert!(exit_status.is_success()); - }; - - let test_function: ~fn() = || { - let io = unsafe { local_io() }; - let addr = next_test_ip4(); - let maybe_socket = io.udp_bind(addr); - // this socket is bound to this event loop - assert!(maybe_socket.is_ok()); - - // block self on sched1 - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - // unblock task - do task.wake().map |task| { - // send self to sched2 - tasksFriendHandle.take().send(TaskFromFriend(task)); - }; - // sched1 should now sleep since it has nothing else to do - } - } - // sched2 will wake up and get the task - // as we do nothing else, the function ends and the socket goes out of scope - // sched2 will start to run the destructor - // the destructor will first block the task, set it's home as sched1, then enqueue it - // sched2 will dequeue the task, see that it has a home, and send it to sched1 - // sched1 will wake up, exec the close function on the correct loop, and then we're done - }; - - let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); - main_task.death.on_exit = Some(on_exit); - let main_task = Cell::new(main_task); - - let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); - - let sched1 = Cell::new(sched1); - let sched2 = Cell::new(sched2); - - let thread1 = do Thread::start { - sched1.take().bootstrap(main_task.take()); - }; - let thread2 = do Thread::start { - sched2.take().bootstrap(null_task.take()); - }; - - thread1.join(); - thread2.join(); - } -} - -#[test] -fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { - use std::rt::sleeper_list::SleeperList; - use std::rt::work_queue::WorkQueue; - use std::rt::thread::Thread; - use std::rt::task::Task; - use std::rt::comm::oneshot; - use std::rt::sched::Shutdown; - use std::rt::task::UnwindResult; - do run_in_bare_thread { - let sleepers = SleeperList::new(); - let work_queue1 = WorkQueue::new(); - let work_queue2 = WorkQueue::new(); - let queues = ~[work_queue1.clone(), work_queue2.clone()]; - - let loop1 = ~UvEventLoop::new() as ~EventLoop; - let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), - sleepers.clone()); - let loop2 = ~UvEventLoop::new() as ~EventLoop; - let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), - sleepers.clone()); - - let handle1 = Cell::new(sched1.make_handle()); - let handle2 = Cell::new(sched2.make_handle()); - - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); - - let body1: ~fn() = || { - let io = unsafe { local_io() }; - let addr = next_test_ip4(); - let socket = io.udp_bind(addr); - assert!(socket.is_ok()); - chan.take().send(socket); - }; - - let body2: ~fn() = || { - let socket = port.take().recv(); - assert!(socket.is_ok()); - /* The socket goes out of scope and the destructor is called. - * The destructor: - * - sends itself back to sched1 - * - frees the socket - * - resets the home of the task to whatever it was previously - */ - }; - - let on_exit: ~fn(UnwindResult) = |exit| { - handle1.take().send(Shutdown); - handle2.take().send(Shutdown); - assert!(exit.is_success()); - }; - - let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1)); - - let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2); - task2.death.on_exit = Some(on_exit); - let task2 = Cell::new(task2); - - let sched1 = Cell::new(sched1); - let sched2 = Cell::new(sched2); - - let thread1 = do Thread::start { - sched1.take().bootstrap(task1.take()); - }; - let thread2 = do Thread::start { - sched2.take().bootstrap(task2.take()); - }; - - thread1.join(); - thread2.join(); - } -} - -#[test] -fn test_simple_tcp_server_and_client() { - do run_in_mt_newsched_task { - let addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); - - // Start the server first so it's listening when we connect - do spawntask { - unsafe { - let io = local_io(); - let listener = io.tcp_bind(addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - chan.take().send(()); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert_eq!(nread, 8); - for i in range(0u, nread) { - uvdebug!("{}", buf[i]); - assert_eq!(buf[i], i as u8); - } - } - } - - do spawntask { - unsafe { - port.take().recv(); - let io = local_io(); - let mut stream = io.tcp_connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - } - } - } -} - -#[test] -fn test_simple_tcp_server_and_client_on_diff_threads() { - use std::rt::sleeper_list::SleeperList; - use std::rt::work_queue::WorkQueue; - use std::rt::thread::Thread; - use std::rt::task::Task; - use std::rt::sched::{Shutdown}; - use std::rt::task::UnwindResult; - do run_in_bare_thread { - let sleepers = SleeperList::new(); - - let server_addr = next_test_ip4(); - let client_addr = server_addr.clone(); - - let server_work_queue = WorkQueue::new(); - let client_work_queue = WorkQueue::new(); - let queues = ~[server_work_queue.clone(), client_work_queue.clone()]; - - let sloop = ~UvEventLoop::new() as ~EventLoop; - let mut server_sched = ~Scheduler::new(sloop, server_work_queue, - queues.clone(), sleepers.clone()); - let cloop = ~UvEventLoop::new() as ~EventLoop; - let mut client_sched = ~Scheduler::new(cloop, client_work_queue, - queues.clone(), sleepers.clone()); - - let server_handle = Cell::new(server_sched.make_handle()); - let client_handle = Cell::new(client_sched.make_handle()); - - let server_on_exit: ~fn(UnwindResult) = |exit_status| { - server_handle.take().send(Shutdown); - assert!(exit_status.is_success()); - }; - - let client_on_exit: ~fn(UnwindResult) = |exit_status| { - client_handle.take().send(Shutdown); - assert!(exit_status.is_success()); - }; - - let server_fn: ~fn() = || { - let io = unsafe { local_io() }; - let listener = io.tcp_bind(server_addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert_eq!(nread, 8); - for i in range(0u, nread) { - assert_eq!(buf[i], i as u8); - } - }; - - let client_fn: ~fn() = || { - let io = unsafe { local_io() }; - let mut stream = io.tcp_connect(client_addr); - while stream.is_err() { - stream = io.tcp_connect(client_addr); - } - stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); - }; - - let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn); - server_task.death.on_exit = Some(server_on_exit); - let server_task = Cell::new(server_task); - - let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn); - client_task.death.on_exit = Some(client_on_exit); - let client_task = Cell::new(client_task); - - let server_sched = Cell::new(server_sched); - let client_sched = Cell::new(client_sched); - - let server_thread = do Thread::start { - server_sched.take().bootstrap(server_task.take()); - }; - let client_thread = do Thread::start { - client_sched.take().bootstrap(client_task.take()); - }; - - server_thread.join(); - client_thread.join(); - } -} - -#[test] -fn test_simple_udp_server_and_client() { - do run_in_mt_newsched_task { - let server_addr = next_test_ip4(); - let client_addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); - - do spawntask { - unsafe { - let io = local_io(); - let mut server_socket = io.udp_bind(server_addr).unwrap(); - chan.take().send(()); - let mut buf = [0, .. 2048]; - let (nread,src) = server_socket.recvfrom(buf).unwrap(); - assert_eq!(nread, 8); - for i in range(0u, nread) { - uvdebug!("{}", buf[i]); - assert_eq!(buf[i], i as u8); - } - assert_eq!(src, client_addr); - } - } - - do spawntask { - unsafe { - let io = local_io(); - let mut client_socket = io.udp_bind(client_addr).unwrap(); - port.take().recv(); - client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr); - } - } - } -} - -#[test] #[ignore(reason = "busted")] -fn test_read_and_block() { - do run_in_mt_newsched_task { - let addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); - - do spawntask { - let io = unsafe { local_io() }; - let listener = io.tcp_bind(addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - chan.take().send(()); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; - - let expected = 32; - let mut current = 0; - let mut reads = 0; - - while current < expected { - let nread = stream.read(buf).unwrap(); - for i in range(0u, nread) { - let val = buf[i] as uint; - assert_eq!(val, current % 8); - current += 1; - } - reads += 1; - - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - // Yield to the other task in hopes that it - // will trigger a read callback while we are - // not ready for it - do scheduler.deschedule_running_task_and_then |sched, task| { - let task = Cell::new(task); - sched.enqueue_blocked_task(task.take()); - } - } - } - - // Make sure we had multiple reads - assert!(reads > 1); - } - - do spawntask { - unsafe { - port.take().recv(); - let io = local_io(); - let mut stream = io.tcp_connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - } - } - - } -} - -#[test] -fn test_read_read_read() { - do run_in_mt_newsched_task { - let addr = next_test_ip4(); - static MAX: uint = 500000; - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); - - do spawntask { - unsafe { - let io = local_io(); - let listener = io.tcp_bind(addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - chan.take().send(()); - let mut stream = acceptor.accept().unwrap(); - let buf = [1, .. 2048]; - let mut total_bytes_written = 0; - while total_bytes_written < MAX { - stream.write(buf); - total_bytes_written += buf.len(); - } - } - } - - do spawntask { - unsafe { - port.take().recv(); - let io = local_io(); - let mut stream = io.tcp_connect(addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < MAX { - let nread = stream.read(buf).unwrap(); - uvdebug!("read {} bytes", nread); - total_bytes_read += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } - } - uvdebug!("read {} bytes total", total_bytes_read); - } - } - } -} - -#[test] -#[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send -fn test_udp_twice() { - do run_in_mt_newsched_task { - let server_addr = next_test_ip4(); - let client_addr = next_test_ip4(); - let (port, chan) = oneshot(); - let port = Cell::new(port); - let chan = Cell::new(chan); - - do spawntask { - unsafe { - let io = local_io(); - let mut client = io.udp_bind(client_addr).unwrap(); - port.take().recv(); - assert!(client.sendto([1], server_addr).is_ok()); - assert!(client.sendto([2], server_addr).is_ok()); - } - } - - do spawntask { - unsafe { - let io = local_io(); - let mut server = io.udp_bind(server_addr).unwrap(); - chan.take().send(()); - let mut buf1 = [0]; - let mut buf2 = [0]; - let (nread1, src1) = server.recvfrom(buf1).unwrap(); - let (nread2, src2) = server.recvfrom(buf2).unwrap(); - assert_eq!(nread1, 1); - assert_eq!(nread2, 1); - assert_eq!(src1, client_addr); - assert_eq!(src2, client_addr); - assert_eq!(buf1[0], 1); - assert_eq!(buf2[0], 2); - } - } - } -} - -#[test] -fn test_udp_many_read() { - do run_in_mt_newsched_task { - let server_out_addr = next_test_ip4(); - let server_in_addr = next_test_ip4(); - let client_out_addr = next_test_ip4(); - let client_in_addr = next_test_ip4(); - static MAX: uint = 500_000; - - let (p1, c1) = oneshot(); - let (p2, c2) = oneshot(); - - let first = Cell::new((p1, c2)); - let second = Cell::new((p2, c1)); - - do spawntask { - unsafe { - let io = local_io(); - let mut server_out = io.udp_bind(server_out_addr).unwrap(); - let mut server_in = io.udp_bind(server_in_addr).unwrap(); - let (port, chan) = first.take(); - chan.send(()); - port.recv(); - let msg = [1, .. 2048]; - let mut total_bytes_sent = 0; - let mut buf = [1]; - while buf[0] == 1 { - // send more data - assert!(server_out.sendto(msg, client_in_addr).is_ok()); - total_bytes_sent += msg.len(); - // check if the client has received enough - let res = server_in.recvfrom(buf); - assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(nread, 1); - assert_eq!(src, client_out_addr); - } - assert!(total_bytes_sent >= MAX); - } - } - - do spawntask { - unsafe { - let io = local_io(); - let mut client_out = io.udp_bind(client_out_addr).unwrap(); - let mut client_in = io.udp_bind(client_in_addr).unwrap(); - let (port, chan) = second.take(); - port.recv(); - chan.send(()); - let mut total_bytes_recv = 0; - let mut buf = [0, .. 2048]; - while total_bytes_recv < MAX { - // ask for more - assert!(client_out.sendto([1], server_in_addr).is_ok()); - // wait for data - let res = client_in.recvfrom(buf); - assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(src, server_out_addr); - total_bytes_recv += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } - } - // tell the server we're done - assert!(client_out.sendto([0], server_in_addr).is_ok()); - } - } - } -} - -#[test] -fn test_timer_sleep_simple() { - do run_in_mt_newsched_task { - unsafe { - let io = local_io(); - let timer = io.timer_init(); - do timer.map |mut t| { t.sleep(1) }; - } - } -} - -fn file_test_uvio_full_simple_impl() { - use std::rt::io::{Open, ReadWrite, Read}; - unsafe { - let io = local_io(); - let write_val = "hello uvio!"; - let path = "./tmp/file_test_uvio_full.txt"; - { - let create_fm = Open; - let create_fa = ReadWrite; - let mut fd = io.fs_open(&path.to_c_str(), create_fm, create_fa).unwrap(); - let write_buf = write_val.as_bytes(); - fd.write(write_buf); - } - { - let ro_fm = Open; - let ro_fa = Read; - let mut fd = io.fs_open(&path.to_c_str(), ro_fm, ro_fa).unwrap(); - let mut read_vec = [0, .. 1028]; - let nread = fd.read(read_vec).unwrap(); - let read_val = str::from_utf8(read_vec.slice(0, nread as uint)); - assert!(read_val == write_val.to_owned()); - } - io.fs_unlink(&path.to_c_str()); - } -} - -#[test] -fn file_test_uvio_full_simple() { - do run_in_mt_newsched_task { - file_test_uvio_full_simple_impl(); - } -} - -fn uvio_naive_print(input: &str) { - unsafe { - use std::libc::{STDOUT_FILENO}; - let io = local_io(); - { - let mut fd = io.fs_from_raw_fd(STDOUT_FILENO, DontClose); - let write_buf = input.as_bytes(); - fd.write(write_buf); - } - } -} - -#[test] -fn file_test_uvio_write_to_stdout() { - do run_in_mt_newsched_task { - uvio_naive_print("jubilation\n"); - } -}