From f7e3a4e036cbe84677a924548c2ecbb49b551265 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 5 Jun 2012 06:40:39 -0700 Subject: [PATCH] std: EADDRINUSE and EACCES err tests for tcp server + more cleanup .. confounded resolve! --- src/libstd/net_tcp.rs | 600 +++++++++++++++++++----------------------- 1 file changed, 278 insertions(+), 322 deletions(-) diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 66f287683bba..b51058b24998 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -23,8 +23,7 @@ export listen, accept; // tcp client stuff export connect; // helper methods -import methods = net_tcp_methods; -export methods; +export methods_tcp_socket; #[nolink] native mod rustrt { @@ -57,6 +56,38 @@ type tcp_err_data = { err_name: str, err_msg: str }; +#[doc=" +Details returned as part of a `result::err` result from `tcp::listen` +"] +enum tcp_listen_err_data { + #[doc=" + Some unplanned-for error. The first and second fields correspond + to libuv's `err_name` and `err_msg` fields, respectively. + "] + generic_listen_err(str, str), + #[doc=" + Failed to bind to the requested IP/Port, because it is already in use. + + # Possible Causes + + * Attempting to bind to a port already bound to another listener + "] + address_in_use, + #[doc=" + Request to bind to an IP/Port was denied by the system. + + # Possible Causes + + * Attemping to binding to an IP/Port as a non-Administrator + on Windows Vista+ + * Attempting to bind, as a non-priv'd + user, to 'privileged' ports (< 1024) on *nix + "] + access_denied +} +#[doc=" +Details returned as part of a `result::err` result from `tcp::connect` +"] enum tcp_connect_err_data { #[doc=" Some unplanned-for error. The first and second fields correspond @@ -333,185 +364,6 @@ fn read_future(sock: tcp_socket, timeout_msecs: uint) } #[doc=" -<<<<<<< HEAD -Bind to a given IP/port and listen for new connections - -# Arguments - -* `host_ip` - a `net::ip::ip_addr` representing a unique IP -(versions 4 or 6) -* `port` - a uint representing the port to listen on -* `backlog` - a uint representing the number of incoming connections -to cache in memory -* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on - -# Returns - -A `result` instance containing either a `tcp_conn_port` which can used -to listen for, and accept, new connections, or a `tcp_err_data` if -failure to create the tcp listener occurs -"] -fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint, - iotask: iotask) - -> result::result unsafe { - let stream_closed_po = comm::port::<()>(); - let stream_closed_ch = comm::chan(stream_closed_po); - let new_conn_po = comm::port::>(); - let new_conn_ch = comm::chan(new_conn_po); - // FIXME (#2656): This shared box should not be captured in the i/o - // task Make it a unique pointer. - let server_data: @tcp_conn_port_data = @{ - server_stream: uv::ll::tcp_t(), - stream_closed_po: stream_closed_po, - stream_closed_ch: stream_closed_ch, - iotask: iotask, - new_conn_po: new_conn_po, - new_conn_ch: new_conn_ch - }; - let server_data_ptr = ptr::addr_of(*server_data); - let server_stream_ptr = ptr::addr_of((*server_data_ptr) - .server_stream); - - let setup_po = comm::port::>(); - let setup_ch = comm::chan(setup_po); - iotask::interact(iotask) {|loop_ptr| - let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip, - port); - alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) { - 0i32 { - alt uv::ll::tcp_bind(server_stream_ptr, - ptr::addr_of(tcp_addr)) { - 0i32 { - alt uv::ll::listen(server_stream_ptr, - backlog as libc::c_int, - tcp_nl_on_connection_cb) { - 0i32 { - uv::ll::set_data_for_uv_handle( - server_stream_ptr, - server_data_ptr); - comm::send(setup_ch, none); - } - _ { - log(debug, "failure to uv_listen()"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(setup_ch, some(err_data)); - } - } - } - _ { - log(debug, "failure to uv_tcp_bind"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(setup_ch, some(err_data)); - } - } - } - _ { - log(debug, "failure to uv_tcp_init"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(setup_ch, some(err_data)); - } - } - }; - alt comm::recv(setup_po) { - some(err_data) { - // we failed to bind/list w/ libuv - result::err(err_data.to_tcp_err()) - } - none { - result::ok(tcp_conn_port(server_data)) - } - } -} - -#[doc=" -Block on a `net::tcp::tcp_conn_port` until a new connection arrives - -This function behaves similarly to `comm::recv()` - -# Arguments - -* server_port -- a `net::tcp::tcp_conn_port` that you wish to listen -on for an incoming connection - -# Returns - -A `result` object containing a `net::tcp::tcp_socket`, ready for immediate -use, as the `ok` varient, or a `net::tcp::tcp_err_data` for the `err` -variant -"] -fn conn_recv(server_port: tcp_conn_port) - -> result::result { - let new_conn_po = (*(server_port.conn_data)).new_conn_po; - let iotask = (*(server_port.conn_data)).iotask; - let new_conn_result = comm::recv(new_conn_po); - alt new_conn_result { - ok(client_stream_ptr) { - conn_port_new_tcp_socket(client_stream_ptr, iotask) - } - err(err_data) { - result::err(err_data) - } - } -} - -#[doc=" -Identical to `net::tcp::conn_recv`, but ran on a new task - -The recv'd tcp_socket is created with a new task on the current scheduler, -and given as a parameter to the provided callback - -# Arguments - -* `server_port` -- a `net::tcp::tcp_conn_port` that you wish to listen -on for an incoming connection -* `cb` -- a callback that will be ran, in a new task on the current scheduler, -once a new connection is recv'd. Its parameter: - * A `result` object containing a `net::tcp::tcp_socket`, ready for immediate - use, as the `ok` varient, or a `net::tcp::tcp_err_data` for the `err` - variant -"] -fn conn_recv_spawn(server_port: tcp_conn_port, - +cb: fn~(result::result)) { - let new_conn_po = (*(server_port.conn_data)).new_conn_po; - let iotask = (*(server_port.conn_data)).iotask; - let new_conn_result = comm::recv(new_conn_po); - task::spawn {|| - let sock_create_result = alt new_conn_result { - ok(client_stream_ptr) { - conn_port_new_tcp_socket(client_stream_ptr, iotask) - } - err(err_data) { - result::err(err_data) - } - }; - cb(sock_create_result); - }; -} - -#[doc=" -Check if a `net::tcp::tcp_conn_port` has one-or-more pending, new connections - -This function behaves similarly to `comm::peek()` - -# Arguments - -* `server_port` -- a `net::tcp::tcp_conn_port` representing a server -connection - -# Returns - -`true` if there are one-or-more pending connections, `false` if there are -none. -"] -fn conn_peek(server_port: tcp_conn_port) -> bool { - let new_conn_po = (*(server_port.conn_data)).new_conn_po; - comm::peek(new_conn_po) -} - -#[doc=" -======= ->>>>>>> std: dump the tcp::new_listener server API Bind an incoming client connection to a `net::tcp::tcp_socket` # Notes @@ -681,7 +533,7 @@ fn listen(host_ip: ip::ip_addr, port: uint, backlog: uint, on_establish_cb: fn~(comm::chan>), +new_connect_cb: fn~(tcp_new_connection, comm::chan>)) - -> result::result<(), tcp_err_data> unsafe { + -> result::result<(), tcp_listen_err_data> unsafe { listen_common(host_ip, port, backlog, iotask, on_establish_cb) // on_connect_cb {|handle| @@ -697,7 +549,7 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint, iotask: iotask, on_establish_cb: fn~(comm::chan>), -on_connect_cb: fn~(*uv::ll::uv_tcp_t)) - -> result::result<(), tcp_err_data> unsafe { + -> result::result<(), tcp_listen_err_data> unsafe { let stream_closed_po = comm::port::<()>(); let kill_po = comm::port::>(); let kill_ch = comm::chan(kill_po); @@ -719,6 +571,9 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint, port); alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) { 0i32 { + uv::ll::set_data_for_uv_handle( + server_stream_ptr, + server_data_ptr); alt uv::ll::tcp_bind(server_stream_ptr, ptr::addr_of(tcp_addr)) { 0i32 { @@ -726,9 +581,6 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint, backlog as libc::c_int, tcp_lfc_on_connection_cb) { 0i32 { - uv::ll::set_data_for_uv_handle( - server_stream_ptr, - server_data_ptr); comm::send(setup_ch, none); } _ { @@ -756,8 +608,29 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint, }; alt setup_result { some(err_data) { - // we failed to bind/list w/ libuv - result::err(err_data.to_tcp_err()) + iotask::interact(iotask) {|loop_ptr| + log(debug, #fmt("tcp::listen post-kill recv hl interact %?", + loop_ptr)); + (*server_data_ptr).active = false; + uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + }; + stream_closed_po.recv(); + alt err_data.err_name { + "EACCES" { + log(debug, "Got EACCES error"); + result::err(access_denied) + } + "EADDRINUSE" { + log(debug, "Got EADDRINUSE error"); + result::err(address_in_use) + } + _ { + log(debug, #fmt("Got '%s' '%s' libuv error", + err_data.err_name, err_data.err_msg)); + result::err( + generic_listen_err(err_data.err_name, err_data.err_msg)) + } + } } none { on_establish_cb(kill_ch); @@ -768,11 +641,12 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint, (*server_data_ptr).active = false; uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); }; - comm::recv(stream_closed_po); + stream_closed_po.recv(); alt kill_result { // some failure post bind/listen some(err_data) { - result::err(err_data) + result::err(generic_listen_err(err_data.err_name, + err_data.err_msg)) } // clean exit none { @@ -783,35 +657,33 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint, } } -mod net_tcp_methods { - #[doc=" - Convenience methods extending `net::tcp::tcp_socket` - "] - impl methods_tcp_socket for tcp_socket { - fn read_start() -> result::result>, tcp_err_data> { - read_start(self) - } - fn read_stop() -> - result::result<(), tcp_err_data> { - read_stop(self) - } - fn read(timeout_msecs: uint) -> - result::result<[u8]/~, tcp_err_data> { - read(self, timeout_msecs) - } - fn read_future(timeout_msecs: uint) -> - future::future> { - read_future(self, timeout_msecs) - } - fn write(raw_write_data: [u8]/~) - -> result::result<(), tcp_err_data> { - write(self, raw_write_data) - } - fn write_future(raw_write_data: [u8]/~) - -> future::future> { - write_future(self, raw_write_data) - } +#[doc=" +Convenience methods extending `net::tcp::tcp_socket` +"] +impl methods_tcp_socket for tcp_socket { + fn read_start() -> result::result>, tcp_err_data> { + read_start(self) + } + fn read_stop() -> + result::result<(), tcp_err_data> { + read_stop(self) + } + fn read(timeout_msecs: uint) -> + result::result<[u8]/~, tcp_err_data> { + read(self, timeout_msecs) + } + fn read_future(timeout_msecs: uint) -> + future::future> { + read_future(self, timeout_msecs) + } + fn write(raw_write_data: [u8]/~) + -> result::result<(), tcp_err_data> { + write(self, raw_write_data) + } + fn write_future(raw_write_data: [u8]/~) + -> future::future> { + write_future(self, raw_write_data) } } // INTERNAL API @@ -984,38 +856,6 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data, } } -<<<<<<< HEAD -// various recv_* can use a tcp_conn_port can re-use this.. -fn conn_port_new_tcp_socket( - stream_handle_ptr: *uv::ll::uv_tcp_t, - iotask: iotask) - -> result::result unsafe { - // tcp_nl_on_connection_cb - let reader_po = comm::port::>(); - let client_socket_data = @{ - reader_po : reader_po, - reader_ch : comm::chan(reader_po), - stream_handle_ptr : stream_handle_ptr, - connect_req : uv::ll::connect_t(), - write_req : uv::ll::write_t(), - iotask : iotask - }; - let client_socket_data_ptr = ptr::addr_of(*client_socket_data); - comm::listen {|cont_ch| - iotask::interact(iotask) {|loop_ptr| - log(debug, #fmt("in interact cb 4 conn_port_new_tcp.. loop %?", - loop_ptr)); - uv::ll::set_data_for_uv_handle(stream_handle_ptr, - client_socket_data_ptr); - cont_ch.send(()); - }; - cont_ch.recv() - }; - result::ok(tcp_socket(client_socket_data)) -} - -======= ->>>>>>> std: dump the tcp::new_listener server API enum tcp_new_connection { new_tcp_conn(*uv::ll::uv_tcp_t) } @@ -1260,6 +1100,18 @@ mod test { fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe { impl_gl_tcp_ipv4_client_error_connection_refused(); } + #[test] + fn test_gl_tcp_server_address_in_use() unsafe { + impl_gl_tcp_ipv4_server_address_in_use(); + } + #[test] + // FIXME: this probably needs to be ignored on windows. + // ... need to verify (someday we'll have 64bit windows! :) + //#[ignore(cfg(target_os = "win32"))] + fn test_gl_tcp_server_access_denied() unsafe { + impl_gl_tcp_ipv4_server_access_denied(); + } + } #[cfg(target_arch="x86")] mod impl32 { @@ -1273,6 +1125,19 @@ mod test { fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe { impl_gl_tcp_ipv4_client_error_connection_refused(); } + #[test] + #[ignore(cfg(target_os = "linux"))] + fn test_gl_tcp_server_address_in_use() unsafe { + impl_gl_tcp_ipv4_server_address_in_use(); + } + #[test] + #[ignore(cfg(target_os = "linux"))] + // FIXME: this probably needs to be ignored on windows. + // ... need to verify + //#[ignore(cfg(target_os = "win32"))] + fn test_gl_tcp_server_access_denied() unsafe { + impl_gl_tcp_ipv4_server_access_denied(); + } } } fn impl_gl_tcp_ipv4_server_and_client() { @@ -1324,7 +1189,7 @@ mod test { fn impl_gl_tcp_ipv4_client_error_connection_refused() { let hl_loop = uv::global_loop::get(); let server_ip = "127.0.0.1"; - let server_port = 8890u; + let server_port = 8889u; let expected_req = "ping"; // client log(debug, "firing up client.."); @@ -1344,85 +1209,181 @@ mod test { } } } + fn impl_gl_tcp_ipv4_server_address_in_use() { + let hl_loop = uv::global_loop::get(); + let server_ip = "127.0.0.1"; + let server_port = 8890u; + let expected_req = "ping"; + let expected_resp = "pong"; + + let server_result_po = comm::port::(); + let server_result_ch = comm::chan(server_result_po); + + let cont_po = comm::port::<()>(); + let cont_ch = comm::chan(cont_po); + // server + task::spawn_sched(task::manual_threads(1u)) {|| + let actual_req = comm::listen {|server_ch| + run_tcp_test_server( + server_ip, + server_port, + expected_resp, + server_ch, + cont_ch, + hl_loop) + }; + server_result_ch.send(actual_req); + }; + comm::recv(cont_po); + // this one should fail.. + let listen_err = run_tcp_test_server_fail( + server_ip, + server_port, + hl_loop); + // client.. just doing this so that the first server tears down + log(debug, "server started, firing up client.."); + comm::listen {|client_ch| + run_tcp_test_client( + server_ip, + server_port, + expected_req, + client_ch, + hl_loop) + }; + alt listen_err { + address_in_use { + assert true; + } + _ { + fail "expected address_in_use listen error,"+ + "but got a different error varient. check logs."; + } + } + } + fn impl_gl_tcp_ipv4_server_access_denied() { + let hl_loop = uv::global_loop::get(); + let server_ip = "127.0.0.1"; + let server_port = 80u; + // this one should fail.. + let listen_err = run_tcp_test_server_fail( + server_ip, + server_port, + hl_loop); + alt listen_err { + access_denied { + assert true; + } + _ { + fail "expected address_in_use listen error,"+ + "but got a different error varient. check logs."; + } + } + } fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str, server_ch: comm::chan, cont_ch: comm::chan<()>, iotask: iotask) -> str { + let server_ip_addr = ip::v4::parse_addr(server_ip); + let listen_result = listen(server_ip_addr, server_port, 128u, iotask, + // on_establish_cb -- called when listener is set up + {|kill_ch| + log(debug, #fmt("establish_cb %?", + kill_ch)); + comm::send(cont_ch, ()); + }, + // risky to run this on the loop, but some users + // will want the POWER + {|new_conn, kill_ch| + log(debug, "SERVER: new connection!"); + comm::listen {|cont_ch| + task::spawn_sched(task::manual_threads(1u)) {|| + log(debug, "SERVER: starting worker for new req"); - task::spawn_sched(task::manual_threads(1u)) {|| - let server_ip_addr = ip::v4::parse_addr(server_ip); - let listen_result = - listen(server_ip_addr, server_port, 128u, - iotask, - // on_establish_cb -- called when listener is set up - {|kill_ch| - log(debug, #fmt("establish_cb %?", - kill_ch)); - comm::send(cont_ch, ()); - }, - // risky to run this on the loop, but some users - // will want the POWER - {|new_conn, kill_ch| - log(debug, "SERVER: new connection!"); - comm::listen {|cont_ch| - task::spawn_sched(task::manual_threads(1u)) {|| - log(debug, "SERVER: starting worker for new req"); - - let accept_result = accept(new_conn); - log(debug, "SERVER: after accept()"); - if result::is_err(accept_result) { - log(debug, "SERVER: error accept connection"); - let err_data = result::get_err(accept_result); - comm::send(kill_ch, some(err_data)); - log(debug, - "SERVER/WORKER: send on err cont ch"); - cont_ch.send(()); - } - else { - log(debug, - "SERVER/WORKER: send on cont ch"); - cont_ch.send(()); - let sock = result::unwrap(accept_result); - log(debug, "SERVER: successfully accepted"+ - "connection!"); - let received_req_bytes = read(sock, 0u); - alt received_req_bytes { - result::ok(data) { - server_ch.send( - str::from_bytes(data)); - log(debug, "SERVER: before write"); - tcp_write_single(sock, str::bytes(resp)); - log(debug, "SERVER: after write.. die"); - comm::send(kill_ch, none); - } - result::err(err_data) { - log(debug, #fmt("SERVER: error recvd: %s %s", - err_data.err_name, err_data.err_msg)); - comm::send(kill_ch, some(err_data)); - server_ch.send(""); - } - } - log(debug, "SERVER: worker spinning down"); - } + let accept_result = accept(new_conn); + log(debug, "SERVER: after accept()"); + if result::is_err(accept_result) { + log(debug, "SERVER: error accept connection"); + let err_data = result::get_err(accept_result); + comm::send(kill_ch, some(err_data)); + log(debug, + "SERVER/WORKER: send on err cont ch"); + cont_ch.send(()); } - log(debug, "SERVER: waiting to recv on cont_ch"); - cont_ch.recv() - }; - log(debug, "SERVER: recv'd on cont_ch..leaving listen cb"); - }); - // err check on listen_result - if result::is_err(listen_result) { - let err_data = result::get_err(listen_result); - log(debug, #fmt("SERVER: exited abnormally name %s msg %s", + else { + log(debug, + "SERVER/WORKER: send on cont ch"); + cont_ch.send(()); + let sock = result::unwrap(accept_result); + log(debug, "SERVER: successfully accepted"+ + "connection!"); + let received_req_bytes = read(sock, 0u); + alt received_req_bytes { + result::ok(data) { + server_ch.send( + str::from_bytes(data)); + log(debug, "SERVER: before write"); + tcp_write_single(sock, str::bytes(resp)); + log(debug, "SERVER: after write.. die"); + comm::send(kill_ch, none); + } + result::err(err_data) { + log(debug, #fmt("SERVER: error recvd: %s %s", err_data.err_name, err_data.err_msg)); + comm::send(kill_ch, some(err_data)); + server_ch.send(""); + } + } + log(debug, "SERVER: worker spinning down"); + } + } + log(debug, "SERVER: waiting to recv on cont_ch"); + cont_ch.recv() + }; + log(debug, "SERVER: recv'd on cont_ch..leaving listen cb"); + }); + // err check on listen_result + if result::is_err(listen_result) { + alt result::get_err(listen_result) { + generic_listen_err(name, msg) { + fail #fmt("SERVER: exited abnormally name %s msg %s", + name, msg); + } + access_denied { + fail "SERVER: exited abnormally, got access denied.."; + } + address_in_use { + fail "SERVER: exited abnormally, got address in use..."; + } } - }; + } let ret_val = server_ch.recv(); log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val)); ret_val } + fn run_tcp_test_server_fail(server_ip: str, server_port: uint, + iotask: iotask) -> tcp_listen_err_data { + let server_ip_addr = ip::v4::parse_addr(server_ip); + let listen_result = listen(server_ip_addr, server_port, 128u, iotask, + // on_establish_cb -- called when listener is set up + {|kill_ch| + log(debug, #fmt("establish_cb %?", + kill_ch)); + }, + {|new_conn, kill_ch| + fail #fmt("SERVER: shouldn't be called.. %? %?", + new_conn, kill_ch); + }); + // err check on listen_result + if result::is_failure(listen_result) { + result::get_err(listen_result) + } + else { + fail "SERVER: did not fail as expected" + } + } + fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str, client_ch: comm::chan, iotask: iotask) -> result::result>>>>>> std: mod cleanup, impl/test for conn. refused err + mem leak fix let write_result = write_result_future.get(); if result::is_err(write_result) { log(debug, "tcp_write_single: write failed!");