Intelligence
Artifacts
Browse the repository, read documents, and manage the governance folders. Source, runtime, and infrastructure are read-only.
Repository
repositories/aaf-holdings/hq01/lib/missions/assignments.ts
14.4 KB
import fs from "node:fs";
import path from "node:path";
import { CONTENT_ROOT } from "@/lib/content/config";
import { dispatchExecutive } from "@/lib/executives/briefing";
import { getSession } from "@/lib/sessions/manager";
import { ORGANIZATION } from "./config";
import { missionDir, readMission } from "./store";
import { getObjective } from "./objectives";
import { readWorkOrder } from "./work-orders";
import { recordDispatchStarted } from "./manager";
import { EXECUTABLE_STATES, isExecutable } from "./lifecycle";
import { generateReportForAssignment } from "./reports";
import { getTemplate } from "@/lib/workers/templates";
import { createInstance, terminateInstance } from "@/lib/workers/instances";
import type {
Assignment,
CreateAssignmentInput,
ExecChainEvent,
ExecChainEventType,
ExecutionContext,
AssignmentView,
} from "./types";
/**
* Assignments + Execution Context (PASS M3). An assignment delegates one work
* order; on creation it generates an immutable execution context. Dispatch is
* now assignment-based: the assignment is the input to the existing execution
* engine. No workers, no worker templates — only the chain and a session
* reference.
*/
export class AssignmentError extends Error {
constructor(
message: string,
readonly status = 400,
) {
super(message);
this.name = "AssignmentError";
}
}
const AID_RE = /^ASSIGNMENT-\d{6}$/;
const RUNTIME_ASSIGNMENTS_ROOT = path.join(CONTENT_ROOT, "runtime", "assignments");
function nowIso(): string {
return new Date().toISOString();
}
function assignmentsDir(mid: string): string {
return path.join(missionDir(mid), "assignments");
}
function aDir(mid: string, aid: string): string {
return path.join(assignmentsDir(mid), aid);
}
function aJson(mid: string, aid: string): string {
return path.join(aDir(mid, aid), "assignment.json");
}
function execContextPath(mid: string, aid: string): string {
return path.join(aDir(mid, aid), "execution-context.json");
}
function aHistory(mid: string, aid: string): string {
return path.join(aDir(mid, aid), "history", "log.jsonl");
}
function writeJsonAtomic(file: string, data: unknown): void {
fs.mkdirSync(path.dirname(file), { recursive: true });
const tmp = `${file}.tmp`;
fs.writeFileSync(tmp, JSON.stringify(data, null, 2) + "\n", "utf8");
fs.renameSync(tmp, file);
}
function append(
mid: string,
aid: string,
type: ExecChainEventType,
extra: Partial<ExecChainEvent> = {},
): void {
const file = aHistory(mid, aid);
fs.mkdirSync(path.dirname(file), { recursive: true });
const ev: ExecChainEvent = { type, at: nowIso(), ...extra };
fs.appendFileSync(file, JSON.stringify(ev) + "\n", "utf8");
}
function nextId(mid: string): string {
let max = 0;
let entries: fs.Dirent[] = [];
try {
entries = fs.readdirSync(assignmentsDir(mid), { withFileTypes: true });
} catch {
/* none */
}
for (const e of entries) {
if (e.isDirectory() && AID_RE.test(e.name)) {
const n = Number(e.name.slice("ASSIGNMENT-".length));
if (Number.isFinite(n) && n > max) max = n;
}
}
return `ASSIGNMENT-${String(max + 1).padStart(6, "0")}`;
}
export function readAssignment(mid: string, aid: string): Assignment | null {
if (!AID_RE.test(aid)) return null;
try {
return JSON.parse(fs.readFileSync(aJson(mid, aid), "utf8")) as Assignment;
} catch {
return null;
}
}
export function readExecutionContext(
mid: string,
aid: string,
): ExecutionContext | null {
try {
return JSON.parse(
fs.readFileSync(execContextPath(mid, aid), "utf8"),
) as ExecutionContext;
} catch {
return null;
}
}
export function listAssignments(mid: string): Assignment[] {
let entries: fs.Dirent[];
try {
entries = fs.readdirSync(assignmentsDir(mid), { withFileTypes: true });
} catch {
return [];
}
return entries
.filter((e) => e.isDirectory() && AID_RE.test(e.name))
.map((e) => readAssignment(mid, e.name))
.filter((a): a is Assignment => Boolean(a))
.sort((a, b) => a.id.localeCompare(b.id));
}
function readHistory(mid: string, aid: string): ExecChainEvent[] {
let raw: string;
try {
raw = fs.readFileSync(aHistory(mid, aid), "utf8");
} catch {
return [];
}
return raw
.split("\n")
.filter(Boolean)
.map((l) => {
try {
return JSON.parse(l) as ExecChainEvent;
} catch {
return null;
}
})
.filter((e): e is ExecChainEvent => Boolean(e));
}
export function getAssignmentView(mid: string, aid: string): AssignmentView | null {
// Reconcile finished sessions into assignment status/history on read.
collectAssignmentReports(mid);
const assignment = readAssignment(mid, aid);
if (!assignment) return null;
return {
assignment,
execution_context: readExecutionContext(mid, aid),
history: readHistory(mid, aid).sort((a, b) => b.at.localeCompare(a.at)),
};
}
/** Create an assignment for a work order and generate its execution context. */
export function createAssignment(
mid: string,
input: CreateAssignmentInput,
): { assignment: Assignment; execution_context: ExecutionContext } {
const mission = readMission(mid);
if (!mission) throw new AssignmentError("Mission not found.", 404);
const wo = input.work_order_id ? readWorkOrder(mid, input.work_order_id) : null;
if (!wo) throw new AssignmentError("A valid work order is required.", 400);
const objective = getObjective(mid, wo.objective_id);
if (!objective) throw new AssignmentError("Work order's objective is missing.", 400);
const executive = input.executive?.trim();
if (!executive) throw new AssignmentError("Executive is required.", 400);
const id = nextId(mid);
const created = nowIso();
const repository = input.repository?.trim() || mission.repository;
const workspace = path.join(RUNTIME_ASSIGNMENTS_ROOT, `${mid}-${id}`);
fs.mkdirSync(path.join(workspace, "outputs"), { recursive: true });
fs.mkdirSync(path.join(aDir(mid, id), "history"), { recursive: true });
const assignment: Assignment = {
id,
mission_id: mid,
objective_id: objective.id,
work_order_id: wo.id,
executive,
worker_template: null,
worker_instance_id: null,
repository,
workspace,
status: "Pending",
priority: wo.priority,
created_at: created,
updated_at: created,
session: null,
};
writeJsonAtomic(aJson(mid, id), assignment);
append(mid, id, "assignment_created", { detail: `${wo.id} → ${executive}` });
const ctx: ExecutionContext = {
organization: mission.organization || ORGANIZATION,
company: mission.company,
product: mission.product,
repository,
mission_id: mid,
objective_id: objective.id,
work_order_id: wo.id,
assignment_id: id,
executive,
workspace,
allowed_paths: [workspace],
expected_outputs: ["report.md", "outputs/"],
success_criteria: objective.success_criteria,
report_location: path.join(workspace, "report.md"),
runtime_root: workspace,
created_at: created,
};
// Immutable after creation: written once, never rewritten.
writeJsonAtomic(execContextPath(mid, id), ctx);
append(mid, id, "execution_context_created", { detail: workspace });
return { assignment, execution_context: ctx };
}
/** Dispatch an assignment through the existing execution engine. */
export function dispatchAssignment(
mid: string,
aid: string,
): { session_id: string } {
const assignment = readAssignment(mid, aid);
if (!assignment) throw new AssignmentError("Assignment not found.", 404);
if (assignment.status !== "Pending" && assignment.status !== "Failed") {
throw new AssignmentError(
`Assignment is ${assignment.status}; only Pending/Failed assignments dispatch.`,
409,
);
}
// CEO approval gate (PASS M1) still applies — execution needs an executable mission.
const mission = readMission(mid);
if (mission && !isExecutable(mission.status)) {
throw new AssignmentError(
`Mission is not executable. Current State: ${mission.status}. ` +
`Allowed: ${EXECUTABLE_STATES.join(", ")}.`,
409,
);
}
const ctx = readExecutionContext(mid, aid);
if (!ctx) throw new AssignmentError("Execution context missing.", 400);
const wo = readWorkOrder(mid, assignment.work_order_id);
const objective = getObjective(mid, assignment.objective_id);
if (!wo || !objective) throw new AssignmentError("Broken execution chain.", 400);
const criteria = objective.success_criteria.length
? objective.success_criteria.map((c) => `- ${c}`).join("\n")
: "- (none specified)";
const instruction =
`Work Order ${wo.id}: ${wo.title}\n\n${wo.description}\n\n` +
`Objective ${objective.id}: ${objective.title} — ${objective.description}\n` +
`Success criteria:\n${criteria}\n\n` +
`Write your final report to ./report.md in the required format.`;
const { session } = dispatchExecutive({
executiveId: assignment.executive,
instruction,
missionId: mid,
routingReason: `Assignment ${aid} of ${wo.id} (objective ${objective.id})`,
workspace: assignment.workspace,
});
const next: Assignment = {
...assignment,
status: "Dispatched",
updated_at: nowIso(),
session: {
session_id: session.id,
assignment_id: aid,
executive: assignment.executive,
status: "running",
report_path: ctx.report_location,
},
};
writeJsonAtomic(aJson(mid, aid), next);
append(mid, aid, "dispatch_started", {
session_id: session.id,
detail: assignment.executive,
});
recordDispatchStarted(mid, {
sessionId: session.id,
executiveId: assignment.executive,
instruction: `[${aid}] ${wo.title}`,
});
return { session_id: session.id };
}
/**
* Instantiate a worker from a template and dispatch the assignment through it
* (PASS M7). The existing Claude engine is unchanged; the worker instance is the
* ephemeral record (template → instance → session → report → terminate).
*/
export function instantiateWorker(
mid: string,
aid: string,
templateId: string,
): { session_id: string; worker_instance_id: string } {
const assignment = readAssignment(mid, aid);
if (!assignment) throw new AssignmentError("Assignment not found.", 404);
if (assignment.status !== "Pending" && assignment.status !== "Failed") {
throw new AssignmentError(
`Assignment is ${assignment.status}; only Pending/Failed assignments dispatch.`,
409,
);
}
const mission = readMission(mid);
if (mission && !isExecutable(mission.status)) {
throw new AssignmentError(
`Mission is not executable. Current State: ${mission.status}. ` +
`Allowed: ${EXECUTABLE_STATES.join(", ")}.`,
409,
);
}
const template = getTemplate(templateId);
if (!template) throw new AssignmentError("Worker template not found.", 404);
const ctx = readExecutionContext(mid, aid);
if (!ctx) throw new AssignmentError("Execution context missing.", 400);
const wo = readWorkOrder(mid, assignment.work_order_id);
const objective = getObjective(mid, assignment.objective_id);
if (!wo || !objective) throw new AssignmentError("Broken execution chain.", 400);
const criteria = objective.success_criteria.length
? objective.success_criteria.map((c) => `- ${c}`).join("\n")
: "- (none specified)";
const instruction =
`You are a ${template.name} (template ${template.id}, v${template.version}) — a temporary worker. ` +
`${template.description}\n\n` +
`Work Order ${wo.id}: ${wo.title}\n\n${wo.description}\n\n` +
`Objective ${objective.id}: ${objective.title} — ${objective.description}\n` +
`Success criteria:\n${criteria}\n\n` +
`Write your final report to ./report.md in the required format.`;
const { session } = dispatchExecutive({
executiveId: assignment.executive,
instruction,
missionId: mid,
routingReason: `Worker ${template.name} for assignment ${aid} of ${wo.id}`,
workspace: assignment.workspace,
});
const instance = createInstance({
template_id: template.id,
mission_id: mid,
objective_id: objective.id,
work_order_id: wo.id,
assignment_id: aid,
executive: assignment.executive,
workspace: assignment.workspace,
model: template.default_model,
version: template.version,
session_id: session.id,
started_at: session.started_at,
});
const next: Assignment = {
...assignment,
status: "Dispatched",
worker_template: template.id,
worker_instance_id: instance.id,
updated_at: nowIso(),
session: {
session_id: session.id,
assignment_id: aid,
executive: assignment.executive,
status: "running",
report_path: ctx.report_location,
},
};
writeJsonAtomic(aJson(mid, aid), next);
append(mid, aid, "dispatch_started", {
session_id: session.id,
detail: `${template.name} (${instance.id})`,
});
recordDispatchStarted(mid, {
sessionId: session.id,
executiveId: assignment.executive,
instruction: `[${aid}] ${template.name}: ${wo.title}`,
});
return { session_id: session.id, worker_instance_id: instance.id };
}
/**
* Reconcile dispatched assignments: when their session has completed, mark the
* assignment Completed and record dispatch_completed. Idempotent. (Mission-level
* report storage is handled by the mission manager's collectReports.)
*/
export function collectAssignmentReports(mid: string): number {
let updated = 0;
for (const a of listAssignments(mid)) {
if (a.status !== "Dispatched" || !a.session) continue;
const session = getSession(a.session.session_id);
if (!session) continue;
if (session.status === "completed" || session.status === "completed_missing_report") {
// Generate the immutable structured report for this execution.
const rid = generateReportForAssignment(mid, a.id);
// Terminate the worker instance — no worker survives execution (PASS M7).
if (a.worker_instance_id) {
terminateInstance(a.worker_instance_id, {
status: session.status === "completed" ? "Terminated" : "Failed",
report_id: rid,
completed_at: session.stopped_at ?? nowIso(),
});
}
const next: Assignment = {
...a,
status: session.status === "completed" ? "Completed" : "Failed",
updated_at: nowIso(),
session: { ...a.session, status: session.status },
};
writeJsonAtomic(aJson(mid, a.id), next);
append(mid, a.id, "dispatch_completed", {
session_id: a.session.session_id,
detail: rid ? `${session.status} · ${rid}` : session.status,
});
updated += 1;
}
}
return updated;
}
root · /srv/aaf