diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index 04c1c5a9fb8c..f4c12c6f2a39 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -11,12 +11,10 @@ use ai = std::io::net::addrinfo; use std::libc::c_int; use std::ptr::null; -use std::rt::BlockedTask; -use std::rt::local::Local; -use std::rt::sched::Scheduler; +use std::rt::task::BlockedTask; use net; -use super::{Loop, UvError, Request, wait_until_woken_after}; +use super::{Loop, UvError, Request, wait_until_woken_after, wakeup}; use uvll; struct Addrinfo { @@ -108,8 +106,7 @@ impl GetAddrInfoRequest { cx.status = status; cx.addrinfo = Some(Addrinfo { handle: res }); - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(cx.slot.take_unwrap()); + wakeup(&mut cx.slot); } } } @@ -188,7 +185,6 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] { #[cfg(test, not(target_os="android"))] mod test { use std::io::net::ip::{SocketAddr, Ipv4Addr}; - use super::*; use super::super::local_loop; #[test] diff --git a/src/librustuv/async.rs b/src/librustuv/async.rs index 5a0db8313fb5..2d770ff6be1f 100644 --- a/src/librustuv/async.rs +++ b/src/librustuv/async.rs @@ -129,7 +129,6 @@ mod test_remote { use std::rt::thread::Thread; use std::rt::tube::Tube; - use super::*; use super::super::local_loop; // Make sure that we can fire watchers in remote threads and that they diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index 3a8af71e019e..cebf4f199e49 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -14,15 +14,15 @@ use std::cast::transmute; use std::cast; use std::libc::{c_int, c_char, c_void, size_t}; use std::libc; -use std::rt::BlockedTask; +use std::rt::task::BlockedTask; use std::io::{FileStat, IoError}; use std::io; -use std::rt::local::Local; use std::rt::rtio; -use std::rt::sched::{Scheduler, SchedHandle}; +use std::vec; -use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after}; -use uvio::HomingIO; +use homing::{HomingIO, HomeHandle}; +use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after, wakeup}; +use uvio::UvIoFactory; use uvll; pub struct FsRequest { @@ -34,19 +34,19 @@ pub struct FileWatcher { priv loop_: Loop, priv fd: c_int, priv close: rtio::CloseBehavior, - priv home: SchedHandle, + priv home: HomeHandle, } impl FsRequest { - pub fn open(loop_: &Loop, path: &CString, flags: int, mode: int) + pub fn open(io: &mut UvIoFactory, path: &CString, flags: int, mode: int) -> Result { execute(|req, cb| unsafe { - uvll::uv_fs_open(loop_.handle, + uvll::uv_fs_open(io.uv_loop(), req, path.with_ref(|p| p), flags as c_int, mode as c_int, cb) }).map(|req| - FileWatcher::new(*loop_, req.get_result() as c_int, + FileWatcher::new(io, req.get_result() as c_int, rtio::CloseSynchronously) ) } @@ -320,8 +320,7 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int) let slot: &mut Option = unsafe { cast::transmute(uvll::get_data_for_req(req)) }; - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(slot.take_unwrap()); + wakeup(slot); } } @@ -331,16 +330,17 @@ fn execute_nop(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int) } impl HomingIO for FileWatcher { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl FileWatcher { - pub fn new(loop_: Loop, fd: c_int, close: rtio::CloseBehavior) -> FileWatcher { + pub fn new(io: &mut UvIoFactory, fd: c_int, + close: rtio::CloseBehavior) -> FileWatcher { FileWatcher { - loop_: loop_, + loop_: Loop::wrap(io.uv_loop()), fd: fd, close: close, - home: get_handle_to_current_scheduler!() + home: io.make_handle(), } } @@ -448,7 +448,6 @@ mod test { use std::io; use std::str; use std::vec; - use super::*; use l = super::super::local_loop; #[test] diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs new file mode 100644 index 000000000000..7a12c4ad06d6 --- /dev/null +++ b/src/librustuv/homing.rs @@ -0,0 +1,144 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Homing I/O implementation +//! +//! In libuv, whenever a handle is created on an I/O loop it is illegal to use +//! that handle outside of that I/O loop. We use libuv I/O with our green +//! scheduler, and each green scheduler corresponds to a different I/O loop on a +//! different OS thread. Green tasks are also free to roam among schedulers, +//! which implies that it is possible to create an I/O handle on one event loop +//! and then attempt to use it on another. +//! +//! In order to solve this problem, this module implements the notion of a +//! "homing operation" which will transplant a task from its currently running +//! scheduler back onto the original I/O loop. This is accomplished entirely at +//! the librustuv layer with very little cooperation from the scheduler (which +//! we don't even know exists technically). +//! +//! These homing operations are completed by first realizing that we're on the +//! wrong I/O loop, then descheduling ourselves, sending ourselves to the +//! correct I/O loop, and then waking up the I/O loop in order to process its +//! local queue of tasks which need to run. +//! +//! This enqueueing is done with a concurrent queue from libstd, and the +//! signalling is achieved with an async handle. + +use std::rt::local::Local; +use std::rt::rtio::LocalIo; +use std::rt::task::{Task, BlockedTask}; + +use ForbidUnwind; +use queue::{Queue, QueuePool}; + +/// A handle to a remote libuv event loop. This handle will keep the event loop +/// alive while active in order to ensure that a homing operation can always be +/// completed. +/// +/// Handles are clone-able in order to derive new handles from existing handles +/// (very useful for when accepting a socket from a server). +pub struct HomeHandle { + priv queue: Queue, + priv id: uint, +} + +impl HomeHandle { + pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle { + HomeHandle { queue: pool.queue(), id: id } + } + + fn send(&mut self, task: BlockedTask) { + self.queue.push(task); + } +} + +impl Clone for HomeHandle { + fn clone(&self) -> HomeHandle { + HomeHandle { + queue: self.queue.clone(), + id: self.id, + } + } +} + +pub trait HomingIO { + fn home<'r>(&'r mut self) -> &'r mut HomeHandle; + + /// This function will move tasks to run on their home I/O scheduler. Note + /// that this function does *not* pin the task to the I/O scheduler, but + /// rather it simply moves it to running on the I/O scheduler. + fn go_to_IO_home(&mut self) -> uint { + let _f = ForbidUnwind::new("going home"); + + let mut cur_task: ~Task = Local::take(); + let cur_loop_id = { + let mut io = cur_task.local_io().expect("libuv must have I/O"); + io.get().id() + }; + + // Try at all costs to avoid the homing operation because it is quite + // expensive. Hence, we only deschedule/send if we're not on the correct + // event loop. If we're already on the home event loop, then we're good + // to go (remember we have no preemption, so we're guaranteed to stay on + // this event loop as long as we avoid the scheduler). + if cur_loop_id != self.home().id { + cur_task.deschedule(1, |task| { + self.home().send(task); + Ok(()) + }); + + // Once we wake up, assert that we're in the right location + let cur_loop_id = { + let mut io = LocalIo::borrow().expect("libuv must have I/O"); + io.get().id() + }; + assert_eq!(cur_loop_id, self.home().id); + + cur_loop_id + } else { + Local::put(cur_task); + cur_loop_id + } + } + + /// Fires a single homing missile, returning another missile targeted back + /// at the original home of this task. In other words, this function will + /// move the local task to its I/O scheduler and then return an RAII wrapper + /// which will return the task home. + fn fire_homing_missile(&mut self) -> HomingMissile { + HomingMissile { io_home: self.go_to_IO_home() } + } +} + +/// After a homing operation has been completed, this will return the current +/// task back to its appropriate home (if applicable). The field is used to +/// assert that we are where we think we are. +struct HomingMissile { + priv io_home: uint, +} + +impl HomingMissile { + /// Check at runtime that the task has *not* transplanted itself to a + /// different I/O loop while executing. + pub fn check(&self, msg: &'static str) { + let mut io = LocalIo::borrow().expect("libuv must have I/O"); + assert!(io.get().id() == self.io_home, "{}", msg); + } +} + +impl Drop for HomingMissile { + fn drop(&mut self) { + let _f = ForbidUnwind::new("leaving home"); + + // It would truly be a sad day if we had moved off the home I/O + // scheduler while we were doing I/O. + self.check("task moved away from the home scheduler"); + } +} diff --git a/src/librustuv/idle.rs b/src/librustuv/idle.rs index 32c7699a3084..2445932c026b 100644 --- a/src/librustuv/idle.rs +++ b/src/librustuv/idle.rs @@ -97,7 +97,6 @@ impl Drop for IdleWatcher { #[cfg(test)] mod test { - use super::*; use std::rt::tube::Tube; use std::rt::rtio::{Callback, PausableIdleCallback}; use super::super::local_loop; diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 2e5b7254769f..2715f0bd02a1 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -41,23 +41,22 @@ via `close` and `delete` methods. #[crate_type = "rlib"]; #[crate_type = "dylib"]; -#[feature(macro_rules, globs)]; +#[feature(macro_rules)]; -use std::cast::transmute; use std::cast; +use std::io; +use std::io::IoError; use std::libc::{c_int, malloc}; use std::ptr::null; use std::ptr; -use std::rt::BlockedTask; use std::rt::local::Local; -use std::rt::sched::Scheduler; +use std::rt::task::{BlockedTask, Task}; +use std::rt::rtio::LocalIo; use std::str::raw::from_c_str; use std::str; use std::task; use std::unstable::finally::Finally; -use std::io::IoError; - pub use self::async::AsyncWatcher; pub use self::file::{FsRequest, FileWatcher}; pub use self::idle::IdleWatcher; @@ -70,6 +69,9 @@ pub use self::tty::TtyWatcher; mod macros; +mod queue; +mod homing; + /// The implementation of `rtio` for libuv pub mod uvio; @@ -144,32 +146,31 @@ pub trait UvHandle { uvll::free_handle(handle); if data == ptr::null() { return } let slot: &mut Option = cast::transmute(data); - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(slot.take_unwrap()); + wakeup(slot); } } } } pub struct ForbidSwitch { - msg: &'static str, - sched: uint, + priv msg: &'static str, + priv io: uint, } impl ForbidSwitch { fn new(s: &'static str) -> ForbidSwitch { - let mut sched = Local::borrow(None::); + let mut io = LocalIo::borrow().expect("libuv must have local I/O"); ForbidSwitch { msg: s, - sched: sched.get().sched_id(), + io: io.get().id(), } } } impl Drop for ForbidSwitch { fn drop(&mut self) { - let mut sched = Local::borrow(None::); - assert!(self.sched == sched.get().sched_id(), + let mut io = LocalIo::borrow().expect("libuv must have local I/O"); + assert!(self.io == io.get().id(), "didnt want a scheduler switch: {}", self.msg); } @@ -199,14 +200,20 @@ fn wait_until_woken_after(slot: *mut Option, f: ||) { let _f = ForbidUnwind::new("wait_until_woken_after"); unsafe { assert!((*slot).is_none()); - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|_, task| { - f(); + let task: ~Task = Local::take(); + task.deschedule(1, |task| { *slot = Some(task); - }) + f(); + Ok(()) + }); } } +fn wakeup(slot: &mut Option) { + assert!(slot.is_some()); + slot.take_unwrap().wake().map(|t| t.reawaken(true)); +} + pub struct Request { handle: *uvll::uv_req_t, priv defused: bool, @@ -325,28 +332,26 @@ fn error_smoke_test() { pub fn uv_error_to_io_error(uverr: UvError) -> IoError { unsafe { // Importing error constants - use uvll::*; - use std::io::*; // uv error descriptions are static let c_desc = uvll::uv_strerror(*uverr); let desc = str::raw::c_str_to_static_slice(c_desc); let kind = match *uverr { - UNKNOWN => OtherIoError, - OK => OtherIoError, - EOF => EndOfFile, - EACCES => PermissionDenied, - ECONNREFUSED => ConnectionRefused, - ECONNRESET => ConnectionReset, - ENOENT => FileNotFound, - ENOTCONN => NotConnected, - EPIPE => BrokenPipe, - ECONNABORTED => ConnectionAborted, + uvll::UNKNOWN => io::OtherIoError, + uvll::OK => io::OtherIoError, + uvll::EOF => io::EndOfFile, + uvll::EACCES => io::PermissionDenied, + uvll::ECONNREFUSED => io::ConnectionRefused, + uvll::ECONNRESET => io::ConnectionReset, + uvll::ENOTCONN => io::NotConnected, + uvll::ENOENT => io::FileNotFound, + uvll::EPIPE => io::BrokenPipe, + uvll::ECONNABORTED => io::ConnectionAborted, err => { uvdebug!("uverr.code {}", err as int); // XXX: Need to map remaining uv error types - OtherIoError + io::OtherIoError } }; diff --git a/src/librustuv/macros.rs b/src/librustuv/macros.rs index a63dcc6de310..61b4de576559 100644 --- a/src/librustuv/macros.rs +++ b/src/librustuv/macros.rs @@ -27,18 +27,21 @@ macro_rules! uvdebug ( }) ) -// get a handle for the current scheduler -macro_rules! get_handle_to_current_scheduler( - () => ({ - let mut sched = Local::borrow(None::); - sched.get().make_handle() - }) -) - pub fn dumb_println(args: &fmt::Arguments) { - use std::io::native::file::FileDesc; use std::io; use std::libc; - let mut out = FileDesc::new(libc::STDERR_FILENO, false); - fmt::writeln(&mut out as &mut io::Writer, args); + use std::vec; + + struct Stderr; + impl io::Writer for Stderr { + fn write(&mut self, data: &[u8]) { + unsafe { + libc::write(libc::STDERR_FILENO, + vec::raw::to_ptr(data) as *libc::c_void, + data.len() as libc::size_t); + } + } + } + let mut w = Stderr; + fmt::writeln(&mut w as &mut io::Writer, args); } diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index ce543eafd2f6..caf9ee0aac63 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -9,24 +9,22 @@ // except according to those terms. use std::cast; -use std::libc; -use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char}; -use std::ptr; -use std::rt::BlockedTask; use std::io::IoError; use std::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr}; -use std::rt::local::Local; +use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char}; +use std::libc; +use std::ptr; use std::rt::rtio; -use std::rt::sched::{Scheduler, SchedHandle}; -use std::rt::tube::Tube; +use std::rt::task::BlockedTask; use std::str; use std::vec; +use homing::{HomingIO, HomeHandle}; use stream::StreamWatcher; use super::{Loop, Request, UvError, Buf, status_to_io_result, uv_error_to_io_error, UvHandle, slice_to_uv_buf, - wait_until_woken_after}; -use uvio::HomingIO; + wait_until_woken_after, wakeup}; +use uvio::UvIoFactory; use uvll; use uvll::sockaddr; @@ -145,42 +143,47 @@ fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result, - priv outgoing: Tube>, + priv outgoing: Chan>, + priv incoming: Port>, } pub struct TcpAcceptor { listener: ~TcpListener, - priv incoming: Tube>, } // TCP watchers (clients/streams) impl TcpWatcher { - pub fn new(loop_: &Loop) -> TcpWatcher { + pub fn new(io: &mut UvIoFactory) -> TcpWatcher { + let handle = io.make_handle(); + TcpWatcher::new_home(&io.loop_, handle) + } + + fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher { let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; assert_eq!(unsafe { uvll::uv_tcp_init(loop_.handle, handle) }, 0); TcpWatcher { - home: get_handle_to_current_scheduler!(), + home: home, handle: handle, stream: StreamWatcher::new(handle), } } - pub fn connect(loop_: &mut Loop, address: SocketAddr) + pub fn connect(io: &mut UvIoFactory, address: SocketAddr) -> Result { struct Ctx { status: c_int, task: Option } - let tcp = TcpWatcher::new(loop_); + let tcp = TcpWatcher::new(io); let ret = socket_addr_as_sockaddr(address, |addr| { let mut req = Request::new(uvll::UV_CONNECT); let result = unsafe { @@ -213,14 +216,13 @@ impl TcpWatcher { assert!(status != uvll::ECANCELED); let cx: &mut Ctx = unsafe { req.get_data() }; cx.status = status; - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(cx.task.take_unwrap()); + wakeup(&mut cx.task); } } } impl HomingIO for TcpWatcher { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl rtio::RtioSocket for TcpWatcher { @@ -290,17 +292,19 @@ impl Drop for TcpWatcher { // TCP listeners (unbound servers) impl TcpListener { - pub fn bind(loop_: &mut Loop, address: SocketAddr) + pub fn bind(io: &mut UvIoFactory, address: SocketAddr) -> Result<~TcpListener, UvError> { let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; assert_eq!(unsafe { - uvll::uv_tcp_init(loop_.handle, handle) + uvll::uv_tcp_init(io.uv_loop(), handle) }, 0); + let (port, chan) = Chan::new(); let l = ~TcpListener { - home: get_handle_to_current_scheduler!(), + home: io.make_handle(), handle: handle, closing_task: None, - outgoing: Tube::new(), + outgoing: chan, + incoming: port, }; let res = socket_addr_as_sockaddr(address, |addr| unsafe { uvll::uv_tcp_bind(l.handle, addr) @@ -313,7 +317,7 @@ impl TcpListener { } impl HomingIO for TcpListener { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl UvHandle for TcpListener { @@ -330,11 +334,7 @@ impl rtio::RtioSocket for TcpListener { impl rtio::RtioTcpListener for TcpListener { fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> { // create the acceptor object from ourselves - let incoming = self.outgoing.clone(); - let mut acceptor = ~TcpAcceptor { - listener: self, - incoming: incoming, - }; + let mut acceptor = ~TcpAcceptor { listener: self }; let _m = acceptor.fire_homing_missile(); // XXX: the 128 backlog should be configurable @@ -347,19 +347,18 @@ impl rtio::RtioTcpListener for TcpListener { extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) { assert!(status != uvll::ECANCELED); + let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(server) }); - let client = TcpWatcher::new(&loop_); + let client = TcpWatcher::new_home(&loop_, tcp.home().clone()); assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0); Ok(~client as ~rtio::RtioTcpStream) } n => Err(uv_error_to_io_error(UvError(n))) }; - - let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) }; tcp.outgoing.send(msg); } @@ -373,7 +372,7 @@ impl Drop for TcpListener { // TCP acceptors (bound servers) impl HomingIO for TcpAcceptor { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() } } impl rtio::RtioSocket for TcpAcceptor { @@ -385,8 +384,7 @@ impl rtio::RtioSocket for TcpAcceptor { impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> { - let _m = self.fire_homing_missile(); - self.incoming.recv() + self.listener.incoming.recv() } fn accept_simultaneously(&mut self) -> Result<(), IoError> { @@ -410,18 +408,18 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { pub struct UdpWatcher { handle: *uvll::uv_udp_t, - home: SchedHandle, + home: HomeHandle, } impl UdpWatcher { - pub fn bind(loop_: &Loop, address: SocketAddr) + pub fn bind(io: &mut UvIoFactory, address: SocketAddr) -> Result { let udp = UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, - home: get_handle_to_current_scheduler!(), + home: io.make_handle(), }; assert_eq!(unsafe { - uvll::uv_udp_init(loop_.handle, udp.handle) + uvll::uv_udp_init(io.uv_loop(), udp.handle) }, 0); let result = socket_addr_as_sockaddr(address, |addr| unsafe { uvll::uv_udp_bind(udp.handle, addr, 0u32) @@ -438,7 +436,7 @@ impl UvHandle for UdpWatcher { } impl HomingIO for UdpWatcher { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl rtio::RtioSocket for UdpWatcher { @@ -519,9 +517,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { Some(sockaddr_to_socket_addr(addr)) }; cx.result = Some((nread, addr)); - - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(cx.task.take_unwrap()); + wakeup(&mut cx.task); } } @@ -556,9 +552,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { assert!(status != uvll::ECANCELED); let cx: &mut Ctx = unsafe { req.get_data() }; cx.result = status; - - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(cx.task.take_unwrap()); + wakeup(&mut cx.task); } } @@ -646,12 +640,10 @@ impl Drop for UdpWatcher { #[cfg(test)] mod test { - use std::rt::test::*; use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor, RtioUdpSocket}; use std::task; - use super::*; use super::super::local_loop; #[test] @@ -824,7 +816,6 @@ mod test { #[test] fn test_read_read_read() { - use std::rt::rtio::*; let addr = next_test_ip4(); static MAX: uint = 5000; let (port, chan) = Chan::new(); diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 814205cbbf1c..6975ef26bd76 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -9,35 +9,33 @@ // except according to those terms. use std::c_str::CString; -use std::libc; -use std::rt::BlockedTask; use std::io::IoError; -use std::rt::local::Local; +use std::libc; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; -use std::rt::sched::{Scheduler, SchedHandle}; -use std::rt::tube::Tube; +use std::rt::task::BlockedTask; +use homing::{HomingIO, HomeHandle}; use stream::StreamWatcher; use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, - wait_until_woken_after}; -use uvio::HomingIO; + wait_until_woken_after, wakeup}; +use uvio::UvIoFactory; use uvll; pub struct PipeWatcher { stream: StreamWatcher, - home: SchedHandle, + home: HomeHandle, priv defused: bool, } pub struct PipeListener { - home: SchedHandle, + home: HomeHandle, pipe: *uvll::uv_pipe_t, - priv outgoing: Tube>, + priv outgoing: Chan>, + priv incoming: Port>, } pub struct PipeAcceptor { listener: ~PipeListener, - priv incoming: Tube>, } // PipeWatcher implementation and traits @@ -46,7 +44,12 @@ impl PipeWatcher { // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to // get bound to some other source (this is normally a helper method paired // with another call). - pub fn new(loop_: &Loop, ipc: bool) -> PipeWatcher { + pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher { + let home = io.make_handle(); + PipeWatcher::new_home(&io.loop_, home, ipc) + } + + pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher { let handle = unsafe { let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE); assert!(!handle.is_null()); @@ -56,26 +59,28 @@ impl PipeWatcher { }; PipeWatcher { stream: StreamWatcher::new(handle), - home: get_handle_to_current_scheduler!(), + home: home, defused: false, } } - pub fn open(loop_: &Loop, file: libc::c_int) -> Result + pub fn open(io: &mut UvIoFactory, file: libc::c_int) + -> Result { - let pipe = PipeWatcher::new(loop_, false); + let pipe = PipeWatcher::new(io, false); match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } { 0 => Ok(pipe), n => Err(UvError(n)) } } - pub fn connect(loop_: &Loop, name: &CString) -> Result + pub fn connect(io: &mut UvIoFactory, name: &CString) + -> Result { struct Ctx { task: Option, result: libc::c_int, } let mut cx = Ctx { task: None, result: 0 }; let mut req = Request::new(uvll::UV_CONNECT); - let pipe = PipeWatcher::new(loop_, false); + let pipe = PipeWatcher::new(io, false); wait_until_woken_after(&mut cx.task, || { unsafe { @@ -97,8 +102,7 @@ impl PipeWatcher { assert!(status != uvll::ECANCELED); let cx: &mut Ctx = unsafe { req.get_data() }; cx.result = status; - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(cx.task.take_unwrap()); + wakeup(&mut cx.task); } } @@ -125,7 +129,7 @@ impl RtioPipe for PipeWatcher { } impl HomingIO for PipeWatcher { - fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home } + fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home } } impl UvHandle for PipeWatcher { @@ -144,8 +148,10 @@ impl Drop for PipeWatcher { // PipeListener implementation and traits impl PipeListener { - pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> { - let pipe = PipeWatcher::new(loop_, false); + pub fn bind(io: &mut UvIoFactory, name: &CString) + -> Result<~PipeListener, UvError> + { + let pipe = PipeWatcher::new(io, false); match unsafe { uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p)) } { @@ -153,10 +159,12 @@ impl PipeListener { // If successful, unwrap the PipeWatcher because we control how // we close the pipe differently. We can't rely on // StreamWatcher's default close method. + let (port, chan) = Chan::new(); let p = ~PipeListener { - home: get_handle_to_current_scheduler!(), + home: io.make_handle(), pipe: pipe.unwrap(), - outgoing: Tube::new(), + incoming: port, + outgoing: chan, }; Ok(p.install()) } @@ -168,11 +176,7 @@ impl PipeListener { impl RtioUnixListener for PipeListener { fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> { // create the acceptor object from ourselves - let incoming = self.outgoing.clone(); - let mut acceptor = ~PipeAcceptor { - listener: self, - incoming: incoming, - }; + let mut acceptor = ~PipeAcceptor { listener: self }; let _m = acceptor.fire_homing_missile(); // XXX: the 128 backlog should be configurable @@ -184,7 +188,7 @@ impl RtioUnixListener for PipeListener { } impl HomingIO for PipeListener { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl UvHandle for PipeListener { @@ -193,20 +197,20 @@ impl UvHandle for PipeListener { extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { assert!(status != uvll::ECANCELED); + + let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) }; let msg = match status { 0 => { let loop_ = Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(server) }); - let client = PipeWatcher::new(&loop_, false); + let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false); assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0); Ok(~client as ~RtioPipe) } n => Err(uv_error_to_io_error(UvError(n))) }; - - let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) }; - pipe.outgoing.send(msg); + pipe.outgoing.send_deferred(msg); } impl Drop for PipeListener { @@ -220,13 +224,12 @@ impl Drop for PipeListener { impl RtioUnixAcceptor for PipeAcceptor { fn accept(&mut self) -> Result<~RtioPipe, IoError> { - let _m = self.fire_homing_missile(); - self.incoming.recv() + self.listener.incoming.recv() } } impl HomingIO for PipeAcceptor { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home } } #[cfg(test)] @@ -234,7 +237,6 @@ mod tests { use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe}; use std::rt::test::next_test_unix; - use super::*; use super::super::local_loop; #[test] diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index 9e359e26f03d..7b7a16d7084e 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -8,32 +8,31 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use std::io::IoError; +use std::io::process; use std::libc::c_int; use std::libc; use std::ptr; -use std::rt::BlockedTask; -use std::io::IoError; -use std::io::process::*; -use std::rt::local::Local; use std::rt::rtio::RtioProcess; -use std::rt::sched::{Scheduler, SchedHandle}; +use std::rt::task::BlockedTask; use std::vec; -use super::{Loop, UvHandle, UvError, uv_error_to_io_error, - wait_until_woken_after}; -use uvio::HomingIO; -use uvll; +use homing::{HomingIO, HomeHandle}; use pipe::PipeWatcher; +use super::{UvHandle, UvError, uv_error_to_io_error, + wait_until_woken_after, wakeup}; +use uvio::UvIoFactory; +use uvll; pub struct Process { handle: *uvll::uv_process_t, - home: SchedHandle, + home: HomeHandle, /// Task to wake up (may be null) for when the process exits to_wake: Option, /// Collected from the exit_cb - exit_status: Option, + exit_status: Option, } impl Process { @@ -41,7 +40,7 @@ impl Process { /// /// Returns either the corresponding process object or an error which /// occurred. - pub fn spawn(loop_: &Loop, config: ProcessConfig) + pub fn spawn(io_loop: &mut UvIoFactory, config: process::ProcessConfig) -> Result<(~Process, ~[Option]), UvError> { let cwd = config.cwd.map(|s| s.to_c_str()); @@ -52,7 +51,7 @@ impl Process { stdio.set_len(io.len()); for (slot, other) in stdio.iter().zip(io.iter()) { let io = set_stdio(slot as *uvll::uv_stdio_container_t, other, - loop_); + io_loop); ret_io.push(io); } } @@ -78,12 +77,12 @@ impl Process { let handle = UvHandle::alloc(None::, uvll::UV_PROCESS); let process = ~Process { handle: handle, - home: get_handle_to_current_scheduler!(), + home: io_loop.make_handle(), to_wake: None, exit_status: None, }; match unsafe { - uvll::uv_spawn(loop_.handle, handle, &options) + uvll::uv_spawn(io_loop.uv_loop(), handle, &options) } { 0 => Ok(process.install()), err => Err(UvError(err)), @@ -105,33 +104,28 @@ extern fn on_exit(handle: *uvll::uv_process_t, assert!(p.exit_status.is_none()); p.exit_status = Some(match term_signal { - 0 => ExitStatus(exit_status as int), - n => ExitSignal(n as int), + 0 => process::ExitStatus(exit_status as int), + n => process::ExitSignal(n as int), }); - match p.to_wake.take() { - Some(task) => { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task); - } - None => {} - } + if p.to_wake.is_none() { return } + wakeup(&mut p.to_wake); } unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, - io: &StdioContainer, - loop_: &Loop) -> Option { + io: &process::StdioContainer, + io_loop: &mut UvIoFactory) -> Option { match *io { - Ignored => { + process::Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); None } - InheritFd(fd) => { + process::InheritFd(fd) => { uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD); uvll::set_stdio_container_fd(dst, fd); None } - CreatePipe(readable, writable) => { + process::CreatePipe(readable, writable) => { let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; if readable { flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; @@ -139,7 +133,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, if writable { flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; } - let pipe = PipeWatcher::new(loop_, false); + let pipe = PipeWatcher::new(io_loop, false); uvll::set_stdio_container_flags(dst, flags); uvll::set_stdio_container_stream(dst, pipe.handle()); Some(pipe) @@ -186,7 +180,7 @@ fn with_env(env: Option<&[(~str, ~str)]>, f: |**libc::c_char| -> T) -> T { } impl HomingIO for Process { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl UvHandle for Process { @@ -208,7 +202,7 @@ impl RtioProcess for Process { } } - fn wait(&mut self) -> ProcessExit { + fn wait(&mut self) -> process::ProcessExit { // Make sure (on the home scheduler) that we have an exit status listed let _m = self.fire_homing_missile(); match self.exit_status { diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs new file mode 100644 index 000000000000..22e7925b2110 --- /dev/null +++ b/src/librustuv/queue.rs @@ -0,0 +1,184 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A concurrent queue used to signal remote event loops +//! +//! This queue implementation is used to send tasks among event loops. This is +//! backed by a multi-producer/single-consumer queue from libstd and uv_async_t +//! handles (to wake up a remote event loop). +//! +//! The uv_async_t is stored next to the event loop, so in order to not keep the +//! event loop alive we use uv_ref and uv_unref in order to control when the +//! async handle is active or not. + +use std::cast; +use std::libc::{c_void, c_int}; +use std::rt::task::BlockedTask; +use std::unstable::sync::LittleLock; +use mpsc = std::sync::mpsc_queue; + +use async::AsyncWatcher; +use super::{Loop, UvHandle}; +use uvll; + +enum Message { + Task(BlockedTask), + Increment, + Decrement, +} + +struct State { + handle: *uvll::uv_async_t, + lock: LittleLock, // see comments in async_cb for why this is needed +} + +/// This structure is intended to be stored next to the event loop, and it is +/// used to create new `Queue` structures. +pub struct QueuePool { + priv producer: mpsc::Producer, + priv consumer: mpsc::Consumer, + priv refcnt: uint, +} + +/// This type is used to send messages back to the original event loop. +pub struct Queue { + priv queue: mpsc::Producer, +} + +extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { + assert_eq!(status, 0); + let state: &mut QueuePool = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(handle)) + }; + let packet = unsafe { state.consumer.packet() }; + + // Remember that there is no guarantee about how many times an async + // callback is called with relation to the number of sends, so process the + // entire queue in a loop. + loop { + match state.consumer.pop() { + mpsc::Data(Task(task)) => { + task.wake().map(|t| t.reawaken(true)); + } + mpsc::Data(Increment) => unsafe { + if state.refcnt == 0 { + uvll::uv_ref((*packet).handle); + } + state.refcnt += 1; + }, + mpsc::Data(Decrement) => unsafe { + state.refcnt -= 1; + if state.refcnt == 0 { + uvll::uv_unref((*packet).handle); + } + }, + mpsc::Empty | mpsc::Inconsistent => break + }; + } + + // If the refcount is now zero after processing the queue, then there is no + // longer a reference on the async handle and it is possible that this event + // loop can exit. What we're not guaranteed, however, is that a producer in + // the middle of dropping itself is yet done with the handle. It could be + // possible that we saw their Decrement message but they have yet to signal + // on the async handle. If we were to return immediately, the entire uv loop + // could be destroyed meaning the call to uv_async_send would abort() + // + // In order to fix this, an OS mutex is used to wait for the other end to + // finish before we continue. The drop block on a handle will acquire a + // mutex and then drop it after both the push and send have been completed. + // If we acquire the mutex here, then we are guaranteed that there are no + // longer any senders which are holding on to their handles, so we can + // safely allow the event loop to exit. + if state.refcnt == 0 { + unsafe { + let _l = (*packet).lock.lock(); + } + } +} + +impl QueuePool { + pub fn new(loop_: &mut Loop) -> ~QueuePool { + let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); + let (c, p) = mpsc::queue(State { + handle: handle, + lock: LittleLock::new(), + }); + let q = ~QueuePool { + producer: p, + consumer: c, + refcnt: 0, + }; + + unsafe { + assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0); + uvll::uv_unref(handle); + let data: *c_void = *cast::transmute::<&~QueuePool, &*c_void>(&q); + uvll::set_data_for_uv_handle(handle, data); + } + + return q; + } + + pub fn queue(&mut self) -> Queue { + unsafe { + if self.refcnt == 0 { + uvll::uv_ref((*self.producer.packet()).handle); + } + self.refcnt += 1; + } + Queue { queue: self.producer.clone() } + } +} + +impl Queue { + pub fn push(&mut self, task: BlockedTask) { + self.queue.push(Task(task)); + unsafe { + uvll::uv_async_send((*self.queue.packet()).handle); + } + } +} + +impl Clone for Queue { + fn clone(&self) -> Queue { + // Push a request to increment on the queue, but there's no need to + // signal the event loop to process it at this time. We're guaranteed + // that the count is at least one (because we have a queue right here), + // and if the queue is dropped later on it'll see the increment for the + // decrement anyway. + unsafe { + cast::transmute_mut(self).queue.push(Increment); + } + Queue { queue: self.queue.clone() } + } +} + +impl Drop for Queue { + fn drop(&mut self) { + // See the comments in the async_cb function for why there is a lock + // that is acquired only on a drop. + unsafe { + let state = self.queue.packet(); + let _l = (*state).lock.lock(); + self.queue.push(Decrement); + uvll::uv_async_send((*state).handle); + } + } +} + +impl Drop for State { + fn drop(&mut self) { + unsafe { + uvll::uv_close(self.handle, cast::transmute(0)); + uvll::free_handle(self.handle); + } + } +} diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index f082aef003c6..27dbc0fe3bb0 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -10,34 +10,33 @@ use std::libc::c_int; use std::io::signal::Signum; -use std::rt::sched::{SchedHandle, Scheduler}; use std::comm::SharedChan; -use std::rt::local::Local; use std::rt::rtio::RtioSignal; -use super::{Loop, UvError, UvHandle}; +use homing::{HomingIO, HomeHandle}; +use super::{UvError, UvHandle}; use uvll; -use uvio::HomingIO; +use uvio::UvIoFactory; pub struct SignalWatcher { handle: *uvll::uv_signal_t, - home: SchedHandle, + home: HomeHandle, channel: SharedChan, signal: Signum, } impl SignalWatcher { - pub fn new(loop_: &mut Loop, signum: Signum, + pub fn new(io: &mut UvIoFactory, signum: Signum, channel: SharedChan) -> Result<~SignalWatcher, UvError> { let s = ~SignalWatcher { handle: UvHandle::alloc(None::, uvll::UV_SIGNAL), - home: get_handle_to_current_scheduler!(), + home: io.make_handle(), channel: channel, signal: signum, }; assert_eq!(unsafe { - uvll::uv_signal_init(loop_.handle, s.handle) + uvll::uv_signal_init(io.uv_loop(), s.handle) }, 0); match unsafe { @@ -57,7 +56,7 @@ extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { } impl HomingIO for SignalWatcher { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl UvHandle for SignalWatcher { @@ -75,7 +74,6 @@ impl Drop for SignalWatcher { #[cfg(test)] mod test { - use super::*; use super::super::local_loop; use std::io::signal; diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index 0304b89dd6fd..73173fc677e8 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -11,12 +11,10 @@ use std::cast; use std::libc::{c_int, size_t, ssize_t}; use std::ptr; -use std::rt::BlockedTask; -use std::rt::local::Local; -use std::rt::sched::Scheduler; +use std::rt::task::BlockedTask; use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, - ForbidUnwind}; + ForbidUnwind, wakeup}; use uvll; // This is a helper structure which is intended to get embedded into other @@ -164,8 +162,7 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) { unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); } rcx.result = nread; - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap()); + wakeup(&mut rcx.task); } // Unlike reading, the WriteContext is stored in the uv_write_t request. Like @@ -180,6 +177,5 @@ extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { wcx.result = status; req.defuse(); - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(wcx.task.take_unwrap()); + wakeup(&mut wcx.task); } diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index ab143d6e8b07..1e70c5c55e06 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -9,19 +9,19 @@ // except according to those terms. use std::libc::c_int; -use std::rt::BlockedTask; use std::rt::local::Local; use std::rt::rtio::RtioTimer; -use std::rt::sched::{Scheduler, SchedHandle}; +use std::rt::task::{BlockedTask, Task}; use std::util; +use homing::{HomeHandle, HomingIO}; +use super::{UvHandle, ForbidUnwind, ForbidSwitch}; +use uvio::UvIoFactory; use uvll; -use super::{Loop, UvHandle, ForbidUnwind, ForbidSwitch}; -use uvio::HomingIO; pub struct TimerWatcher { handle: *uvll::uv_timer_t, - home: SchedHandle, + home: HomeHandle, action: Option, id: uint, // see comments in timer_cb } @@ -33,15 +33,15 @@ pub enum NextAction { } impl TimerWatcher { - pub fn new(loop_: &mut Loop) -> ~TimerWatcher { + pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher { let handle = UvHandle::alloc(None::, uvll::UV_TIMER); assert_eq!(unsafe { - uvll::uv_timer_init(loop_.handle, handle) + uvll::uv_timer_init(io.uv_loop(), handle) }, 0); let me = ~TimerWatcher { handle: handle, action: None, - home: get_handle_to_current_scheduler!(), + home: io.make_handle(), id: 0, }; return me.install(); @@ -59,7 +59,7 @@ impl TimerWatcher { } impl HomingIO for TimerWatcher { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } + fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } impl UvHandle for TimerWatcher { @@ -89,10 +89,11 @@ impl RtioTimer for TimerWatcher { // started, then we need to call stop on the timer. let _f = ForbidUnwind::new("timer"); - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|_sched, task| { + let task: ~Task = Local::take(); + task.deschedule(1, |task| { self.action = Some(WakeTask(task)); self.start(msecs, 0); + Ok(()) }); self.stop(); } @@ -137,8 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { match timer.action.take_unwrap() { WakeTask(task) => { - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(task); + task.wake().map(|t| t.reawaken(true)); } SendOnce(chan) => { chan.try_send_deferred(()); } SendMany(chan, id) => { @@ -177,7 +177,6 @@ impl Drop for TimerWatcher { #[cfg(test)] mod test { - use super::*; use std::rt::rtio::RtioTimer; use super::super::local_loop; diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs index fcad62965799..0e76ed9feb93 100644 --- a/src/librustuv/tty.rs +++ b/src/librustuv/tty.rs @@ -10,24 +10,23 @@ use std::libc; use std::io::IoError; -use std::rt::local::Local; use std::rt::rtio::RtioTTY; -use std::rt::sched::{Scheduler, SchedHandle}; +use homing::{HomingIO, HomeHandle}; use stream::StreamWatcher; -use super::{Loop, UvError, UvHandle, uv_error_to_io_error}; -use uvio::HomingIO; +use super::{UvError, UvHandle, uv_error_to_io_error}; +use uvio::UvIoFactory; use uvll; pub struct TtyWatcher{ tty: *uvll::uv_tty_t, stream: StreamWatcher, - home: SchedHandle, + home: HomeHandle, fd: libc::c_int, } impl TtyWatcher { - pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool) + pub fn new(io: &mut UvIoFactory, fd: libc::c_int, readable: bool) -> Result { // libuv may succeed in giving us a handle (via uv_tty_init), but if the @@ -56,14 +55,14 @@ impl TtyWatcher { // with attempting to open it as a tty. let handle = UvHandle::alloc(None::, uvll::UV_TTY); match unsafe { - uvll::uv_tty_init(loop_.handle, handle, fd as libc::c_int, + uvll::uv_tty_init(io.uv_loop(), handle, fd as libc::c_int, readable as libc::c_int) } { 0 => { Ok(TtyWatcher { tty: handle, stream: StreamWatcher::new(handle), - home: get_handle_to_current_scheduler!(), + home: io.make_handle(), fd: fd, }) } @@ -120,7 +119,7 @@ impl UvHandle for TtyWatcher { } impl HomingIO for TtyWatcher { - fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home } + fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home } } impl Drop for TtyWatcher { diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index c556b96671ab..52e0b5ed77b2 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -9,121 +9,41 @@ // except according to those terms. use std::c_str::CString; +use std::cast; use std::comm::SharedChan; -use std::libc::c_int; -use std::libc; -use std::path::Path; use std::io::IoError; use std::io::net::ip::SocketAddr; use std::io::process::ProcessConfig; -use std::io; -use std::rt::local::Local; -use std::rt::rtio::*; -use std::rt::sched::{Scheduler, SchedHandle}; -use std::rt::task::Task; -use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, - S_IRUSR, S_IWUSR}; -use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write, - ReadWrite, FileStat}; use std::io::signal::Signum; +use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write, + ReadWrite, FileStat}; +use std::io; +use std::libc::c_int; +use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR, + S_IWUSR}; +use std::libc; +use std::path::Path; +use std::rt::rtio; +use std::rt::rtio::IoFactory; use ai = std::io::net::addrinfo; #[cfg(test)] use std::unstable::run_in_bare_thread; -use super::*; +use super::{uv_error_to_io_error, Loop}; + use addrinfo::GetAddrInfoRequest; - -pub trait HomingIO { - - fn home<'r>(&'r mut self) -> &'r mut SchedHandle; - - /// This function will move tasks to run on their home I/O scheduler. Note - /// that this function does *not* pin the task to the I/O scheduler, but - /// rather it simply moves it to running on the I/O scheduler. - fn go_to_IO_home(&mut self) -> uint { - use std::rt::sched::RunOnce; - - let _f = ForbidUnwind::new("going home"); - - let current_sched_id = { - let mut sched = Local::borrow(None::); - sched.get().sched_id() - }; - - // Only need to invoke a context switch if we're not on the right - // scheduler. - if current_sched_id != self.home().sched_id { - let scheduler: ~Scheduler = Local::take(); - scheduler.deschedule_running_task_and_then(|_, task| { - task.wake().map(|task| { - self.home().send(RunOnce(task)); - }); - }) - } - let current_sched_id = { - let mut sched = Local::borrow(None::); - sched.get().sched_id() - }; - assert!(current_sched_id == self.home().sched_id); - - self.home().sched_id - } - - /// Fires a single homing missile, returning another missile targeted back - /// at the original home of this task. In other words, this function will - /// move the local task to its I/O scheduler and then return an RAII wrapper - /// which will return the task home. - fn fire_homing_missile(&mut self) -> HomingMissile { - HomingMissile { io_home: self.go_to_IO_home() } - } - - /// Same as `fire_homing_missile`, but returns the local I/O scheduler as - /// well (the one that was homed to). - fn fire_homing_missile_sched(&mut self) -> (HomingMissile, ~Scheduler) { - // First, transplant ourselves to the home I/O scheduler - let missile = self.fire_homing_missile(); - // Next (must happen next), grab the local I/O scheduler - let io_sched: ~Scheduler = Local::take(); - - (missile, io_sched) - } -} - -/// After a homing operation has been completed, this will return the current -/// task back to its appropriate home (if applicable). The field is used to -/// assert that we are where we think we are. -struct HomingMissile { - priv io_home: uint, -} - -impl HomingMissile { - pub fn check(&self, msg: &'static str) { - let mut sched = Local::borrow(None::); - let local_id = sched.get().sched_id(); - assert!(local_id == self.io_home, "{}", msg); - } -} - -impl Drop for HomingMissile { - fn drop(&mut self) { - let _f = ForbidUnwind::new("leaving home"); - - // It would truly be a sad day if we had moved off the home I/O - // scheduler while we were doing I/O. - self.check("task moved away from the home scheduler"); - - // If we were a homed task, then we must send ourselves back to the - // original scheduler. Otherwise, we can just return and keep running - if !Task::on_appropriate_sched() { - let scheduler: ~Scheduler = Local::take(); - scheduler.deschedule_running_task_and_then(|_, task| { - task.wake().map(|task| { - Scheduler::run_task(task); - }); - }) - } - } -} +use async::AsyncWatcher; +use file::{FsRequest, FileWatcher}; +use queue::QueuePool; +use homing::HomeHandle; +use idle::IdleWatcher; +use net::{TcpWatcher, TcpListener, UdpWatcher}; +use pipe::{PipeWatcher, PipeListener}; +use process::Process; +use signal::SignalWatcher; +use timer::TimerWatcher; +use tty::TtyWatcher; +use uvll; // Obviously an Event Loop is always home. pub struct UvEventLoop { @@ -132,45 +52,52 @@ pub struct UvEventLoop { impl UvEventLoop { pub fn new() -> UvEventLoop { + let mut loop_ = Loop::new(); + let handle_pool = QueuePool::new(&mut loop_); UvEventLoop { - uvio: UvIoFactory(Loop::new()) + uvio: UvIoFactory { + loop_: loop_, + handle_pool: handle_pool, + } } } } impl Drop for UvEventLoop { fn drop(&mut self) { - self.uvio.uv_loop().close(); + self.uvio.loop_.close(); } } -impl EventLoop for UvEventLoop { +impl rtio::EventLoop for UvEventLoop { fn run(&mut self) { - self.uvio.uv_loop().run(); + self.uvio.loop_.run(); } fn callback(&mut self, f: proc()) { - IdleWatcher::onetime(self.uvio.uv_loop(), f); + IdleWatcher::onetime(&mut self.uvio.loop_, f); } - fn pausable_idle_callback(&mut self, cb: ~Callback) -> ~PausableIdleCallback { - IdleWatcher::new(self.uvio.uv_loop(), cb) as ~PausableIdleCallback + fn pausible_idle_callback(&mut self, cb: ~rtio::Callback) + -> ~rtio::PausibleIdleCallback + { + IdleWatcher::new(&mut self.uvio.loop_, cb) as ~rtio::PausibleIdleCallback } - fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback { - ~AsyncWatcher::new(self.uvio.uv_loop(), f) as ~RemoteCallback + fn remote_callback(&mut self, f: ~rtio::Callback) -> ~rtio::RemoteCallback { + ~AsyncWatcher::new(&mut self.uvio.loop_, f) as ~rtio::RemoteCallback } - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { - let factory = &mut self.uvio as &mut IoFactory; + fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> { + let factory = &mut self.uvio as &mut rtio::IoFactory; Some(factory) } } #[cfg(not(test))] #[lang = "event_loop_factory"] -pub extern "C" fn new_loop() -> ~EventLoop { - ~UvEventLoop::new() as ~EventLoop +pub extern "C" fn new_loop() -> ~rtio::EventLoop { + ~UvEventLoop::new() as ~rtio::EventLoop } #[test] @@ -187,59 +114,65 @@ fn test_callback_run_once() { } } -pub struct UvIoFactory(Loop); +pub struct UvIoFactory { + loop_: Loop, + priv handle_pool: ~QueuePool, +} impl UvIoFactory { - pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop { - match self { &UvIoFactory(ref mut ptr) => ptr } + pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle } + + pub fn make_handle(&mut self) -> HomeHandle { + HomeHandle::new(self.id(), &mut *self.handle_pool) } } impl IoFactory for UvIoFactory { + fn id(&self) -> uint { unsafe { cast::transmute(self) } } + // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future fn tcp_connect(&mut self, addr: SocketAddr) - -> Result<~RtioTcpStream, IoError> + -> Result<~rtio::RtioTcpStream, IoError> { - match TcpWatcher::connect(self.uv_loop(), addr) { - Ok(t) => Ok(~t as ~RtioTcpStream), + match TcpWatcher::connect(self, addr) { + Ok(t) => Ok(~t as ~rtio::RtioTcpStream), Err(e) => Err(uv_error_to_io_error(e)), } } - fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> { - match TcpListener::bind(self.uv_loop(), addr) { - Ok(t) => Ok(t as ~RtioTcpListener), + fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioTcpListener, IoError> { + match TcpListener::bind(self, addr) { + Ok(t) => Ok(t as ~rtio::RtioTcpListener), Err(e) => Err(uv_error_to_io_error(e)), } } - fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> { - match UdpWatcher::bind(self.uv_loop(), addr) { - Ok(u) => Ok(~u as ~RtioUdpSocket), + fn udp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioUdpSocket, IoError> { + match UdpWatcher::bind(self, addr) { + Ok(u) => Ok(~u as ~rtio::RtioUdpSocket), Err(e) => Err(uv_error_to_io_error(e)), } } - fn timer_init(&mut self) -> Result<~RtioTimer, IoError> { - Ok(TimerWatcher::new(self.uv_loop()) as ~RtioTimer) + fn timer_init(&mut self) -> Result<~rtio::RtioTimer, IoError> { + Ok(TimerWatcher::new(self) as ~rtio::RtioTimer) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> Result<~[ai::Info], IoError> { - let r = GetAddrInfoRequest::run(self.uv_loop(), host, servname, hint); + let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint); r.map_err(uv_error_to_io_error) } fn fs_from_raw_fd(&mut self, fd: c_int, - close: CloseBehavior) -> ~RtioFileStream { - let loop_ = Loop::wrap(self.uv_loop().handle); - ~FileWatcher::new(loop_, fd, close) as ~RtioFileStream + close: rtio::CloseBehavior) -> ~rtio::RtioFileStream { + ~FileWatcher::new(self, fd, close) as ~rtio::RtioFileStream } fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess) - -> Result<~RtioFileStream, IoError> { + -> Result<~rtio::RtioFileStream, IoError> { let flags = match fm { io::Open => 0, io::Append => libc::O_APPEND, @@ -254,117 +187,117 @@ impl IoFactory for UvIoFactory { libc::S_IRUSR | libc::S_IWUSR), }; - match FsRequest::open(self.uv_loop(), path, flags as int, mode as int) { - Ok(fs) => Ok(~fs as ~RtioFileStream), + match FsRequest::open(self, path, flags as int, mode as int) { + Ok(fs) => Ok(~fs as ~rtio::RtioFileStream), Err(e) => Err(uv_error_to_io_error(e)) } } fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> { - let r = FsRequest::unlink(self.uv_loop(), path); + let r = FsRequest::unlink(&self.loop_, path); r.map_err(uv_error_to_io_error) } fn fs_lstat(&mut self, path: &CString) -> Result { - let r = FsRequest::lstat(self.uv_loop(), path); + let r = FsRequest::lstat(&self.loop_, path); r.map_err(uv_error_to_io_error) } fn fs_stat(&mut self, path: &CString) -> Result { - let r = FsRequest::stat(self.uv_loop(), path); + let r = FsRequest::stat(&self.loop_, path); r.map_err(uv_error_to_io_error) } fn fs_mkdir(&mut self, path: &CString, perm: io::FilePermission) -> Result<(), IoError> { - let r = FsRequest::mkdir(self.uv_loop(), path, perm as c_int); + let r = FsRequest::mkdir(&self.loop_, path, perm as c_int); r.map_err(uv_error_to_io_error) } fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> { - let r = FsRequest::rmdir(self.uv_loop(), path); + let r = FsRequest::rmdir(&self.loop_, path); r.map_err(uv_error_to_io_error) } fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> { - let r = FsRequest::rename(self.uv_loop(), path, to); + let r = FsRequest::rename(&self.loop_, path, to); r.map_err(uv_error_to_io_error) } fn fs_chmod(&mut self, path: &CString, perm: io::FilePermission) -> Result<(), IoError> { - let r = FsRequest::chmod(self.uv_loop(), path, perm as c_int); + let r = FsRequest::chmod(&self.loop_, path, perm as c_int); r.map_err(uv_error_to_io_error) } fn fs_readdir(&mut self, path: &CString, flags: c_int) -> Result<~[Path], IoError> { - let r = FsRequest::readdir(self.uv_loop(), path, flags); + let r = FsRequest::readdir(&self.loop_, path, flags); r.map_err(uv_error_to_io_error) } fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> { - let r = FsRequest::link(self.uv_loop(), src, dst); + let r = FsRequest::link(&self.loop_, src, dst); r.map_err(uv_error_to_io_error) } fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> { - let r = FsRequest::symlink(self.uv_loop(), src, dst); + let r = FsRequest::symlink(&self.loop_, src, dst); r.map_err(uv_error_to_io_error) } fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> { - let r = FsRequest::chown(self.uv_loop(), path, uid, gid); + let r = FsRequest::chown(&self.loop_, path, uid, gid); r.map_err(uv_error_to_io_error) } fn fs_readlink(&mut self, path: &CString) -> Result { - let r = FsRequest::readlink(self.uv_loop(), path); + let r = FsRequest::readlink(&self.loop_, path); r.map_err(uv_error_to_io_error) } fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64) -> Result<(), IoError> { - let r = FsRequest::utime(self.uv_loop(), path, atime, mtime); + let r = FsRequest::utime(&self.loop_, path, atime, mtime); r.map_err(uv_error_to_io_error) } fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError> + -> Result<(~rtio::RtioProcess, ~[Option<~rtio::RtioPipe>]), IoError> { - match Process::spawn(self.uv_loop(), config) { + match Process::spawn(self, config) { Ok((p, io)) => { - Ok((p as ~RtioProcess, - io.move_iter().map(|i| i.map(|p| ~p as ~RtioPipe)).collect())) + Ok((p as ~rtio::RtioProcess, + io.move_iter().map(|i| i.map(|p| ~p as ~rtio::RtioPipe)).collect())) } Err(e) => Err(uv_error_to_io_error(e)), } } - fn unix_bind(&mut self, path: &CString) -> Result<~RtioUnixListener, IoError> + fn unix_bind(&mut self, path: &CString) -> Result<~rtio::RtioUnixListener, IoError> { - match PipeListener::bind(self.uv_loop(), path) { - Ok(p) => Ok(p as ~RtioUnixListener), + match PipeListener::bind(self, path) { + Ok(p) => Ok(p as ~rtio::RtioUnixListener), Err(e) => Err(uv_error_to_io_error(e)), } } - fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> { - match PipeWatcher::connect(self.uv_loop(), path) { - Ok(p) => Ok(~p as ~RtioPipe), + fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe, IoError> { + match PipeWatcher::connect(self, path) { + Ok(p) => Ok(~p as ~rtio::RtioPipe), Err(e) => Err(uv_error_to_io_error(e)), } } fn tty_open(&mut self, fd: c_int, readable: bool) - -> Result<~RtioTTY, IoError> { - match TtyWatcher::new(self.uv_loop(), fd, readable) { - Ok(tty) => Ok(~tty as ~RtioTTY), + -> Result<~rtio::RtioTTY, IoError> { + match TtyWatcher::new(self, fd, readable) { + Ok(tty) => Ok(~tty as ~rtio::RtioTTY), Err(e) => Err(uv_error_to_io_error(e)) } } - fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> { - match PipeWatcher::open(self.uv_loop(), fd) { - Ok(s) => Ok(~s as ~RtioPipe), + fn pipe_open(&mut self, fd: c_int) -> Result<~rtio::RtioPipe, IoError> { + match PipeWatcher::open(self, fd) { + Ok(s) => Ok(~s as ~rtio::RtioPipe), Err(e) => Err(uv_error_to_io_error(e)) } } fn signal(&mut self, signum: Signum, channel: SharedChan) - -> Result<~RtioSignal, IoError> { - match SignalWatcher::new(self.uv_loop(), signum, channel) { - Ok(s) => Ok(s as ~RtioSignal), + -> Result<~rtio::RtioSignal, IoError> { + match SignalWatcher::new(self, signum, channel) { + Ok(s) => Ok(s as ~rtio::RtioSignal), Err(e) => Err(uv_error_to_io_error(e)), } } diff --git a/src/librustuv/uvll.rs b/src/librustuv/uvll.rs index dea90a40fa9f..fa0bb85faed1 100644 --- a/src/librustuv/uvll.rs +++ b/src/librustuv/uvll.rs @@ -37,7 +37,8 @@ use std::libc; #[cfg(test)] use std::libc::uintptr_t; -pub use self::errors::*; +pub use self::errors::{EACCES, ECONNREFUSED, ECONNRESET, EPIPE, ECONNABORTED, + ECANCELED, EBADF, ENOTCONN}; pub static OK: c_int = 0; pub static EOF: c_int = -4095; @@ -576,6 +577,8 @@ extern { // generic uv functions pub fn uv_loop_delete(l: *uv_loop_t); + pub fn uv_ref(t: *uv_handle_t); + pub fn uv_unref(t: *uv_handle_t); pub fn uv_handle_size(ty: uv_handle_type) -> size_t; pub fn uv_req_size(ty: uv_req_type) -> size_t; pub fn uv_run(l: *uv_loop_t, mode: uv_run_mode) -> c_int;