(*********************************************************************)
(* Title:	Threads of Control - Implementation		     *)
(* LastEdit:	"Tue Feb  2 11:57:09 1988" by D.McAuley 	     *)
(* Author: 	Mick Jordan					     *)
(* 		Copyright (C) 1984 by Acorn Research Centre	     *)
(*********************************************************************)

(* An implementation in terms of coroutines *)

IMPLEMENTATION MODULE Threads;

IMPORT Select, SysStreams, WriteF;

FROM SYSTEM IMPORT NEWPROCESS, PROCESS, WORD, ADDRESS, SIZE, TRANSFER;

FROM Storage IMPORT ALLOCATE, DEALLOCATE;

FROM ThreadsExtra IMPORT
  TopPriority, DefPriority, LastPriority, Priority, 
  currentThread, currentPriority, preEmpt, thePProc;

CONST
    PreEmptLimit = 64;

TYPE
    CoRoutine = PROCESS;
    Stack = POINTER TO ARRAY [0..0FFFH] OF WORD; (* Stack 32 kbytes *)

    ThreadStatus = (Alive, Finished, Dead);
    Thread = POINTER TO ThreadState;
    ThreadState = RECORD
            link: Thread;            (* to next thread in list *)
            joinQueue: Thread;       (* list of processes waiting to Join *)
            state: CoRoutine;
            realForkee: Forkee;      (* user supplied procedure *)
            status: ThreadStatus;
            stack: Stack;
            argOrResult: WORD;
            priority:Priority;
        END (* record *);

    Mutex = POINTER TO RECORD
            isLocked: BOOLEAN;
            queue: Thread;
        END (* record *);

    Condition = POINTER TO RECORD
            wakeupWaiting: BOOLEAN;
            queue: Thread;
        END (* record *);

VAR readyQueue: ARRAY Priority OF Thread;
VAR inSelect: BOOLEAN; 			(* TRUE if polling IO or Timers *)
VAR preEmptCount:INTEGER;

PROCEDURE PriorSetGet(thread:Thread; VAR p:Priority; set:BOOLEAN);
  BEGIN
    IF set THEN
      thread^.priority := p
    ELSE
      p := thread^.priority
    END
  END PriorSetGet;
  
PROCEDURE InitMutex(VAR mutex: Mutex);
    BEGIN
        NEW(mutex);
        WITH mutex^ DO
            isLocked := FALSE;
            queue := NIL;
        END (* with *);
    END InitMutex;

PROCEDURE InitCondition(VAR cv: Condition);
    BEGIN
        NEW(cv);
        WITH cv^ DO
            wakeupWaiting := FALSE;
            queue := NIL;
        END (* with *);
    END InitCondition;

PROCEDURE Acquire(VAR mutex: Mutex);
    BEGIN
        IF mutex^.isLocked THEN
            Queue.InsertAtTail(mutex^.queue, currentThread);
            RunNext();
        ELSE
            mutex^.isLocked := TRUE;
            IF preEmpt THEN
              Queue.InsertAtTail(readyQueue[currentThread^.priority],
                                 currentThread);
              RunNext()
            END;
        END (* if *);
    END Acquire;

PROCEDURE Release(VAR mutex: Mutex);
    VAR
        thread: Thread;
    BEGIN
        IF mutex^.queue # NIL THEN
            thread := Queue.RemoveFromHead(mutex^.queue);
            Queue.InsertAtTail(readyQueue[thread^.priority], thread);
        ELSE
            mutex^.isLocked := FALSE;
        END (* if *);
        DEC(preEmptCount);
        preEmpt := (preEmptCount = 0);
        IF preEmpt THEN
          Queue.InsertAtTail(readyQueue[currentThread^.priority],
                             currentThread);
          RunNext()
        END;
    END Release;

PROCEDURE Wait(VAR mutex: Mutex; VAR condition: Condition);
    VAR
        thread: Thread;
    BEGIN
        Queue.InsertAtTail(condition^.queue, currentThread);
        (* release mutex *)
        IF mutex^.queue # NIL THEN
            thread := Queue.RemoveFromHead(mutex^.queue);
            Queue.InsertAtTail(readyQueue[thread^.priority], thread);
        ELSE
            mutex^.isLocked := FALSE;
        END (* if *);
        RunNext();
        Acquire(mutex);
    END Wait;

PROCEDURE Signal(VAR cv: Condition);
    VAR
        thread: Thread;
    BEGIN
        IF cv^.queue # NIL THEN
            thread := Queue.RemoveFromHead(cv^.queue);
            Queue.InsertAtTail(readyQueue[thread^.priority], thread);
            preEmpt := preEmpt OR (thread^.priority > currentPriority)
        END (* if *);
        IF (NOT inSelect) AND preEmpt THEN
          Queue.InsertAtTail(readyQueue[currentThread^.priority],
                             currentThread);
          RunNext();
        END;
    END Signal;

