Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
includesource "tcp.ac"
(* ******************************************************************* *)
(* ** ** *)
(* ** Npi, consisting of Npi1 and Npi2 ** *)
(* ** ** *)
(* ******************************************************************* *)
(* The Npi module manages groups of threads in a single acute process,
implementing the key primitives of the Nomadic Pict language.
A thread can either be registered with the Npi module or not.
If it is registered, it belongs to exactly one group thoughout its
execution.
Local communication within a group and inter-group communication
via typed channels is supported.
Furthermore, there is a "migrate_group" command, which when called by
one member of the group, migrates the whole group to a new Tcp address.
For this to work, the other end also needs to have an initialised
Npi module running.
The correct operation of this module depends on the client code not
using any low-level primitives - thread operations, thunkify, etc.
Most important functions:
init : (Tcp.ip option * Tcp.port option) -> unit
initialise group infrastucture to handle inter-group communication
and group migrations.
create_group : forall t. (t -> unit) -> t -> unit
create a new group containing one (new) thread.
create_gthread : forall t. (t -> unit) -> t -> unit
add a new thread to the current group.
recv_local : forall t. t name -> t
receive information from a named typed channel
send_local : forall t. t name -> t -> unit
send information to a named typed channel (of current group)
send_remote : forall t. string -> (Tcp.addr * group name * t name) -> t -> unit
send information to a named typed channel of another group at a
known Tcp address.
migrate_group : Tcp.addr -> unit
migrate current group to a new Tcp address.
As is Local_channel and Distributed_channel, (T name)s are used for
channels carrying values of type T, allowing any of the Acute
methods for establishing shared typed names to be used.
Internally, migration uses thunkify. Migration and send_remote
both use marshal, with a wire format of marshalled values of type
(group name * (exists t. t name * t)) + migration
for the message and migration cases, where
type migration = group name
* group
* mutex name * cvar name
* (thunkkey list -> unit)
The recv_local and send_local use namecase (as in Local_channel).
Marshalling of migrations is with respect to the mark "Npi_end" set
below; marshalling for send_remote is with respect to the supplied
mark, which should usually be below "Npi_end". There is some
delicate use of local concurrency with mutexes and cvars.
*)
(* NB: fields marked by (*A*) will be removed from the interface *)
(* when width subsignaturing is added *)
(* Note the use of hash! (instead of fresh), as we need to rebind to
this interface on migration with type "group" being compatible *)
module hash! Npi1 :
sig
type tf = (Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit
type tho = Tcp_string_messaging.handle option ref
type channel = (exists t. t name * (t list ref * cvar name))
type group = thread name list ref (* threads in group *)
* mutex name list ref (* mutexes in group *)
* cvar name list ref (* cvars in group *)
* channel list ref (* local channels *)
type migration = group name
* group
* (thunkkey list -> unit)
val groups_mutex : mutex name
val groups : (group name * group) list ref
val threadmap : (thread name * group name) list ref
val ho: tho
end
=
struct
type tf = (Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit
type tho = Tcp_string_messaging.handle option ref
type channel = (exists t. t name * (t list ref * cvar name))
type group = thread name list ref (* threads in group *)
* mutex name list ref (* mutexes in group *)
* cvar name list ref (* cvars in group *)
* channel list ref (* local channels *)
(* The group data structure is more generous than its usage:
it allows also mutexes and condition variables to be associated
with a group (and be migrated propely).
At the moment there is no create_gmutex/create_gcvar, although
their implementation would be trivial.
*)
type migration = group name
* group
* (thunkkey list -> unit)
let groups_mutex = hash(mutex, "Npi global mutex") %[mutex] (* fresh *)(* global mutex *)
(* Locking strategy:
- There is a global mutex ("groups_mutex") at each running acute process.
- Functions acting on the group data structures are all protected by this
global lock.
- When a thread wants to receive a message and there are none in the
channel, the thread waits on the channel's condition variable.
- When a new message is sent on empty channel, its condition variable is
signalled so that a waiting receiver is unblocked.
NB: This does not in principle guarrantee a FIFO delivery order, but will in
fact have a FIFO ordering with the current version of Acute as threads
in a condition variable are stored in a FIFO queue.
The locking strategy is quite coarse; a more fine-grained scheme would be
possible, where besides the global lock, a lock per group is also kept.
*)
let groups = ref [] (* group name -> group *)
let threadmap = ref [] (* thread name -> group name *)
(* threadmap exists to find in which group a thread belongs to
These maps are simply implemented as linked lists, but a production
implementation would use a hashtable instead.
Simillarly the list of channels should really be a hashtable.
*)
let ho = ref None
end
mark "Npi1"
module hash! Npi2 :
sig
val find_my_group : unit -> Npi1.group name * Npi1.group
val gthread_wrapper : forall t. (t->unit) -> t -> unit
val create_group : forall t. (t -> unit) -> t -> unit
val create_gthread : forall t. (t->unit) -> t -> unit
val recv_local : forall t. t name -> t
val my_send_local : forall t. Npi1.group -> t name -> t -> unit
val send_local : forall t. t name -> t -> unit
val f : Npi1.tf
val init : (Tcp.ip option * Tcp.port option) -> unit
val send_remote : forall t. string -> (Tcp.addr*Npi1.group name*t name) -> t -> unit
val migrate_group : Tcp.addr -> unit
val local_addr : unit -> Tcp.ip option * Tcp.port
end
=
struct
(* returns which group the calling thread belongs to *)
let find_my_group () = Utils.locked_by_stmt2 %[] Npi1.groups_mutex
(function () ->
Pervasives.print_endline "In find_my_group lock...";
let gn =
try List.assoc %[] %[] (self ()) !Npi1.threadmap
with Not_found ->
raise (Failure "find_my_group:assoc")
in
let group_info =
try List.assoc %[] %[] gn !Npi1.groups
with Not_found -> raise (Failure "find_my_group:assoc[2]")
in
(gn, group_info)
)
(* Ensure that thread exits gracefully by unregistering itself from
* the group data structure.
*)
let gthread_wrapper = Function t -> fun (f: t -> unit) (v: t) ->
f v
(* let unregister_my_gthread () =
let tn = self() in
let rec remove_me xs = match xs with
[] -> raise Not_found
| (x::xs) -> if x = tn then xs
else x :: remove_me xs in
let (gn, (ths, _, _, _)) = find_my_group () in
Utils.locked_by_stmt Npi1.groups_mutex
(function () ->
Npi1.threadmap := List.remove_assoc %[] %[] tn !Npi1.threadmap;
ths := remove_me !ths
)
in
(try f v
with e -> (try unregister_my_gthread () with _ -> ()); raise e);
unregister_my_gthread ()
*)
(* create a new group *)
let create_group = Function t -> fun (f: t -> unit) (v : t) ->
let gn = fresh %[Npi1.group] in
let tn = fresh %[thread] in
Utils.locked_by_stmt Npi1.groups_mutex
(function () ->
let group_info = (ref (tn::[]), ref [], ref [], ref []) in
Npi1.groups := (gn, group_info) :: !Npi1.groups;
Npi1.threadmap := (tn, gn) :: !Npi1.threadmap;
create_thread tn (gthread_wrapper %[t] f) v )
(* create a new thread in the current group *)
let create_gthread = Function t -> fun (f: t -> unit) (v: t) ->
let (gn, (ths, _, _, _)) = find_my_group () in
let tn = fresh %[thread] in
Utils.locked_by_stmt Npi1.groups_mutex
(function () ->
Npi1.threadmap := (tn, gn) :: !Npi1.threadmap;
ths := tn :: !ths;
create_thread %[t] tn (gthread_wrapper %[t] f) v
)
(* receive a value from a local channel, blocking if there is none *)
let recv_local = Function t -> fun (cn: t name) ->
let (gn, group_info) = find_my_group () in
let (_,_,_,csr) = group_info in
Utils.locked_by_stmt2 %[t] Npi1.groups_mutex
(function () ->
let rec lookup cs' = match cs' with
[] -> let my_cvar = fresh %[cvar] in
create_cvar my_cvar;
csr := ({t, (cn, (ref [], my_cvar))} as Npi1.channel) :: !csr;
wait my_cvar Npi1.groups_mutex;
lookup !csr
| (c: Npi1.channel)::cs0 ->
namecase c with
{t,(cn',x)} when cn'=cn ->
let ((msgs: t list ref), my_cvar) = x in
let rec ww () =
match !msgs with
[] -> wait my_cvar Npi1.groups_mutex; ww ()
| v::vs -> msgs := vs; v
in
ww ()
otherwise ->
lookup cs0
in lookup !csr
)
let my_send_local = Function t -> fun group_info (cn: t name) (v: t) ->
let (_,_,_,csr) = group_info in
Utils.locked_by_stmt Npi1.groups_mutex
(function () ->
let rec lookup cs' = match cs' with
[] -> let my_cvar = fresh %[cvar] in
create_cvar my_cvar;
csr := ({t,(cn,(ref(v::[]),my_cvar))} as Npi1.channel) :: !csr
| (c: Npi1.channel)::cs0 ->
namecase c with
{t,(cn',x)} when cn'=cn ->
let ((msgs: t list ref), my_cvar) = x in
(match !msgs with
[] -> msgs := v :: !msgs; signal my_cvar
| _ -> msgs := v :: !msgs)
otherwise ->
lookup cs0
in lookup !csr
)
let send_local = Function t -> fun (cn: t name) (v: t) ->
let (gn, group_info) = find_my_group () in
my_send_local %[t] group_info cn v
(* We have a single site daemon listen for messages and migrating things.
- for messages, it uses the group name to look up in the group data structure
to find the appropriate (local) channel handle, then use that to propagate
the message.
- for migrating things, it'll unthunkify and extend the group data structure.
*)
let f ipop_local addr_remote data =
Utils.locked_by_stmt Npi1.groups_mutex
(function () ->
Pervasives.print_endline "npi daemon received something";
try
match (unmarshal data) with
inj 1 %[(Npi1.group name * (exists t. t name * t)) + Npi1.migration] (gn, channel)
-> (* a normal value *)
Pervasives.print_endline "npi daemon received a value";
let group_info = try List.assoc %[] %[] gn !Npi1.groups
with Not_found -> raise (Failure
"Received a value for a group not present at this TCP address")
in
let {t, x} = channel in
let (cn, v) = x in
send_local %[t] cn v
| inj 2 %[(Npi1.group name*(exists t. t name*t))+Npi1.migration] (gn,groupinfo,unthunk)
-> (* a migration *)
Pervasives.print_endline "npi daemon received a migration";
let (ths, mtxs, cvs, csr) = groupinfo in
if List.mem_assoc %[] %[] gn !Npi1.groups then
(* NB: this should never occur as group names are only created with
fresh %[group] and the only operation involving group names
is migration which is linear.
This check prevents a type of maliciously forged migrations.
*)
raise (Failure "A group with this same name is already present at this site")
else (
Npi1.groups := (gn, groupinfo) :: !Npi1.groups;
List.iter %[] (fun tn -> Npi1.threadmap := (tn, gn) :: !Npi1.threadmap) !ths;
let tks = List.map %[] %[] (fun n -> Thread (n, Blocking)) !ths
@ List.map %[] %[] (fun n -> Mutex n) !mtxs
@ List.map %[] %[] (fun n -> CVar n) !cvs
@ List.map %[] %[] (fun (p: Npi1.channel) ->
let {t,x} = p in let (_,(_,n)) = x in CVar n) !csr
in
unthunk tks;
Pervasives.print_endline("unthunked")
)
with e -> Pervasives.print_endline "An exception was raised in the npi daemon";
raise e
)
let init (ipo,po) =
create_mutex Npi1.groups_mutex;
Pervasives.print_endline ("Created NPI mutex " ^ name_to_string Npi1.groups_mutex);
match !Npi1.ho with
Some _ -> raise (Failure "Npi already initialised")
| None -> Npi1.ho := Some (Tcp_string_messaging.daemon (ipo,po) f)
let send_remote = Function t -> fun mk (addr,gn,cn) (v: t) ->
let h = Utils.the %[] !Npi1.ho in
let (ip, port) = addr in
if (Some ip, port) = Tcp_string_messaging.local_addr h then
(* note this local-send optimisation will only take effect if the
IP was set explicitly *)
let group_info = Utils.locked_by_stmt2 %[] Npi1.groups_mutex (function () ->
try List.assoc %[] %[] gn !Npi1.groups
with Not_found -> raise (Failure "send_remote:List.assoc")
) in
my_send_local %[t] group_info cn v
else
let channel = {t, (cn, v)} as exists t'.t' name * t' in
let data = inj 1 %[(Npi1.group name*(exists t.t name*t))+Npi1.migration] (gn, channel) in
let mar_data = marshal mk data in
Tcp_string_messaging.send h addr mar_data
(* Migrate the current group to a new Tcp address.
All threads except for the calling thread are thunkified with Blocking mode.
The called thread is blocked with a mutex/cvar. As it is marshalled with
Interrupting mode, it is woken up at the other end with a Thunkify_EINTR
exception.
*)
let migrate_group = fun addr ->
Pervasives.print_endline("migrate_group: started");
let (gn, group_info) = find_my_group () in
Pervasives.print_endline("migrate_group: found my group");
let (ths, mtxs, cvs, csr) = group_info in
let my_cv = fresh in
create_cvar my_cv;
lock Npi1.groups_mutex;
(* First remove the group and its threads from the global data structures *)
Npi1.groups := List.remove_assoc %[] %[] gn !Npi1.groups; (* remove gn -> group_info mapping *)
List.iter %[]
(fun tn -> Npi1.threadmap := List.remove_assoc %[] %[] tn !Npi1.threadmap)
!ths; (* remove tn -> gn mapping *)
Pervasives.print_endline("migrate_group: removed gn,tn data");
let initiating_thread_name = self() in
(* make new thread to perform thunkify, otherwise will thunkify self *)
create_thread fresh
(function () ->
Pervasives.print_endline("migrate_group: thunkify thread started");
Utils.locked_by_stmt Npi1.groups_mutex
(function () ->
Pervasives.print_endline("migrate_group: thunkify thread got lock");
let get_tmode tn =
if compare_name tn initiating_thread_name = 0 then
Interrupting
else Blocking in
let tks = List.map %[] %[] (fun n -> Thread (n, get_tmode n)) !ths
@ List.map %[] %[] (fun n -> Mutex n) !mtxs
@ List.map %[] %[] (fun n -> CVar n) !cvs
@ List.map %[] %[] (fun (p: Npi1.channel) ->
let {t,x} = p in let (_,(_,n)) = x in CVar n) !csr
in
Pervasives.print_endline("migrate_group: thunkify thread going to thunkify");
let thunked = thunkify tks in
Pervasives.print_endline("migrate_group: thunkify thread done thunkify");
let data = inj 2 %[(Npi1.group name * (exists t. t name *
t)) + Npi1.migration] (gn, group_info, thunked) in
let mar_data = marshal "Npi_end" data in
Pervasives.print_endline("migrate_group: going to send marshalled: ... "
(* ^ mar_data *) );
let h = Utils.the %[] !Npi1.ho in
Tcp_string_messaging.send h addr mar_data
)
) ();
(* must block thread initiating migration, until thunkify has completed *)
try
wait my_cv Npi1.groups_mutex (* Block here - thunkify will cause Thunkify_EINTR *)
with Thunkify_EINTR -> () (* Migration completed -- we can now continue execution *)
let local_addr () = Tcp_string_messaging.local_addr (Utils.the %[] !Npi1.ho)
end
mark "Npi2"
module hash! Npi :
sig
type group
val create_group : forall t. (t -> unit) -> t -> unit
val create_gthread : forall t. (t->unit) -> t -> unit
val recv_local : forall t. t name -> t
val send_local : forall t. t name -> t -> unit
val init : (Tcp.ip option * Tcp.port option) -> unit
val send_remote : forall t. string -> (Tcp.addr * group name * t name) -> t -> unit
val migrate_group : Tcp.addr -> unit
val local_addr : unit -> Tcp.ip option * Tcp.port
end
=
struct
type group = Npi1.group
let create_group = Npi2.create_group
let create_gthread = Npi2.create_gthread
let recv_local = Npi2.recv_local
let send_local = Npi2.send_local
let init = Npi2.init
let send_remote = Npi2.send_remote
let migrate_group = Npi2.migrate_group
let local_addr = Npi2.local_addr
end
mark "Npi_end"