UNSAFE MODULE----------------------------------------------------- types and globals ---ThreadPThread EXPORTSThread ,ThreadF ,RTThread ,Scheduler ,SchedulerPosix ,RTOS ,RTHooks ,ThreadPThread ; IMPORT Cerrno, FloatMode, MutexRep, RTCollectorSRC, RTError, RTHeapRep, RTIO, RTParams, RTPerfTool, RTProcess, Thread, ThreadEvent, Time, Unix, Utime, Word, Usched, Uerror, Uexec; FROM Compiler IMPORT ThisFile, ThisLine; FROM Ctypes IMPORT int; IMPORT RuntimeError AS RTE;
CONST WAIT_UNIT = 1000000; (* one million nanoseconds, one thousandth of a second *) RETRY_INTERVAL = 10000000; (* 10 million nanoseconds, one hundredth of a second *) REVEAL Mutex = MutexRep.Public BRANDED "Mutex Pthread-1.0" OBJECT mutex: pthread_mutex_t := NIL; OVERRIDES acquire := LockMutex; release := UnlockMutex; END; Condition = BRANDED "Thread.Condition Pthread-1.0" OBJECT mutex: pthread_mutex_t := NIL; waiters: Activation := NIL; (* LL = mutex *) END; T = BRANDED "Thread.T Pthread-1.6" OBJECT act: Activation := NIL; (* live untraced thread data *) closure: Closure := NIL; (* our work and its result *) result: REFANY := NIL; (* our work and its result *) join: Condition; (* wait here to join; NIL when done *) joined: BOOLEAN := FALSE; (* Is anyone waiting yet? *) END; TYPE ActState = { Starting, Started, Stopping, Stopped }; REVEAL Activation = UNTRACED BRANDED REF RECORD frame: ADDRESS := NIL; (* exception handling support *) mutex: pthread_mutex_t := NIL; (* write-once in CreateT *) cond: pthread_cond_t := NIL; (* write-once in CreateT; a place to park while waiting *) alerted : BOOLEAN := FALSE; (* LL = mutex; the alert flag *) waitingOn: pthread_mutex_t := NIL; (* LL = mutex; The CV's mutex *) nextWaiter: Activation := NIL; (* LL = mutex; waiting thread queue *) next, prev: Activation := NIL; (* LL = activeMu; global doubly-linked, circular list of all active threads *) handle: pthread_t := NIL; (* LL = activeMu; thread handle *) stackbase: ADDRESS := NIL; (* LL = activeMu; stack base for GC *) context: ADDRESS := NIL; (* LL = activeMu *) state := ActState.Started; (* LL = activeMu *) slot: INTEGER; (* LL = slotMu; index in slots *) floatState : FloatMode.ThreadState; (* per-thread floating point state *) heapState : RTHeapRep.ThreadState; (* per-thread heap state *) END; PROCEDURE----------------------------------------------------------------- Mutex ---SetState (act: Activation; state: ActState) = CONST text = ARRAY ActState OF TEXT { "Starting", "Started", "Stopping", "Stopped" }; BEGIN act.state := state; IF DEBUG THEN RTIO.PutText(text[state]); RTIO.PutText(" act="); RTIO.PutAddr(act); RTIO.PutText("\n"); RTIO.Flush(); END; END SetState;
PROCEDURE---------------------------------------- Condition variables and Alerts ---Acquire (m: Mutex) = BEGIN m.acquire (); END Acquire; PROCEDURERelease (m: Mutex) = BEGIN m.release (); END Release; PROCEDURECleanMutex (r: REFANY) = VAR m := NARROW(r, Mutex); BEGIN pthread_mutex_delete(m.mutex); m.mutex := NIL; END CleanMutex; PROCEDUREInitMutex (VAR m: pthread_mutex_t; root: REFANY; Clean: PROCEDURE(root: REFANY)) = VAR mutex := pthread_mutex_new(); BEGIN WITH r = pthread_mutex_lock(initMu) DO <*ASSERT r=0*> END; IF m = NIL THEN (* We won the race. *) IF mutex = NIL THEN (* But we failed. *) WITH r = pthread_mutex_unlock(initMu) DO <*ASSERT r=0*> END; RTE.Raise (RTE.T.OutOfMemory); ELSE (* We won the race and succeeded. *) m := mutex; WITH r = pthread_mutex_unlock(initMu) DO <*ASSERT r=0*> END; RTHeapRep.RegisterFinalCleanup (root, Clean); END; ELSE (* another thread beat us in the race, ok *) WITH r = pthread_mutex_unlock(initMu) DO <*ASSERT r=0*> END; pthread_mutex_delete(mutex); END; END InitMutex; PROCEDURELockMutex (m: Mutex) = BEGIN IF m.mutex = NIL THEN InitMutex(m.mutex, m, CleanMutex) END; IF perfOn THEN PerfChanged(State.locking) END; WITH r = pthread_mutex_lock(m.mutex) DO IF r # 0 THEN DieI(ThisLine(), r) END; END; IF perfOn THEN PerfRunning() END; END LockMutex; PROCEDUREUnlockMutex (m: Mutex) = (* LL = m *) BEGIN IF m.mutex = NIL THEN InitMutex(m.mutex, m, CleanMutex) END; WITH r = pthread_mutex_unlock(m.mutex) DO IF r # 0 THEN DieI(ThisLine(), r) END; END; END UnlockMutex;
PROCEDURE------------------------------------------------------------------ Self ---CleanCondition (r: REFANY) = VAR c := NARROW(r, Condition); BEGIN pthread_mutex_delete(c.mutex); c.mutex := NIL; END CleanCondition; PROCEDUREXWait (self: Activation; m: Mutex; c: Condition; alertable: BOOLEAN) RAISES {Alerted} = (* LL = m *) VAR next, prev: Activation; BEGIN IF c.mutex = NIL THEN InitMutex(c.mutex, c, CleanCondition) END; WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END; <*ASSERT self.waitingOn = NIL*> <*ASSERT self.nextWaiter = NIL*> WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END; self.waitingOn := c.mutex; self.nextWaiter := c.waiters; c.waiters := self; WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END; m.release(); IF perfOn THEN PerfChanged(State.waiting) END; LOOP IF alertable AND self.alerted THEN self.alerted := FALSE; <*ASSERT self.waitingOn = c.mutex*> WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END; next := c.waiters; prev := NIL; WHILE next # self DO <*ASSERT next # NIL*> prev := next; next := next.nextWaiter; END; IF prev = NIL THEN c.waiters := self.nextWaiter; ELSE prev.nextWaiter := self.nextWaiter; END; WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END; self.nextWaiter := NIL; self.waitingOn := NIL; WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END; m.acquire(); RAISE Alerted; END; WITH r = pthread_cond_wait(self.cond, self.mutex) DO <*ASSERT r=0*> END; IF self.waitingOn = NIL THEN <*ASSERT self.nextWaiter = NIL*> WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END; m.acquire(); RETURN; END; END; END XWait; PROCEDUREAlertWait (m: Mutex; c: Condition) RAISES {Alerted} = (* LL = m *) BEGIN XWait(GetActivation(), m, c, alertable := TRUE); END AlertWait; PROCEDUREWait (m: Mutex; c: Condition) = <*FATAL Alerted*> (* LL = m *) VAR self := GetActivation(); BEGIN XWait(self, m, c, alertable := FALSE); END Wait; PROCEDUREDequeueHead (c: Condition) = (* LL = c *) VAR t := c.waiters; BEGIN WITH r = pthread_mutex_lock(t.mutex) DO <*ASSERT r=0*> END; c.waiters := t.nextWaiter; t.nextWaiter := NIL; t.waitingOn := NIL; WITH r = pthread_cond_signal(t.cond) DO <*ASSERT r=0*> END; WITH r = pthread_mutex_unlock(t.mutex) DO <*ASSERT r=0*> END; END DequeueHead; PROCEDURESignal (c: Condition) = BEGIN IF c.mutex = NIL THEN InitMutex(c.mutex, c, CleanCondition) END; WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END; IF c.waiters # NIL THEN DequeueHead(c) END; WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END; END Signal; PROCEDUREBroadcast (c: Condition) = BEGIN IF c.mutex = NIL THEN InitMutex(c.mutex, c, CleanCondition) END; WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END; WHILE c.waiters # NIL DO DequeueHead(c) END; WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END; END Broadcast; PROCEDUREAlert (thread: T) = VAR t := thread.act; BEGIN WITH r = pthread_mutex_lock(t.mutex) DO <*ASSERT r=0*> END; t.alerted := TRUE; WITH r = pthread_cond_signal(t.cond) DO <*ASSERT r=0*> END; WITH r = pthread_mutex_unlock(t.mutex) DO <*ASSERT r=0*> END; END Alert; PROCEDUREXTestAlert (self: Activation): BOOLEAN = VAR result: BOOLEAN; BEGIN WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END; result := self.alerted; self.alerted := FALSE; WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END; RETURN result; END XTestAlert; PROCEDURETestAlert (): BOOLEAN = VAR self := GetActivation(); BEGIN RETURN XTestAlert(self); END TestAlert;
VAR (* LL = slotMu *) n_slotted := 0; next_slot := 1; slots: REF ARRAY OF T; (* NOTE: we don't use slots[0] *) PROCEDURE------------------------------------------------------------ Fork, Join ---InitActivations (): Activation = VAR me := NEW(Activation); BEGIN me.handle := pthread_self(); me.next := me; me.prev := me; SetActivation(me); (* Explicitly (re)initialize to handle fork(). *) next_slot := 1; (* no threads created yet *) slots := NIL; (* no threads created yet *) n_slotted := 0; (* no threads created yet *) allThreads := me; FloatMode.InitThread(me.floatState); RETURN me; END InitActivations; PROCEDURESelf (): T = (* If not the initial thread and not created by Fork, returns NIL *) VAR me := GetActivation(); t: T; BEGIN IF me = NIL THEN Die(ThisLine(), "Thread primitive called from non-Modula-3 thread") END; WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END; t := slots[me.slot]; WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END; IF (t.act # me) THEN Die(ThisLine(), "thread with bad slot!") END; RETURN t; END Self; PROCEDUREAssignSlot (t: T) = (* LL = 0, cause we allocate stuff with NEW! *) VAR n: CARDINAL; new_slots: REF ARRAY OF T; BEGIN WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END; (* make sure we have room to register this guy *) IF (slots = NIL) THEN WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END; slots := NEW (REF ARRAY OF T, 20); WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END; END; IF (n_slotted >= LAST (slots^)) THEN n := NUMBER (slots^); WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END; new_slots := NEW (REF ARRAY OF T, n+n); WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END; IF (n = NUMBER (slots^)) THEN (* we won any races that may have occurred. *) SUBARRAY (new_slots^, 0, n) := slots^; slots := new_slots; ELSIF (n_slotted < LAST (slots^)) THEN (* we lost a race while allocating a new slot table, and the new table has room for us. *) ELSE (* ouch, the new table is full too! Bail out and retry *) WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END; AssignSlot (t); RETURN; END; END; (* look for an empty slot *) WHILE (slots [next_slot] # NIL) DO INC (next_slot); IF (next_slot >= NUMBER (slots^)) THEN next_slot := 1; END; END; INC (n_slotted); t.act.slot := next_slot; slots [next_slot] := t; WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END; END AssignSlot; PROCEDUREFreeSlot (t: T; act: Activation) = (* LL = 0 *) BEGIN WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END; DEC (n_slotted); WITH z = slots [act.slot] DO IF z # t THEN Die (ThisLine(), "unslotted thread!"); END; z := NIL; END; t := NIL; (* drop traced reference *) act.slot := 0; WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END; END FreeSlot; PROCEDUREDumpThread (t: Activation) = BEGIN RTIO.PutText("Activation: "); RTIO.PutAddr(t); RTIO.PutChar('\n'); RTIO.PutText(" slot: "); RTIO.PutInt(t.slot); RTIO.PutChar('\n'); RTIO.PutText(" mutex: "); RTIO.PutAddr(t.mutex); RTIO.PutChar('\n'); RTIO.PutText(" cond: "); RTIO.PutAddr(t.cond); RTIO.PutChar('\n'); RTIO.PutText(" alerted: "); RTIO.PutInt(ORD(t.alerted)); RTIO.PutChar('\n'); RTIO.PutText(" waitingOn: "); RTIO.PutAddr(t.waitingOn); RTIO.PutChar('\n'); RTIO.PutText(" nextWaiter: "); RTIO.PutAddr(t.nextWaiter); RTIO.PutChar('\n'); RTIO.PutText(" frame: "); RTIO.PutAddr(t.frame); RTIO.PutChar('\n'); RTIO.PutText(" next: "); RTIO.PutAddr(t.next); RTIO.PutChar('\n'); RTIO.PutText(" prev: "); RTIO.PutAddr(t.prev); RTIO.PutChar('\n'); RTIO.PutText(" handle: "); RTIO.PutAddr(t.handle); RTIO.PutChar('\n'); RTIO.PutText(" stackbase: "); RTIO.PutAddr(t.stackbase); RTIO.PutChar('\n'); RTIO.PutText(" context: "); RTIO.PutAddr(t.context); RTIO.PutChar('\n'); RTIO.PutText(" state: "); CASE t.state OF | ActState.Started => RTIO.PutText("Started\n"); | ActState.Stopped => RTIO.PutText("Stopped\n"); | ActState.Starting => RTIO.PutText("Starting\n"); | ActState.Stopping => RTIO.PutText("Stopping\n"); END; RTIO.Flush(); END DumpThread; PROCEDUREDumpThreads () = VAR t := allThreads; BEGIN REPEAT DumpThread(t); t := t.next UNTIL t = allThreads; END DumpThreads;
VAR (* LL=activeMu *) allThreads: Activation := NIL; (* global list of active threads *) PROCEDUREThreadBase calls RunThread after finding (approximately) where its stack begins. This dance ensures that all of ThreadMain's traced references are within the stack scanned by the collector.CleanThread (r: REFANY) = VAR t := NARROW(r, T); BEGIN pthread_mutex_delete(t.act.mutex); pthread_cond_delete(t.act.cond); DISPOSE(t.act); END CleanThread; PROCEDURECreateT (act: Activation): T = (* LL = 0, because allocating a traced reference may cause the allocator to start a collection which will call "SuspendOthers" which will try to acquire "activeMu". *) VAR t := NEW(T, act := act); mutex := pthread_mutex_new(); cond := pthread_cond_new(); BEGIN IF (mutex = NIL) OR (cond = NIL) THEN pthread_mutex_delete(mutex); pthread_cond_delete(cond); RTE.Raise(RTE.T.OutOfMemory); END; act.mutex := mutex; act.cond := cond; RTHeapRep.RegisterFinalCleanup (t, CleanThread); t.join := NEW(Condition); AssignSlot (t); RETURN t; END CreateT;
PROCEDURE---------------------------------------------------- Scheduling support ---ThreadBase (param: ADDRESS): ADDRESS = VAR me: Activation := param; BEGIN SetActivation(me); me.stackbase := ADR(me); (* enable GC scanning of this stack *) me.handle := pthread_self(); (* add to the list of active threads *) WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END; me.next := allThreads; me.prev := allThreads.prev; allThreads.prev.next := me; allThreads.prev := me; WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END; FloatMode.InitThread (me.floatState); RunThread(me); me.stackbase := NIL; (* disable GC scanning of my stack *) (* remove from the list of active threads *) WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END; <*ASSERT allThreads # me*> me.next.prev := me.prev; me.prev.next := me.next; WITH r = pthread_detach_self() DO <*ASSERT r=0*> END; WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END; me.next := NIL; me.prev := NIL; RETURN NIL; END ThreadBase; PROCEDURERunThread (me: Activation) = VAR self: T; BEGIN IF perfOn THEN PerfChanged(State.alive) END; WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END; self := slots [me.slot]; WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END; IF perfOn THEN PerfRunning() END; (*** Run the user-level code. ***) self.result := self.closure.apply(); IF perfOn THEN PerfChanged(State.dying) END; (* Join *) LOCK joinMu DO Broadcast(self.join); self.join := NIL; (* mark me done *) END; IF perfOn THEN PerfChanged(State.dead) END; (* we're dying *) RTHeapRep.FlushThreadState(me.heapState); IF perfOn THEN PerfDeleted() END; FreeSlot(self, me); (* Since we're no longer slotted, we cannot touch traced refs. *) self := NIL; (* drop traced reference *) END RunThread; VAR joinMu: MUTEX; PROCEDUREFork (closure: Closure): T = VAR act := NEW(Activation); t := CreateT(act); size := defaultStackSize; BEGIN t.closure := closure; (* determine the initial size of the stack for this thread *) TYPECASE closure OF | SizedClosure (scl) => size := scl.stackSize; ELSE (*skip*) END; WITH r = thread_create(size * ADRSIZE(Word.T), ThreadBase, act) DO IF r # 0 THEN DieI(ThisLine(), r) END; END; RETURN t; END Fork; PROCEDUREXJoin (self: Activation; t: T; alertable: BOOLEAN): REFANY RAISES {Alerted} = BEGIN LOCK joinMu DO IF t.joined THEN Die(ThisLine(), "attempt to join with thread twice") END; TRY t.joined := TRUE; WHILE t.join # NIL DO XWait(self, joinMu, t.join, alertable) END; FINALLY IF t.join # NIL THEN t.joined := FALSE END; END; END; RETURN t.result; END XJoin; PROCEDUREJoin (t: T): REFANY = <*FATAL Alerted*> VAR self := GetActivation(); BEGIN RETURN XJoin(self, t, alertable := FALSE); END Join; PROCEDUREAlertJoin (t: T): REFANY RAISES {Alerted} = VAR self := GetActivation(); BEGIN RETURN XJoin(self, t, alertable := TRUE); END AlertJoin;
PROCEDURE--------------------------------------------------- Stack size controls ---CommonSleep () = VAR wait, remaining: Utime.struct_timespec; BEGIN wait.tv_sec := 0; wait.tv_nsec := WAIT_UNIT; WHILE Nanosleep(wait, remaining) # 0 DO wait := remaining; END; END CommonSleep; PROCEDUREToNTime (n: LONGREAL; VAR ts: Utime.struct_timespec) = BEGIN ts.tv_sec := TRUNC(n); ts.tv_nsec := ROUND((n - FLOAT(ts.tv_sec, LONGREAL)) * 1.0D9); END ToNTime; PROCEDUREXPause (self: Activation; n: LONGREAL; alertable: BOOLEAN) RAISES {Alerted} = VAR until: Utime.struct_timespec; BEGIN IF n <= 0.0d0 THEN RETURN END; ToNTime(Time.Now() + n, until); IF perfOn THEN PerfChanged(State.pausing) END; WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END; <*ASSERT self.waitingOn = NIL*> <*ASSERT self.nextWaiter = NIL*> LOOP IF alertable AND self.alerted THEN self.alerted := FALSE; WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END; IF perfOn THEN PerfRunning() END; RAISE Alerted; END; WITH r = pthread_cond_timedwait(self.cond, self.mutex, until) DO IF r = Uerror.ETIMEDOUT THEN WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END; IF perfOn THEN PerfRunning() END; RETURN; END; <*ASSERT r=0*> END; END; END XPause; PROCEDUREPause (n: LONGREAL) = <*FATAL Alerted*> VAR self := GetActivation(); BEGIN XPause(self, n, alertable := FALSE); END Pause; PROCEDUREAlertPause (n: LONGREAL) RAISES {Alerted} = VAR self := GetActivation(); BEGIN XPause(self, n, alertable := TRUE); END AlertPause; PROCEDUREYield () = BEGIN WITH r = Usched.yield() DO IF r # 0 THEN DieI(ThisLine(), Cerrno.GetErrno()) END; END; END Yield; CONST FDSetSize = BITSIZE(INTEGER); TYPE FDSet = SET OF [0 .. FDSetSize-1]; FDS = REF ARRAY OF FDSet; PROCEDUREIOWait (fd: CARDINAL; read: BOOLEAN; timeoutInterval: LONGREAL := -1.0D0): WaitResult = <*FATAL Alerted*> VAR self := GetActivation(); BEGIN TRY IF perfOn THEN PerfChanged(State.blocking) END; RETURN XIOWait(self, fd, read, timeoutInterval, alertable := FALSE); FINALLY IF perfOn THEN PerfRunning() END; END; END IOWait; PROCEDUREIOAlertWait (fd: CARDINAL; read: BOOLEAN; timeoutInterval: LONGREAL := -1.0D0): WaitResult RAISES {Alerted} = VAR self := GetActivation(); BEGIN TRY IF perfOn THEN PerfChanged(State.blocking) END; RETURN XIOWait(self, fd, read, timeoutInterval, alertable := TRUE); FINALLY IF perfOn THEN PerfRunning() END; END; END IOAlertWait; PROCEDUREXIOWait (self: Activation; fd: CARDINAL; read: BOOLEAN; interval: LONGREAL; alertable: BOOLEAN): WaitResult RAISES {Alerted} = VAR res: INTEGER; fdindex := fd DIV FDSetSize; fdset := FDSet{fd MOD FDSetSize}; gReadFDS, gWriteFDS, gExceptFDS: FDS := NEW(FDS, fdindex+1); subInterval: LONGREAL := 1.0d0; PROCEDURE TestFDS (index: CARDINAL; set: FDSet; read: BOOLEAN): WaitResult = BEGIN IF (set * gExceptFDS[index]) # FDSet{} THEN IF read THEN IF (set * gReadFDS[index]) # FDSet{} THEN RETURN WaitResult.Ready; END; IF (set * gWriteFDS[index]) = FDSet{} THEN RETURN WaitResult.FDError; END; ELSE IF (set * gWriteFDS[index]) # FDSet{} THEN RETURN WaitResult.Ready; END; IF (set * gReadFDS[index]) = FDSet{} THEN RETURN WaitResult.FDError; END; END; END; RETURN WaitResult.Timeout; END TestFDS; PROCEDURE CallSelect (nfd: CARDINAL; timeout: UNTRACED REF UTime): INTEGER = TYPE FDSPtr = UNTRACED REF Unix.FDSet; VAR res: INTEGER; BEGIN FOR i := 0 TO fdindex DO gExceptFDS[i] := gReadFDS[i] + gWriteFDS[i]; END; res := Unix.select(nfd, LOOPHOLE (ADR(gReadFDS[0]), FDSPtr), LOOPHOLE (ADR(gWriteFDS[0]), FDSPtr), LOOPHOLE (ADR(gExceptFDS[0]), FDSPtr), timeout); IF res > 0 THEN FOR i := 0 TO fdindex DO gExceptFDS[i] := gExceptFDS[i] + gReadFDS[i] + gWriteFDS[i]; END; END; RETURN res; END CallSelect; BEGIN IF NOT alertable THEN subInterval := interval; ELSIF interval < 0.0d0 THEN interval := LAST(LONGREAL); ELSIF interval < subInterval THEN subInterval := interval; END; IF alertable AND XTestAlert(self) THEN RAISE Alerted END; LOOP FOR i := 0 TO fdindex-1 DO gReadFDS[i] := FDSet{}; gWriteFDS[i] := FDSet{}; END; IF read THEN gReadFDS[fdindex] := fdset; ELSE gWriteFDS[fdindex] := fdset; END; IF subInterval >= 0.0D0 THEN VAR utimeout := UTimeFromTime(subInterval); BEGIN res := CallSelect(fd+1, ADR(utimeout)); END; ELSE res := CallSelect(fd+1, NIL); END; IF alertable AND XTestAlert(self) THEN RAISE Alerted END; IF res > 0 THEN RETURN TestFDS(fdindex, fdset, read); ELSIF res = 0 THEN interval := interval - subInterval; IF interval <= 0.0d0 THEN RETURN WaitResult.Timeout END; IF interval < subInterval THEN subInterval := interval; END; ELSE IF Cerrno.GetErrno() = Uerror.EINTR THEN (* spurious wakeups are OK *) ELSE RETURN WaitResult.Error; END; END; END; END XIOWait; TYPE UTime = Utime.struct_timeval; PROCEDUREUTimeFromTime (time: Time.T): UTime = VAR floor := FLOOR(time); BEGIN RETURN UTime{floor, FLOOR(1.0D6 * (time - FLOAT(floor, LONGREAL)))}; END UTimeFromTime; PROCEDUREWaitProcess (pid: int; VAR status: int): int = (* ThreadPThread.m3 and ThreadPosix.m3 are very similar. *) BEGIN LOOP WITH r = Uexec.waitpid(pid, ADR(status), 0) DO <*ASSERT r # 0*> IF r > 0 THEN RETURN r END; IF Cerrno.GetErrno() # Uerror.EINTR THEN RETURN r END; END; END; END WaitProcess;
VAR defaultStackSize := 4096; PROCEDURE--------------------------------------------- Garbage collector support --- NOTE: These routines are called indirectly by the low-level page fault handler of the garbage collector. So, if they touched traced references, they could trigger indefinite invocations of the fault handler.GetDefaultStackSize (): CARDINAL = BEGIN RETURN defaultStackSize; END GetDefaultStackSize; PROCEDUREMinDefaultStackSize (size: CARDINAL) = BEGIN defaultStackSize := MAX(defaultStackSize, size); END MinDefaultStackSize; PROCEDUREIncDefaultStackSize (inc: CARDINAL) = BEGIN INC(defaultStackSize, inc); END IncDefaultStackSize;
In versions of SuspendOthers prior to the addition of the incremental collector, it acquired 'cm' to guarantee that no suspended thread held it. That way when the collector tried to acquire a mutex or signal a condition, it wouldn't deadlock with the suspended thread that held cm.
With the VM-synchronized, incremental collector this design is inadequate. Here's a deadlock that occurred: Thread.Broadcast held cm, then it touched its condition argument, the page containing the condition was protected by the collector, another thread started running the page fault handler, the handler called SuspendOthers, SuspendOthers tried to acquire cm.
So, SuspendOthers does not grab cm
before shutting down the other
threads. If the collector tries to use any of the thread functions
that acquire cm
, it'll be deadlocked.
VAR suspended: BOOLEAN := FALSE; (* LL=activeMu *) PROCEDURESignal based suspend/resumeSuspendOthers () = (* LL=0. Always bracketed with ResumeOthers which releases "activeMu" *) BEGIN WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END; StopWorld(); <*ASSERT NOT suspended*> suspended := TRUE; END SuspendOthers; PROCEDUREResumeOthers () = (* LL=activeMu. Always preceded by SuspendOthers. *) BEGIN <*ASSERT suspended*> suspended := FALSE; StartWorld(); WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END; END ResumeOthers; PROCEDUREProcessStacks (p: PROCEDURE (start, limit: ADDRESS)) = (* LL=activeMu. Only called within {SuspendOthers, ResumeOthers} *) VAR me := GetActivation(); act: Activation; BEGIN ProcessMe(me, p); act := me.next; WHILE act # me DO ProcessOther(act, p); act := act.next; END; END ProcessStacks; PROCEDUREProcessEachStack (p: PROCEDURE (start, limit: ADDRESS)) = (* LL=0 *) VAR me := GetActivation(); act: Activation; state: ActState; acks: int; nLive, nDead, newlySent: INTEGER; wait_nsecs := RETRY_INTERVAL; BEGIN WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END; ProcessMe(me, p); act := me.next; WHILE act # me DO (* stop *) LOOP <*ASSERT act.state = ActState.Started*> SetState(act, ActState.Stopping); IF SIG_SUSPEND = 0 THEN IF StopThread(act) THEN SetState(act, ActState.Stopped); EXIT; ELSE SetState(act, ActState.Started); END; ELSE SignalThread(act); INC(nLive); EXIT; END; CommonSleep(); END; WHILE nLive > 0 DO <*ASSERT SIG_SUSPEND # 0*> WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF acks = nLive THEN EXIT END; <*ASSERT acks < nLive*> IF wait_nsecs <= 0 THEN newlySent := 0; state := act.state; <*ASSERT state # ActState.Starting*> IF state # ActState.Stopped THEN SignalThread(act); INC(newlySent); END; WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF newlySent < nLive - acks THEN (* how did we manage to lose some? *) nLive := acks + newlySent; END; wait_nsecs := RETRY_INTERVAL; ELSE CommonSleep(); DEC(wait_nsecs, WAIT_UNIT); END; END; FOR i := 0 TO nLive - 1 DO WHILE sem_wait() # 0 DO WITH r = Cerrno.GetErrno() DO IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END; END; (*retry*) END; END; (* process *) ProcessOther(act, p); (* start *) nDead := 0; LOOP <*ASSERT act.state = ActState.Stopped*> SetState(act, ActState.Starting); IF SIG_SUSPEND = 0 THEN IF StartThread(act) THEN SetState(act, ActState.Started); EXIT; ELSE SetState(act, ActState.Stopped); END; ELSE SignalThread(act); INC(nDead); EXIT; END; CommonSleep(); END; WHILE nDead > 0 DO <*ASSERT SIG_SUSPEND # 0*> WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF acks = nDead THEN EXIT END; <*ASSERT acks < nDead*> IF wait_nsecs <= 0 THEN newlySent := 0; state := act.state; <*ASSERT state # ActState.Stopping*> IF state # ActState.Started THEN SignalThread(act); INC(newlySent); END; WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF newlySent < nDead - acks THEN (* how did we manage to lose some? *) nDead := acks + newlySent; END; wait_nsecs := RETRY_INTERVAL; ELSE CommonSleep(); DEC(wait_nsecs, WAIT_UNIT); END; END; FOR i := 0 TO nDead - 1 DO WHILE sem_wait() # 0 DO WITH r = Cerrno.GetErrno() DO IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END; END; (*retry*) END; END; END; WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END; END ProcessEachStack; PROCEDUREProcessMe (me: Activation; p: PROCEDURE (start, limit: ADDRESS)) = (* LL=activeMu *) BEGIN <*ASSERT me.state # ActState.Stopped*> IF DEBUG THEN RTIO.PutText("Processing act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush(); END; RTHeapRep.FlushThreadState(me.heapState); ProcessLive(me.stackbase, p); END ProcessMe; PROCEDUREProcessOther (act: Activation; p: PROCEDURE (start, stop: ADDRESS)) = (* LL=activeMu *) BEGIN <*ASSERT act.state = ActState.Stopped*> IF DEBUG THEN RTIO.PutText("Processing act="); RTIO.PutAddr(act); RTIO.PutText("\n"); RTIO.Flush(); END; IF act.stackbase = NIL THEN RETURN END; RTHeapRep.FlushThreadState(act.heapState); ProcessStopped(act.handle, act.stackbase, act.context, p); END ProcessOther;
PROCEDURE----------------------------------------------------------- misc. stuff ---SignalThread (act: Activation) = BEGIN <*ASSERT SIG_SUSPEND # 0*> LOOP WITH z = pthread_kill(act.handle, SIG_SUSPEND) DO IF z = 0 THEN EXIT END; IF z # Uerror.EAGAIN THEN DieI(ThisLine(), z) END; (* try it again... *) END; END; END SignalThread; PROCEDUREStopThread (act: Activation): BOOLEAN = BEGIN <*ASSERT act.state = ActState.Stopping*> <*ASSERT SIG_SUSPEND = 0*> IF NOT SuspendThread(act.handle) THEN RETURN FALSE END; IF act.heapState.inCritical # 0 THEN IF NOT RestartThread(act.handle) THEN <*ASSERT FALSE*> END; RETURN FALSE; END; RETURN TRUE; END StopThread; PROCEDUREStartThread (act: Activation): BOOLEAN = BEGIN <*ASSERT act.state = ActState.Starting*> <*ASSERT SIG_SUSPEND = 0*> RETURN RestartThread(act.handle); END StartThread; PROCEDUREStopWorld () = (* LL=activeMu *) VAR me := GetActivation(); act: Activation; state: ActState; acks: int; nLive, newlySent: INTEGER; retry: BOOLEAN; wait_nsecs := RETRY_INTERVAL; BEGIN IF DEBUG THEN RTIO.PutText("Stopping from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush(); END; nLive := 0; LOOP retry := FALSE; act := me.next; WHILE act # me DO <*ASSERT act.state # ActState.Starting*> IF act.state = ActState.Started THEN SetState(act, ActState.Stopping); IF SIG_SUSPEND = 0 THEN IF StopThread(act) THEN SetState(act, ActState.Stopped); ELSE SetState(act, ActState.Started); retry := TRUE; END; ELSE SignalThread(act); INC(nLive); END; END; act := act.next; END; IF NOT retry THEN EXIT END; CommonSleep(); END; WHILE nLive > 0 DO <*ASSERT SIG_SUSPEND # 0*> WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF acks = nLive THEN EXIT END; <*ASSERT acks < nLive*> IF wait_nsecs <= 0 THEN newlySent := 0; act := me.next; WHILE act # me DO state := act.state; <*ASSERT state # ActState.Starting*> IF state # ActState.Stopped THEN SignalThread(act); INC(newlySent); END; act := act.next; END; WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF newlySent < nLive - acks THEN (* how did we manage to lose some? *) nLive := acks + newlySent; END; wait_nsecs := RETRY_INTERVAL; ELSE CommonSleep(); DEC(wait_nsecs, WAIT_UNIT); END; END; (* drain semaphore *) FOR i := 0 TO nLive-1 DO WHILE sem_wait() # 0 DO WITH r = Cerrno.GetErrno() DO IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END; END; (*retry*) END; END; IF DEBUG THEN RTIO.PutText("Stopped from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush(); DumpThreads(); END; END StopWorld; PROCEDUREStartWorld () = (* LL=activeMu *) VAR me := GetActivation(); act: Activation; state: ActState; acks: int; nDead, newlySent: INTEGER; retry: BOOLEAN; wait_nsecs := RETRY_INTERVAL; BEGIN IF DEBUG THEN RTIO.PutText("Starting from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush(); END; nDead := 0; LOOP retry := FALSE; act := me.next; WHILE act # me DO <*ASSERT act.state # ActState.Stopping*> IF act.state # ActState.Started THEN SetState(act, ActState.Starting); IF SIG_SUSPEND = 0 THEN IF StartThread(act) THEN SetState(act, ActState.Started); ELSE SetState(act, ActState.Stopped); retry := TRUE; END; ELSE SignalThread(act); INC(nDead); END; END; act := act.next; END; IF NOT retry THEN EXIT END; CommonSleep(); END; WHILE nDead > 0 DO <*ASSERT SIG_SUSPEND # 0*> WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF acks = nDead THEN EXIT END; <*ASSERT acks < nDead*> IF wait_nsecs <= 0 THEN newlySent := 0; act := me.next; WHILE act # me DO state := act.state; <*ASSERT state # ActState.Stopping*> IF state # ActState.Started THEN SignalThread(act); INC(newlySent); END; act := act.next; END; WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END; IF newlySent < nDead - acks THEN (* how did we manage to lose some? *) nDead := acks + newlySent; END; wait_nsecs := RETRY_INTERVAL; ELSE CommonSleep(); DEC(wait_nsecs, WAIT_UNIT); END; END; (* drain semaphore *) FOR i := 0 TO nDead-1 DO WHILE sem_wait() # 0 DO WITH r = Cerrno.GetErrno() DO IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END; END; (*retry*) END; END; IF DEBUG THEN RTIO.PutText("Started from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush(); DumpThreads(); END; END StartWorld; PROCEDURESignalHandler (sig: int; <*UNUSED*>info: ADDRESS; context: ADDRESS) = VAR errno := Cerrno.GetErrno(); me := GetActivation(); BEGIN <*ASSERT sig = SIG_SUSPEND*> IF me.state = ActState.Stopping THEN IF me.heapState.inCritical # 0 THEN me.state := ActState.Started; RETURN; END; me.state := ActState.Stopped; <*ASSERT me.context = NIL*> me.context := context; WITH r = sem_post() DO <*ASSERT r=0*> END; REPEAT sigsuspend() UNTIL me.state = ActState.Starting; me.context := NIL; me.state := ActState.Started; WITH r = sem_post() DO <*ASSERT r=0*> END; END; Cerrno.SetErrno(errno); END SignalHandler;
PROCEDURE---------------------------------------------------------------- errors ---MyId (): Id RAISES {} = VAR me := GetActivation(); BEGIN IF me = NIL THEN RETURN 0 ELSE RETURN me.slot; END; END MyId; PROCEDUREMyFPState (): UNTRACED REF FloatMode.ThreadState = VAR me := GetActivation(); BEGIN RETURN ADR(me.floatState); END MyFPState; PROCEDUREMyHeapState (): UNTRACED REF RTHeapRep.ThreadState = VAR me := GetActivation(); BEGIN RETURN ADR(me.heapState); END MyHeapState; PROCEDUREDisableSwitching () = BEGIN (* no user-level thread switching *) END DisableSwitching; PROCEDUREEnableSwitching () = BEGIN (* no user-level thread switching *) END EnableSwitching;
PROCEDURE------------------------------------------------------ ShowThread hooks ---Die (lineno: INTEGER; msg: TEXT) = BEGIN RTError.Msg (ThisFile(), lineno, "Thread client error: ", msg); END Die; PROCEDUREDieI (lineno: INTEGER; i: INTEGER) = BEGIN RTError.MsgI (ThisFile(), lineno, "Thread client error: ", i); END DieI;
VAR perfW : RTPerfTool.Handle; perfOn: BOOLEAN := FALSE; (* LL = perfMu *) PROCEDURE-------------------------------------------------------- Initialization ---PerfStart () = BEGIN IF RTPerfTool.Start ("showthread", perfW) THEN perfOn := TRUE; RTProcess.RegisterExitor (PerfStop); END; END PerfStart; PROCEDUREPerfStop () = BEGIN (* UNSAFE, but needed to prevent deadlock if we're crashing! *) RTPerfTool.Close (perfW); END PerfStop; CONST EventSize = (BITSIZE(ThreadEvent.T) + BITSIZE(CHAR) - 1) DIV BITSIZE(CHAR); TYPE TE = ThreadEvent.Kind; PROCEDUREPerfChanged (s: State) = VAR e := ThreadEvent.T {kind := TE.Changed, id := MyId(), state := s}; BEGIN WITH r = pthread_mutex_lock(perfMu) DO <*ASSERT r=0*> END; perfOn := RTPerfTool.Send (perfW, ADR (e), EventSize); WITH r = pthread_mutex_unlock(perfMu) DO <*ASSERT r=0*> END; END PerfChanged; PROCEDUREPerfDeleted () = VAR e := ThreadEvent.T {kind := TE.Deleted, id := MyId()}; BEGIN WITH r = pthread_mutex_lock(perfMu) DO <*ASSERT r=0*> END; perfOn := RTPerfTool.Send (perfW, ADR (e), EventSize); WITH r = pthread_mutex_unlock(perfMu) DO <*ASSERT r=0*> END; END PerfDeleted; PROCEDUREPerfRunning () = VAR e := ThreadEvent.T {kind := TE.Running, id := MyId()}; BEGIN WITH r = pthread_mutex_lock(perfMu) DO <*ASSERT r=0*> END; perfOn := RTPerfTool.Send (perfW, ADR (e), EventSize); WITH r = pthread_mutex_unlock(perfMu) DO <*ASSERT r=0*> END; END PerfRunning;
PROCEDURE------------------------------------------------------------- collector --- These procedures provide synchronization primitives for the allocator and collector.InitWithStackBase (stackbase: ADDRESS) = VAR self: T; me: Activation; BEGIN InitC(stackbase); me := InitActivations(); me.stackbase := stackbase; self := CreateT(me); joinMu := NEW(MUTEX); PerfStart(); IF perfOn THEN PerfRunning() END; IF RTParams.IsPresent("backgroundgc") THEN RTCollectorSRC.StartBackgroundCollection(); END; IF RTParams.IsPresent("foregroundgc") THEN RTCollectorSRC.StartForegroundCollection(); END; END InitWithStackBase; PROCEDUREInit ()= VAR r: INTEGER; BEGIN r := RTProcess.RegisterForkHandlers(AtForkPrepare, AtForkParent, AtForkChild); IF r # 0 THEN DieI(ThisLine(), r) END; InitWithStackBase(ADR(r)); (* not quite accurate but hopefully ok *) END Init; VAR locks := ARRAY [0..3] OF pthread_mutex_t{ activeMu, slotsMu, initMu, perfMu}; PROCEDUREPThreadLockMutex (mutex: pthread_mutex_t; line: INTEGER) = VAR r: INTEGER; BEGIN IF mutex # NIL THEN r := pthread_mutex_lock(mutex); IF r # 0 THEN DieI(line, r) END; END; END PThreadLockMutex; PROCEDUREPThreadUnlockMutex (mutex: pthread_mutex_t; line: INTEGER) = VAR r: INTEGER; BEGIN IF mutex # NIL THEN r := pthread_mutex_unlock(mutex); IF r # 0 THEN DieI(line, r) END; END; END PThreadUnlockMutex; PROCEDUREAtForkPrepare () = VAR me := GetActivation(); act: Activation; cond: Condition; BEGIN Thread.Acquire(joinMu); LockHeap(); FOR i := FIRST(locks) TO LAST(locks) DO PThreadLockMutex(locks[i], ThisLine()); END; (* Walk activations and lock all threads, conditions. * NOTE: We have initMu, activeMu, so slots * won't change, conditions and mutexes * won't be initialized on-demand. *) act := me; REPEAT PThreadLockMutex(act.mutex, ThisLine()); (*PThreadLockMutex(act.waitingOn, ThisLine());*) cond := slots[act.slot].join; IF cond # NIL THEN PThreadLockMutex(cond.mutex, ThisLine()); END; act := act.next; UNTIL act = me; END AtForkPrepare; PROCEDUREAtForkParent () = VAR me := GetActivation(); act: Activation; cond: Condition; BEGIN (* Walk activations and unlock all threads, conditions. *) act := me; REPEAT cond := slots[act.slot].join; IF cond # NIL THEN PThreadUnlockMutex(cond.mutex, ThisLine()); END; (*PThreadUnlockMutex(act.waitingOn, ThisLine());*) PThreadUnlockMutex(act.mutex, ThisLine()); act := act.next; UNTIL act = me; FOR i := LAST(locks) TO FIRST(locks) BY -1 DO PThreadUnlockMutex(locks[i], ThisLine()); END; UnlockHeap(); Thread.Release(joinMu); END AtForkParent; PROCEDUREAtForkChild () = BEGIN AtForkParent(); InitWithStackBase(GetActivation().stackbase); END AtForkChild;
VAR holder: pthread_t; inCritical := 0; PROCEDURE--------------------------------------------- exception handling support --LockHeap () = VAR self := pthread_self(); BEGIN IF pthread_equal(holder, self) = 0 THEN WITH r = pthread_mutex_lock(heapMu) DO <*ASSERT r=0*> END; holder := self; END; INC(inCritical); END LockHeap; PROCEDUREUnlockHeap () = BEGIN <*ASSERT pthread_equal(holder, pthread_self()) # 0*> DEC(inCritical); IF inCritical = 0 THEN holder := NIL; WITH r = pthread_mutex_unlock(heapMu) DO <*ASSERT r=0*> END; END; END UnlockHeap; PROCEDUREWaitHeap () = VAR self := pthread_self(); BEGIN <*ASSERT pthread_equal(holder, self) # 0*> DEC(inCritical); <*ASSERT inCritical = 0*> WITH r = pthread_cond_wait(heapCond, heapMu) DO <*ASSERT r=0*> END; holder := self; <*ASSERT inCritical = 0*> INC(inCritical); END WaitHeap; PROCEDUREBroadcastHeap () = BEGIN WITH r = pthread_cond_broadcast(heapCond) DO <*ASSERT r=0*> END; END BroadcastHeap;
PROCEDURERTHooks.PushEFrameGetCurrentHandlers (): ADDRESS = VAR me := GetActivation(); BEGIN RETURN me.frame; END GetCurrentHandlers; PROCEDURESetCurrentHandlers (h: ADDRESS) = VAR me := GetActivation(); BEGIN me.frame := h; END SetCurrentHandlers;
PROCEDURERTHooks.PopEFramePushEFrame (frame: ADDRESS) = TYPE Frame = UNTRACED REF RECORD next: ADDRESS END; VAR me := GetActivation(); f: Frame := frame; BEGIN f.next := me.frame; me.frame := f; END PushEFrame;
PROCEDUREPopEFrame (frame: ADDRESS) = VAR me := GetActivation(); BEGIN me.frame := frame; END PopEFrame; VAR DEBUG := RTParams.IsPresent("debugthreads"); BEGIN END ThreadPThread.