Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
includesource "tcp.ac" (* ******************************************************************* *) (* ** ** *) (* ** Npi, consisting of Npi1 and Npi2 ** *) (* ** ** *) (* ******************************************************************* *) (* The Npi module manages groups of threads in a single acute process, implementing the key primitives of the Nomadic Pict language. A thread can either be registered with the Npi module or not. If it is registered, it belongs to exactly one group thoughout its execution. Local communication within a group and inter-group communication via typed channels is supported. Furthermore, there is a "migrate_group" command, which when called by one member of the group, migrates the whole group to a new Tcp address. For this to work, the other end also needs to have an initialised Npi module running. The correct operation of this module depends on the client code not using any low-level primitives - thread operations, thunkify, etc. Most important functions: init : (Tcp.ip option * Tcp.port option) -> unit initialise group infrastucture to handle inter-group communication and group migrations. create_group : forall t. (t -> unit) -> t -> unit create a new group containing one (new) thread. create_gthread : forall t. (t -> unit) -> t -> unit add a new thread to the current group. recv_local : forall t. t name -> t receive information from a named typed channel send_local : forall t. t name -> t -> unit send information to a named typed channel (of current group) send_remote : forall t. string -> (Tcp.addr * group name * t name) -> t -> unit send information to a named typed channel of another group at a known Tcp address. migrate_group : Tcp.addr -> unit migrate current group to a new Tcp address. As is Local_channel and Distributed_channel, (T name)s are used for channels carrying values of type T, allowing any of the Acute methods for establishing shared typed names to be used. Internally, migration uses thunkify. Migration and send_remote both use marshal, with a wire format of marshalled values of type (group name * (exists t. t name * t)) + migration for the message and migration cases, where type migration = group name * group * mutex name * cvar name * (thunkkey list -> unit) The recv_local and send_local use namecase (as in Local_channel). Marshalling of migrations is with respect to the mark "Npi_end" set below; marshalling for send_remote is with respect to the supplied mark, which should usually be below "Npi_end". There is some delicate use of local concurrency with mutexes and cvars. *) (* NB: fields marked by (*A*) will be removed from the interface *) (* when width subsignaturing is added *) (* Note the use of hash! (instead of fresh), as we need to rebind to this interface on migration with type "group" being compatible *) module hash! Npi1 : sig type tf = (Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit type tho = Tcp_string_messaging.handle option ref type channel = (exists t. t name * (t list ref * cvar name)) type group = thread name list ref (* threads in group *) * mutex name list ref (* mutexes in group *) * cvar name list ref (* cvars in group *) * channel list ref (* local channels *) type migration = group name * group * (thunkkey list -> unit) val groups_mutex : mutex name val groups : (group name * group) list ref val threadmap : (thread name * group name) list ref val ho: tho end = struct type tf = (Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit type tho = Tcp_string_messaging.handle option ref type channel = (exists t. t name * (t list ref * cvar name)) type group = thread name list ref (* threads in group *) * mutex name list ref (* mutexes in group *) * cvar name list ref (* cvars in group *) * channel list ref (* local channels *) (* The group data structure is more generous than its usage: it allows also mutexes and condition variables to be associated with a group (and be migrated propely). At the moment there is no create_gmutex/create_gcvar, although their implementation would be trivial. *) type migration = group name * group * (thunkkey list -> unit) let groups_mutex = hash(mutex, "Npi global mutex") %[mutex] (* fresh *)(* global mutex *) (* Locking strategy: - There is a global mutex ("groups_mutex") at each running acute process. - Functions acting on the group data structures are all protected by this global lock. - When a thread wants to receive a message and there are none in the channel, the thread waits on the channel's condition variable. - When a new message is sent on empty channel, its condition variable is signalled so that a waiting receiver is unblocked. NB: This does not in principle guarrantee a FIFO delivery order, but will in fact have a FIFO ordering with the current version of Acute as threads in a condition variable are stored in a FIFO queue. The locking strategy is quite coarse; a more fine-grained scheme would be possible, where besides the global lock, a lock per group is also kept. *) let groups = ref [] (* group name -> group *) let threadmap = ref [] (* thread name -> group name *) (* threadmap exists to find in which group a thread belongs to These maps are simply implemented as linked lists, but a production implementation would use a hashtable instead. Simillarly the list of channels should really be a hashtable. *) let ho = ref None end mark "Npi1" module hash! Npi2 : sig val find_my_group : unit -> Npi1.group name * Npi1.group val gthread_wrapper : forall t. (t->unit) -> t -> unit val create_group : forall t. (t -> unit) -> t -> unit val create_gthread : forall t. (t->unit) -> t -> unit val recv_local : forall t. t name -> t val my_send_local : forall t. Npi1.group -> t name -> t -> unit val send_local : forall t. t name -> t -> unit val f : Npi1.tf val init : (Tcp.ip option * Tcp.port option) -> unit val send_remote : forall t. string -> (Tcp.addr*Npi1.group name*t name) -> t -> unit val migrate_group : Tcp.addr -> unit val local_addr : unit -> Tcp.ip option * Tcp.port end = struct (* returns which group the calling thread belongs to *) let find_my_group () = Utils.locked_by_stmt2 %[] Npi1.groups_mutex (function () -> Pervasives.print_endline "In find_my_group lock..."; let gn = try List.assoc %[] %[] (self ()) !Npi1.threadmap with Not_found -> raise (Failure "find_my_group:assoc") in let group_info = try List.assoc %[] %[] gn !Npi1.groups with Not_found -> raise (Failure "find_my_group:assoc[2]") in (gn, group_info) ) (* Ensure that thread exits gracefully by unregistering itself from * the group data structure. *) let gthread_wrapper = Function t -> fun (f: t -> unit) (v: t) -> f v (* let unregister_my_gthread () = let tn = self() in let rec remove_me xs = match xs with [] -> raise Not_found | (x::xs) -> if x = tn then xs else x :: remove_me xs in let (gn, (ths, _, _, _)) = find_my_group () in Utils.locked_by_stmt Npi1.groups_mutex (function () -> Npi1.threadmap := List.remove_assoc %[] %[] tn !Npi1.threadmap; ths := remove_me !ths ) in (try f v with e -> (try unregister_my_gthread () with _ -> ()); raise e); unregister_my_gthread () *) (* create a new group *) let create_group = Function t -> fun (f: t -> unit) (v : t) -> let gn = fresh %[Npi1.group] in let tn = fresh %[thread] in Utils.locked_by_stmt Npi1.groups_mutex (function () -> let group_info = (ref (tn::[]), ref [], ref [], ref []) in Npi1.groups := (gn, group_info) :: !Npi1.groups; Npi1.threadmap := (tn, gn) :: !Npi1.threadmap; create_thread tn (gthread_wrapper %[t] f) v ) (* create a new thread in the current group *) let create_gthread = Function t -> fun (f: t -> unit) (v: t) -> let (gn, (ths, _, _, _)) = find_my_group () in let tn = fresh %[thread] in Utils.locked_by_stmt Npi1.groups_mutex (function () -> Npi1.threadmap := (tn, gn) :: !Npi1.threadmap; ths := tn :: !ths; create_thread %[t] tn (gthread_wrapper %[t] f) v ) (* receive a value from a local channel, blocking if there is none *) let recv_local = Function t -> fun (cn: t name) -> let (gn, group_info) = find_my_group () in let (_,_,_,csr) = group_info in Utils.locked_by_stmt2 %[t] Npi1.groups_mutex (function () -> let rec lookup cs' = match cs' with [] -> let my_cvar = fresh %[cvar] in create_cvar my_cvar; csr := ({t, (cn, (ref [], my_cvar))} as Npi1.channel) :: !csr; wait my_cvar Npi1.groups_mutex; lookup !csr | (c: Npi1.channel)::cs0 -> namecase c with {t,(cn',x)} when cn'=cn -> let ((msgs: t list ref), my_cvar) = x in let rec ww () = match !msgs with [] -> wait my_cvar Npi1.groups_mutex; ww () | v::vs -> msgs := vs; v in ww () otherwise -> lookup cs0 in lookup !csr ) let my_send_local = Function t -> fun group_info (cn: t name) (v: t) -> let (_,_,_,csr) = group_info in Utils.locked_by_stmt Npi1.groups_mutex (function () -> let rec lookup cs' = match cs' with [] -> let my_cvar = fresh %[cvar] in create_cvar my_cvar; csr := ({t,(cn,(ref(v::[]),my_cvar))} as Npi1.channel) :: !csr | (c: Npi1.channel)::cs0 -> namecase c with {t,(cn',x)} when cn'=cn -> let ((msgs: t list ref), my_cvar) = x in (match !msgs with [] -> msgs := v :: !msgs; signal my_cvar | _ -> msgs := v :: !msgs) otherwise -> lookup cs0 in lookup !csr ) let send_local = Function t -> fun (cn: t name) (v: t) -> let (gn, group_info) = find_my_group () in my_send_local %[t] group_info cn v (* We have a single site daemon listen for messages and migrating things. - for messages, it uses the group name to look up in the group data structure to find the appropriate (local) channel handle, then use that to propagate the message. - for migrating things, it'll unthunkify and extend the group data structure. *) let f ipop_local addr_remote data = Utils.locked_by_stmt Npi1.groups_mutex (function () -> Pervasives.print_endline "npi daemon received something"; try match (unmarshal data) with inj 1 %[(Npi1.group name * (exists t. t name * t)) + Npi1.migration] (gn, channel) -> (* a normal value *) Pervasives.print_endline "npi daemon received a value"; let group_info = try List.assoc %[] %[] gn !Npi1.groups with Not_found -> raise (Failure "Received a value for a group not present at this TCP address") in let {t, x} = channel in let (cn, v) = x in send_local %[t] cn v | inj 2 %[(Npi1.group name*(exists t. t name*t))+Npi1.migration] (gn,groupinfo,unthunk) -> (* a migration *) Pervasives.print_endline "npi daemon received a migration"; let (ths, mtxs, cvs, csr) = groupinfo in if List.mem_assoc %[] %[] gn !Npi1.groups then (* NB: this should never occur as group names are only created with fresh %[group] and the only operation involving group names is migration which is linear. This check prevents a type of maliciously forged migrations. *) raise (Failure "A group with this same name is already present at this site") else ( Npi1.groups := (gn, groupinfo) :: !Npi1.groups; List.iter %[] (fun tn -> Npi1.threadmap := (tn, gn) :: !Npi1.threadmap) !ths; let tks = List.map %[] %[] (fun n -> Thread (n, Blocking)) !ths @ List.map %[] %[] (fun n -> Mutex n) !mtxs @ List.map %[] %[] (fun n -> CVar n) !cvs @ List.map %[] %[] (fun (p: Npi1.channel) -> let {t,x} = p in let (_,(_,n)) = x in CVar n) !csr in unthunk tks; Pervasives.print_endline("unthunked") ) with e -> Pervasives.print_endline "An exception was raised in the npi daemon"; raise e ) let init (ipo,po) = create_mutex Npi1.groups_mutex; Pervasives.print_endline ("Created NPI mutex " ^ name_to_string Npi1.groups_mutex); match !Npi1.ho with Some _ -> raise (Failure "Npi already initialised") | None -> Npi1.ho := Some (Tcp_string_messaging.daemon (ipo,po) f) let send_remote = Function t -> fun mk (addr,gn,cn) (v: t) -> let h = Utils.the %[] !Npi1.ho in let (ip, port) = addr in if (Some ip, port) = Tcp_string_messaging.local_addr h then (* note this local-send optimisation will only take effect if the IP was set explicitly *) let group_info = Utils.locked_by_stmt2 %[] Npi1.groups_mutex (function () -> try List.assoc %[] %[] gn !Npi1.groups with Not_found -> raise (Failure "send_remote:List.assoc") ) in my_send_local %[t] group_info cn v else let channel = {t, (cn, v)} as exists t'.t' name * t' in let data = inj 1 %[(Npi1.group name*(exists t.t name*t))+Npi1.migration] (gn, channel) in let mar_data = marshal mk data in Tcp_string_messaging.send h addr mar_data (* Migrate the current group to a new Tcp address. All threads except for the calling thread are thunkified with Blocking mode. The called thread is blocked with a mutex/cvar. As it is marshalled with Interrupting mode, it is woken up at the other end with a Thunkify_EINTR exception. *) let migrate_group = fun addr -> Pervasives.print_endline("migrate_group: started"); let (gn, group_info) = find_my_group () in Pervasives.print_endline("migrate_group: found my group"); let (ths, mtxs, cvs, csr) = group_info in let my_cv = fresh in create_cvar my_cv; lock Npi1.groups_mutex; (* First remove the group and its threads from the global data structures *) Npi1.groups := List.remove_assoc %[] %[] gn !Npi1.groups; (* remove gn -> group_info mapping *) List.iter %[] (fun tn -> Npi1.threadmap := List.remove_assoc %[] %[] tn !Npi1.threadmap) !ths; (* remove tn -> gn mapping *) Pervasives.print_endline("migrate_group: removed gn,tn data"); let initiating_thread_name = self() in (* make new thread to perform thunkify, otherwise will thunkify self *) create_thread fresh (function () -> Pervasives.print_endline("migrate_group: thunkify thread started"); Utils.locked_by_stmt Npi1.groups_mutex (function () -> Pervasives.print_endline("migrate_group: thunkify thread got lock"); let get_tmode tn = if compare_name tn initiating_thread_name = 0 then Interrupting else Blocking in let tks = List.map %[] %[] (fun n -> Thread (n, get_tmode n)) !ths @ List.map %[] %[] (fun n -> Mutex n) !mtxs @ List.map %[] %[] (fun n -> CVar n) !cvs @ List.map %[] %[] (fun (p: Npi1.channel) -> let {t,x} = p in let (_,(_,n)) = x in CVar n) !csr in Pervasives.print_endline("migrate_group: thunkify thread going to thunkify"); let thunked = thunkify tks in Pervasives.print_endline("migrate_group: thunkify thread done thunkify"); let data = inj 2 %[(Npi1.group name * (exists t. t name * t)) + Npi1.migration] (gn, group_info, thunked) in let mar_data = marshal "Npi_end" data in Pervasives.print_endline("migrate_group: going to send marshalled: ... " (* ^ mar_data *) ); let h = Utils.the %[] !Npi1.ho in Tcp_string_messaging.send h addr mar_data ) ) (); (* must block thread initiating migration, until thunkify has completed *) try wait my_cv Npi1.groups_mutex (* Block here - thunkify will cause Thunkify_EINTR *) with Thunkify_EINTR -> () (* Migration completed -- we can now continue execution *) let local_addr () = Tcp_string_messaging.local_addr (Utils.the %[] !Npi1.ho) end mark "Npi2" module hash! Npi : sig type group val create_group : forall t. (t -> unit) -> t -> unit val create_gthread : forall t. (t->unit) -> t -> unit val recv_local : forall t. t name -> t val send_local : forall t. t name -> t -> unit val init : (Tcp.ip option * Tcp.port option) -> unit val send_remote : forall t. string -> (Tcp.addr * group name * t name) -> t -> unit val migrate_group : Tcp.addr -> unit val local_addr : unit -> Tcp.ip option * Tcp.port end = struct type group = Npi1.group let create_group = Npi2.create_group let create_gthread = Npi2.create_gthread let recv_local = Npi2.recv_local let send_local = Npi2.send_local let init = Npi2.init let send_remote = Npi2.send_remote let migrate_group = Npi2.migrate_group let local_addr = Npi2.local_addr end mark "Npi_end"