Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
(* tcp.ac *) (* This file contains Tcp_padded, Tcp_connection_management and Tcp_string_messaging modules *) (* These use the Sockets API and local concurrency - threads and mutexes. *) (* Both are hash modules, providing abstract types of handles. *) includesource "util.ac" (* ******************************************************************* *) (* ** ** *) (* ** Tcp_padded ** *) (* ** ** *) (* ******************************************************************* *) (* The Tcp_padded module implements a wire-format send and receive for arbitrary strings. The wire format encoding of a string consists of 21 bytes containing an ASCII pretty-print of its length followed by the string itself. This is not efficient(!) but is conveniently human-readable. *) module hash Tcp_padded : sig val send : Tcp.fd -> ((Tcp.ip * Tcp.port) option) -> string -> unit val recv : Tcp.fd -> string end = struct let send fd ippo data = let pad data n = let padding = String.make ( n - (String.length data)) ' ' in (data ^ padding) in let data_length = String.length data in let data_length_string = pad (Pervasives.string_of_int data_length) 21 in let rec send_all s = let no_options = [] in let s' = (Tcp.send fd ippo s no_options) in if 0 = (String.length s') then () else send_all s' in send_all (data_length_string ^ data) let recv fd = let rec recv_n_bytes = function n -> let no_options = [] in let (s,_) = Tcp.recv fd n no_options in let _ = IO.print_string ("Tcp_padded.recv got " ^ Pervasives.string_of_int (String.length s) ^" bytes; expecting " ^ Pervasives.string_of_int (n - String.length s) ^ " more\n") in (* let _ = IO.print_string ("in particular, Tcp_padded.recv got ---" ^ s ^ "--- \n") in*) let l = String.length s in if l = 0 then (Tcp.close fd; raise (Failure "socket closed by the other party") ) else if l >= n then s else s ^ (recv_n_bytes (n-l)) in let data_length_string = recv_n_bytes 21 in let first_space = String.index data_length_string ' ' in let data_length_string' = String.sub data_length_string 0 first_space in let data_length = Pervasives.int_of_string data_length_string' in recv_n_bytes data_length end (* ******************************************************************* *) (* ** ** *) (* ** Tcp_connection_management ** *) (* ** ** *) (* ******************************************************************* *) (* The Tcp_connection_management module manages collections of TCP connections. daemon takes a local address (an Tcp.ip option * Tcp.port option) and an incoming-connection-handler function and creates a listening socket on that address, spawning a thread that invokes the supplied function for any incoming connection and then adds the connection to a list. daemon returns a handle which must be passed in to the other functions. (Using handles rather than module state allows a single runtime to have multiple instances with different local addresses.) establish_to takes a handle and remote address. If there is already a connection to that address it returns its file descriptor, otherwise it tries to establish one (and returns the new file descriptor). disestablish_to takes a handle and remote address, closing and removing a connection to that address if one exists. connection_failed takes a handle and remote address (one for which a connection has failed) and removes it from the stored list. shutdown closes and removes all connections and closes the listening socket. local_addr takes a handle and returns the local address. *) (* TODO: Deal more sensibly with TCP errors and the REUSEADDR semantics, here and in the clients *) (* TODO: Think about efficiency *) (* TODO: Have shutdown cleanly terminate the associated thread *) module hash Tcp_connection_management : sig type fd = Tcp.fd type handle val daemon : Tcp.ip option * Tcp.port option -> ((Tcp.ip option * Tcp.port)->Tcp.addr->fd -> unit) -> handle val establish_to : handle -> Tcp.addr -> fd val disestablish_to : handle -> Tcp.addr -> unit val shutdown : handle -> unit val connection_failed : handle -> Tcp.addr -> unit val local_addr : handle -> Tcp.ip option * Tcp.port end = struct type fd = Tcp.fd type handle = (Tcp.ip option * Tcp.port) (* local address *) * fd (* listening socket *) * ((Tcp.ip option*Tcp.port)->Tcp.addr->fd->unit) (* incoming conn handler *) * mutex name (* current connections mutex *) * (Tcp.addr * fd) list ref (* current connections *) let daemon (ipo,po) f = let conn_mutex = fresh in create_mutex conn_mutex; Pervasives.print_endline ("Created TCP mutex " ^ name_to_string conn_mutex); let conn = ref [] in let fd = Tcp.tcp_socket () in let _ = Tcp.bind fd ipo po in let (ipo,p) = match Tcp.getsockname fd with (Some ip, Some p) -> (Some ip, p) | (None, Some p) -> (None,p) | _ -> raise (Failure "no local port after bind()") in let _ = let backlog = 5 in Tcp.listen fd backlog in (while true do let (fd',(ip',p')) = Tcp.accept fd in let p'' = (unmarshal (Tcp_padded.recv fd') as Tcp.port) in f (ipo,p) (ip',p'') fd' ; (* note that f terminates before adding this to conn *) Utils.locked_by_stmt conn_mutex (function () -> conn := ((ip',p''),fd') :: !conn) done ||| (((ipo,p),fd,f,conn_mutex,conn) )) let establish_to h (ip',p') = let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in Utils.locked_by_stmt2 %[fd] conn_mutex (function ()-> try List.assoc %[Tcp.addr] %[] (ip',p') !conn with Not_found -> let fd = Tcp.tcp_socket () in Tcp.bind fd ipo None; Pervasives.print_endline ("Establish connecting to p' = " ^ Pervasives.string_of_int(Tcp.int_of_port p') ); Tcp.connect fd ip' Some p'; let d = (marshal "StdLib" p : Tcp.port) in Pervasives.print_endline ("Establish p = " ^ Pervasives.string_of_int(Tcp.int_of_port p)); (* Pervasives.print_endline ("Establish string = " ^ d ); *) Tcp_padded.send fd None d; f (ipo,p) (ip',p') fd; conn := ((ip',p'),fd) :: !conn; fd ) let disestablish_to h (ip',p') = let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in Utils.locked_by_stmt conn_mutex (function ()-> try let fd = List.assoc %[] %[] (ip',p') !conn in conn := List.remove_assoc %[] %[] (ip',p') !conn; Tcp.close fd with Not_found -> () ) let shutdown h = let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in Utils.locked_by_stmt conn_mutex (function ()-> List.iter %[] (function ((ip,p),fd) -> Tcp.close fd) !conn; conn := []; Tcp.close fd_listen) let connection_failed h (ip',p') = let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in Utils.locked_by_stmt conn_mutex (function () -> conn := List.remove_assoc %[] %[] (ip',p') !conn ) let local_addr h = let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in (ipo,p) end (* ******************************************************************* *) (* ** ** *) (* ** Tcp_string_messaging ** *) (* ** ** *) (* ******************************************************************* *) (* The Tcp_string_messaging module provides asynchronous messaging of strings to TCP addresses, using Tcp_connection_management. daemon takes a local address (an Tcp.ip option * Tcp.port option) and a function to handle incoming strings, of type (Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit and creates a Tcp_connection_management.daemon, returning a handle. send takes a handle, a remote TCP address and a string, uses Tcp_connection_management.establish_to to ensure there is a connection, and sends the string (encapsulated in a wire format). shutdown takes a handle and shuts down (calling Tcp_connection_management.shutdown). local_addr takes a handle and returns the local TCP address. The wire format is implemented by Tcp_padded. *) (* TODO: handle send/recv errors and call connection_failed as required *) (* TODO: need more locking to stop different send/recvs interleaving *) (* TODO: one might want to pass the handle as another argument to the function argument to daemon *) module hash Tcp_string_messaging : sig type handle val daemon : Tcp.ip option * Tcp.port option -> ((Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit) -> handle val send : handle -> Tcp.addr -> string -> unit val shutdown : handle -> unit val local_addr : handle -> Tcp.ip option * Tcp.port end = struct type handle = Tcp_connection_management.handle let daemon (ipo,po) f = let g ipop addr' fd = create_thread fresh (function () -> while true do let data = Tcp_padded.recv fd in f ipop addr' data done ) () in Tcp_connection_management.daemon (ipo,po) g let send h (ip,p) data = let fd = Tcp_connection_management.establish_to h (ip,p) in Tcp_padded.send fd (Some(ip,p)) data let shutdown h = Tcp_connection_management.shutdown h let local_addr h = Tcp_connection_management.local_addr h end