Terminal.skills
Use Cases/Build an AI Agent Orchestration Loop

Build an AI Agent Orchestration Loop

Build an autonomous AI agent loop that breaks down complex tasks, executes steps with tool calling, handles errors with self-correction, and runs until completion with human-in-the-loop checkpoints.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor
$

The Problem

Oscar leads AI engineering at a 20-person dev tools company. They want to build autonomous agents that can complete multi-step tasks: "deploy this PR to staging, run tests, report results." Current implementation is a single LLM call that can't handle errors or multi-step reasoning. When a step fails, the agent gives up instead of retrying with a different approach. There's no way to pause for human approval on sensitive actions. They need an agent loop: plan → execute → observe → adjust, with tool calling, memory, error recovery, and configurable checkpoints.

Step 1: Build the Agent Loop Engine

typescript
// src/agents/loop.ts — Autonomous agent loop with planning, tool execution, and checkpoints
import { pool } from "../db";
import { Redis } from "ioredis";
import { randomBytes } from "node:crypto";

const redis = new Redis(process.env.REDIS_URL!);

interface AgentRun {
  id: string;
  task: string;
  status: "running" | "paused" | "completed" | "failed" | "cancelled";
  plan: PlanStep[];
  currentStep: number;
  memory: AgentMemory;
  config: AgentConfig;
  iterations: number;
  maxIterations: number;
  startedAt: string;
  completedAt: string | null;
}

interface PlanStep {
  id: string;
  description: string;
  tool: string;
  args: Record<string, any>;
  status: "pending" | "running" | "completed" | "failed" | "skipped";
  result: any;
  error: string | null;
  requiresApproval: boolean;
  retries: number;
}

interface AgentMemory {
  observations: Array<{ step: number; content: string; timestamp: string }>;
  decisions: Array<{ step: number; reasoning: string; action: string }>;
  errors: Array<{ step: number; error: string; recovery: string }>;
  context: Record<string, any>;
}

interface AgentConfig {
  model: string;
  temperature: number;
  maxIterations: number;
  approvalRequired: string[];  // tool names that need human approval
  retryPolicy: { maxRetries: number; backoffMs: number };
  timeout: number;             // max total runtime in seconds
}

interface Tool {
  name: string;
  description: string;
  parameters: Record<string, { type: string; description: string; required?: boolean }>;
  execute: (args: Record<string, any>, context: Record<string, any>) => Promise<any>;
}

const tools = new Map<string, Tool>();

// Register available tools
export function registerTool(tool: Tool): void {
  tools.set(tool.name, tool);
}

// Start an autonomous agent run
export async function startRun(task: string, config?: Partial<AgentConfig>): Promise<AgentRun> {
  const id = `run-${randomBytes(8).toString("hex")}`;
  const fullConfig: AgentConfig = {
    model: "claude-sonnet-4-20250514",
    temperature: 0.3,
    maxIterations: 50,
    approvalRequired: ["deploy", "delete", "send_email"],
    retryPolicy: { maxRetries: 3, backoffMs: 2000 },
    timeout: 600,
    ...config,
  };

  const run: AgentRun = {
    id, task, status: "running",
    plan: [], currentStep: 0,
    memory: { observations: [], decisions: [], errors: [], context: {} },
    config: fullConfig,
    iterations: 0,
    maxIterations: fullConfig.maxIterations,
    startedAt: new Date().toISOString(),
    completedAt: null,
  };

  await pool.query(
    `INSERT INTO agent_runs (id, task, status, plan, memory, config, started_at)
     VALUES ($1, $2, 'running', $3, $4, $5, NOW())`,
    [id, task, JSON.stringify(run.plan), JSON.stringify(run.memory), JSON.stringify(fullConfig)]
  );

  // Start the loop asynchronously
  runLoop(run).catch(async (err) => {
    await pool.query("UPDATE agent_runs SET status = 'failed' WHERE id = $1", [id]);
  });

  return run;
}

