Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
(* ******************************************************************* *)
(* ** ** *)
(* ** Local_channel ** *)
(* ** ** *)
(* ******************************************************************* *)
(* Module Local_channel provides simple typed asynchronous local
channels.
Function send : forall t. t name -> t -> unit sends a message
on the specified name, returning immediately.
Function recv : forall t. t name -> (t -> unit) -> unit
registers a receiver on the specified name, returning immediately.
As soon as there is both a message and a receiver for a name the
receiver is applied to the message. The receiver is then removed.
The interface uses (T name) as the type of channels carrying values
of type T. Exposing the fact that this is a name type allows
clients to use any of the methods for constructing shared typed
names that Acute provides.
One might instead think of using ML-style references as channel
`names'. For a local implementation that would be fine, but one
one marshalled values mentioning channels the whole channel data
structure would be copied, which is not our desired semantics.
Internally, the pending messages and receivers on the channels are
stored in a list of existential packages, of type
(exists t. t name * (t list ref * (t->unit) list ref)) list
with the Acute namecase operation used in lookups.
This is a hash! module. There is module state: the handle h
consists of a mutex name and a pointer to the channel data
structure. (Here h is exposed, abstractly, in the interface,
purely to work around the current lack of width subsignaturing.)
Nonetheless, rebinding to local instances of Local_channel.send and
Local_channel.recv should just work, so we use the hash! mode to
give an exact-hash version (and, as part of that workaround, to
make the abstract type of h hash-generated).
One might think of passing the handle explicitly as an argument to
send and recv, doing without module state. That again would lead
to the wrong semantics for marshalling values that use this
library.
*)
(* NB: fields marked by (*A*) will be removed from the interface *)
(* when width subsignaturing is added *)
module hash! Local_channel :
sig
type handle (*A*)
val h : handle (*A*)
val send : forall t. t name -> t -> unit
val recv : forall t. t name -> (t -> unit) -> unit
end
=
struct
type handle = mutex name * (exists t. t name * (t list ref * (t->unit) list ref)) list ref
let h = (let n = fresh in create_mutex n; n, ref [] )
(* A handle consists of a mutex and a reference to a list of
channel structures. Each channel structure is an existential
package containing a name, a reference to a list of pending
messages and a reference to a list of pending receptors. We
maintain the invariant that at most one of those two is
nonempty.
Channel structures are added to the list as necessary. At
present they are never removed; we could remove them when they
become empty.
The pending messages and pending receptors are kept with the
oldest at the heads of the lists.
*)
(* Note the use of namecase below *)
let send = Function t -> fun (cn: t name) (v: t) ->
let (m,csr) = h in
Utils.locked_by_stmt m
(function () ->
let rec lookup cs' = match cs' with
[] -> csr :=
( {t,(cn,(ref (v::[]),ref []))} as exists t. t name * (t list ref * (t->unit) list ref) )
:: !csr
| (c: exists t'. t' name * (t' list ref * (t'->unit) list ref))::cs0 ->
namecase c with
{t',(cn',xyz)} when cn'=cn ->
let ((msgs: t list ref),rcvrs)=xyz in
match !rcvrs with (* in this branch the typechecker needs to know t=t' *)
[] -> msgs := (!msgs @ (v::[]))
| rcvr::rcvrs0 -> (
rcvrs:=rcvrs0; (* could remove this whole channel if it's become empty*)
create_thread fresh rcvr v)
otherwise ->
lookup cs0
in lookup !csr
)
let recv = Function t -> fun (cn: t name) (f: t -> unit) ->
let (m,csr) = h in
Utils.locked_by_stmt m
(function () ->
let rec lookup cs' = match cs' with
[] -> csr := ({t, (cn,(ref [],ref (f::[])))} as
exists t. t name * (t list ref * (t->unit) list ref)) :: !csr
| (c: exists t'. t' name * (t' list ref * (t'->unit) list ref))::cs0 ->
namecase c with
{t,(cn',x)} when cn'=cn ->
let ((msgs: t list ref),rcvrs)=x in
match !msgs with
[] -> rcvrs := !rcvrs @ (f::[])
| v::vs0 -> (
msgs:=vs0;
create_thread %[t] fresh f v)
otherwise ->
lookup cs0
in lookup !csr
)
end
mark "LChan"
(* TODO: Extend with replicated input and with blocking receive. *)