Intelligence
Artifacts
Browse the repository, read documents, and manage the governance folders. Source, runtime, and infrastructure are read-only.
Repository
repositories/aaf-holdings/hq01/lib/sessions/manager.ts
8.2 KB
import fs from "node:fs";
import path from "node:path";
import { spawnDetached } from "@/lib/process/spawn";
import { isAlive, terminate } from "@/lib/process/signal";
import { DEFAULT_COMMAND, LOG_TAIL_BYTES, STOP_GRACE_MS } from "./config";
import {
ensureSessionDir,
isValidId,
listSessionIds,
logModifiedAt,
readMetadata,
readSession,
stderrPath,
stdoutPath,
tailFile,
writeMetadata,
writeSession,
} from "./store";
import type {
Session,
SessionLogs,
SessionMetadata,
StartSessionInput,
} from "./types";
/**
* The session manager — the one place that knows how to start, observe and stop
* Claude CLI sessions. It composes the process primitives (`lib/process`) with
* the filesystem store (`lib/sessions/store`). Everything here runs server-side
* only (it touches the filesystem and spawns processes).
*/
export class SessionError extends Error {
constructor(
message: string,
readonly status = 400,
) {
super(message);
this.name = "SessionError";
}
}
function nowIso(): string {
return new Date().toISOString();
}
function slugify(name: string): string {
return (
name
.toLowerCase()
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-+|-+$/g, "")
.slice(0, 40) || "session"
);
}
/** A collision-resistant id: <slug>-<base36 timestamp>. */
function generateId(name: string): string {
return `${slugify(name)}-${Date.now().toString(36)}`;
}
/**
* Default arguments for a session. For the Claude CLI we run in print mode with
* the mission/prompt as input, which streams output to the logs and returns —
* never requiring a TTY. Callers may override `args` entirely for other tools.
*/
function buildArgs(command: string, prompt: string | null): string[] {
const isClaude = command === "claude" || command.endsWith("/claude");
if (isClaude && prompt && prompt.trim()) {
return ["--print", "--verbose", prompt.trim()];
}
return [];
}
/**
* Detect whether a report-expecting session produced its report.md, in its
* runtime workspace or an `outputs/` subfolder of it.
*/
function reportExists(session: Session): boolean {
const roots = [session.working_directory, session.runtime_write_root].filter(
(r): r is string => Boolean(r),
);
for (const root of roots) {
for (const rel of ["report.md", path.join("outputs", "report.md")]) {
try {
if (fs.statSync(path.join(root, rel)).isFile()) return true;
} catch {
/* not there */
}
}
}
return false;
}
/**
* Reconcile a session's recorded state with reality, persisting any change.
*
* The manager runs in a different (and shorter-lived) process than the detached
* sessions, so a `session.json` that says "running" might describe a process
* that has since exited. We check liveness on every read and downgrade the
* status accordingly, and we refresh `last_activity` from the stdout log mtime.
*
* For sessions that are expected to produce a report (dispatched executive
* sessions), the terminal status reflects whether report.md was written:
* `completed` if present, `completed_missing_report` if not. Sessions with no
* report expectation keep the plain `exited` terminal status.
*/
function reconcile(session: Session): Session {
let changed = false;
const next: Session = { ...session };
const alive = isAlive(next.pid);
if ((next.status === "running" || next.status === "starting") && !alive) {
if (!next.stopped_at) next.stopped_at = nowIso();
next.status = next.expects_report
? reportExists(next)
? "completed"
: "completed_missing_report"
: "exited";
changed = true;
} else if (next.status === "starting" && alive) {
next.status = "running";
changed = true;
}
const activity = logModifiedAt(next.id);
if (activity && activity !== next.last_activity) {
next.last_activity = activity;
changed = true;
}
if (changed) writeSession(next);
return next;
}
/** Start a new session: create runtime state, spawn the process, return record. */
export function startSession(input: StartSessionInput): Session {
const name = input.name?.trim();
if (!name) throw new SessionError("A session name is required.");
const cwd = input.working_directory?.trim();
if (!cwd) throw new SessionError("A working_directory is required.");
let stat: fs.Stats;
try {
stat = fs.statSync(cwd);
} catch {
throw new SessionError(`working_directory does not exist: ${cwd}`);
}
if (!stat.isDirectory()) {
throw new SessionError(`working_directory is not a directory: ${cwd}`);
}
const id = generateId(name);
if (!isValidId(id)) throw new SessionError("Could not derive a valid id.");
const command = input.command?.trim() || DEFAULT_COMMAND;
const prompt = input.prompt?.trim() || null;
const args = input.args ?? buildArgs(command, prompt);
const created = nowIso();
ensureSessionDir(id);
const metadata: SessionMetadata = {
id,
name,
executive: input.executive?.trim() || "Unassigned",
mission_id: input.mission_id?.trim() || null,
assignment_id: input.assignment_id?.trim() || null,
working_directory: cwd,
branch: input.branch?.trim() || null,
command,
args,
prompt,
created_at: created,
created_by: input.created_by?.trim() || "HQ01 operator",
};
writeMetadata(metadata);
const session: Session = {
id,
name,
executive: metadata.executive,
status: "starting",
pid: null,
started_at: created,
stopped_at: null,
exit_code: null,
working_directory: cwd,
branch: metadata.branch,
mission_id: metadata.mission_id,
assignment_id: metadata.assignment_id,
log_path: stdoutPath(id),
stderr_path: stderrPath(id),
last_activity: null,
command,
args,
permission_mode: input.permission_mode ?? null,
allowed_tools: input.allowed_tools ?? [],
runtime_write_root: input.runtime_write_root ?? null,
repository_write_allowed: input.repository_write_allowed ?? true,
expects_report: input.expects_report ?? false,
};
writeSession(session);
try {
const { pid } = spawnDetached({
command,
args,
cwd,
stdoutPath: stdoutPath(id),
stderrPath: stderrPath(id),
});
session.pid = pid;
session.status = "running";
} catch (err) {
session.status = "failed";
session.stopped_at = nowIso();
writeSession(session);
const reason = err instanceof Error ? err.message : String(err);
throw new SessionError(`Failed to start session: ${reason}`, 500);
}
writeSession(session);
return session;
}
/** Get one session, reconciled against the running process. */
export function getSession(id: string): Session | null {
const session = readSession(id);
if (!session) return null;
return reconcile(session);
}
/** Get the immutable creation metadata for a session. */
export function getMetadata(id: string): SessionMetadata | null {
return readMetadata(id);
}
/** List all sessions, most recently started first. */
export function listSessions(): Session[] {
const sessions = listSessionIds()
.map((id) => getSession(id))
.filter((s): s is Session => Boolean(s));
return sessions.sort((a, b) => {
const ta = a.started_at ? Date.parse(a.started_at) : 0;
const tb = b.started_at ? Date.parse(b.started_at) : 0;
return tb - ta;
});
}
/** Gracefully stop a session, updating its recorded state. */
export async function stopSession(id: string): Promise<Session> {
const session = readSession(id);
if (!session) throw new SessionError("Session not found.", 404);
if (isAlive(session.pid)) {
await terminate(session.pid, { graceMs: STOP_GRACE_MS });
}
session.status = session.expects_report ? "terminated" : "stopped";
session.stopped_at = nowIso();
const activity = logModifiedAt(id);
if (activity) session.last_activity = activity;
writeSession(session);
return session;
}
/** Read the tail of a session's stdout and stderr logs. */
export function readLogs(id: string, maxBytes = LOG_TAIL_BYTES): SessionLogs {
const session = getSession(id);
if (!session) throw new SessionError("Session not found.", 404);
return {
id,
status: session.status,
last_activity: session.last_activity,
stdout: tailFile(stdoutPath(id), maxBytes),
stderr: tailFile(stderrPath(id), maxBytes),
};
}
root · /srv/aaf