diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index 0ff4543a116b..1994c0a54199 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -8,406 +8,437 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::ptr::null; -use std::c_str; use std::c_str::CString; +use std::c_str; use std::cast::transmute; -use std::libc; +use std::cast; use std::libc::{c_int, c_char, c_void, c_uint}; +use std::libc; +use std::rt::BlockedTask; +use std::rt::io; +use std::rt::io::{FileStat, IoError}; +use std::rt::rtio; +use std::rt::local::Local; +use std::rt::sched::{Scheduler, SchedHandle}; +use std::vec; -use super::{Request, NativeHandle, Loop, FsCallback, Buf, - status_to_maybe_uv_error, UvError}; +use super::{NativeHandle, Loop, UvError, uv_error_to_io_error}; +use uvio::HomingIO; use uvll; -use uvll::*; -pub struct FsRequest(*uvll::uv_fs_t); -impl Request for FsRequest {} +pub struct FsRequest { + req: *uvll::uv_fs_t, + priv fired: bool, +} -pub struct RequestData { - priv complete_cb: Option +pub struct FileWatcher { + priv loop_: Loop, + priv fd: c_int, + priv close: rtio::CloseBehavior, + priv home: SchedHandle, } impl FsRequest { - pub fn new() -> FsRequest { - let fs_req = unsafe { malloc_req(UV_FS) }; - assert!(fs_req.is_not_null()); - let fs_req: FsRequest = NativeHandle::from_native_handle(fs_req); - fs_req - } - - pub fn open(self, loop_: &Loop, path: &CString, flags: int, mode: int, - cb: FsCallback) { - let complete_cb_ptr = { - let mut me = self; - me.req_boilerplate(Some(cb)) - }; - let ret = path.with_ref(|p| unsafe { + pub fn open(loop_: &Loop, path: &CString, flags: int, mode: int) + -> Result + { + execute(|req, cb| unsafe { uvll::uv_fs_open(loop_.native_handle(), - self.native_handle(), p, flags as c_int, - mode as c_int, complete_cb_ptr) - }); - assert_eq!(ret, 0); + req, path.with_ref(|p| p), flags as c_int, + mode as c_int, cb) + }).map(|req| + FileWatcher::new(*loop_, req.get_result() as c_int, + rtio::CloseSynchronously) + ) } - pub fn open_sync(mut self, loop_: &Loop, path: &CString, - flags: int, mode: int) -> Result { - let complete_cb_ptr = self.req_boilerplate(None); - let result = path.with_ref(|p| unsafe { - uvll::uv_fs_open(loop_.native_handle(), - self.native_handle(), p, flags as c_int, - mode as c_int, complete_cb_ptr) - }); - self.sync_cleanup(result) + pub fn unlink(loop_: &Loop, path: &CString) -> Result<(), UvError> { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_unlink(loop_.native_handle(), req, path.with_ref(|p| p), + cb) + }) } - pub fn unlink(mut self, loop_: &Loop, path: &CString, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - let ret = path.with_ref(|p| unsafe { - uvll::uv_fs_unlink(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }); - assert_eq!(ret, 0); + pub fn lstat(loop_: &Loop, path: &CString) -> Result { + execute(|req, cb| unsafe { + uvll::uv_fs_lstat(loop_.native_handle(), req, path.with_ref(|p| p), + cb) + }).map(|req| req.mkstat()) } - pub fn unlink_sync(mut self, loop_: &Loop, path: &CString) - -> Result { - let complete_cb_ptr = self.req_boilerplate(None); - let result = path.with_ref(|p| unsafe { - uvll::uv_fs_unlink(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }); - self.sync_cleanup(result) + pub fn stat(loop_: &Loop, path: &CString) -> Result { + execute(|req, cb| unsafe { + uvll::uv_fs_stat(loop_.native_handle(), req, path.with_ref(|p| p), + cb) + }).map(|req| req.mkstat()) } - pub fn lstat(mut self, loop_: &Loop, path: &CString, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - let ret = path.with_ref(|p| unsafe { - uvll::uv_fs_lstat(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }); - assert_eq!(ret, 0); + pub fn write(loop_: &Loop, fd: c_int, buf: &[u8], offset: i64) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_write(loop_.native_handle(), req, + fd, vec::raw::to_ptr(buf) as *c_void, + buf.len() as c_uint, offset, cb) + }) } - pub fn stat(mut self, loop_: &Loop, path: &CString, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - let ret = path.with_ref(|p| unsafe { - uvll::uv_fs_stat(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }); - assert_eq!(ret, 0); - } - - pub fn write(mut self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - let base_ptr = buf.base as *c_void; - let len = buf.len as uint; - let ret = unsafe { - uvll::uv_fs_write(loop_.native_handle(), self.native_handle(), - fd, base_ptr, - len as c_uint, offset, complete_cb_ptr) - }; - assert_eq!(ret, 0); - } - pub fn write_sync(mut self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64) - -> Result { - let complete_cb_ptr = self.req_boilerplate(None); - let base_ptr = buf.base as *c_void; - let len = buf.len as uint; - let result = unsafe { - uvll::uv_fs_write(loop_.native_handle(), self.native_handle(), - fd, base_ptr, - len as c_uint, offset, complete_cb_ptr) - }; - self.sync_cleanup(result) - } - - pub fn read(mut self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - let buf_ptr = buf.base as *c_void; - let len = buf.len as uint; - let ret = unsafe { - uvll::uv_fs_read(loop_.native_handle(), self.native_handle(), - fd, buf_ptr, - len as c_uint, offset, complete_cb_ptr) - }; - assert_eq!(ret, 0); - } - pub fn read_sync(mut self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64) - -> Result { - let complete_cb_ptr = self.req_boilerplate(None); - let buf_ptr = buf.base as *c_void; - let len = buf.len as uint; - let result = unsafe { - uvll::uv_fs_read(loop_.native_handle(), self.native_handle(), - fd, buf_ptr, - len as c_uint, offset, complete_cb_ptr) - }; - self.sync_cleanup(result) - } - - pub fn close(mut self, loop_: &Loop, fd: c_int, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(unsafe { - uvll::uv_fs_close(loop_.native_handle(), self.native_handle(), - fd, complete_cb_ptr) - }, 0); - } - pub fn close_sync(mut self, loop_: &Loop, - fd: c_int) -> Result { - let complete_cb_ptr = self.req_boilerplate(None); - let result = unsafe { - uvll::uv_fs_close(loop_.native_handle(), self.native_handle(), - fd, complete_cb_ptr) - }; - self.sync_cleanup(result) - } - - pub fn mkdir(mut self, loop_: &Loop, path: &CString, mode: c_int, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(path.with_ref(|p| unsafe { - uvll::uv_fs_mkdir(loop_.native_handle(), - self.native_handle(), p, mode, complete_cb_ptr) - }), 0); - } - - pub fn rmdir(mut self, loop_: &Loop, path: &CString, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(path.with_ref(|p| unsafe { - uvll::uv_fs_rmdir(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }), 0); - } - - pub fn rename(mut self, loop_: &Loop, path: &CString, to: &CString, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(unsafe { - uvll::uv_fs_rename(loop_.native_handle(), - self.native_handle(), - path.with_ref(|p| p), - to.with_ref(|p| p), - complete_cb_ptr) - }, 0); - } - - pub fn chmod(mut self, loop_: &Loop, path: &CString, mode: c_int, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(path.with_ref(|p| unsafe { - uvll::uv_fs_chmod(loop_.native_handle(), self.native_handle(), p, - mode, complete_cb_ptr) - }), 0); - } - - pub fn readdir(mut self, loop_: &Loop, path: &CString, - flags: c_int, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(path.with_ref(|p| unsafe { - uvll::uv_fs_readdir(loop_.native_handle(), - self.native_handle(), p, flags, complete_cb_ptr) - }), 0); - } - - pub fn readlink(mut self, loop_: &Loop, path: &CString, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(path.with_ref(|p| unsafe { - uvll::uv_fs_readlink(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }), 0); - } - - pub fn chown(mut self, loop_: &Loop, path: &CString, uid: int, gid: int, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(path.with_ref(|p| unsafe { - uvll::uv_fs_chown(loop_.native_handle(), - self.native_handle(), p, - uid as uvll::uv_uid_t, - gid as uvll::uv_gid_t, - complete_cb_ptr) - }), 0); - } - - pub fn truncate(mut self, loop_: &Loop, file: c_int, offset: i64, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(unsafe { - uvll::uv_fs_ftruncate(loop_.native_handle(), - self.native_handle(), file, offset, - complete_cb_ptr) - }, 0); - } - - pub fn link(mut self, loop_: &Loop, src: &CString, dst: &CString, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(unsafe { - uvll::uv_fs_link(loop_.native_handle(), self.native_handle(), - src.with_ref(|p| p), - dst.with_ref(|p| p), - complete_cb_ptr) - }, 0); - } - - pub fn symlink(mut self, loop_: &Loop, src: &CString, dst: &CString, - cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(unsafe { - uvll::uv_fs_symlink(loop_.native_handle(), self.native_handle(), - src.with_ref(|p| p), - dst.with_ref(|p| p), - 0, - complete_cb_ptr) - }, 0); - } - - pub fn fsync(mut self, loop_: &Loop, fd: c_int, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(unsafe { - uvll::uv_fs_fsync(loop_.native_handle(), self.native_handle(), fd, - complete_cb_ptr) - }, 0); - } - - pub fn datasync(mut self, loop_: &Loop, fd: c_int, cb: FsCallback) { - let complete_cb_ptr = self.req_boilerplate(Some(cb)); - assert_eq!(unsafe { - uvll::uv_fs_fdatasync(loop_.native_handle(), self.native_handle(), fd, - complete_cb_ptr) - }, 0); - } - - // accessors/utility funcs - fn sync_cleanup(self, result: c_int) - -> Result { - self.cleanup_and_delete(); - match status_to_maybe_uv_error(result as i32) { - Some(err) => Err(err), - None => Ok(result) - } - } - fn req_boilerplate(&mut self, cb: Option) -> uvll::uv_fs_cb { - let result = match cb { - Some(_) => compl_cb, - None => 0 as uvll::uv_fs_cb - }; - self.install_req_data(cb); - result - } - pub fn install_req_data(&mut self, cb: Option) { - let fs_req = (self.native_handle()) as *uvll::uv_write_t; - let data = ~RequestData { - complete_cb: cb - }; - unsafe { - let data = transmute::<~RequestData, *c_void>(data); - uvll::set_data_for_req(fs_req, data); + pub fn read(loop_: &Loop, fd: c_int, buf: &mut [u8], offset: i64) + -> Result + { + do execute(|req, cb| unsafe { + uvll::uv_fs_read(loop_.native_handle(), req, + fd, vec::raw::to_ptr(buf) as *c_void, + buf.len() as c_uint, offset, cb) + }).map |req| { + req.get_result() as int } } - fn get_req_data<'r>(&'r mut self) -> &'r mut RequestData { - unsafe { - let data = uvll::get_data_for_req((self.native_handle())); - let data = transmute::<&*c_void, &mut ~RequestData>(&data); - &mut **data - } - } + pub fn close(loop_: &Loop, fd: c_int, sync: bool) -> Result<(), UvError> { + if sync { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_close(loop_.native_handle(), req, fd, cb) + }) + } else { + unsafe { + let req = uvll::malloc_req(uvll::UV_FS); + uvll::uv_fs_close(loop_.native_handle(), req, fd, close_cb); + return Ok(()); + } - pub fn get_path(&self) -> *c_char { - unsafe { uvll::get_path_from_fs_req(self.native_handle()) } - } - - pub fn get_result(&self) -> c_int { - unsafe { uvll::get_result_from_fs_req(self.native_handle()) } - } - - pub fn get_loop(&self) -> Loop { - unsafe { Loop{handle:uvll::get_loop_from_fs_req(self.native_handle())} } - } - - pub fn get_stat(&self) -> uv_stat_t { - let stat = uv_stat_t::new(); - unsafe { uvll::populate_stat(self.native_handle(), &stat); } - stat - } - - pub fn get_ptr(&self) -> *libc::c_void { - unsafe { - uvll::get_ptr_from_fs_req(self.native_handle()) - } - } - - pub fn each_path(&mut self, f: &fn(&CString)) { - let ptr = self.get_ptr(); - match self.get_result() { - n if (n <= 0) => {} - n => { - let n_len = n as uint; - // we pass in the len that uv tells us is there - // for the entries and we don't continue past that.. - // it appears that sometimes the multistring isn't - // correctly delimited and we stray into garbage memory? - // in any case, passing Some(n_len) fixes it and ensures - // good results + extern fn close_cb(req: *uvll::uv_fs_t) { unsafe { - c_str::from_c_multistring(ptr as *libc::c_char, - Some(n_len), f); + uvll::uv_fs_req_cleanup(req); + uvll::free_req(req); } } } } - fn cleanup_and_delete(self) { - unsafe { - let data = uvll::get_data_for_req(self.native_handle()); - let _data = transmute::<*c_void, ~RequestData>(data); - uvll::set_data_for_req(self.native_handle(), null::<()>()); - uvll::uv_fs_req_cleanup(self.native_handle()); - free_req(self.native_handle() as *c_void) + pub fn mkdir(loop_: &Loop, path: &CString, mode: c_int) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_mkdir(loop_.native_handle(), req, path.with_ref(|p| p), + mode, cb) + }) + } + + pub fn rmdir(loop_: &Loop, path: &CString) -> Result<(), UvError> { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_rmdir(loop_.native_handle(), req, path.with_ref(|p| p), + cb) + }) + } + + pub fn rename(loop_: &Loop, path: &CString, to: &CString) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_rename(loop_.native_handle(), + req, + path.with_ref(|p| p), + to.with_ref(|p| p), + cb) + }) + } + + pub fn chmod(loop_: &Loop, path: &CString, mode: c_int) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_chmod(loop_.native_handle(), req, path.with_ref(|p| p), + mode, cb) + }) + } + + pub fn readdir(loop_: &Loop, path: &CString, flags: c_int) + -> Result<~[Path], UvError> + { + execute(|req, cb| unsafe { + uvll::uv_fs_readdir(loop_.native_handle(), + req, path.with_ref(|p| p), flags, cb) + }).map(|req| unsafe { + let mut paths = ~[]; + let path = CString::new(path.with_ref(|p| p), false); + let parent = Path::new(path); + do c_str::from_c_multistring(req.get_ptr() as *libc::c_char, + Some(req.get_result() as uint)) |rel| { + let p = rel.as_bytes(); + paths.push(parent.join(p.slice_to(rel.len()))); + }; + paths + }) + } + + pub fn readlink(loop_: &Loop, path: &CString) -> Result { + do execute(|req, cb| unsafe { + uvll::uv_fs_readlink(loop_.native_handle(), req, + path.with_ref(|p| p), cb) + }).map |req| { + Path::new(unsafe { + CString::new(req.get_ptr() as *libc::c_char, false) + }) + } + } + + pub fn chown(loop_: &Loop, path: &CString, uid: int, gid: int) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_chown(loop_.native_handle(), + req, path.with_ref(|p| p), + uid as uvll::uv_uid_t, + gid as uvll::uv_gid_t, + cb) + }) + } + + pub fn truncate(loop_: &Loop, file: c_int, offset: i64) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_ftruncate(loop_.native_handle(), req, file, offset, cb) + }) + } + + pub fn link(loop_: &Loop, src: &CString, dst: &CString) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_link(loop_.native_handle(), req, + src.with_ref(|p| p), + dst.with_ref(|p| p), + cb) + }) + } + + pub fn symlink(loop_: &Loop, src: &CString, dst: &CString) + -> Result<(), UvError> + { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_symlink(loop_.native_handle(), req, + src.with_ref(|p| p), + dst.with_ref(|p| p), + 0, cb) + }) + } + + pub fn fsync(loop_: &Loop, fd: c_int) -> Result<(), UvError> { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_fsync(loop_.native_handle(), req, fd, cb) + }) + } + + pub fn datasync(loop_: &Loop, fd: c_int) -> Result<(), UvError> { + execute_nop(|req, cb| unsafe { + uvll::uv_fs_fdatasync(loop_.native_handle(), req, fd, cb) + }) + } + + pub fn get_result(&self) -> c_int { + unsafe { uvll::get_result_from_fs_req(self.req) } + } + + pub fn get_stat(&self) -> uvll::uv_stat_t { + let stat = uvll::uv_stat_t::new(); + unsafe { uvll::populate_stat(self.req, &stat); } + stat + } + + pub fn get_ptr(&self) -> *libc::c_void { + unsafe { uvll::get_ptr_from_fs_req(self.req) } + } + + pub fn mkstat(&self) -> FileStat { + let path = unsafe { uvll::get_path_from_fs_req(self.req) }; + let path = unsafe { Path::new(CString::new(path, false)) }; + let stat = self.get_stat(); + fn to_msec(stat: uvll::uv_timespec_t) -> u64 { + (stat.tv_sec * 1000 + stat.tv_nsec / 1000000) as u64 + } + let kind = match (stat.st_mode as c_int) & libc::S_IFMT { + libc::S_IFREG => io::TypeFile, + libc::S_IFDIR => io::TypeDirectory, + libc::S_IFIFO => io::TypeNamedPipe, + libc::S_IFBLK => io::TypeBlockSpecial, + libc::S_IFLNK => io::TypeSymlink, + _ => io::TypeUnknown, + }; + FileStat { + path: path, + size: stat.st_size as u64, + kind: kind, + perm: (stat.st_mode as io::FilePermission) & io::AllPermissions, + created: to_msec(stat.st_birthtim), + modified: to_msec(stat.st_mtim), + accessed: to_msec(stat.st_atim), + unstable: io::UnstableFileStat { + device: stat.st_dev as u64, + inode: stat.st_ino as u64, + rdev: stat.st_rdev as u64, + nlink: stat.st_nlink as u64, + uid: stat.st_uid as u64, + gid: stat.st_gid as u64, + blksize: stat.st_blksize as u64, + blocks: stat.st_blocks as u64, + flags: stat.st_flags as u64, + gen: stat.st_gen as u64, + } } } } -impl NativeHandle<*uvll::uv_fs_t> for FsRequest { - fn from_native_handle(handle: *uvll:: uv_fs_t) -> FsRequest { - FsRequest(handle) - } - fn native_handle(&self) -> *uvll::uv_fs_t { - match self { &FsRequest(ptr) => ptr } +impl Drop for FsRequest { + fn drop(&mut self) { + unsafe { + if self.fired { + uvll::uv_fs_req_cleanup(self.req); + } + uvll::free_req(self.req); + } } } -fn sync_cleanup(result: int) - -> Result { - match status_to_maybe_uv_error(result as i32) { - Some(err) => Err(err), - None => Ok(result) - } -} - -extern fn compl_cb(req: *uv_fs_t) { - let mut req: FsRequest = NativeHandle::from_native_handle(req); - // pull the user cb out of the req data - let cb = { - let data = req.get_req_data(); - assert!(data.complete_cb.is_some()); - // option dance, option dance. oooooh yeah. - data.complete_cb.take_unwrap() +fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int) + -> Result +{ + let mut req = FsRequest { + fired: false, + req: unsafe { uvll::malloc_req(uvll::UV_FS) } }; - // in uv_fs_open calls, the result will be the fd in the - // case of success, otherwise it's -1 indicating an error - let result = req.get_result(); - let status = status_to_maybe_uv_error(result); - // we have a req and status, call the user cb.. - // only giving the user a ref to the FsRequest, as we - // have to clean it up, afterwards (and they aren't really - // reusable, anyways - cb(&mut req, status); - // clean up the req (and its data!) after calling the user cb - req.cleanup_and_delete(); + return match f(req.req, fs_cb) { + 0 => { + req.fired = true; + let mut slot = None; + unsafe { uvll::set_data_for_req(req.req, &slot) } + let sched: ~Scheduler = Local::take(); + do sched.deschedule_running_task_and_then |_, task| { + slot = Some(task); + } + match req.get_result() { + n if n < 0 => Err(UvError(n)), + _ => Ok(req), + } + } + n => Err(UvError(n)) + + }; + + extern fn fs_cb(req: *uvll::uv_fs_t) { + let slot: &mut Option = unsafe { + cast::transmute(uvll::get_data_for_req(req)) + }; + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(slot.take_unwrap()); + } +} + +fn execute_nop(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int) + -> Result<(), UvError> +{ + execute(f).map(|_| {}) +} + +impl HomingIO for FileWatcher { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl FileWatcher { + pub fn new(loop_: Loop, fd: c_int, close: rtio::CloseBehavior) -> FileWatcher { + FileWatcher { + loop_: loop_, + fd: fd, + close: close, + home: get_handle_to_current_scheduler!() + } + } + + fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result { + let _m = self.fire_missiles(); + let r = FsRequest::read(&self.loop_, self.fd, buf, offset); + r.map_err(uv_error_to_io_error) + } + fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> { + let _m = self.fire_missiles(); + let r = FsRequest::write(&self.loop_, self.fd, buf, offset); + r.map_err(uv_error_to_io_error) + } + fn seek_common(&mut self, pos: i64, whence: c_int) -> + Result{ + #[fixed_stack_segment]; #[inline(never)]; + unsafe { + match libc::lseek(self.fd, pos as libc::off_t, whence) { + -1 => { + Err(IoError { + kind: io::OtherIoError, + desc: "Failed to lseek.", + detail: None + }) + }, + n => Ok(n as u64) + } + } + } +} + +impl Drop for FileWatcher { + fn drop(&mut self) { + let _m = self.fire_missiles(); + match self.close { + rtio::DontClose => {} + rtio::CloseAsynchronously => { + FsRequest::close(&self.loop_, self.fd, false); + } + rtio::CloseSynchronously => { + FsRequest::close(&self.loop_, self.fd, true); + } + } + } +} + +impl rtio::RtioFileStream for FileWatcher { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.base_read(buf, -1) + } + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + self.base_write(buf, -1) + } + fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result { + self.base_read(buf, offset as i64) + } + fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> { + self.base_write(buf, offset as i64) + } + fn seek(&mut self, pos: i64, whence: io::SeekStyle) -> Result { + use std::libc::{SEEK_SET, SEEK_CUR, SEEK_END}; + let whence = match whence { + io::SeekSet => SEEK_SET, + io::SeekCur => SEEK_CUR, + io::SeekEnd => SEEK_END + }; + self.seek_common(pos, whence) + } + fn tell(&self) -> Result { + use std::libc::SEEK_CUR; + // this is temporary + let self_ = unsafe { cast::transmute_mut(self) }; + self_.seek_common(0, SEEK_CUR) + } + fn fsync(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + FsRequest::fsync(&self.loop_, self.fd).map_err(uv_error_to_io_error) + } + fn datasync(&mut self) -> Result<(), IoError> { + let _m = self.fire_missiles(); + FsRequest::datasync(&self.loop_, self.fd).map_err(uv_error_to_io_error) + } + fn truncate(&mut self, offset: i64) -> Result<(), IoError> { + let _m = self.fire_missiles(); + let r = FsRequest::truncate(&self.loop_, self.fd, offset); + r.map_err(uv_error_to_io_error) + } } #[cfg(test)] diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 1d6f2f0edb55..eb2da05506d8 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -60,7 +60,7 @@ use std::rt::io::IoError; //#[cfg(test)] use unstable::run_in_bare_thread; -pub use self::file::{FsRequest}; +pub use self::file::{FsRequest, FileWatcher}; pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; @@ -219,7 +219,6 @@ pub type AllocCallback = ~fn(uint) -> Buf; pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); pub type NullCallback = ~fn(); pub type ConnectionCallback = ~fn(StreamWatcher, Option); -pub type FsCallback = ~fn(&mut FsRequest, Option); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option); pub type UdpSendCallback = ~fn(UdpWatcher, Option); @@ -263,7 +262,6 @@ impl> WatcherInterop for W { connect_cb: None, close_cb: None, alloc_cb: None, - async_cb: None, udp_recv_cb: None, udp_send_cb: None, }; diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index f1936635a183..a857308a81b3 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -158,7 +158,7 @@ impl PipeListener { Ok(p.install()) } n => { - unsafe { uvll::free_handle(pipe) } + unsafe { uvll::uv_close(pipe, pipe_close_cb) } Err(UvError(n)) } } diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 5bf3a82e972b..1732e84be4e7 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -9,7 +9,7 @@ // except according to those terms. use std::cell::Cell; -use std::comm::{oneshot, stream, PortOne, ChanOne}; +use std::comm::{oneshot, stream, PortOne, ChanOne, SendDeferred}; use std::libc::c_int; use std::rt::BlockedTask; use std::rt::local::Local; @@ -106,9 +106,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, _status: c_int) { let sched: ~Scheduler = Local::take(); sched.resume_blocked_task_immediately(task); } - SendOnce(chan) => chan.send(()), + SendOnce(chan) => chan.send_deferred(()), SendMany(chan) => { - chan.send(()); + chan.send_deferred(()); timer.action = Some(SendMany(chan)); } } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 592a23297cc3..e06f8b5430ec 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -21,20 +21,18 @@ use std::str; use std::rt::io; use std::rt::io::IoError; use std::rt::io::net::ip::{SocketAddr, IpAddr}; -use std::rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, - SeekEnd}; +use std::rt::io::{standard_error, OtherIoError}; use std::rt::io::process::ProcessConfig; use std::rt::local::Local; use std::rt::rtio::*; use std::rt::sched::{Scheduler, SchedHandle}; use std::rt::tube::Tube; use std::rt::task::Task; -use std::path::{GenericPath, Path}; -use std::libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, - O_WRONLY, S_IRUSR, S_IWUSR}; -use std::rt::io::{FileMode, FileAccess, Open, - Append, Truncate, Read, Write, ReadWrite, - FileStat}; +use std::path::Path; +use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, + S_IRUSR, S_IWUSR}; +use std::rt::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write, + ReadWrite, FileStat}; use std::rt::io::signal::Signum; use std::task; use ai = std::rt::io::net::addrinfo; @@ -249,76 +247,6 @@ impl UvIoFactory { } } -/// Helper for a variety of simple uv_fs_* functions that have no ret val. This -/// function takes the loop that it will act on, and then invokes the specified -/// callback in a situation where the task wil be immediately blocked -/// afterwards. The `FsCallback` yielded must be invoked to reschedule the task -/// (once the result of the operation is known). -fn uv_fs_helper(loop_: &mut Loop, - retfn: extern "Rust" fn(&mut FsRequest) -> T, - cb: &fn(&mut FsRequest, &mut Loop, FsCallback)) - -> Result { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - let mut new_req = FsRequest::new(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do cb(&mut new_req, loop_) |req, err| { - let res = match err { - None => Ok(retfn(req)), - Some(err) => Err(uv_error_to_io_error(err)) - }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - } - } - assert!(!result_cell.is_empty()); - return result_cell.take(); -} - -fn unit(_: &mut FsRequest) {} - -fn fs_mkstat(f: &mut FsRequest) -> FileStat { - let path = unsafe { Path::new(CString::new(f.get_path(), false)) }; - let stat = f.get_stat(); - fn to_msec(stat: uvll::uv_timespec_t) -> u64 { - (stat.tv_sec * 1000 + stat.tv_nsec / 1000000) as u64 - } - let kind = match (stat.st_mode as c_int) & libc::S_IFMT { - libc::S_IFREG => io::TypeFile, - libc::S_IFDIR => io::TypeDirectory, - libc::S_IFIFO => io::TypeNamedPipe, - libc::S_IFBLK => io::TypeBlockSpecial, - libc::S_IFLNK => io::TypeSymlink, - _ => io::TypeUnknown, - }; - FileStat { - path: path, - size: stat.st_size as u64, - kind: kind, - perm: (stat.st_mode as io::FilePermission) & io::AllPermissions, - created: to_msec(stat.st_birthtim), - modified: to_msec(stat.st_mtim), - accessed: to_msec(stat.st_atim), - unstable: io::UnstableFileStat { - device: stat.st_dev as u64, - inode: stat.st_ino as u64, - rdev: stat.st_rdev as u64, - nlink: stat.st_nlink as u64, - uid: stat.st_uid as u64, - gid: stat.st_gid as u64, - blksize: stat.st_blksize as u64, - blocks: stat.st_blocks as u64, - flags: stat.st_flags as u64, - gen: stat.st_gen as u64, - } - } -} - impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. @@ -456,10 +384,10 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn fs_from_raw_fd(&mut self, fd: c_int, close: CloseBehavior) -> ~RtioFileStream { + fn fs_from_raw_fd(&mut self, fd: c_int, + close: CloseBehavior) -> ~RtioFileStream { let loop_ = Loop {handle: self.uv_loop().native_handle()}; - let home = get_handle_to_current_scheduler!(); - ~UvFileStream::new(loop_, fd, close, home) as ~RtioFileStream + ~FileWatcher::new(loop_, fd, close) as ~RtioFileStream } fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess) @@ -477,138 +405,64 @@ impl IoFactory for UvIoFactory { io::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT, libc::S_IRUSR | libc::S_IWUSR), }; - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - let open_req = file::FsRequest::new(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do open_req.open(self.uv_loop(), path, flags as int, mode as int) - |req,err| { - if err.is_none() { - let loop_ = Loop {handle: req.get_loop().native_handle()}; - let home = get_handle_to_current_scheduler!(); - let fd = req.get_result() as c_int; - let fs = ~UvFileStream::new( - loop_, fd, CloseSynchronously, home) as ~RtioFileStream; - let res = Ok(fs); - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } else { - let res = Err(uv_error_to_io_error(err.unwrap())); - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - }; - }; - }; - assert!(!result_cell.is_empty()); - return result_cell.take(); + + match FsRequest::open(self.uv_loop(), path, flags as int, mode as int) { + Ok(fs) => Ok(~fs as ~RtioFileStream), + Err(e) => Err(uv_error_to_io_error(e)) + } } fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.unlink(l, path, cb) - } + let r = FsRequest::unlink(self.uv_loop(), path); + r.map_err(uv_error_to_io_error) } fn fs_lstat(&mut self, path: &CString) -> Result { - do uv_fs_helper(self.uv_loop(), fs_mkstat) |req, l, cb| { - req.lstat(l, path, cb) - } + let r = FsRequest::lstat(self.uv_loop(), path); + r.map_err(uv_error_to_io_error) } fn fs_stat(&mut self, path: &CString) -> Result { - do uv_fs_helper(self.uv_loop(), fs_mkstat) |req, l, cb| { - req.stat(l, path, cb) - } + let r = FsRequest::stat(self.uv_loop(), path); + r.map_err(uv_error_to_io_error) } fn fs_mkdir(&mut self, path: &CString, perm: io::FilePermission) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.mkdir(l, path, perm as c_int, cb) - } + let r = FsRequest::mkdir(self.uv_loop(), path, perm as c_int); + r.map_err(uv_error_to_io_error) } fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.rmdir(l, path, cb) - } + let r = FsRequest::rmdir(self.uv_loop(), path); + r.map_err(uv_error_to_io_error) } fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.rename(l, path, to, cb) - } + let r = FsRequest::rename(self.uv_loop(), path, to); + r.map_err(uv_error_to_io_error) } fn fs_chmod(&mut self, path: &CString, perm: io::FilePermission) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.chmod(l, path, perm as c_int, cb) - } + let r = FsRequest::chmod(self.uv_loop(), path, perm as c_int); + r.map_err(uv_error_to_io_error) } - fn fs_readdir(&mut self, path: &CString, flags: c_int) -> - Result<~[Path], IoError> { - use str::StrSlice; - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let path_cell = Cell::new(path); - do task::unkillable { // FIXME(#8674) - let scheduler: ~Scheduler = Local::take(); - let stat_req = file::FsRequest::new(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let path = path_cell.take(); - // Don't pick up the null byte - let slice = path.as_bytes().slice(0, path.len()); - let path_parent = Cell::new(Path::new(slice)); - do stat_req.readdir(self.uv_loop(), path, flags) |req,err| { - let parent = path_parent.take(); - let res = match err { - None => { - let mut paths = ~[]; - do req.each_path |rel_path| { - let p = rel_path.as_bytes(); - paths.push(parent.join(p.slice_to(rel_path.len()))); - } - Ok(paths) - }, - Some(e) => { - Err(uv_error_to_io_error(e)) - } - }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - }; - }; - assert!(!result_cell.is_empty()); - return result_cell.take(); + fn fs_readdir(&mut self, path: &CString, flags: c_int) + -> Result<~[Path], IoError> + { + let r = FsRequest::readdir(self.uv_loop(), path, flags); + r.map_err(uv_error_to_io_error) } fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.link(l, src, dst, cb) - } + let r = FsRequest::link(self.uv_loop(), src, dst); + r.map_err(uv_error_to_io_error) } fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.symlink(l, src, dst, cb) - } + let r = FsRequest::symlink(self.uv_loop(), src, dst); + r.map_err(uv_error_to_io_error) } fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> { - do uv_fs_helper(self.uv_loop(), unit) |req, l, cb| { - req.chown(l, path, uid, gid, cb) - } + let r = FsRequest::chown(self.uv_loop(), path, uid, gid); + r.map_err(uv_error_to_io_error) } fn fs_readlink(&mut self, path: &CString) -> Result { - fn getlink(f: &mut FsRequest) -> Path { - Path::new(unsafe { CString::new(f.get_ptr() as *libc::c_char, false) }) - } - do uv_fs_helper(self.uv_loop(), getlink) |req, l, cb| { - req.readlink(l, path, cb) - } + let r = FsRequest::readlink(self.uv_loop(), path); + r.map_err(uv_error_to_io_error) } fn spawn(&mut self, config: ProcessConfig) @@ -1072,159 +926,6 @@ impl RtioUdpSocket for UvUdpSocket { } } -pub struct UvFileStream { - priv loop_: Loop, - priv fd: c_int, - priv close: CloseBehavior, - priv home: SchedHandle, -} - -impl HomingIO for UvFileStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl UvFileStream { - fn new(loop_: Loop, fd: c_int, close: CloseBehavior, - home: SchedHandle) -> UvFileStream { - UvFileStream { - loop_: loop_, - fd: fd, - close: close, - home: home, - } - } - fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let buf_ptr: *&mut [u8] = &buf; - let (_m, scheduler) = self.fire_homing_missile_sched(); - do scheduler.deschedule_running_task_and_then |_, task| { - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let task_cell = Cell::new(task); - let read_req = file::FsRequest::new(); - do read_req.read(&self.loop_, self.fd, buf, offset) |req, uverr| { - let res = match uverr { - None => Ok(req.get_result() as int), - Some(err) => Err(uv_error_to_io_error(err)) - }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - result_cell.take() - } - fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> { - do self.nop_req |self_, req, cb| { - req.write(&self_.loop_, self_.fd, slice_to_uv_buf(buf), offset, cb) - } - } - fn seek_common(&mut self, pos: i64, whence: c_int) -> - Result{ - #[fixed_stack_segment]; #[inline(never)]; - unsafe { - match lseek(self.fd, pos as off_t, whence) { - -1 => { - Err(IoError { - kind: OtherIoError, - desc: "Failed to lseek.", - detail: None - }) - }, - n => Ok(n as u64) - } - } - } - fn nop_req(&mut self, f: &fn(&mut UvFileStream, file::FsRequest, FsCallback)) - -> Result<(), IoError> { - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let (_m, sched) = self.fire_homing_missile_sched(); - do sched.deschedule_running_task_and_then |_, task| { - let task = Cell::new(task); - let req = file::FsRequest::new(); - do f(self, req) |_, uverr| { - let res = match uverr { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)) - }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task.take()); - } - } - result_cell.take() - } -} - -impl Drop for UvFileStream { - fn drop(&mut self) { - match self.close { - DontClose => {} - CloseAsynchronously => { - let close_req = file::FsRequest::new(); - do close_req.close(&self.loop_, self.fd) |_,_| {} - } - CloseSynchronously => { - let (_m, scheduler) = self.fire_homing_missile_sched(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let close_req = file::FsRequest::new(); - do close_req.close(&self.loop_, self.fd) |_,_| { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } - } -} - -impl RtioFileStream for UvFileStream { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.base_read(buf, -1) - } - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - self.base_write(buf, -1) - } - fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result { - self.base_read(buf, offset as i64) - } - fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> { - self.base_write(buf, offset as i64) - } - fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result { - use std::libc::{SEEK_SET, SEEK_CUR, SEEK_END}; - let whence = match whence { - SeekSet => SEEK_SET, - SeekCur => SEEK_CUR, - SeekEnd => SEEK_END - }; - self.seek_common(pos, whence) - } - fn tell(&self) -> Result { - use std::libc::SEEK_CUR; - // this is temporary - let self_ = unsafe { cast::transmute_mut(self) }; - self_.seek_common(0, SEEK_CUR) - } - fn fsync(&mut self) -> Result<(), IoError> { - do self.nop_req |self_, req, cb| { - req.fsync(&self_.loop_, self_.fd, cb) - } - } - fn datasync(&mut self) -> Result<(), IoError> { - do self.nop_req |self_, req, cb| { - req.datasync(&self_.loop_, self_.fd, cb) - } - } - fn truncate(&mut self, offset: i64) -> Result<(), IoError> { - do self.nop_req |self_, req, cb| { - req.truncate(&self_.loop_, self_.fd, offset, cb) - } - } -} - // this function is full of lies unsafe fn local_io() -> &'static mut IoFactory { do Local::borrow |sched: &mut Scheduler| {