From ceab326e82dfba2f3cd513926c023dea1af4b1c2 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 1 Nov 2013 10:26:43 -0700 Subject: [PATCH] Migrate uv process bindings away from ~fn() --- src/librustuv/lib.rs | 11 +-- src/librustuv/process.rs | 166 +++++++++++++++++++++++++-------------- src/librustuv/timer.rs | 6 +- src/librustuv/uvio.rs | 132 ++----------------------------- 4 files changed, 115 insertions(+), 200 deletions(-) diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 3d0ea4e6d1b8..66abca5924f2 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -139,8 +139,8 @@ pub trait UvHandle { fn install(~self) -> ~Self { unsafe { - let myptr = cast::transmute::<&~Self, *u8>(&self); - uvll::set_data_for_uv_handle(self.uv_handle(), myptr); + let myptr = cast::transmute::<&~Self, &*u8>(&self); + uvll::set_data_for_uv_handle(self.uv_handle(), *myptr); } self } @@ -188,9 +188,6 @@ pub type NullCallback = ~fn(); pub type IdleCallback = ~fn(IdleWatcher, Option); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(&mut FsRequest, Option); -// first int is exit_status, second is term_signal -pub type ExitCallback = ~fn(Process, int, int, Option); -pub type TimerCallback = ~fn(TimerWatcher, Option); pub type AsyncCallback = ~fn(AsyncWatcher, Option); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option); pub type UdpSendCallback = ~fn(UdpWatcher, Option); @@ -206,11 +203,9 @@ struct WatcherData { close_cb: Option, alloc_cb: Option, idle_cb: Option, - timer_cb: Option, async_cb: Option, udp_recv_cb: Option, udp_send_cb: Option, - exit_cb: Option, signal_cb: Option, } @@ -242,11 +237,9 @@ impl> WatcherInterop for W { close_cb: None, alloc_cb: None, idle_cb: None, - timer_cb: None, async_cb: None, udp_recv_cb: None, udp_send_cb: None, - exit_cb: None, signal_cb: None, }; let data = transmute::<~WatcherData, *c_void>(data); diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index ce281b656d39..96b08b3f88b6 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -9,58 +9,42 @@ // except according to those terms. use std::cell::Cell; +use std::libc::c_int; use std::libc; use std::ptr; -use std::vec; +use std::rt::BlockedTask; +use std::rt::io::IoError; use std::rt::io::process::*; +use std::rt::local::Local; +use std::rt::rtio::RtioProcess; +use std::rt::sched::{Scheduler, SchedHandle}; +use std::vec; -use super::{Watcher, Loop, NativeHandle, UvError}; -use super::{status_to_maybe_uv_error, ExitCallback}; -use uvio::{UvPipeStream, UvUnboundPipe}; +use super::{Loop, NativeHandle, UvHandle, UvError, uv_error_to_io_error}; +use uvio::{HomingIO, UvPipeStream, UvUnboundPipe}; use uvll; -/// A process wraps the handle of the underlying uv_process_t. -pub struct Process(*uvll::uv_process_t); +pub struct Process { + handle: *uvll::uv_process_t, + home: SchedHandle, -impl Watcher for Process {} + /// Task to wake up (may be null) for when the process exits + to_wake: Option, + + /// Collected from the exit_cb + exit_status: Option, + term_signal: Option, +} impl Process { - /// Creates a new process, ready to spawn inside an event loop - pub fn new() -> Process { - let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) }; - assert!(handle.is_not_null()); - let mut ret: Process = NativeHandle::from_native_handle(handle); - ret.install_watcher_data(); - return ret; - } - /// Spawn a new process inside the specified event loop. /// - /// The `config` variable will be passed down to libuv, and the `exit_cb` - /// will be run only once, when the process exits. - /// /// Returns either the corresponding process object or an error which /// occurred. - pub fn spawn(&mut self, loop_: &Loop, config: ProcessConfig, - exit_cb: ExitCallback) - -> Result<~[Option<~UvPipeStream>], UvError> + pub fn spawn(loop_: &Loop, config: ProcessConfig) + -> Result<(~Process, ~[Option<~UvPipeStream>]), UvError> { let cwd = config.cwd.map(|s| s.to_c_str()); - - extern fn on_exit(p: *uvll::uv_process_t, - exit_status: libc::c_int, - term_signal: libc::c_int) { - let mut p: Process = NativeHandle::from_native_handle(p); - let err = match exit_status { - 0 => None, - _ => status_to_maybe_uv_error(-1) - }; - p.get_watcher_data().exit_cb.take_unwrap()(p, - exit_status as int, - term_signal as int, - err); - } - let io = config.io; let mut stdio = vec::with_capacity::(io.len()); let mut ret_io = vec::with_capacity(io.len()); @@ -73,7 +57,6 @@ impl Process { } } - let exit_cb = Cell::new(exit_cb); let ret_io = Cell::new(ret_io); do with_argv(config.program, config.args) |argv| { do with_env(config.env) |envp| { @@ -93,34 +76,47 @@ impl Process { gid: 0, }; + let handle = UvHandle::alloc(None::, uvll::UV_PROCESS); match unsafe { - uvll::uv_spawn(loop_.native_handle(), **self, options) + uvll::uv_spawn(loop_.native_handle(), handle, options) } { 0 => { - (*self).get_watcher_data().exit_cb = Some(exit_cb.take()); - Ok(ret_io.take()) + let process = ~Process { + handle: handle, + home: get_handle_to_current_scheduler!(), + to_wake: None, + exit_status: None, + term_signal: None, + }; + Ok((process.install(), ret_io.take())) + } + err => { + unsafe { uvll::free_handle(handle) } + Err(UvError(err)) } - err => Err(UvError(err)) } } } } +} - /// Sends a signal to this process. - /// - /// This is a wrapper around `uv_process_kill` - pub fn kill(&self, signum: int) -> Result<(), UvError> { - match unsafe { - uvll::uv_process_kill(self.native_handle(), signum as libc::c_int) - } { - 0 => Ok(()), - err => Err(UvError(err)) +extern fn on_exit(handle: *uvll::uv_process_t, + exit_status: libc::c_int, + term_signal: libc::c_int) { + let handle = handle as *uvll::uv_handle_t; + let p: &mut Process = unsafe { UvHandle::from_uv_handle(&handle) }; + + assert!(p.exit_status.is_none()); + assert!(p.term_signal.is_none()); + p.exit_status = Some(exit_status as int); + p.term_signal = Some(term_signal as int); + + match p.to_wake.take() { + Some(task) => { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task); } - } - - /// Returns the process id of a spawned process - pub fn pid(&self) -> libc::pid_t { - unsafe { uvll::process_pid(**self) as libc::pid_t } + None => {} } } @@ -192,11 +188,59 @@ fn with_env(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T { c_envp.as_imm_buf(|buf, _| f(buf)) } -impl NativeHandle<*uvll::uv_process_t> for Process { - fn from_native_handle(handle: *uvll::uv_process_t) -> Process { - Process(handle) +impl HomingIO for Process { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl UvHandle for Process { + fn uv_handle(&self) -> *uvll::uv_process_t { self.handle } +} + +impl RtioProcess for Process { + fn id(&self) -> libc::pid_t { + unsafe { uvll::process_pid(self.handle) as libc::pid_t } } - fn native_handle(&self) -> *uvll::uv_process_t { - match self { &Process(ptr) => ptr } + + fn kill(&mut self, signal: int) -> Result<(), IoError> { + do self.home_for_io |self_| { + match unsafe { + uvll::process_kill(self_.handle, signal as libc::c_int) + } { + 0 => Ok(()), + err => Err(uv_error_to_io_error(UvError(err))) + } + } + } + + fn wait(&mut self) -> int { + // Make sure (on the home scheduler) that we have an exit status listed + do self.home_for_io |self_| { + match self_.exit_status { + Some(*) => {} + None => { + // If there's no exit code previously listed, then the + // process's exit callback has yet to be invoked. We just + // need to deschedule ourselves and wait to be reawoken. + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + assert!(self_.to_wake.is_none()); + self_.to_wake = Some(task); + } + assert!(self_.exit_status.is_some()); + } + } + } + + // FIXME(#10109): this is wrong + self.exit_status.unwrap() + } +} + +impl Drop for Process { + fn drop(&mut self) { + do self.home_for_io |self_| { + assert!(self_.to_wake.is_none()); + self_.close_async_(); + } } } diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index f89a6c5e5c57..f4f2563f0b9e 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -103,9 +103,9 @@ impl RtioTimer for TimerWatcher { extern fn timer_cb(handle: *uvll::uv_timer_t, _status: c_int) { let handle = handle as *uvll::uv_handle_t; - let foo: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; + let timer : &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; - match foo.action.take_unwrap() { + match timer.action.take_unwrap() { WakeTask(task) => { let sched: ~Scheduler = Local::take(); sched.resume_blocked_task_immediately(task); @@ -113,7 +113,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, _status: c_int) { SendOnce(chan) => chan.send(()), SendMany(chan) => { chan.send(()); - foo.action = Some(SendMany(chan)); + timer.action = Some(SendMany(chan)); } } } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 5e67e79c020f..226507ff09a4 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -23,7 +23,6 @@ use std::rt::io::net::ip::{SocketAddr, IpAddr}; use std::rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd}; use std::rt::io::process::ProcessConfig; -use std::rt::BlockedTask; use std::rt::local::Local; use std::rt::rtio::*; use std::rt::sched::{Scheduler, SchedHandle}; @@ -772,54 +771,12 @@ impl IoFactory for UvIoFactory { fn spawn(&mut self, config: ProcessConfig) -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError> { - // Sadly, we must create the UvProcess before we actually call uv_spawn - // so that the exit_cb can close over it and notify it when the process - // has exited. - let mut ret = ~UvProcess { - process: Process::new(), - home: None, - exit_status: None, - term_signal: None, - exit_error: None, - descheduled: None, - }; - let ret_ptr = unsafe { - *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret) - }; - - // The purpose of this exit callback is to record the data about the - // exit and then wake up the task which may be waiting for the process - // to exit. This is all performed in the current io-loop, and the - // implementation of UvProcess ensures that reading these fields always - // occurs on the current io-loop. - let exit_cb: ExitCallback = |_, exit_status, term_signal, error| { - unsafe { - assert!((*ret_ptr).exit_status.is_none()); - (*ret_ptr).exit_status = Some(exit_status); - (*ret_ptr).term_signal = Some(term_signal); - (*ret_ptr).exit_error = error; - match (*ret_ptr).descheduled.take() { - Some(task) => { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task); - } - None => {} - } - } - }; - - match ret.process.spawn(self.uv_loop(), config, exit_cb) { - Ok(io) => { - // Only now do we actually get a handle to this scheduler. - ret.home = Some(get_handle_to_current_scheduler!()); - Ok((ret as ~RtioProcess, - io.move_iter().map(|p| p.map(|p| p as ~RtioPipe)).collect())) - } - Err(uverr) => { - // We still need to close the process handle we created, but - // that's taken care for us in the destructor of UvProcess - Err(uv_error_to_io_error(uverr)) + match Process::spawn(self.uv_loop(), config) { + Ok((p, io)) => { + Ok((p as ~RtioProcess, + io.move_iter().map(|i| i.map(|p| p as ~RtioPipe)).collect())) } + Err(e) => Err(uv_error_to_io_error(e)), } } @@ -1511,85 +1468,6 @@ impl RtioFileStream for UvFileStream { } } -pub struct UvProcess { - priv process: process::Process, - - // Sadly, this structure must be created before we return it, so in that - // brief interim the `home` is None. - priv home: Option, - - // All None until the process exits (exit_error may stay None) - priv exit_status: Option, - priv term_signal: Option, - priv exit_error: Option, - - // Used to store which task to wake up from the exit_cb - priv descheduled: Option, -} - -impl HomingIO for UvProcess { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() } -} - -impl Drop for UvProcess { - fn drop(&mut self) { - let close = |self_: &mut UvProcess| { - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task = Cell::new(task); - do self_.process.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task.take()); - } - } - }; - - // If home is none, then this process never actually successfully - // spawned, so there's no need to switch event loops - if self.home.is_none() { - close(self) - } else { - let _m = self.fire_homing_missile(); - close(self) - } - } -} - -impl RtioProcess for UvProcess { - fn id(&self) -> pid_t { - self.process.pid() - } - - fn kill(&mut self, signal: int) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - match self.process.kill(signal) { - Ok(()) => Ok(()), - Err(uverr) => Err(uv_error_to_io_error(uverr)) - } - } - - fn wait(&mut self) -> int { - // Make sure (on the home scheduler) that we have an exit status listed - let _m = self.fire_homing_missile(); - match self.exit_status { - Some(*) => {} - None => { - // If there's no exit code previously listed, then the - // process's exit callback has yet to be invoked. We just - // need to deschedule ourselves and wait to be reawoken. - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - assert!(self.descheduled.is_none()); - self.descheduled = Some(task); - } - assert!(self.exit_status.is_some()); - } - } - - self.exit_status.unwrap() - } -} - pub struct UvUnixListener { priv inner: UvUnboundPipe }