Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
(* ******************************************************************* *) (* ** ** *) (* ** Local_channel ** *) (* ** ** *) (* ******************************************************************* *) (* Module Local_channel provides simple typed asynchronous local channels. Function send : forall t. t name -> t -> unit sends a message on the specified name, returning immediately. Function recv : forall t. t name -> (t -> unit) -> unit registers a receiver on the specified name, returning immediately. As soon as there is both a message and a receiver for a name the receiver is applied to the message. The receiver is then removed. The interface uses (T name) as the type of channels carrying values of type T. Exposing the fact that this is a name type allows clients to use any of the methods for constructing shared typed names that Acute provides. One might instead think of using ML-style references as channel `names'. For a local implementation that would be fine, but one one marshalled values mentioning channels the whole channel data structure would be copied, which is not our desired semantics. Internally, the pending messages and receivers on the channels are stored in a list of existential packages, of type (exists t. t name * (t list ref * (t->unit) list ref)) list with the Acute namecase operation used in lookups. This is a hash! module. There is module state: the handle h consists of a mutex name and a pointer to the channel data structure. (Here h is exposed, abstractly, in the interface, purely to work around the current lack of width subsignaturing.) Nonetheless, rebinding to local instances of Local_channel.send and Local_channel.recv should just work, so we use the hash! mode to give an exact-hash version (and, as part of that workaround, to make the abstract type of h hash-generated). One might think of passing the handle explicitly as an argument to send and recv, doing without module state. That again would lead to the wrong semantics for marshalling values that use this library. *) (* NB: fields marked by (*A*) will be removed from the interface *) (* when width subsignaturing is added *) module hash! Local_channel : sig type handle (*A*) val h : handle (*A*) val send : forall t. t name -> t -> unit val recv : forall t. t name -> (t -> unit) -> unit end = struct type handle = mutex name * (exists t. t name * (t list ref * (t->unit) list ref)) list ref let h = (let n = fresh in create_mutex n; n, ref [] ) (* A handle consists of a mutex and a reference to a list of channel structures. Each channel structure is an existential package containing a name, a reference to a list of pending messages and a reference to a list of pending receptors. We maintain the invariant that at most one of those two is nonempty. Channel structures are added to the list as necessary. At present they are never removed; we could remove them when they become empty. The pending messages and pending receptors are kept with the oldest at the heads of the lists. *) (* Note the use of namecase below *) let send = Function t -> fun (cn: t name) (v: t) -> let (m,csr) = h in Utils.locked_by_stmt m (function () -> let rec lookup cs' = match cs' with [] -> csr := ( {t,(cn,(ref (v::[]),ref []))} as exists t. t name * (t list ref * (t->unit) list ref) ) :: !csr | (c: exists t'. t' name * (t' list ref * (t'->unit) list ref))::cs0 -> namecase c with {t',(cn',xyz)} when cn'=cn -> let ((msgs: t list ref),rcvrs)=xyz in match !rcvrs with (* in this branch the typechecker needs to know t=t' *) [] -> msgs := (!msgs @ (v::[])) | rcvr::rcvrs0 -> ( rcvrs:=rcvrs0; (* could remove this whole channel if it's become empty*) create_thread fresh rcvr v) otherwise -> lookup cs0 in lookup !csr ) let recv = Function t -> fun (cn: t name) (f: t -> unit) -> let (m,csr) = h in Utils.locked_by_stmt m (function () -> let rec lookup cs' = match cs' with [] -> csr := ({t, (cn,(ref [],ref (f::[])))} as exists t. t name * (t list ref * (t->unit) list ref)) :: !csr | (c: exists t'. t' name * (t' list ref * (t'->unit) list ref))::cs0 -> namecase c with {t,(cn',x)} when cn'=cn -> let ((msgs: t list ref),rcvrs)=x in match !msgs with [] -> rcvrs := !rcvrs @ (f::[]) | v::vs0 -> ( msgs:=vs0; create_thread %[t] fresh f v) otherwise -> lookup cs0 in lookup !csr ) end mark "LChan" (* TODO: Extend with replicated input and with blocking receive. *)