2026 年 AI Agent 开发最佳实践
掌握构建生产级 AI Agent 的核心设计模式、错误处理策略、测试方法和性能优化技术,从原型到生产环境的完整指南。
掌握构建生产级 AI Agent 的核心设计模式、错误处理策略、测试方法和性能优化技术,从原型到生产环境的完整指南。
构建可靠、可扩展的 AI Agent 不仅仅是调用 LLM API 那么简单。本指南涵盖了经过实战检验的模式和实践,帮助你将原型升级为生产级系统。
AI Agent 应该维护清晰的状态转换,确保行为可预测:
```typescript
enum AgentState {
IDLE = 'idle',
PLANNING = 'planning',
EXECUTING = 'executing',
VERIFYING = 'verifying',
ERROR = 'error',
COMPLETED = 'completed'
}
class AIAgent {
private state: AgentState = AgentState.IDLE
private stateHistory: AgentState[] = []
async transition(newState: AgentState): Promise
const validTransitions = {
[AgentState.IDLE]: [AgentState.PLANNING],
[AgentState.PLANNING]: [AgentState.EXECUTING, AgentState.ERROR],
[AgentState.EXECUTING]: [AgentState.VERIFYING, AgentState.ERROR],
[AgentState.VERIFYING]: [AgentState.COMPLETED, AgentState.EXECUTING, AgentState.ERROR],
[AgentState.ERROR]: [AgentState.PLANNING, AgentState.IDLE],
[AgentState.COMPLETED]: [AgentState.IDLE]
}
if (!validTransitions[this.state]?.includes(newState)) {
throw new Error(`无效的状态转换:从 ${this.state} 到 ${newState}`)
}
this.stateHistory.push(this.state)
this.state = newState
await this.onStateChange(newState)
}
private async onStateChange(state: AgentState): Promise
console.log(`Agent 状态转换为: ${state}`)
}
}
```
为什么重要:状态机防止 Agent 进入无效状态,通过清晰的审计轨迹让调试变得更简单。
集中管理工具,提高可发现性和验证能力:
```typescript
interface Tool {
name: string
description: string
parameters: Record execute: (params: any) => Promise validate?: (params: any) => boolean } class ToolRegistry { private tools = new Map register(tool: Tool): void { if (this.tools.has(tool.name)) { throw new Error(`工具 ${tool.name} 已注册`) } this.tools.set(tool.name, tool) } async execute(toolName: string, params: any): Promise const tool = this.tools.get(toolName) if (!tool) { throw new Error(`工具 ${toolName} 未找到`) } if (tool.validate && !tool.validate(params)) { throw new Error(`工具 ${toolName} 的参数无效`) } try { return await tool.execute(params) } catch (error) { throw new Error(`工具 ${toolName} 执行失败: ${error.message}`) } } getToolDescriptions(): string { return Array.from(this.tools.values()) .map(tool => `${tool.name}: ${tool.description}`) .join('\n') } } ``` 高效管理 Token 预算和上下文: ```typescript class ContextManager { private maxTokens: number private messages: Array<{role: string, content: string, tokens: number}> = [] constructor(maxTokens: number = 100000) { this.maxTokens = maxTokens } addMessage(role: string, content: string): void { const tokens = this.estimateTokens(content) this.messages.push({ role, content, tokens }) // 超出预算时裁剪旧消息 while (this.getTotalTokens() > this.maxTokens) { if (this.messages.length > 2 && this.messages[1].role !== 'system') { this.messages.splice(1, 1) } else { break } } } private estimateTokens(text: string): number { // 粗略估算:1 token ≈ 4 个字符 return Math.ceil(text.length / 4) } private getTotalTokens(): number { return this.messages.reduce((sum, msg) => sum + msg.tokens, 0) } getMessages() { return this.messages.map(({ role, content }) => ({ role, content })) } } ```3. 上下文窗口管理模式
错误处理策略
永远不要让你的 Agent 完全崩溃:
```typescript
class ResilientAgent {
private fallbackStrategies = [
this.retryWithExponentialBackoff,
this.simplifyPrompt,
this.useFallbackModel,
this.returnPartialResult
]
async executeWithFallback(task: Task): Promise
for (const strategy of this.fallbackStrategies) {
try {
return await strategy.call(this, task)
} catch (error) {
console.warn(`策略失败: ${error.message}`)
continue
}
}
throw new Error('所有降级策略已耗尽')
}
private async retryWithExponentialBackoff(task: Task, maxRetries = 3): Promise
for (let i = 0; i < maxRetries; i++) {
try {
return await this.execute(task)
} catch (error) {
if (i === maxRetries - 1) throw error
await this.sleep(Math.pow(2, i) * 1000)
}
}
}
private sleep(ms: number): Promise
return new Promise(resolve => setTimeout(resolve, ms))
}
}
```
当外部服务宕机时防止级联故障:
```typescript
class CircuitBreaker {
private failureCount = 0
private lastFailureTime: number | null = null
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED'
constructor(
private threshold: number = 5,
private timeout: number = 60000
) {}
async execute
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime! > this.timeout) {
this.state = 'HALF_OPEN'
} else {
throw new Error('熔断器处于打开状态')
}
}
try {
const result = await fn()
this.onSuccess()
return result
} catch (error) {
this.onFailure()
throw error
}
}
private onSuccess(): void {
this.failureCount = 0
this.state = 'CLOSED'
}
private onFailure(): void {
this.failureCount++
this.lastFailureTime = Date.now()
if (this.failureCount >= this.threshold) {
this.state = 'OPEN'
}
}
}
```
```typescript
import { describe, it, expect, vi } from 'vitest'
describe('AIAgent', () => {
it('应该正确处理工具执行', async () => {
const mockLLM = vi.fn().mockResolvedValue({
tool_calls: [{
name: 'search',
parameters: { query: 'test' }
}]
})
const agent = new AIAgent({ llm: mockLLM })
const result = await agent.execute('查找关于 test 的信息')
expect(mockLLM).toHaveBeenCalledWith(
expect.objectContaining({
messages: expect.arrayContaining([
expect.objectContaining({ content: '查找关于 test 的信息' })
])
})
)
})
})
```
```typescript
describe('AIAgent 集成测试', () => {
it('应该完成端到端的真实任务', async () => {
const agent = new AIAgent({
llm: new OpenAIClient({ apiKey: process.env.OPENAI_API_KEY })
})
const result = await agent.execute({
task: '计算 45.50 美元的 15% 小费',
maxSteps: 5
})
expect(result.status).toBe('completed')
expect(result.answer).toContain('6.82')
}, 30000)
})
```
```typescript
class ParallelAgent {
async executeTools(toolCalls: ToolCall[]): Promise
const independentGroups = this.groupIndependentTools(toolCalls)
const results: ToolResult[] = []
for (const group of independentGroups) {
const groupResults = await Promise.all(
group.map(tool => this.executeTool(tool))
)
results.push(...groupResults)
}
return results
}
private groupIndependentTools(tools: ToolCall[]): ToolCall[][] {
const groups: ToolCall[][] = []
const processed = new Set
for (const tool of tools) {
if (processed.has(tool.id)) continue
const independentTools = tools.filter(t =>
!processed.has(t.id) && !this.hasDependency(t, tool)
)
groups.push(independentTools)
independentTools.forEach(t => processed.add(t.id))
}
return groups
}
}
```
```typescript
class StreamingAgent {
async *executeStreaming(task: string): AsyncGenerator
const stream = await this.llm.streamCompletion({
messages: [{ role: 'user', content: task }]
})
let buffer = ''
for await (const chunk of stream) {
buffer += chunk.content
const sentences = buffer.match(/[^。!?]+[。!?]+/g) || []
for (const sentence of sentences) {
yield sentence
buffer = buffer.replace(sentence, '')
}
}
if (buffer.trim()) {
yield buffer
}
}
}
```
```typescript
class CachedAgent {
private cache = new Map
private cacheTTL = 3600000 // 1 小时
async execute(task: string): Promise
const cacheKey = this.generateCacheKey(task)
const cached = this.cache.get(cacheKey)
if (cached && Date.now() - cached.timestamp < this.cacheTTL) {
return cached.result
}
const result = await this.performExecution(task)
this.cache.set(cacheKey, { result, timestamp: Date.now() })
return result
}
private generateCacheKey(task: string): string {
return crypto.createHash('sha256').update(task).digest('hex')
}
}
```
```typescript
class ObservableAgent {
private logger: Logger
async execute(task: Task): Promise
const executionId = crypto.randomUUID()
this.logger.info('Agent 执行开始', {
executionId,
task: task.description,
timestamp: new Date().toISOString()
})
try {
const result = await this.performExecution(task)
this.logger.info('Agent 执行完成', {
executionId,
duration: result.duration,
tokensUsed: result.tokensUsed,
toolsCalled: result.toolsCalled.length
})
return result
} catch (error) {
this.logger.error('Agent 执行失败', {
executionId,
error: error.message,
stack: error.stack
})
throw error
}
}
}
```
构建生产级 AI Agent 需要仔细关注架构、错误处理、测试和性能。遵循这些最佳实践,你将构建出可靠、可维护、可投入生产的 Agent 系统。