Appearance
Graph Pipelines
The graph system lets you build complex workflows by combining agents and other nodes. It provides patterns for sequential processing, parallel execution, decision-making, and planning.
Pipeline Basics
A Pipeline chains nodes together, passing output from one to the next:
typescript
import { AgentGraph } from '@agentionai/agents';
import { ClaudeAgent } from '@agentionai/agents';
const researcher = new ClaudeAgent({
id: 'researcher',
name: 'Researcher',
description: 'Research the topic and list key facts.',
model: 'claude-sonnet-4-5',
});
const writer = new ClaudeAgent({
id: 'writer',
name: 'Writer',
description: 'Write a blog post from the research provided.',
model: 'claude-sonnet-4-5',
});
const pipeline = AgentGraph.pipeline(researcher, writer);
const result = await pipeline.execute('Artificial Intelligence in Healthcare');Executor Types
SequentialExecutor
Chains agents in sequence. Each agent receives the previous agent's output:
typescript
const chain = AgentGraph.sequential(researcher, writer, editor);
const result = await chain.execute('Topic to write about');
// researcher → writer → editorParallelExecutor
Runs multiple agents simultaneously on the same input:
typescript
const parallel = AgentGraph.parallel({}, optimist, pessimist, realist);
const results = await parallel.execute('Analyze this business proposal');
// Returns array of all agent responsesMapExecutor
Applies an agent to each item in an array:
typescript
const summarizer = new ClaudeAgent({
id: 'summarizer',
name: 'Summarizer',
description: 'Summarize this article in 2 sentences.',
model: 'claude-sonnet-4-5',
});
const mapper = AgentGraph.map(summarizer);
const articles = ['Article 1 text...', 'Article 2 text...', 'Article 3 text...'];
const summaries = await mapper.execute(articles);
// Returns array of summariesVotingSystem
Multiple agents propose solutions, then a judge selects the best:
typescript
const voting = AgentGraph.votingSystem(techLead);
const result = await voting.execute({
originalInput: 'Implement a caching solution',
solutions: [juniorSolution, seniorSolution, architectSolution],
});
// Judge evaluates all proposals and picks the bestRouterExecutor
Routes input to different agents based on content:
typescript
const router = AgentGraph.router(classifierAgent, [
{ name: 'billing', handler: billingAgent, description: 'Billing questions' },
{ name: 'technical', handler: techAgent, description: 'Technical issues' },
{ name: 'general', handler: generalAgent, description: 'General inquiries' },
]);
const result = await router.execute('I need help with my invoice');
// Routes to billingAgentPlanExecutor
Orchestrates plan-based execution with separate planning and worker agents. The planning agent creates a plan, then worker agents execute each step. Supports concurrent step execution and enforces step limits.
typescript
const planStore = AgentGraph.createPlanStore();
const contextStore = AgentGraph.createContextStore();
// Planning agent creates the plan
const planningAgent = new ClaudeAgent({
id: 'planner',
name: 'Planning Agent',
description: 'Create a detailed execution plan with clear steps.',
model: 'claude-sonnet-4-20250514',
tools: AgentGraph.createPlanningTools(planStore),
});
// Worker agent executes individual steps
const workerAgent = new ClaudeAgent({
id: 'worker',
name: 'Worker Agent',
description: 'Execute a single step and store results in context.',
model: 'claude-sonnet-4-20250514',
tools: AgentGraph.createContextTools(contextStore),
});
const executor = AgentGraph.planExecutor(planStore, planningAgent, workerAgent, {
maxSteps: 10, // Maximum steps to execute
concurrency: 1, // Run steps sequentially (1) or in parallel (>1)
onPlanCreated: (goal, steps) => {
console.log(`Plan: ${goal} with ${steps.length} steps`);
},
onStepComplete: (step, result, num) => {
console.log(`Completed step ${num}: ${step.description}`);
},
});
const finalOutput = await executor.execute('Research and summarize quantum computing');
// Returns clean finalOutput string for chaining
// Get detailed results
const details = executor.getLastResult();
console.log(details.completedSteps, details.totalSteps);Key Features:
- Separation of Concerns: Planning and execution are separate phases
- Concurrency Support: Execute multiple independent steps in parallel
- Step Limits:
maxStepsconstrains both planning and execution - Graph Chaining: Returns
finalOutputstring for pipeline integration - Detailed Results: Use
getLastResult()to access full execution details
Use in Pipelines:
typescript
const planner = AgentGraph.planExecutor(planStore, planningAgent, workerAgent, {
maxSteps: 5,
});
const summarizer = new ClaudeAgent({
description: 'Create a summary from the plan results.',
model: 'claude-sonnet-4-20250514',
});
// Chain PlanExecutor with other nodes
const pipeline = AgentGraph.pipeline(
planner,
AgentGraph.sequential(summarizer)
);
const result = await pipeline.execute('Build a feature roadmap');Shared Context
Share data between agents using a key-value context store accessed via tools:
typescript
const contextStore = AgentGraph.createContextStore({ userId: '123' });
const researcher = new ClaudeAgent({
id: 'researcher',
name: 'Researcher',
description: 'Research the topic and store findings in context.',
model: 'claude-sonnet-4-20250514',
tools: AgentGraph.createContextTools(contextStore),
});
const summarizer = new ClaudeAgent({
id: 'summarizer',
name: 'Summarizer',
description: 'Get findings from context and create a summary.',
model: 'claude-sonnet-4-20250514',
tools: AgentGraph.createContextTools(contextStore),
});
// Researcher stores findings
await researcher.execute('Research quantum computing');
// Context now has: { research_findings: '...' }
// Summarizer retrieves and uses findings
await summarizer.execute('Summarize the research findings from context');Context Tools:
context_get- Get a value by keycontext_set- Store a value with a descriptive keylist_context_keys- List all available keyscontext_delete- Delete a key
Planning Tools
Enable agents to create and manage execution plans:
typescript
const planStore = AgentGraph.createPlanStore();
const agent = new ClaudeAgent({
id: 'planner',
name: 'Planning Agent',
description: 'Create a plan, then work through each step.',
model: 'claude-sonnet-4-20250514',
tools: AgentGraph.createPlanningTools(planStore),
});
await agent.execute('Create a plan to build a REST API');
// Agent creates plan with steps, works through them, marks completePlanning Tools:
create_plan- Create a plan with a goal and stepsview_plan- View current plan statusget_next_step- Get the next pending stepupdate_step- Mark a step as completed/failedadd_step- Add a new step to the plan
Combining Context and Planning
Use both context and planning for sophisticated workflows:
typescript
const contextStore = AgentGraph.createContextStore();
const planStore = AgentGraph.createPlanStore();
const planner = new ClaudeAgent({
id: 'planner',
description: 'Create a research plan.',
model: 'claude-sonnet-4-20250514',
tools: AgentGraph.createPlanningTools(planStore),
});
const researcher = new ClaudeAgent({
id: 'researcher',
description: 'Execute research steps and store findings in context.',
model: 'claude-sonnet-4-20250514',
tools: AgentGraph.createContextTools(contextStore),
});
const executor = AgentGraph.planExecutor(planStore, planner, researcher, {
maxSteps: 10,
onStepComplete: (step, result) => {
console.log(`Completed: ${step.description}`);
},
});
await executor.execute('Research AI safety and compile a report');
// Access results from context
const findings = contextStore.get('research_findings');
const report = contextStore.get('final_report');Agents with Tools in Pipelines
Each agent in a pipeline can have its own set of tools:
typescript
// Research agent with search tools
const researcher = new OpenAiAgent({
id: 'researcher',
name: 'Researcher',
description: 'Research the topic thoroughly using available tools.',
model: 'gpt-4o',
tools: [webSearchTool, pubmedSearchTool, arxivSearchTool],
});
// Analysis agent with data tools
const analyst = new ClaudeAgent({
id: 'analyst',
name: 'Analyst',
description: 'Analyze the research and extract insights.',
model: 'claude-sonnet-4-20250514',
tools: [calculatorTool, chartGeneratorTool],
});
// Writer agent with no tools - pure text generation
const writer = new ClaudeAgent({
id: 'writer',
name: 'Writer',
description: 'Write a compelling report based on the analysis.',
model: 'claude-sonnet-4-20250514',
});
const pipeline = AgentGraph.pipeline(researcher, analyst, writer);Why This Matters:
- Specialized capabilities - Each agent has exactly the tools it needs
- Tool isolation - Agents can't accidentally use tools meant for other stages
- Cost optimization - Use cheaper models for data gathering, expensive models for synthesis
Combining Executors
Executors are nodes themselves, so you can nest them:
typescript
const researchPhase = AgentGraph.parallel({}, webSearcher, documentAnalyzer, expertConsult);
const synthesisPhase = AgentGraph.sequential(summarizer, factChecker);
const pipeline = AgentGraph.pipeline(researchPhase, synthesisPhase);Metrics & Observability
Track execution metrics across your pipeline:
typescript
import { createMetricsCollector } from '@agentionai/agents';
const metrics = createMetricsCollector();
const pipeline = AgentGraph.pipeline(researcher, writer).withMetrics(metrics);
const result = await pipeline.execute('Input');
console.log(metrics.getAggregateMetrics());
// {
// totalDurationMs: 4523,
// totalTokens: { inputTokens: 1200, outputTokens: 3400, totalTokens: 4600 },
// nodeCount: 2,
// successCount: 2,
// failureCount: 0,
// stages: [...]
// }Custom Nodes
Create custom nodes by implementing GraphNode:
typescript
import { GraphNode } from '@agentionai/agents';
const dataFetcher: GraphNode<string, object> = {
name: 'data-fetcher',
nodeType: 'custom',
async execute(url: string): Promise<object> {
const response = await fetch(url);
return response.json();
},
};
// Use in pipelines
const pipeline = AgentGraph.pipeline(dataFetcher, analyzer);Factory Methods Summary
| Method | Description |
|---|---|
AgentGraph.sequential(...agents) | Chain agents in sequence |
AgentGraph.parallel(options, ...agents) | Run agents in parallel |
AgentGraph.pipeline(...nodes) | Chain any graph nodes |
AgentGraph.map(processor, options) | Apply processor to array items |
AgentGraph.votingSystem(judge, options) | Judge selects best solution |
AgentGraph.router(router, routes, options) | Route to handlers |
AgentGraph.planExecutor(store, planner, worker, options) | Orchestrate plan-based execution |
AgentGraph.createContextStore(initial) | Create shared context |
AgentGraph.createContextTools(store) | Get context tools |
AgentGraph.createPlanStore() | Create plan store |
AgentGraph.createPlanningTools(store) | Get planning tools |