Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
(* tcp.ac *)
(* This file contains Tcp_padded, Tcp_connection_management and Tcp_string_messaging modules *)
(* These use the Sockets API and local concurrency - threads and mutexes. *)
(* Both are hash modules, providing abstract types of handles. *)
includesource "util.ac"
(* ******************************************************************* *)
(* ** ** *)
(* ** Tcp_padded ** *)
(* ** ** *)
(* ******************************************************************* *)
(* The Tcp_padded module implements a wire-format send and receive for
arbitrary strings.
The wire format encoding of a string consists of 21 bytes
containing an ASCII pretty-print of its length followed by the
string itself. This is not efficient(!) but is conveniently
human-readable.
*)
module hash Tcp_padded :
sig
val send : Tcp.fd -> ((Tcp.ip * Tcp.port) option) -> string -> unit
val recv : Tcp.fd -> string
end =
struct
let send fd ippo data =
let pad data n =
let padding =
String.make ( n - (String.length data)) ' ' in
(data ^ padding) in
let data_length = String.length data in
let data_length_string =
pad (Pervasives.string_of_int data_length) 21 in
let rec send_all s =
let no_options = [] in
let s' = (Tcp.send fd ippo s no_options) in
if 0 = (String.length s') then () else send_all s'
in
send_all (data_length_string ^ data)
let recv fd =
let rec recv_n_bytes = function n ->
let no_options = [] in
let (s,_) = Tcp.recv fd n no_options in
let _ = IO.print_string ("Tcp_padded.recv got " ^ Pervasives.string_of_int (String.length s)
^" bytes; expecting " ^ Pervasives.string_of_int (n - String.length s) ^ " more\n") in
(* let _ = IO.print_string ("in particular, Tcp_padded.recv got ---" ^ s ^ "--- \n") in*)
let l = String.length s in
if l = 0 then (Tcp.close fd; raise (Failure "socket closed by the other party") )
else if l >= n then s else s ^ (recv_n_bytes (n-l)) in
let data_length_string = recv_n_bytes 21 in
let first_space = String.index data_length_string ' ' in
let data_length_string' = String.sub data_length_string 0 first_space in
let data_length = Pervasives.int_of_string data_length_string' in
recv_n_bytes data_length
end
(* ******************************************************************* *)
(* ** ** *)
(* ** Tcp_connection_management ** *)
(* ** ** *)
(* ******************************************************************* *)
(* The Tcp_connection_management module manages collections of TCP
connections.
daemon takes a local address (an Tcp.ip option * Tcp.port option)
and an incoming-connection-handler function and creates a listening
socket on that address, spawning a thread that invokes the supplied
function for any incoming connection and then adds the connection
to a list. daemon returns a handle which must be passed in to the
other functions. (Using handles rather than module state allows a
single runtime to have multiple instances with different local
addresses.)
establish_to takes a handle and remote address. If there is
already a connection to that address it returns its file
descriptor, otherwise it tries to establish one (and returns the
new file descriptor).
disestablish_to takes a handle and remote address, closing and
removing a connection to that address if one exists.
connection_failed takes a handle and remote address (one for which
a connection has failed) and removes it from the stored list.
shutdown closes and removes all connections and closes the
listening socket.
local_addr takes a handle and returns the local address.
*)
(* TODO: Deal more sensibly with TCP errors and the REUSEADDR semantics, here and in the clients *)
(* TODO: Think about efficiency *)
(* TODO: Have shutdown cleanly terminate the associated thread *)
module hash Tcp_connection_management :
sig
type fd = Tcp.fd
type handle
val daemon : Tcp.ip option * Tcp.port option ->
((Tcp.ip option * Tcp.port)->Tcp.addr->fd -> unit) -> handle
val establish_to : handle -> Tcp.addr -> fd
val disestablish_to : handle -> Tcp.addr -> unit
val shutdown : handle -> unit
val connection_failed : handle -> Tcp.addr -> unit
val local_addr : handle -> Tcp.ip option * Tcp.port
end =
struct
type fd = Tcp.fd
type handle =
(Tcp.ip option * Tcp.port) (* local address *)
* fd (* listening socket *)
* ((Tcp.ip option*Tcp.port)->Tcp.addr->fd->unit) (* incoming conn handler *)
* mutex name (* current connections mutex *)
* (Tcp.addr * fd) list ref (* current connections *)
let daemon (ipo,po) f =
let conn_mutex = fresh in
create_mutex conn_mutex;
Pervasives.print_endline ("Created TCP mutex " ^ name_to_string conn_mutex);
let conn = ref [] in
let fd = Tcp.tcp_socket () in
let _ = Tcp.bind fd ipo po in
let (ipo,p) = match Tcp.getsockname fd with
(Some ip, Some p) -> (Some ip, p)
| (None, Some p) -> (None,p)
| _ -> raise (Failure "no local port after bind()") in
let _ = let backlog = 5 in Tcp.listen fd backlog in
(while true do
let (fd',(ip',p')) = Tcp.accept fd in
let p'' = (unmarshal (Tcp_padded.recv fd') as Tcp.port) in
f (ipo,p) (ip',p'') fd' ; (* note that f terminates before adding this to conn *)
Utils.locked_by_stmt conn_mutex
(function () ->
conn := ((ip',p''),fd') :: !conn)
done |||
(((ipo,p),fd,f,conn_mutex,conn)
))
let establish_to h (ip',p') =
let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in
Utils.locked_by_stmt2 %[fd] conn_mutex (function ()->
try
List.assoc %[Tcp.addr] %[] (ip',p') !conn
with
Not_found ->
let fd = Tcp.tcp_socket () in
Tcp.bind fd ipo None;
Pervasives.print_endline ("Establish connecting to p' = " ^
Pervasives.string_of_int(Tcp.int_of_port p') );
Tcp.connect fd ip' Some p';
let d = (marshal "StdLib" p : Tcp.port) in
Pervasives.print_endline ("Establish p = " ^ Pervasives.string_of_int(Tcp.int_of_port p));
(* Pervasives.print_endline ("Establish string = " ^ d ); *)
Tcp_padded.send fd None d;
f (ipo,p) (ip',p') fd;
conn := ((ip',p'),fd) :: !conn;
fd
)
let disestablish_to h (ip',p') =
let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in
Utils.locked_by_stmt conn_mutex (function ()->
try
let fd = List.assoc %[] %[] (ip',p') !conn in
conn := List.remove_assoc %[] %[] (ip',p') !conn;
Tcp.close fd
with
Not_found -> ()
)
let shutdown h =
let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in
Utils.locked_by_stmt conn_mutex (function ()->
List.iter %[] (function ((ip,p),fd) -> Tcp.close fd) !conn;
conn := [];
Tcp.close fd_listen)
let connection_failed h (ip',p') =
let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in
Utils.locked_by_stmt conn_mutex (function () ->
conn := List.remove_assoc %[] %[] (ip',p') !conn )
let local_addr h =
let ((ipo,p),fd_listen,f,conn_mutex,conn) = h in
(ipo,p)
end
(* ******************************************************************* *)
(* ** ** *)
(* ** Tcp_string_messaging ** *)
(* ** ** *)
(* ******************************************************************* *)
(* The Tcp_string_messaging module provides asynchronous messaging of
strings to TCP addresses, using Tcp_connection_management.
daemon takes a local address (an Tcp.ip option * Tcp.port option) and
a function to handle incoming strings, of type
(Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit
and creates a Tcp_connection_management.daemon, returning a handle.
send takes a handle, a remote TCP address and a string, uses
Tcp_connection_management.establish_to to ensure there is a
connection, and sends the string (encapsulated in a wire format).
shutdown takes a handle and shuts down (calling
Tcp_connection_management.shutdown).
local_addr takes a handle and returns the local TCP address.
The wire format is implemented by Tcp_padded.
*)
(* TODO: handle send/recv errors and call connection_failed as required *)
(* TODO: need more locking to stop different send/recvs interleaving *)
(* TODO: one might want to pass the handle as another argument to the
function argument to daemon *)
module hash Tcp_string_messaging :
sig
type handle
val daemon : Tcp.ip option * Tcp.port option ->
((Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit) -> handle
val send : handle -> Tcp.addr -> string -> unit
val shutdown : handle -> unit
val local_addr : handle -> Tcp.ip option * Tcp.port
end =
struct
type handle = Tcp_connection_management.handle
let daemon (ipo,po) f =
let g ipop addr' fd =
create_thread fresh (function () ->
while true do
let data = Tcp_padded.recv fd in
f ipop addr' data
done
) ()
in
Tcp_connection_management.daemon (ipo,po) g
let send h (ip,p) data =
let fd = Tcp_connection_management.establish_to h (ip,p) in
Tcp_padded.send fd (Some(ip,p)) data
let shutdown h = Tcp_connection_management.shutdown h
let local_addr h = Tcp_connection_management.local_addr h
end