-*- Mode: Modula-3 -*-
*
* For information about this program, contact Blair MacIntyre
* (bm@cs.columbia.edu) or Steven Feiner (feiner@cs.columbia.edu)
* at the Computer Science Dept., Columbia University,
* 1214 Amsterdam Ave. Mailstop 0401, New York, NY, 10027.
*
* Copyright (C) 1995, 1996 by The Trustees of Columbia University in the
* City of New York. Blair MacIntyre, Computer Science Department.
* See file COPYRIGHT-COLUMBIA for details.
*
* Author : Blair MacIntyre
* Created On : Fri Jun 2 15:44:44 1995
* Last Modified By: Blair MacIntyre
* Last Modified On: Fri Oct 24 11:49:40 1997
* Update Count : 139
*
* $Source: /opt/cvs/cm3/doc/help/gen_html/events/src/EventPort.m3.html,v $
* $Date: 2010-04-29 17:18:13 $
* $Author: wagner $
* $Revision: 1.5 $
*
* $Log: not supported by cvs2svn $
* Revision 1.4.2.1 2010-04-15 20:58:50 wagner
* update generated HTML doc to RC5
*
* Revision 1.2 2001/12/02 00:20:37 wagner
* add copyright notes, fix overrides for cm3, and make everything compile
*
* added: events/COPYRIGHT-COLUMBIA
* added: events/src/COPYRIGHT-COLUMBIA
* modified: events/src/Event.i3
* modified: events/src/Event.m3
* modified: events/src/EventConn.i3
* modified: events/src/EventConn.m3
* modified: events/src/EventCounter.i3
* modified: events/src/EventCounter.m3
* modified: events/src/EventHandle.i3
* modified: events/src/EventIO.i3
* modified: events/src/EventNumber.i3
* modified: events/src/EventNumber.m3
* modified: events/src/EventNumberF.i3
* modified: events/src/EventPort.i3
* modified: events/src/EventPort.m3
* modified: events/src/EventProtocol.i3
* modified: events/src/EventRd.i3
* modified: events/src/EventRd.m3
* modified: events/src/EventSpaceID.i3
* modified: events/src/EventSpaceID.m3
* modified: events/src/EventStubLib.i3
* modified: events/src/EventStubLib.m3
* modified: events/src/EventWireRep.i3
* modified: events/src/EventWireRep.m3
* modified: events/src/EventWr.i3
* modified: events/src/EventWr.m3
* modified: events/src/EventWrF.i3
* modified: events/src/HostInfo.i3
* modified: events/src/HostInfo.m3
* modified: events/src/RdWrMutex.i3
* modified: events/src/RdWrMutex.m3
* modified: events/src/Work.i3
* modified: events/src/WorkerPool.i3
* modified: events/src/WorkerPool.m3
* modified: events/src/Zombie.i3
* modified: events/src/m3makefile
* modified: events/src/m3overrides
*
* Revision 1.1.1.1 2001/12/02 00:06:45 wagner
* Blair MacIntyre's events library
*
* Revision 1.6 1998/09/25 15:22:26 bm
* small bug in Event package
* make binaries dynamically linked
*
* Revision 1.5 1997/10/24 19:31:32 bm
* Added the ability to flush the readers and worker pool.
*
* Revision 1.4 1997/08/04 20:15:10 bm
* Fixed BRANDs
*
* Revision 1.3 1997/01/23 15:26:38 bm
* Lots of little bug fixes.
*
*
* HISTORY
UNSAFE MODULE EventPort;
IMPORT Rd, MsgWr, Wr, HostInfo, HostInfoTbl, Atom;
IMPORT EventSpaceID, EventProtocol, AtomList, Event, Thread, Work,
WorkerPool, RdClass, WrClass, EventIO, EventConn, EventSeq,
EventConnList, Fingerprint;
FROM EventProtocol IMPORT ID, StubProtocol;
FROM Event IMPORT Error;
IMPORT IO, Fmt;
REVEAL RdClass.Private <: MUTEX;
REVEAL WrClass.Private <: MUTEX;
REVEAL
T = Public BRANDED "EventPort.T" OBJECT
num : CARDINAL;
debug : BOOLEAN;
mu : Thread.Mutex := NIL;
ports : HostInfoTbl.T;
wp : WorkerPool.T;
cb : ARRAY [FIRST(ID) .. LAST(ID)] OF EventCallBack;
METHODS
problem(conn: EventConn.T; at: AtomList.T) := DispatchProblem;
OVERRIDES
init := Init;
connect := Connect;
disconnect := Disconnect;
send := Send;
mcast := MCast;
register := Register;
stealWorker := StealWorker;
flushReader := FlushReader;
flushWork := FlushWork;
END;
VAR portNumber: CARDINAL := 0;
PROCEDURE DebugMsg (self: T; msg: TEXT) =
BEGIN
IF self.debug THEN
IO.Put("EventPort" & Fmt.Int(self.num) & ": " & msg & "\n");
END;
END DebugMsg;
PROCEDURE StealWorker (self: T): BOOLEAN =
BEGIN
RETURN self.wp.stealWorker();
END StealWorker;
PROCEDURE EventPortReadFlush (hinfo: HostInfo.T) RAISES {Thread.Alerted} =
BEGIN
LOCK hinfo.rdmu DO
IF hinfo.blocking^ THEN RETURN END;
Thread.AlertWait(hinfo.rdmu, hinfo.rdcv);
END;
END EventPortReadFlush;
PROCEDURE FlushReader(self: T) RAISES {Thread.Alerted} =
VAR key: Fingerprint.T;
val: HostInfo.T;
BEGIN
(* we must first flush all the incoming readers, and then wait for
the worker pool to finish it's work *)
WITH it = self.ports.iterate() DO
WHILE it.next(key,val) DO
EventPortReadFlush(val);
END;
END;
IF Thread.TestAlert() THEN RAISE Thread.Alerted END;
END FlushReader;
PROCEDURE FlushWork(self: T) RAISES {Thread.Alerted} =
BEGIN
self.wp.flush();
IF Thread.TestAlert() THEN RAISE Thread.Alerted END;
END FlushWork;
PROCEDURE Init (self: T; debug := FALSE): T =
BEGIN
self.debug := debug;
self.num := portNumber;
INC(portNumber);
self.ports := NEW(HostInfoTbl.Default).init();
(* By default, we want extra space, size we are doing a lot before
getting to the users code. *)
self.wp := NEW(WorkerPool.T).init(maxThreads := 2,
maxIdleThreads := 2,
stackSize := 200000);
self.mu := NEW(Thread.Mutex);
FOR i := FIRST(ID) TO LAST(ID) DO
self.cb[i].proc := DefaultDispatcher;
self.cb[i].data := NIL;
END;
IF self.debug THEN
DebugMsg(self, "initialized.");
END;
RETURN self;
END Init;
PROCEDURE DefaultDispatcher (<*UNUSED*> ev : Event.T;
<*UNUSED*> data: REFANY ) =
BEGIN
(* Just drop it. *)
END DefaultDispatcher;
PROCEDURE Connect (self: T; conn: EventConn.T) RAISES {Error} =
VAR hinfo: HostInfo.T;
BEGIN
LOCK self.mu DO
IF self.debug THEN
DebugMsg(
self, "connect called for " & EventSpaceID.ToText(conn.space));
END;
IF self.ports.get(conn.space, hinfo) THEN
RAISE Error(AtomList.List1(Atom.FromText("Duplicate space")));
END;
hinfo :=
NEW(HostInfo.T, conn := conn,
rdmu := NEW(Thread.Mutex), rdcv := NEW(Thread.Condition),
blocking := NEW (REF BOOLEAN),
mu := NEW(Thread.Mutex), cv := NEW(Thread.Condition),
es := NEW(EventSeq.T).init());
hinfo.reader := Thread.Fork(NEW(RdConnectionClosure, conn := conn,
mu := hinfo.rdmu, cv := hinfo.rdcv,
blocking := hinfo.blocking,
wp := self.wp, ep := self));
hinfo.writer := Thread.Fork(NEW(WrConnectionClosure, ep := self,
es := hinfo.es, mu := hinfo.mu,
conn := conn, cv := hinfo.cv));
EVAL self.ports.put(conn.space, hinfo);
IF self.debug THEN
DebugMsg(self, "connect succeeded.");
END;
END;
END Connect;
PROCEDURE Disconnect (self: T; conn: EventConn.T): EventSeq.T
RAISES {Error} =
VAR hinfo: HostInfo.T;
BEGIN
LOCK self.mu DO
IF self.debug THEN
DebugMsg(self, "disconnect from " & EventSpaceID.ToText(conn.space));
END;
IF self.ports.delete(conn.space, hinfo) = FALSE THEN
RAISE Error(AtomList.List1(Atom.FromText("Unknown space")));
END;
END;
LOCK hinfo DO
Thread.Alert(hinfo.reader);
Thread.Alert(hinfo.writer);
EVAL Thread.Join(hinfo.reader);
EVAL Thread.Join(hinfo.writer);
hinfo.reader := NIL;
hinfo.writer := NIL;
IF self.debug THEN
DebugMsg(self, "disconnect succeeded.");
END;
IF hinfo.es.size() > 0 THEN RETURN hinfo.es; ELSE RETURN NIL; END;
END;
END Disconnect;
TODO: use prot!
PROCEDURE Register ( self: T;
id : ID;
<*UNUSED*> prot: StubProtocol;
disp: Dispatcher;
data: REFANY ) RAISES {Error} =
BEGIN
LOCK self.mu DO
IF disp # DefaultDispatcher
AND self.cb[id].proc # DefaultDispatcher THEN
RAISE Error(AtomList.List1(
Atom.FromText("Dispatcher already registered")));
END;
self.cb[id].proc := disp;
self.cb[id].data := data;
END;
END Register;
-----------------------sending events-------------------------------
PROCEDURE Send (self: T; conn: EventConn.T; ev: Event.T) RAISES {Error} =
VAR hinfo: HostInfo.T;
signal: BOOLEAN := FALSE;
BEGIN
IF self.debug THEN
DebugMsg(self, "Send: enqueuing event " & Event.ToText(ev) &
" for " & EventSpaceID.ToText(conn.space));
END;
LOCK self.mu DO
IF NOT self.ports.get(conn.space, hinfo) OR hinfo.conn # conn THEN
RAISE Error(AtomList.List1(Atom.FromText("Unregistered space")));
END;
END;
LOCK hinfo DO
LOCK hinfo.mu DO
IF hinfo.es.size() = 0 THEN signal := TRUE END;
ev.addRef();
hinfo.es.addhi(ev);
END;
IF signal THEN Thread.Signal(hinfo.cv) END;
END;
IF self.debug THEN
DebugMsg(self, "Send: enqueued event");
END;
END Send;
We guarantee to call send on every connection.
PROCEDURE MCast (self: T; cs: EventConnList.T; ev: Event.T)
RAISES {Error} =
VAR
atomList: AtomList.T := NIL;
BEGIN
IF self.debug THEN
DebugMsg(self, "mcast called.");
END;
WHILE cs # NIL DO
TRY
self.send(cs.head, ev);
EXCEPT
| Event.Error (al) =>
atomList := AtomList.AppendD(AtomList.Cons(Atom.FromText(
"Send to " & EventSpaceID.ToText(cs.head.space)
& " raised error (next atom)"), al), atomList);
END;
cs := cs.tail;
END;
(* Now, reraise the exception. *)
IF atomList # NIL THEN RAISE Event.Error(atomList) END;
IF self.debug THEN
DebugMsg(self, "mcast done.");
END;
END MCast;
TYPE
WrConnectionClosure = Thread.Closure OBJECT
es: EventSeq.T;
mu: Thread.Mutex;
cv: Thread.Condition;
conn: EventConn.T;
ep : T;
OVERRIDES
apply := EventPortWriteApply;
END;
PROCEDURE EventPortWriteApply (self: WrConnectionClosure): REFANY =
VAR ev: Event.T;
wr: MsgWr.T;
BEGIN
TRY
LOOP
IF self.ep.debug THEN
DebugMsg(self.ep, "WrConnectionClosure writing next event.");
END;
LOCK self.mu DO
WHILE self.es.size() = 0 DO Thread.AlertWait(self.mu, self.cv) END;
ev := self.es.remlo();
wr := self.conn.wr;
END;
(* If the writer is NIL, exit *)
IF wr = NIL THEN EXIT END;
DebugMsg(self.ep, "WrConnectionClosure: sending event " &
Event.ToText(ev) & " to " & EventSpaceID.ToText(self.conn.space));
EventIO.Write(wr, ev);
wr.nextMsg();
wr.flush();
ev.dropRef();
END;
EXCEPT
| Thread.Alerted =>
| Wr.Failure (al) =>
self.ep.problem(self.conn, al);
| Rd.Failure (al) =>
self.ep.problem(self.conn, al);
END;
self.mu := NIL;
self.cv := NIL;
self.es := NIL;
self.conn := NIL;
IF self.ep.debug THEN
DebugMsg(self.ep, "WrConnectionClosure exiting.");
END;
self.ep := NIL;
RETURN NIL;
END EventPortWriteApply;
-----------------------reading events---------------
TYPE
EventCallBack = RECORD
proc: Dispatcher;
data: REFANY;
END;
TYPE
EventWork = Work.T OBJECT
next: EventWork := NIL;
event: Event.T;
ep: T;
OVERRIDES
handle := EventJobber;
END;
ProblemWork = Work.T OBJECT
conn: EventConn.T;
al: AtomList.T;
ep: T;
OVERRIDES
handle := ProblemJobber;
END;
VAR idlework: EventWork := NIL;
mu: MUTEX := NIL;
PROCEDURE NewEventWork(ep: T; event: Event.T): EventWork =
VAR ret: EventWork;
BEGIN
LOCK mu DO
IF idlework # NIL THEN
ret := idlework;
ret.ep := ep;
ret.event := event;
idlework := ret.next;
ret.next := NIL;
ELSE
ret := NEW(EventWork, ep := ep, event := event);
END;
END;
RETURN ret;
END NewEventWork;
PROCEDURE EventJobber (work: EventWork)
RAISES {Thread.Alerted} =
BEGIN
IF work.ep.debug THEN
DebugMsg(work.ep, "Jobber received EventWork.");
END;
WITH id = work.event.hdr.rep.id DO
work.ep.cb[id].proc(work.event, work.ep.cb[id].data);
IF work.ep.debug THEN
DebugMsg(work.ep, "Jobber handed off EventWork.");
END;
END;
(* Free and cache the Event and EventWork *)
LOCK mu DO
work.next := idlework;
idlework := work;
work.event.dropRef();
work.event := NIL;
work.ep := NIL;
END;
END EventJobber;
PROCEDURE ProblemJobber (work: ProblemWork) =
BEGIN
IF work.ep.debug THEN
DebugMsg(work.ep, "Jobber received ProblemWork.");
END;
work.conn.problem(work.al);
work.conn.rd := NIL;
work.conn.wr := NIL;
IF work.ep.debug THEN
DebugMsg(work.ep, "Jobber notified client of problem.");
END;
END ProblemJobber;
TYPE
RdConnectionClosure = Thread.Closure OBJECT
conn: EventConn.T;
wp : WorkerPool.T;
ep : T;
mu: Thread.Mutex;
cv: Thread.Condition;
blocking: REF BOOLEAN;
OVERRIDES
apply := EventPortReadApply;
END;
PROCEDURE EventPortReadApply (self: RdConnectionClosure): REFANY =
BEGIN
TRY
LOOP
IF self.ep.debug THEN
DebugMsg(self.ep, "RdConnectionClosure waiting for next event.");
END;
(* If the are no more characters, we caught up for now *)
LOCK self.mu DO
IF Rd.CharsReady(self.conn.rd) = 0 THEN
Thread.Broadcast(self.cv);
self.blocking^ := TRUE;
END;
END;
IF NOT self.conn.rd.nextMsg() THEN
EXIT;
END;
self.blocking^ := FALSE;
WITH ev = EventIO.Read(self.conn.rd) DO
ev.sender := self.conn;
self.wp.add(NewEventWork(self.ep, ev));
IF self.ep.debug THEN
DebugMsg(
self.ep, "RdConnectionClosure received event "
& Fmt.Int(ev.hdr.rep.id) & "/" & Fmt.Int(ev.prot)
& "/" & Fmt.Int(Rd.Length(self.conn.rd))
& " and added to work queue.");
END;
END;
END;
(* "nextMsg()" failed. *)
self.ep.problem(self.conn, AtomList.List1(
Atom.FromText("EventPort.rd.nextMsg() failure")));
EXCEPT
| Thread.Alerted =>
| Rd.Failure (al) =>
self.ep.problem(self.conn, al);
| Event.Error (al) =>
self.ep.problem(self.conn, al);
END;
self.wp := NIL;
self.conn := NIL;
IF self.ep.debug THEN
DebugMsg(self.ep, "RdConnectionClosure exiting.");
END;
self.ep := NIL;
RETURN NIL;
END EventPortReadApply;
PROCEDURE DispatchProblem(self: T; conn: EventConn.T; al: AtomList.T) =
BEGIN
self.wp.add(NEW(ProblemWork, ep := self, conn := conn, al := al));
END DispatchProblem;
BEGIN
mu := NEW(MUTEX);
END EventPort.