PROCEDURE Broadcast(VAR cv: Condition);
    VAR
        thread: Thread;
    BEGIN
        IF cv^.queue # NIL THEN
            thread := Queue.RemoveFromHead(cv^.queue);
            WHILE thread # NIL DO
                Queue.InsertAtTail(readyQueue[thread^.priority], thread);
                thread := Queue.RemoveFromHead(cv^.queue);
                preEmpt := preEmpt OR (thread^.priority > currentPriority)
            END (* while *);
        END (* if *);
        IF (NOT inSelect) AND preEmpt THEN
          Queue.InsertAtTail(readyQueue[currentThread^.priority],
                             currentThread);
          RunNext()
        END;
    END Broadcast;

PROCEDURE Fork(forkee: Forkee; forkeeArg: ForkeeArg): Thread;
    VAR
        thread: Thread;
    BEGIN
        NEW(thread);
        WITH thread^ DO
          NEW(stack);
          NEWPROCESS(RunForkee, ADDRESS(stack), SIZE(stack^), state);
          argOrResult := forkeeArg;
          realForkee := forkee;
          joinQueue := NIL;
          status := Alive;
          priority := DefPriority
        END;
        Queue.InsertAtTail(readyQueue[thread^.priority], thread);
        IF preEmpt THEN
          Queue.InsertAtTail(readyQueue[currentThread^.priority],
                             currentThread);
          RunNext()
        END;
        RETURN thread
    END Fork;

PROCEDURE RunForkee;
(* This procedure is the place that the first TRANSFER
takes place to.  Now we can call the 'forkee' with the
argument passed in 'Fork'
*)
  VAR
    thread: Thread;
  BEGIN
    WITH currentThread^ DO
      argOrResult := realForkee(argOrResult);
      (* another thread may have tried to Join *)
      IF joinQueue # NIL THEN
        thread := Queue.RemoveFromHead(joinQueue);
        WHILE thread # NIL DO
          Queue.InsertAtTail(readyQueue[thread^.priority], thread);
          thread := Queue.RemoveFromHead(joinQueue);
        END;
      END;
      (* we can now safely destroy currentThread.
      Actually we cant dispose the ThreadState cos RunNext
      needs it to do the transfer.
      *)
      status := Finished;
      DISPOSE(stack);
      (* DISPOSE(currentThread); *)
    END;
    RunNext();
  END RunForkee;

PROCEDURE Join(thread: Thread): ForkeeReturn;
  BEGIN
    IF thread^.status = Alive THEN
        Queue.InsertAtTail(thread^.joinQueue, currentThread);
        RunNext();
    ELSE
        currentThread^.argOrResult := thread^.argOrResult;
    END;
    (* when control returns here 'thread' must have completed,
       and 'RunForkee' will have copied its result to
       (what is now) currentThread^.argOrResult 
    *)
    RETURN currentThread^.argOrResult
  END Join;

PROCEDURE RunNext();
    VAR
        p:Priority;
        oldThread:Thread;
    BEGIN
        oldThread := currentThread;
        preEmpt := FALSE;
        inSelect := TRUE;
        Select.Poll();
        LOOP
            inSelect := FALSE;
            p := TopPriority;
            LOOP
              currentThread := Queue.RemoveFromHead(readyQueue[p]);
              IF (currentThread # NIL)
              OR (p = LastPriority) THEN EXIT END;
              INC(p)
            END;
            IF currentThread = oldThread THEN 
                preEmptCount := -1;
                EXIT
            ELSIF currentThread # NIL THEN
                preEmptCount := PreEmptLimit;
                TRANSFER(oldThread^.state, currentThread^.state);
                EXIT
            END;
            inSelect := TRUE; 
            Select.LastThread()
        END;
        IF oldThread^.status = Finished THEN
          DISPOSE(currentThread)
        END;
        currentPriority := currentThread^.priority
    END RunNext;

(* ----------------------------------------------------------------------- *)

MODULE Queue;

IMPORT Thread;

EXPORT QUALIFIED InsertAtTail, RemoveFromHead;

PROCEDURE InsertAtTail(VAR q: Thread; p: Thread);
    BEGIN
        IF q = NIL THEN
            p^.link := p;
        ELSE
            p^.link := q^.link; (* connect to head *)
            q^.link := p;       (* connect old tail to new *)
        END (* if *);
        q := p;
    END InsertAtTail;

PROCEDURE RemoveFromHead(VAR q: Thread): Thread;
    VAR
        head: Thread;
    BEGIN
        IF q = NIL THEN
            head := NIL
        ELSE
            head := q^.link;
            q^.link := head^.link;
            IF head = q THEN
                q := NIL;
            END (* if *);
        END (* if *);
        RETURN head;
    END RemoveFromHead;
END Queue;


PROCEDURE Init();
  VAR
    p:Priority;
  BEGIN
    FOR p := TopPriority TO LastPriority DO readyQueue[p] := NIL END;
    (* create anonymous main thread *)
    NEW(currentThread);
    WITH currentThread^ DO
      joinQueue := NIL;
      status := Alive;
      priority := DefPriority
    END;
    currentPriority := DefPriority;
    preEmpt := FALSE;
    preEmptCount := -1;
    thePProc := PriorSetGet
  END Init;


BEGIN
    Init()
END Threads.
