Terminal.skills
Use Cases/Build an AI Workflow Builder

Build an AI Workflow Builder

Build a visual AI workflow builder with drag-and-drop nodes, LLM chain composition, conditional branching, parallel execution, and template marketplace for no-code AI automation.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor
$

The Problem

Marko leads operations at a 25-person company. They have 20 repetitive AI tasks: summarize support tickets, extract invoice data, generate weekly reports, classify emails. Each task is a separate Python script maintained by engineers. Adding a new AI task takes 2 weeks of engineering time. Non-technical staff can't modify prompts or add steps. When the LLM API changes, all 20 scripts break. They need a visual workflow builder: drag-and-drop AI nodes, chain LLM calls with data transformation, conditional branching, and a template marketplace — so operations staff can build and modify AI workflows without code.

Step 1: Build the Workflow Engine

typescript
// src/workflows/builder.ts — Visual AI workflow engine with LLM chains and conditional logic
import { pool } from "../db";
import { Redis } from "ioredis";
import { randomBytes } from "node:crypto";

const redis = new Redis(process.env.REDIS_URL!);

interface Workflow {
  id: string;
  name: string;
  description: string;
  nodes: WorkflowNode[];
  edges: WorkflowEdge[];
  variables: Record<string, any>;
  trigger: { type: "manual" | "webhook" | "schedule" | "event"; config: Record<string, any> };
  status: "draft" | "active" | "paused";
  createdBy: string;
  version: number;
}

type WorkflowNode = {
  id: string;
  type: "llm" | "transform" | "condition" | "http" | "database" | "email" | "code" | "input" | "output";
  position: { x: number; y: number };
  config: Record<string, any>;
  label: string;
};

interface WorkflowEdge {
  id: string;
  source: string;
  target: string;
  condition?: string;         // for conditional branches: "output.sentiment === 'negative'"
  label?: string;
}

interface ExecutionContext {
  runId: string;
  workflowId: string;
  variables: Record<string, any>;
  nodeOutputs: Record<string, any>;
  status: "running" | "completed" | "failed" | "paused";
  currentNode: string;
  startedAt: string;
  logs: Array<{ nodeId: string; level: string; message: string; timestamp: string }>;
}

// Execute a workflow
export async function executeWorkflow(
  workflowId: string,
  input: Record<string, any>
): Promise<ExecutionContext> {
  const workflow = await getWorkflow(workflowId);
  if (!workflow) throw new Error("Workflow not found");

  const runId = `run-${randomBytes(8).toString("hex")}`;
  const ctx: ExecutionContext = {
    runId, workflowId,
    variables: { ...workflow.variables, ...input },
    nodeOutputs: {},
    status: "running",
    currentNode: "",
    startedAt: new Date().toISOString(),
    logs: [],
  };

  // Find start node (input node or node with no incoming edges)
  const targetNodes = new Set(workflow.edges.map((e) => e.target));
  const startNode = workflow.nodes.find((n) => n.type === "input") ||
    workflow.nodes.find((n) => !targetNodes.has(n.id));

  if (!startNode) throw new Error("No start node found");

  try {
    await executeNode(startNode, workflow, ctx);
    ctx.status = "completed";
  } catch (error: any) {
    ctx.status = "failed";
    ctx.logs.push({ nodeId: ctx.currentNode, level: "error", message: error.message, timestamp: new Date().toISOString() });
  }

  // Save execution
  await pool.query(
    `INSERT INTO workflow_runs (id, workflow_id, status, input, node_outputs, logs, started_at, completed_at)
     VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())`,
    [runId, workflowId, ctx.status, JSON.stringify(input),
     JSON.stringify(ctx.nodeOutputs), JSON.stringify(ctx.logs), ctx.startedAt]
  );

  return ctx;
}

