smalldb/src/SmallDB.m3


 Copyright (C) 1992, Digital Equipment Corporation                         
 All rights reserved.                                                      
 Implement the stable storage module. Michael B. Jones. 18-Jul-86 
 Last modified on Wed Feb  1 09:30:09 PST 1995 by kalsow 
      modified on Fri Apr 22 12:16:19 PDT 1994 by wobber 
      modified on Mon Sep 17 13:54:22 PDT 1990 by birrell 
      modified on Tue Dec 15 17:09:31 1987 by chan 
      modified on Fri Sep  5 06:20:32 1986 by mbj 

MODULE SmallDB;

IMPORT FS, FileRd, FileWr, OSError, OSSupport;
IMPORT Atom, AtomList, Lex, FloatMode, Fmt, FmtTime, Rd, Text,
       TextRd, Thread, Time, Wr, Word;

REVEAL
  T = Public BRANDED OBJECT
    dirname: TEXT; (* Base directory name (no trailing '/' *)
    version: INTEGER := 0; (* Current snapshot and log version # *)
    logName: TEXT := NIL;
    logWriter: OSSupport.T := NIL; (* Writer to the current logfile *)
    (* statistics *)
    snapshotByteCnt, logByteCnt: CARDINAL := 0;
    logEntries: CARDINAL := 0;
    lastSnapshot, lastLog: Time.T;
    pad: BOOLEAN; (* if TRUE, pad each update to a DiskPageSize boundary *)
    cl: Closure;
  OVERRIDES
    recover := Recover;
    update := Update;
    snapshot := Snapshot;
    close := Close;
    snapshotBytes := SnapshotBytes;
    logBytes := LogBytes;
    status := Status;
  END;

CONST
  DirSeparator = "/";
  SnapshotPrefix = "Snapshot.";
  LogfilePrefix = "Logfile.";
  VersionFile = "Version_Number";
  NewVersionFile = "New_Version_Number";

VAR BadUpdateLen, MalformedUpdate: AtomList.T;

<* FATAL Thread.Alerted *>
exported procedures

PROCEDURE New(dir: TEXT; cl: Closure; pad: BOOLEAN := TRUE): T
     RAISES {OSError.E, Failed} =
  (* Returns a "T" for the data that is maintained in files in the directory
     "dir". Raises an exception if the directory doesn't exist or is
     inaccessible. If the directory exists but contains no backing files,
     creates files corresponding to a NIL object with no updates. *)
  VAR t: T;
      ok: BOOLEAN := FALSE;
  BEGIN
    TRY
      ok := FS.Status(dir).type = FS.DirectoryFileType;
    EXCEPT
    | OSError.E =>
    END;
    IF NOT ok THEN FS.CreateDirectory(dir); END;
    t := NEW(T, dirname := dir, pad := pad, cl := cl);
    t.lastSnapshot := 0.0D0;
    t.lastLog := 0.0D0;
    GetVersion(t);
    IF t.version = 0 THEN
       (* Commit to version 1, so that we have the log file initialized
	correctly (and atomically).  We can't just create the log file,
	since we'd need special care to make the creation atomic - it's
         easier to just use the mechanisms already present in Snapshot. *)
      Snapshot(t, cl.new());
    END;
    RETURN t;
  END New;
private procedures

PROCEDURE FName(t: T; n1: TEXT): TEXT =
  (* * Build a file name in a stable storage directory. *)
  BEGIN RETURN t.dirname & DirSeparator & n1; END FName;

PROCEDURE VersionName(t: T; n1: TEXT; thisversion: INTEGER := 0): TEXT =
  (* * Build a file name in a stable storage directory. *)
  VAR version: INTEGER;
  BEGIN
    IF thisversion = 0 THEN
      version := t.version;
    ELSE
      version := thisversion;
    END;
    RETURN t.dirname & DirSeparator & n1 & Fmt.Int(version);
  END VersionName;

PROCEDURE WriteVersionFile(t: T; new: BOOLEAN) RAISES {OSError.E} =
  (* * Write a Version number file for t. *)
  VAR versionWriter: Wr.T; fname: TEXT;
  BEGIN
    TRY
      IF new THEN
        fname := FName(t, NewVersionFile);
      ELSE
        fname := FName(t, VersionFile);
      END;
      versionWriter := FileWr.Open(fname);
      Wr.PutText(versionWriter, Fmt.Int(t.version) & "\n");
      Wr.Close(versionWriter);
    EXCEPT
    | Wr.Failure(e) => RAISE OSError.E(e);
    END;
  END WriteVersionFile;

PROCEDURE DeleteNewVersionFile(t: T) RAISES {OSError.E} =
  (* * Delete a NewVersion number file for t. *)
  VAR fname: TEXT;
  BEGIN
    fname := FName(t, NewVersionFile);
    FS.DeleteFile(fname);
  END DeleteNewVersionFile;

PROCEDURE DeleteSnapshot(t: T; version: INTEGER) RAISES {OSError.E} =
  (* * Delete a NewVersion number file for t. *)
  VAR fname: TEXT;
  BEGIN
    IF version = 0 THEN RETURN; END; (* Never a version 0 *)
    fname := VersionName(t, SnapshotPrefix, version);
    FS.DeleteFile(fname);
  END DeleteSnapshot;

PROCEDURE DeleteLogfile(t: T; version: INTEGER) RAISES {OSError.E} =
  (* * Delete a log file for t. *)
  VAR fname: TEXT;
  BEGIN
    IF version = 0 THEN RETURN; END; (* Never a version 0 *)
    fname := VersionName(t, LogfilePrefix, version);
    FS.DeleteFile(fname);
  END DeleteLogfile;

PROCEDURE CloseLogfile(t: T) RAISES {OSError.E} =
  (* * Close the log file for t. *)
  BEGIN
    IF t.logWriter = NIL THEN RETURN; END;
    TRY
      Wr.Close(t.logWriter);
    EXCEPT
    | Wr.Failure(e) =>
        t.logWriter := NIL;
        RAISE OSError.E(e);
    END;
    t.logWriter := NIL;
  END CloseLogfile;

PROCEDURE OpenLogfile(t: T; truncate: BOOLEAN) RAISES {OSError.E} =
  (* * Open the log file for writing at byte number byte. *)
  VAR fname: TEXT;
  BEGIN
    TRY
      CloseLogfile(t); (* Close any open log file *)
    EXCEPT
    | OSError.E => (* Assume can proceed ok *)
    END;

    fname := VersionName(t, LogfilePrefix);
    t.logName := fname;
    t.logWriter := NEW(OSSupport.T).init(FS.OpenFile(fname, truncate));
    (* force changes to disk ??? *)
  END OpenLogfile;

PROCEDURE CreateFirstVersion(t: T) RAISES {OSError.E} =
  (* * Initialize a stable storage directory. *)
  BEGIN
    t.version := 0;
    (* No snapshot file for version 0 *)
    WriteVersionFile(t, FALSE);
  END CreateFirstVersion;

PROCEDURE IncrVersion(t: T) =
  (* * Increment the directory version number. *)
  BEGIN
    REPEAT t.version := t.version + 1; UNTIL t.version # 0;
    (* No snapshot file for version 0 *)
  END IncrVersion;

PROCEDURE CommitToNewVersion(t: T) RAISES {OSError.E} =
  (* * Take us from the state where we have a NewVersion file to where we
     don't. *)
  BEGIN
    WriteVersionFile(t, FALSE);
    DeleteNewVersionFile(t);
  END CommitToNewVersion;

PROCEDURE GetVersion(t: T) RAISES {OSError.E} =
  (* * Determines the current version number for the directory t. *)
  VAR versionReader: Rd.T;
  VAR version: INTEGER;
  BEGIN
    TRY
      versionReader := FileRd.Open(FName(t, NewVersionFile));
      TRY
        version := Lex.Int(versionReader);
        IF version <= 0 THEN
          RAISE Lex.Error; (* No number! *)
        END;
      FINALLY
        Rd.Close(versionReader);
      END;
      t.version := version;
      CommitToNewVersion(t);
    EXCEPT
    | OSError.E, Lex.Error, Rd.Failure, FloatMode.Trap =>

        TRY
          DeleteNewVersionFile(t); (* Make SURE it's not there *)
        EXCEPT
        | OSError.E =>
        END;

        TRY
          versionReader := FileRd.Open(FName(t, VersionFile));
          TRY
            version := Lex.Int(versionReader);
            IF version <= 0 THEN
              RAISE Lex.Error; (* No number! *)
            END;
          FINALLY
            Rd.Close(versionReader);
          END;
          t.version := version;
        EXCEPT
        | OSError.E, FloatMode.Trap, Lex.Error, Rd.Failure =>
            CreateFirstVersion(t);
        END;
    END;
  END GetVersion;

PROCEDURE Snapshot(t: T; value: REFANY) RAISES {OSError.E} =
  (* Records this value as the current snapshot, and empties the log. *)
  VAR
    snapshotWriter: Wr.T;
    oldversion: INTEGER;
    fname: TEXT;
  BEGIN
    oldversion := t.version;
    IncrVersion(t);
    TRY
      fname := VersionName(t, SnapshotPrefix);
      snapshotWriter := FileWr.Open(fname);
      TRY
        t.cl.snapshot(snapshotWriter, value);
        t.snapshotByteCnt := Wr.Length(snapshotWriter);
        t.lastSnapshot := Time.Now();
      FINALLY
        Wr.Close(snapshotWriter);
      END;
    EXCEPT
    | Wr.Failure(e) => RAISE OSError.E(e);
    END;

    OpenLogfile(t, TRUE);
    t.logByteCnt := 0;
    t.logEntries := 0;
    WriteVersionFile(t, TRUE);
    CommitToNewVersion(t);
    DeleteSnapshot(t, oldversion);
    DeleteLogfile(t, oldversion);
  END Snapshot;

PROCEDURE Recover(t: T) : REFANY RAISES {OSError.E, Failed} =
  (* Returns a REFANY which is the value recorded in the current snapshot. *)
  VAR
    snapshotReader: Rd.T;
    snapshot: REFANY;
    fname: TEXT;
  BEGIN
    IF t.version = 0 THEN RETURN NIL END;
    TRY
      fname := VersionName(t, SnapshotPrefix);
      snapshotReader := FileRd.Open(fname);
      TRY
        snapshot := t.cl.recover(snapshotReader);
        t.snapshotByteCnt := Rd.Length(snapshotReader);
      FINALLY
        Rd.Close(snapshotReader);
      END;
    EXCEPT
    | Rd.Failure(e) => RAISE OSError.E(e);
    END;
    RETURN RecoverUpdates(t, snapshot);
  END Recover;

CONST
  DiskPageSize = 512;
  IntBytes = 4;   (* writing only 32 bits of update length *)
  ByteBits = BITSIZE(CHAR);

PROCEDURE Update(t: T; update: REFANY; forceToDisk: BOOLEAN := TRUE )
    RAISES {OSError.E} =
  (* Records this update in the log file for "t" *)
  VAR
    updateLen: Word.T;
    entryStart, entryEnd: CARDINAL;
  BEGIN

    <* ASSERT t.logWriter # NIL *>
    TRY
      entryStart := Wr.Index(t.logWriter);
      FOR i := 0 TO IntBytes - 1 DO (* Write zero update len *)
        Wr.PutChar(t.logWriter, VAL(0, CHAR)); (* Chars of update len *)
      END;

      t.cl.logUpdate(t.logWriter, update);

      entryEnd := Wr.Index(t.logWriter);
      updateLen := (entryEnd - entryStart) - IntBytes;

      IF forceToDisk THEN (* force update contents before updating length *)
        Wr.Flush(t.logWriter);
        OSSupport.Sync(t.logWriter);
      END;
      Wr.Seek(t.logWriter, entryStart);
      FOR i := 0 TO IntBytes-1 DO (* Write real update len *)
        Wr.PutChar(t.logWriter,
           VAL(Word.Extract(updateLen, i*ByteBits, ByteBits), CHAR));
      END;
      Wr.Seek(t.logWriter, entryEnd);

      (* don't start an entry at the end of page *)
      IF t.pad OR (entryEnd MOD DiskPageSize > DiskPageSize-IntBytes) THEN
        WHILE (Wr.Index(t.logWriter) MOD DiskPageSize) # 0 DO
          Wr.PutChar(t.logWriter, VAL(0, CHAR)); (* Fill to page boundary *)
        END;
      END;

      IF forceToDisk THEN
        Wr.Flush(t.logWriter);
        OSSupport.Sync(t.logWriter);
        (* force changes to disk *)
      END;
      t.logByteCnt := Wr.Index(t.logWriter);
      t.lastLog := Time.Now();
      INC(t.logEntries);
    EXCEPT
    | Wr.Failure(e) => RAISE OSError.E(e);
    END;
  END Update;

PROCEDURE RecoverUpdates(t: T; state: REFANY): REFANY
    RAISES {OSError.E, Failed} =
  VAR
    logReader: Rd.T;
    updateLen: Word.T;
    fname: TEXT;
    updateText: TEXT;
    pos: CARDINAL;
  BEGIN
    IF t.version = 0 THEN RETURN state; END;
    TRY
      fname := VersionName(t, LogfilePrefix);
      logReader := FileRd.Open(fname);
      TRY
        LOOP
          IF Rd.EOF(logReader) THEN EXIT END;

          updateLen := 0;
          FOR i := 0 TO IntBytes - 1 DO
            TRY
              updateLen := Word.Insert(
                updateLen, ORD(Rd.GetChar(logReader)), i*ByteBits, ByteBits);
            EXCEPT
            | Rd.EndOfFile => RAISE Failed(MalformedUpdate);
            END;
          END;

          IF updateLen = 0 THEN
            EXIT (* crashed while writing last log entry *)
          END;

          IF (updateLen < 0) THEN RAISE Failed(BadUpdateLen); END;

          updateText := Rd.GetText(logReader, updateLen);
          IF Text.Length(updateText) # updateLen THEN
            RAISE Failed(BadUpdateLen);
          END;

          state := t.cl.readUpdate(TextRd.New(updateText), state);

          t.logByteCnt := Rd.Index(logReader);
          pos := ((t.logByteCnt+DiskPageSize-1) DIV DiskPageSize) * DiskPageSize;
          IF t.pad OR ((pos-t.logByteCnt) < IntBytes) THEN
            Rd.Seek(logReader, pos); (* Seek to page boundary *)
            t.logByteCnt := pos;
          END;
          INC(t.logEntries);
        END;
      FINALLY
        Rd.Close(logReader);
      END;
    EXCEPT
    | Rd.Failure(e) => RAISE OSError.E(e);
    END;

    (* now reopen the file at the end *)
    OpenLogfile(t, FALSE); (* Prepare for possible appending *)
      (* this code truncates the log file, in case the
         last entry was incompletely written *)

    TRY
      Wr.Seek(t.logWriter, t.logByteCnt);
      OSSupport.Truncate(t.logWriter);
      IF t.pad OR (t.logByteCnt MOD DiskPageSize > DiskPageSize-IntBytes) THEN
        WHILE (Wr.Index(t.logWriter) MOD DiskPageSize) # 0 DO
          Wr.PutChar(t.logWriter, VAL(0, CHAR)); (* Fill to page boundary *)
        END;
      END;
    EXCEPT
    | Wr.Failure(e) => RAISE OSError.E(e);
    END;
    RETURN state;
  END RecoverUpdates;

PROCEDURE Close(t: T) RAISES {OSError.E} =
  (* Close a stable storage directory in an orderly manner *)
  BEGIN CloseLogfile(t); END Close;

PROCEDURE SnapshotBytes(t: T): CARDINAL =
  BEGIN RETURN t.snapshotByteCnt; END SnapshotBytes;

PROCEDURE LogBytes(t: T): CARDINAL = BEGIN RETURN t.logByteCnt; END LogBytes;

PROCEDURE Status(t: T) : TEXT =
  VAR out: TEXT;
  BEGIN
    out := "   Stable storage status for directory \"" & t.dirname & "\"\n";
    out := out & "      Snapshot version " & Fmt.Int(t.version)
              & ", size " & Fmt.Int(t.snapshotByteCnt) & " bytes.\n";
    IF t.lastSnapshot # 0.0D0 THEN
      out := out & "      Last snapshot was written at " &
         FmtTime.Long(t.lastSnapshot) & "\n";
    ELSE
      out := out & "      No snapshot has been made in this run.\n";
    END;
    out := out & "      Log has " & Fmt.Int(t.logEntries)
               & " entries, total size "
               & Fmt.Int(t.logByteCnt) & " bytes.\n";
    IF t.lastLog # 0.0D0 THEN
      out := out & "      Last log entry was written at " &
         FmtTime.Long(t.lastLog) & "\n";
    ELSE
      out := out & "      No log entries have been written in this run.\n";
    END;
    RETURN out;
  END Status;

BEGIN
  BadUpdateLen := AtomList.List1(Atom.FromText("SmallDB.BadUpdateLen"));
  MalformedUpdate := AtomList.List1(Atom.FromText("SmallDB.MalformedUpdate"));
END SmallDB.