The Problem
Marko leads operations at a 25-person company. They have 20 repetitive AI tasks: summarize support tickets, extract invoice data, generate weekly reports, classify emails. Each task is a separate Python script maintained by engineers. Adding a new AI task takes 2 weeks of engineering time. Non-technical staff can't modify prompts or add steps. When the LLM API changes, all 20 scripts break. They need a visual workflow builder: drag-and-drop AI nodes, chain LLM calls with data transformation, conditional branching, and a template marketplace — so operations staff can build and modify AI workflows without code.
Step 1: Build the Workflow Engine
// src/workflows/builder.ts — Visual AI workflow engine with LLM chains and conditional logic
import { pool } from "../db";
import { Redis } from "ioredis";
import { randomBytes } from "node:crypto";
const redis = new Redis(process.env.REDIS_URL!);
interface Workflow {
id: string;
name: string;
description: string;
nodes: WorkflowNode[];
edges: WorkflowEdge[];
variables: Record<string, any>;
trigger: { type: "manual" | "webhook" | "schedule" | "event"; config: Record<string, any> };
status: "draft" | "active" | "paused";
createdBy: string;
version: number;
}
type WorkflowNode = {
id: string;
type: "llm" | "transform" | "condition" | "http" | "database" | "email" | "code" | "input" | "output";
position: { x: number; y: number };
config: Record<string, any>;
label: string;
};
interface WorkflowEdge {
id: string;
source: string;
target: string;
condition?: string; // for conditional branches: "output.sentiment === 'negative'"
label?: string;
}
interface ExecutionContext {
runId: string;
workflowId: string;
variables: Record<string, any>;
nodeOutputs: Record<string, any>;
status: "running" | "completed" | "failed" | "paused";
currentNode: string;
startedAt: string;
logs: Array<{ nodeId: string; level: string; message: string; timestamp: string }>;
}
// Execute a workflow
export async function executeWorkflow(
workflowId: string,
input: Record<string, any>
): Promise<ExecutionContext> {
const workflow = await getWorkflow(workflowId);
if (!workflow) throw new Error("Workflow not found");
const runId = `run-${randomBytes(8).toString("hex")}`;
const ctx: ExecutionContext = {
runId, workflowId,
variables: { ...workflow.variables, ...input },
nodeOutputs: {},
status: "running",
currentNode: "",
startedAt: new Date().toISOString(),
logs: [],
};
// Find start node (input node or node with no incoming edges)
const targetNodes = new Set(workflow.edges.map((e) => e.target));
const startNode = workflow.nodes.find((n) => n.type === "input") ||
workflow.nodes.find((n) => !targetNodes.has(n.id));
if (!startNode) throw new Error("No start node found");
try {
await executeNode(startNode, workflow, ctx);
ctx.status = "completed";
} catch (error: any) {
ctx.status = "failed";
ctx.logs.push({ nodeId: ctx.currentNode, level: "error", message: error.message, timestamp: new Date().toISOString() });
}
// Save execution
await pool.query(
`INSERT INTO workflow_runs (id, workflow_id, status, input, node_outputs, logs, started_at, completed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())`,
[runId, workflowId, ctx.status, JSON.stringify(input),
JSON.stringify(ctx.nodeOutputs), JSON.stringify(ctx.logs), ctx.startedAt]
);
return ctx;
}
async function executeNode(
node: WorkflowNode,
workflow: Workflow,
ctx: ExecutionContext
): Promise<void> {
ctx.currentNode = node.id;
ctx.logs.push({ nodeId: node.id, level: "info", message: `Executing: ${node.label}`, timestamp: new Date().toISOString() });
let output: any;
switch (node.type) {
case "input":
output = ctx.variables;
break;
case "llm": {
const prompt = interpolate(node.config.prompt, ctx);
const systemPrompt = node.config.systemPrompt ? interpolate(node.config.systemPrompt, ctx) : undefined;
// In production: call LLM API
output = { text: `LLM response for: ${prompt.slice(0, 100)}`, model: node.config.model || "claude-sonnet-4-20250514" };
break;
}
case "transform": {
const inputData = resolveInput(node.config.input, ctx);
switch (node.config.operation) {
case "jsonParse": output = JSON.parse(inputData); break;
case "split": output = inputData.split(node.config.delimiter || "\n"); break;
case "merge": output = Object.assign({}, ...Object.values(ctx.nodeOutputs)); break;
case "filter": output = Array.isArray(inputData) ? inputData.filter((i: any) => evaluateCondition(node.config.filterCondition, { item: i })) : inputData; break;
case "template": output = interpolate(node.config.template, ctx); break;
default: output = inputData;
}
break;
}
case "condition": {
const conditionResult = evaluateCondition(node.config.condition, ctx);
output = { result: conditionResult };
// Only follow matching edges
const edges = workflow.edges.filter((e) => e.source === node.id);
for (const edge of edges) {
const shouldFollow = edge.condition
? evaluateCondition(edge.condition, ctx) === conditionResult
: true;
if (shouldFollow) {
const nextNode = workflow.nodes.find((n) => n.id === edge.target);
if (nextNode) {
ctx.nodeOutputs[node.id] = output;
await executeNode(nextNode, workflow, ctx);
}
}
}
return; // Already handled next nodes
}
case "http": {
const url = interpolate(node.config.url, ctx);
const response = await fetch(url, {
method: node.config.method || "GET",
headers: node.config.headers || {},
body: node.config.body ? interpolate(node.config.body, ctx) : undefined,
});
output = { status: response.status, body: await response.text() };
break;
}
case "database": {
const sql = interpolate(node.config.query, ctx);
const { rows } = await pool.query(sql);
output = rows;
break;
}
case "code": {
// Sandboxed code execution
const fn = new Function("ctx", "input", node.config.code);
const inputData = resolveInput(node.config.input, ctx);
output = await fn(ctx, inputData);
break;
}
case "output":
output = resolveInput(node.config.input || "{{previous}}", ctx);
break;
default:
output = null;
}
ctx.nodeOutputs[node.id] = output;
// Execute next nodes
const nextEdges = workflow.edges.filter((e) => e.source === node.id);
for (const edge of nextEdges) {
if (edge.condition && !evaluateCondition(edge.condition, ctx)) continue;
const nextNode = workflow.nodes.find((n) => n.id === edge.target);
if (nextNode) await executeNode(nextNode, workflow, ctx);
}
}
function interpolate(template: string, ctx: ExecutionContext): string {
return template.replace(/\{\{([^}]+)\}\}/g, (_, path) => {
const parts = path.trim().split(".");
let value: any = { ...ctx.variables, ...ctx.nodeOutputs };
for (const part of parts) value = value?.[part];
return value !== undefined ? String(value) : `{{${path}}}`;
});
}
function resolveInput(input: string, ctx: ExecutionContext): any {
if (input === "{{previous}}") {
const outputs = Object.values(ctx.nodeOutputs);
return outputs[outputs.length - 1];
}
return interpolate(input, ctx);
}
function evaluateCondition(condition: string, ctx: any): boolean {
try {
const fn = new Function("ctx", `return ${condition}`);
return !!fn(ctx);
} catch { return false; }
}
async function getWorkflow(id: string): Promise<Workflow | null> {
const { rows: [row] } = await pool.query("SELECT * FROM workflows WHERE id = $1", [id]);
if (!row) return null;
return { ...row, nodes: JSON.parse(row.nodes), edges: JSON.parse(row.edges), variables: JSON.parse(row.variables), trigger: JSON.parse(row.trigger) };
}
// Template marketplace
export async function publishTemplate(workflowId: string, params: { name: string; description: string; category: string; price: number }): Promise<void> {
await pool.query(
`INSERT INTO workflow_templates (workflow_id, name, description, category, price, installs, created_at)
VALUES ($1, $2, $3, $4, $5, 0, NOW())`,
[workflowId, params.name, params.description, params.category, params.price]
);
}
Results
- New AI task: 2 weeks → 30 minutes — operations staff drag-and-drop LLM node, add prompt, connect to email trigger; no engineering ticket needed
- 20 scripts → 20 visual workflows — all migrated; each editable by non-technical staff; prompt changes take effect immediately
- Conditional branching — "if sentiment is negative → escalate to manager; if positive → auto-reply" built as visual flow; business logic visible, not buried in code
- Template marketplace — "Invoice Extraction" workflow shared internally; other teams clone and customize; company builds library of reusable AI workflows
- LLM API changes isolated — model swap from GPT-4 to Claude happens in one config change per node; no code changes; workflows keep running