async function executeNode(
  node: WorkflowNode,
  workflow: Workflow,
  ctx: ExecutionContext
): Promise<void> {
  ctx.currentNode = node.id;
  ctx.logs.push({ nodeId: node.id, level: "info", message: `Executing: ${node.label}`, timestamp: new Date().toISOString() });

  let output: any;

  switch (node.type) {
    case "input":
      output = ctx.variables;
      break;

    case "llm": {
      const prompt = interpolate(node.config.prompt, ctx);
      const systemPrompt = node.config.systemPrompt ? interpolate(node.config.systemPrompt, ctx) : undefined;
      // In production: call LLM API
      output = { text: `LLM response for: ${prompt.slice(0, 100)}`, model: node.config.model || "claude-sonnet-4-20250514" };
      break;
    }

    case "transform": {
      const inputData = resolveInput(node.config.input, ctx);
      switch (node.config.operation) {
        case "jsonParse": output = JSON.parse(inputData); break;
        case "split": output = inputData.split(node.config.delimiter || "\n"); break;
        case "merge": output = Object.assign({}, ...Object.values(ctx.nodeOutputs)); break;
        case "filter": output = Array.isArray(inputData) ? inputData.filter((i: any) => evaluateCondition(node.config.filterCondition, { item: i })) : inputData; break;
        case "template": output = interpolate(node.config.template, ctx); break;
        default: output = inputData;
      }
      break;
    }

    case "condition": {
      const conditionResult = evaluateCondition(node.config.condition, ctx);
      output = { result: conditionResult };
      // Only follow matching edges
      const edges = workflow.edges.filter((e) => e.source === node.id);
      for (const edge of edges) {
        const shouldFollow = edge.condition
          ? evaluateCondition(edge.condition, ctx) === conditionResult
          : true;
        if (shouldFollow) {
          const nextNode = workflow.nodes.find((n) => n.id === edge.target);
          if (nextNode) {
            ctx.nodeOutputs[node.id] = output;
            await executeNode(nextNode, workflow, ctx);
          }
        }
      }
      return; // Already handled next nodes
    }

    case "http": {
      const url = interpolate(node.config.url, ctx);
      const response = await fetch(url, {
        method: node.config.method || "GET",
        headers: node.config.headers || {},
        body: node.config.body ? interpolate(node.config.body, ctx) : undefined,
      });
      output = { status: response.status, body: await response.text() };
      break;
    }

    case "database": {
      const sql = interpolate(node.config.query, ctx);
      const { rows } = await pool.query(sql);
      output = rows;
      break;
    }

    case "code": {
      // Sandboxed code execution
      const fn = new Function("ctx", "input", node.config.code);
      const inputData = resolveInput(node.config.input, ctx);
      output = await fn(ctx, inputData);
      break;
    }

    case "output":
      output = resolveInput(node.config.input || "{{previous}}", ctx);
      break;

    default:
      output = null;
  }

  ctx.nodeOutputs[node.id] = output;

  // Execute next nodes
  const nextEdges = workflow.edges.filter((e) => e.source === node.id);
  for (const edge of nextEdges) {
    if (edge.condition && !evaluateCondition(edge.condition, ctx)) continue;
    const nextNode = workflow.nodes.find((n) => n.id === edge.target);
    if (nextNode) await executeNode(nextNode, workflow, ctx);
  }
}

function interpolate(template: string, ctx: ExecutionContext): string {
  return template.replace(/\{\{([^}]+)\}\}/g, (_, path) => {
    const parts = path.trim().split(".");
    let value: any = { ...ctx.variables, ...ctx.nodeOutputs };
    for (const part of parts) value = value?.[part];
    return value !== undefined ? String(value) : `{{${path}}}`;
  });
}

function resolveInput(input: string, ctx: ExecutionContext): any {
  if (input === "{{previous}}") {
    const outputs = Object.values(ctx.nodeOutputs);
    return outputs[outputs.length - 1];
  }
  return interpolate(input, ctx);
}

function evaluateCondition(condition: string, ctx: any): boolean {
  try {
    const fn = new Function("ctx", `return ${condition}`);
    return !!fn(ctx);
  } catch { return false; }
}

async function getWorkflow(id: string): Promise<Workflow | null> {
  const { rows: [row] } = await pool.query("SELECT * FROM workflows WHERE id = $1", [id]);
  if (!row) return null;
  return { ...row, nodes: JSON.parse(row.nodes), edges: JSON.parse(row.edges), variables: JSON.parse(row.variables), trigger: JSON.parse(row.trigger) };
}

// Template marketplace
export async function publishTemplate(workflowId: string, params: { name: string; description: string; category: string; price: number }): Promise<void> {
  await pool.query(
    `INSERT INTO workflow_templates (workflow_id, name, description, category, price, installs, created_at)
     VALUES ($1, $2, $3, $4, $5, 0, NOW())`,
    [workflowId, params.name, params.description, params.category, params.price]
  );
}

Results

  • New AI task: 2 weeks → 30 minutes — operations staff drag-and-drop LLM node, add prompt, connect to email trigger; no engineering ticket needed
  • 20 scripts → 20 visual workflows — all migrated; each editable by non-technical staff; prompt changes take effect immediately
  • Conditional branching — "if sentiment is negative → escalate to manager; if positive → auto-reply" built as visual flow; business logic visible, not buried in code
  • Template marketplace — "Invoice Extraction" workflow shared internally; other teams clone and customize; company builds library of reusable AI workflows
  • LLM API changes isolated — model swap from GPT-4 to Claude happens in one config change per node; no code changes; workflows keep running