async function runLoop(run: AgentRun): Promise<void> {
  while (run.status === "running" && run.iterations < run.maxIterations) {
    run.iterations++;

    // Phase 1: Plan (or re-plan if needed)
    if (run.plan.length === 0 || run.currentStep >= run.plan.length) {
      const plan = await generatePlan(run);
      run.plan = plan;
      run.currentStep = 0;
    }

    const step = run.plan[run.currentStep];
    if (!step) break;

    // Phase 2: Check if approval needed
    if (step.requiresApproval) {
      run.status = "paused";
      await saveRun(run);
      await redis.publish("agent:approval", JSON.stringify({
        runId: run.id, step: step.description, tool: step.tool,
      }));
      return;  // Will resume when approved
    }

    // Phase 3: Execute
    step.status = "running";
    try {
      const tool = tools.get(step.tool);
      if (!tool) throw new Error(`Unknown tool: ${step.tool}`);

      const result = await tool.execute(step.args, run.memory.context);
      step.result = result;
      step.status = "completed";

      // Phase 4: Observe and update memory
      run.memory.observations.push({
        step: run.currentStep,
        content: JSON.stringify(result).slice(0, 2000),
        timestamp: new Date().toISOString(),
      });

      run.currentStep++;
    } catch (error: any) {
      step.error = error.message;
      step.retries++;

      // Phase 5: Error recovery
      if (step.retries <= run.config.retryPolicy.maxRetries) {
        run.memory.errors.push({
          step: run.currentStep,
          error: error.message,
          recovery: `Retry ${step.retries}/${run.config.retryPolicy.maxRetries}`,
        });
        await sleep(run.config.retryPolicy.backoffMs * step.retries);
        continue;  // Retry same step
      }

      // Max retries exceeded — try to re-plan around the failure
      step.status = "failed";
      const canRecover = await attemptRecovery(run, step, error.message);
      if (!canRecover) {
        run.status = "failed";
        break;
      }
    }

    // Check if all steps complete
    if (run.plan.every((s) => s.status === "completed" || s.status === "skipped")) {
      run.status = "completed";
      run.completedAt = new Date().toISOString();
    }

    await saveRun(run);
  }

  if (run.iterations >= run.maxIterations) {
    run.status = "failed";
    run.memory.errors.push({ step: run.currentStep, error: "Max iterations exceeded", recovery: "none" });
  }

  await saveRun(run);
}

async function generatePlan(run: AgentRun): Promise<PlanStep[]> {
  // Call LLM to generate execution plan based on task + memory
  const toolDescriptions = Array.from(tools.values()).map((t) => ({
    name: t.name, description: t.description, parameters: t.parameters,
  }));

  const prompt = `Task: ${run.task}\n\nAvailable tools: ${JSON.stringify(toolDescriptions)}\n\nMemory: ${JSON.stringify(run.memory.observations.slice(-10))}\n\nGenerate a step-by-step plan.`;

  // Simplified — in production this calls the LLM API
  const steps: PlanStep[] = [
    { id: `s-${randomBytes(3).toString("hex")}`, description: "Analyze task", tool: "analyze", args: { task: run.task }, status: "pending", result: null, error: null, requiresApproval: false, retries: 0 },
  ];

  return steps;
}

async function attemptRecovery(run: AgentRun, failedStep: PlanStep, error: string): Promise<boolean> {
  run.memory.decisions.push({
    step: run.currentStep,
    reasoning: `Step "${failedStep.description}" failed: ${error}. Attempting to re-plan.`,
    action: "re-plan",
  });

  // Clear remaining steps and re-plan
  run.plan = run.plan.filter((s) => s.status === "completed");
  return true;  // Will re-plan on next iteration
}

// Approve a paused step
export async function approveStep(runId: string): Promise<void> {
  const run = await loadRun(runId);
  if (!run || run.status !== "paused") return;

  const step = run.plan[run.currentStep];
  step.requiresApproval = false;
  run.status = "running";
  await saveRun(run);
  runLoop(run).catch(() => {});
}

// Get run status with full execution history
export async function getRunStatus(runId: string): Promise<AgentRun | null> {
  return loadRun(runId);
}

async function saveRun(run: AgentRun): Promise<void> {
  await pool.query(
    "UPDATE agent_runs SET status=$2, plan=$3, memory=$4, current_step=$5, iterations=$6, completed_at=$7 WHERE id=$1",
    [run.id, run.status, JSON.stringify(run.plan), JSON.stringify(run.memory), run.currentStep, run.iterations, run.completedAt]
  );
}

async function loadRun(runId: string): Promise<AgentRun | null> {
  const { rows: [row] } = await pool.query("SELECT * FROM agent_runs WHERE id = $1", [runId]);
  if (!row) return null;
  return { ...row, plan: JSON.parse(row.plan), memory: JSON.parse(row.memory), config: JSON.parse(row.config) };
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

Results

  • Complex tasks completed autonomously — "deploy PR, run tests, report" executes as 5-step plan with self-correction; developer gets Slack notification when done
  • Self-healing on errors — deploy step fails due to missing env var → agent observes error → re-plans to set env var first → retries deploy → succeeds
  • Human-in-the-loop for sensitive actions — deploy and delete tools require approval; agent pauses, sends notification, resumes on approval; no runaway deletions
  • Full execution memory — every observation, decision, and error logged; debugging is reading the agent's "thought process" not grep-ing logs
  • Configurable retry with backoff — transient API failures handled automatically; exponential backoff prevents hammering; 3 retries before escalating to re-plan