0%

AI Agent架构模式——从AutoGen到多智能体协同

最近研究AI Agent架构,发现多智能体协同就像交响乐团一样,每个智能体都有自己的角色和专长,通过协调产生美妙的”智能交响曲”…

介绍

  AI Agent架构正经历着从单一智能体到多智能体协同的重大演进。AutoGen框架开启了智能体编排的新篇章,而现代的多智能体系统则进一步实现了复杂的协同决策和任务分解。本文将全面分析AI Agent架构的发展历程、核心技术以及在2025年的最新趋势。

AutoGen框架深度解析

AutoGen核心架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# AutoGen核心架构实现示例
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum

class AgentRole(Enum):
ASSISTANT = "assistant"
USER_PROXY = "user_proxy"
GROUP_CHAT_MANAGER = "group_chat_manager"
CODER = "coder"
REVIEWER = "reviewer"

@dataclass
class Message:
"""消息数据结构"""
content: str
sender_id: str
receiver_id: str
timestamp: float
message_type: str = "text"
metadata: Optional[Dict[str, Any]] = None

class BaseAgent:
"""基础智能体类"""
def __init__(self, name: str, role: AgentRole, system_message: str = ""):
self.name = name
self.role = role
self.system_message = system_message
self.conversation_history = []
self.skills = []

async def process_message(self, message: Message) -> Optional[Message]:
"""处理消息的基类方法"""
self.conversation_history.append(message)
return None

def add_skill(self, skill_func):
"""添加技能"""
self.skills.append(skill_func)

def can_handle_task(self, task: str) -> bool:
"""判断是否能处理特定任务"""
return any(task.lower() in skill.__name__.lower() for skill in self.skills)

class AutoGenAssistant(BaseAgent):
"""AutoGen助理智能体"""
def __init__(self, name: str, llm_config: Dict[str, Any]):
super().__init__(name, AgentRole.ASSISTANT)
self.llm_config = llm_config

async def process_message(self, message: Message) -> Optional[Message]:
"""处理消息并生成回复"""
# 这里应该是实际的LLM调用逻辑
# 为了演示,使用模拟实现
response_content = f"作为{self.name},我理解您的消息:{message.content}"

response = Message(
content=response_content,
sender_id=self.name,
receiver_id=message.sender_id,
timestamp=asyncio.get_event_loop().time()
)

self.conversation_history.append(response)
return response

class UserProxyAgent(BaseAgent):
"""用户代理智能体"""
def __init__(self, name: str, human_input_mode: str = "NEVER"):
super().__init__(name, AgentRole.USER_PROXY)
self.human_input_mode = human_input_mode
self.executed_commands = []

async def execute_command(self, command: str) -> str:
"""执行命令"""
# 安全的命令执行(仅用于示例)
if command.startswith("print"):
# 安全执行print命令
try:
# 仅允许执行打印操作
exec_result = eval(command.replace("print", "str"))
self.executed_commands.append({
"command": command,
"result": exec_result,
"timestamp": asyncio.get_event_loop().time()
})
return exec_result
except Exception as e:
return f"Command execution error: {str(e)}"
else:
return f"Command not allowed: {command}"

class GroupChatManager(BaseAgent):
"""群聊管理器"""
def __init__(self, name: str, agents: List[BaseAgent], max_round: int = 10):
super().__init__(name, AgentRole.GROUP_CHAT_MANAGER)
self.agents = {agent.name: agent for agent in agents}
self.max_round = max_round
self.current_round = 0
self.participants = agents

async def initiate_group_chat(self, initial_message: Message) -> List[Message]:
"""启动群聊"""
conversation = [initial_message]
current_message = initial_message

while self.current_round < self.max_round:
# 轮流让每个智能体发言
for agent in self.participants:
if agent.name != current_message.sender_id:
response = await agent.process_message(current_message)

if response:
conversation.append(response)
current_message = response

# 检查是否达到终止条件
if self.should_terminate(response.content):
return conversation

self.current_round += 1

return conversation

def should_terminate(self, content: str) -> bool:
"""判断是否应该终止对话"""
termination_keywords = ["TERMINATE", "STOP", "DONE", "COMPLETE"]
return any(keyword.lower() in content.lower() for keyword in termination_keywords)

# AutoGen工厂类
class AutoGenFactory:
"""AutoGen智能体工厂"""
@staticmethod
def create_assistant(name: str, llm_config: Dict[str, Any]) -> AutoGenAssistant:
return AutoGenAssistant(name, llm_config)

@staticmethod
def create_user_proxy(name: str, human_input_mode: str = "NEVER") -> UserProxyAgent:
return UserProxyAgent(name, human_input_mode)

@staticmethod
def create_group_manager(name: str, agents: List[BaseAgent]) -> GroupChatManager:
return GroupChatManager(name, agents)

AutoGen高级特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# AutoGen高级特性实现
class ToolAgent(BaseAgent):
"""工具智能体"""
def __init__(self, name: str, tools: List[callable]):
super().__init__(name, AgentRole.ASSISTANT)
self.tools = {tool.__name__: tool for tool in tools}

async def process_message(self, message: Message) -> Optional[Message]:
"""处理消息并执行工具调用"""
# 解析工具调用请求
tool_request = self.parse_tool_request(message.content)

if tool_request:
tool_name = tool_request.get("name")
tool_args = tool_request.get("arguments", {})

if tool_name in self.tools:
try:
result = self.tools[tool_name](**tool_args)
response_content = f"Tool {tool_name} executed successfully: {result}"
except Exception as e:
response_content = f"Tool {tool_name} execution failed: {str(e)}"
else:
response_content = f"Tool {tool_name} not found"
else:
response_content = f"No tool request found in message: {message.content}"

response = Message(
content=response_content,
sender_id=self.name,
receiver_id=message.sender_id,
timestamp=asyncio.get_event_loop().time()
)

self.conversation_history.append(response)
return response

def parse_tool_request(self, content: str) -> Optional[Dict[str, Any]]:
"""解析工具请求"""
import re
# 简单的工具请求解析(实际应用中需要更复杂的解析)
tool_pattern = r'Tool\(([^)]+)\)'
match = re.search(tool_pattern, content)

if match:
try:
# 假设工具请求是JSON格式
tool_info = json.loads(match.group(1))
return tool_info
except json.JSONDecodeError:
return None

return None

# 工具函数示例
def search_web(query: str, max_results: int = 3) -> List[Dict[str, str]]:
"""网络搜索工具"""
# 模拟网络搜索
return [
{"title": f"Search result for {query}", "url": f"https://example.com/{query}", "summary": f"This is a mock result for {query}"}
for _ in range(max_results)
]

def execute_code(code: str, language: str = "python") -> str:
"""代码执行工具"""
# 安全的代码执行环境(实际应用中需要沙箱)
if language.lower() == "python":
try:
# 安全的Python执行(仅用于演示)
# 在实际应用中,应使用沙箱环境
import io
import sys
from contextlib import redirect_stdout, redirect_stderr

output_buffer = io.StringIO()
error_buffer = io.StringIO()

try:
with redirect_stdout(output_buffer), redirect_stderr(error_buffer):
exec(code, {"__builtins__": {"print": print}})

result = output_buffer.getvalue()
if error_buffer.getvalue():
result += f"\nErrors: {error_buffer.getvalue()}"
return result
except Exception as e:
return f"Execution error: {str(e)}"
except Exception as e:
return f"Code execution environment error: {str(e)}"

return "Unsupported language"

def calculate(expression: str) -> str:
"""计算工具"""
# 安全的数学表达式计算
allowed_chars = set("0123456789+-*/(). ")

if not all(c in allowed_chars for c in expression):
return "Invalid characters in expression"

try:
# 使用更安全的计算方法
import ast
import operator

# 定义安全的操作符
operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.USub: operator.neg,
}

def eval_node(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
left = eval_node(node.left)
right = eval_node(node.right)
return operators[type(node.op)](left, right)
elif isinstance(node, ast.UnaryOp):
operand = eval_node(node.operand)
return operators[type(node.op)](operand)
else:
raise TypeError(f"Unsupported operation: {type(node)}")

node = ast.parse(expression, mode='eval').body
result = eval_node(node)
return str(result)
except Exception as e:
return f"Calculation error: {str(e)}"

# AutoGen增强功能
class EnhancedAutoGen:
"""增强版AutoGen功能"""
def __init__(self):
self.agents = {}
self.communication_layer = CommunicationLayer()
self.task_scheduler = TaskScheduler()

def register_agent(self, agent: BaseAgent):
"""注册智能体"""
self.agents[agent.name] = agent

async def route_message(self, message: Message) -> List[Message]:
"""智能路由消息到合适的智能体"""
# 根据消息内容和智能体能力进行路由
suitable_agents = []

for agent in self.agents.values():
if agent.can_handle_task(message.content) or agent.role != AgentRole.USER_PROXY:
suitable_agents.append(agent)

if not suitable_agents:
# 如果没有找到合适的智能体,返回错误
error_response = Message(
content="No suitable agent found for this request",
sender_id="system",
receiver_id=message.sender_id,
timestamp=asyncio.get_event_loop().time()
)
return [error_response]

# 将消息转发给第一个合适的智能体
response = await suitable_agents[0].process_message(message)
return [response] if response else []

class CommunicationLayer:
"""通信层"""
def __init__(self):
self.message_queue = asyncio.Queue()
self.subscribers = {}

async def publish_message(self, message: Message):
"""发布消息"""
# 添加到消息队列
await self.message_queue.put(message)

# 通知订阅者
if message.receiver_id in self.subscribers:
await self.subscribers[message.receiver_id].handle_message(message)

def subscribe(self, agent_id: str, callback):
"""订阅消息"""
self.subscribers[agent_id] = callback

class TaskScheduler:
"""任务调度器"""
def __init__(self):
self.pending_tasks = []
self.running_tasks = {}

async def schedule_task(self, task_func, *args, **kwargs):
"""调度任务"""
task_id = f"task_{len(self.pending_tasks)}_{asyncio.get_event_loop().time()}"
scheduled_task = {
"id": task_id,
"function": task_func,
"args": args,
"kwargs": kwargs,
"status": "scheduled",
"timestamp": asyncio.get_event_loop().time()
}

self.pending_tasks.append(scheduled_task)
return task_id

async def execute_pending_tasks(self):
"""执行待处理任务"""
completed_tasks = []

for task in self.pending_tasks[:]: # 复制列表以避免修改时迭代问题
if task["status"] == "scheduled":
try:
result = await task["function"](*task["args"], **task["kwargs"])
task["status"] = "completed"
task["result"] = result
completed_tasks.append(task)
self.pending_tasks.remove(task)
except Exception as e:
task["status"] = "failed"
task["error"] = str(e)
self.pending_tasks.remove(task)

return completed_tasks

多智能体协同架构

协同模式设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# 多智能体协同架构实现
class MultiAgentSystem:
"""多智能体系统"""
def __init__(self, system_name: str):
self.system_name = system_name
self.agents = {}
self.agent_relationships = {}
self.coordinator = CoordinatorAgent("coordinator")
self.communication_hub = CommunicationHub()

def add_agent(self, agent: BaseAgent):
"""添加智能体到系统"""
self.agents[agent.name] = agent
self.communication_hub.subscribe(agent.name, agent.process_message)

def establish_relationship(self, agent_a: str, agent_b: str, relationship_type: str):
"""建立智能体关系"""
if agent_a not in self.agent_relationships:
self.agent_relationships[agent_a] = {}
self.agent_relationships[agent_a][agent_b] = relationship_type

async def coordinate_task(self, task_description: str) -> Dict[str, Any]:
"""协调任务分配"""
# 分析任务需求
task_requirements = self.analyze_task_requirements(task_description)

# 分配任务给合适的智能体
assigned_agents = []
for agent_name, agent in self.agents.items():
if self.agent_can_handle_task(agent, task_requirements):
assigned_agents.append(agent_name)

# 协调执行
coordination_result = await self.coordinator.coordinate(
task_description,
assigned_agents,
self.agents
)

return coordination_result

def analyze_task_requirements(self, task_description: str) -> Dict[str, Any]:
"""分析任务需求"""
# 简单的任务分析(实际应用中需要更复杂的NLP分析)
requirements = {
"complexity": len(task_description.split()) > 20,
"requires_code": any(word in task_description.lower() for word in ["code", "program", "function"]),
"requires_search": any(word in task_description.lower() for word in ["search", "find", "research"]),
"requires_calculation": any(word in task_description.lower() for word in ["calculate", "compute", "math"]),
"requires_review": any(word in task_description.lower() for word in ["review", "check", "validate"])
}
return requirements

def agent_can_handle_task(self, agent: BaseAgent, requirements: Dict[str, Any]) -> bool:
"""判断智能体是否能处理任务"""
if requirements["requires_code"] and "code" in [skill.__name__ for skill in agent.skills]:
return True
if requirements["requires_search"] and "search_web" in [skill.__name__ for skill in agent.skills]:
return True
if requirements["requires_calculation"] and "calculate" in [skill.__name__ for skill in agent.skills]:
return True
if requirements["requires_review"] and "review" in [skill.__name__ for skill in agent.skills]:
return True

# 默认情况下,助理智能体可以处理大部分任务
return agent.role == AgentRole.ASSISTANT

class CoordinatorAgent(BaseAgent):
"""协调员智能体"""
def __init__(self, name: str):
super().__init__(name, AgentRole.ASSISTANT, "I coordinate tasks among multiple agents.")
self.knowledge_base = KnowledgeBase()

async def coordinate(self, task: str, agents: List[str], agent_dict: Dict[str, BaseAgent]) -> Dict[str, Any]:
"""协调任务"""
# 记录协调过程
coordination_log = {
"task": task,
"agents_involved": agents,
"coordination_steps": [],
"results": {}
}

# 为每个智能体分配子任务
for agent_name in agents:
agent = agent_dict[agent_name]
sub_task = self.generate_subtask(task, agent)

if sub_task:
# 执行子任务
sub_result = await agent.process_message(Message(
content=sub_task,
sender_id=self.name,
receiver_id=agent_name,
timestamp=asyncio.get_event_loop().time()
))

coordination_log["results"][agent_name] = {
"sub_task": sub_task,
"result": sub_result.content if sub_result else "No result",
"timestamp": asyncio.get_event_loop().time()
}

return coordination_log

def generate_subtask(self, main_task: str, agent: BaseAgent) -> Optional[str]:
"""为智能体生成子任务"""
if agent.role == AgentRole.ASSISTANT:
return f"As an assistant, please help with: {main_task}"
elif agent.role == AgentRole.USER_PROXY:
return f"Please execute or validate this task: {main_task}"
elif agent.role == AgentRole.CODER:
return f"Please write code to solve: {main_task}"
elif agent.role == AgentRole.REVIEWER:
return f"Please review and provide feedback on: {main_task}"

return f"Please contribute to: {main_task}"

class CommunicationHub:
"""通信中心"""
def __init__(self):
self.channels = {}
self.message_history = []

def subscribe(self, agent_id: str, message_handler):
"""订阅消息"""
if agent_id not in self.channels:
self.channels[agent_id] = []
self.channels[agent_id].append(message_handler)

async def broadcast_message(self, message: Message, exclude_sender: bool = True):
"""广播消息"""
targets = list(self.channels.keys())
if exclude_sender and message.sender_id in targets:
targets.remove(message.sender_id)

results = []
for target in targets:
for handler in self.channels[target]:
try:
result = await handler(message)
results.append(result)
except Exception as e:
print(f"Error sending message to {target}: {e}")

return results

def get_conversation_history(self, agent_id: str) -> List[Message]:
"""获取智能体对话历史"""
return [msg for msg in self.message_history if msg.sender_id == agent_id or msg.receiver_id == agent_id]

class KnowledgeBase:
"""知识库"""
def __init__(self):
self.facts = {}
self.procedures = {}
self.context_memory = {}

def store_fact(self, key: str, value: Any):
"""存储事实"""
self.facts[key] = value

def retrieve_fact(self, key: str) -> Any:
"""检索事实"""
return self.facts.get(key)

def store_procedure(self, name: str, procedure: callable):
"""存储程序"""
self.procedures[name] = procedure

def execute_procedure(self, name: str, *args, **kwargs) -> Any:
"""执行程序"""
if name in self.procedures:
return self.procedures[name](*args, **kwargs)
raise ValueError(f"Procedure {name} not found")

def update_context(self, context_id: str, data: Dict[str, Any]):
"""更新上下文"""
if context_id not in self.context_memory:
self.context_memory[context_id] = []
self.context_memory[context_id].append(data)

def get_context(self, context_id: str) -> List[Dict[str, Any]]:
"""获取上下文"""
return self.context_memory.get(context_id, [])

高级协同模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# 高级多智能体协同模式
class SwarmIntelligence:
"""群体智能模式"""
def __init__(self):
self.agents = []
self.swarm_state = {}
self.emergent_behaviors = []

def add_agent(self, agent):
"""添加智能体到群体"""
self.agents.append(agent)
self.initialize_agent_state(agent)

def initialize_agent_state(self, agent):
"""初始化智能体状态"""
self.swarm_state[agent.name] = {
"position": [0, 0], # 对于物理模拟
"energy": 100,
"knowledge": [],
"relationships": {}
}

def evolve_behavior(self, iterations: int = 100):
"""演化行为模式"""
for iteration in range(iterations):
# 每个智能体根据环境和其他智能体的行为调整自身行为
for agent in self.agents:
self.update_agent_behavior(agent)

# 检测涌现行为
emergent_behavior = self.detect_emergent_behavior()
if emergent_behavior:
self.emergent_behaviors.append(emergent_behavior)

def update_agent_behavior(self, agent):
"""更新智能体行为"""
# 基于其他智能体的行为和环境状态更新
neighbors = self.get_neighbors(agent)
environment_state = self.get_environment_state()

# 应用群体智能规则
self.apply_swarm_rules(agent, neighbors, environment_state)

def get_neighbors(self, agent, radius: float = 5.0):
"""获取邻居智能体"""
neighbors = []
agent_pos = self.swarm_state[agent.name]["position"]

for other_agent in self.agents:
if other_agent.name != agent.name:
other_pos = self.swarm_state[other_agent.name]["position"]
distance = self.calculate_distance(agent_pos, other_pos)
if distance <= radius:
neighbors.append((other_agent, distance))

return neighbors

def calculate_distance(self, pos1, pos2):
"""计算距离"""
import math
return math.sqrt(sum((a - b) ** 2 for a, b in zip(pos1, pos2)))

def get_environment_state(self):
"""获取环境状态"""
return {
"resources": len([a for a in self.agents if self.swarm_state[a.name]["energy"] > 50]),
"tasks_available": len([a for a in self.agents if len(self.swarm_state[a.name]["knowledge"]) < 10])
}

def apply_swarm_rules(self, agent, neighbors, environment_state):
"""应用群体规则"""
# 简单的群体行为规则
if neighbors:
# 向邻居聚拢
avg_position = [sum(n[0].swarm_state[n[0].name]["position"][i] for n in neighbors) / len(neighbors)
for i in range(len(neighbors[0][0].swarm_state[neighbors[0][0].name]["position"]))]

# 更新位置(模拟移动)
agent_pos = self.swarm_state[agent.name]["position"]
new_position = [(p + ap) / 2 for p, ap in zip(agent_pos, avg_position)]
self.swarm_state[agent.name]["position"] = new_position

# 消耗能量
self.swarm_state[agent.name]["energy"] -= 0.1

# 获取知识
if environment_state["tasks_available"] > 0:
self.swarm_state[agent.name]["knowledge"].append(f"Knowledge gained at iteration {len(self.swarm_state[agent.name]['knowledge'])}")

class HierarchicalAgentSystem:
"""层次化智能体系统"""
def __init__(self):
self.levels = {}
self.command_structure = {}

def add_level(self, level_name: str, agents: List[BaseAgent]):
"""添加层级"""
self.levels[level_name] = agents

# 建立指挥结构
if len(agents) > 1:
commander = agents[0] # 第一个智能体作为指挥官
subordinates = agents[1:] # 其余作为下属

self.command_structure[commander.name] = {
"level": level_name,
"subordinates": [s.name for s in subordinates],
"authority": len(subordinates)
}

for subordinate in subordinates:
self.command_structure[subordinate.name] = {
"level": level_name,
"commander": commander.name,
"responsibility": "execution"
}

async def hierarchical_execute(self, command: str, level_filter: Optional[str] = None):
"""分层执行命令"""
results = {}

for level_name, agents in self.levels.items():
if level_filter and level_name != level_filter:
continue

if level_name in self.command_structure:
commander_name = agents[0].name if agents else None
if commander_name and commander_name in self.command_structure:
# 指挥官接收命令
commander = next((a for a in agents if a.name == commander_name), None)
if commander:
command_result = await commander.process_message(Message(
content=f"Hierarchical command: {command}",
sender_id="system",
receiver_id=commander_name,
timestamp=asyncio.get_event_loop().time()
))

results[commander_name] = command_result

# 向下属分配任务
subordinate_results = await self.delegate_to_subordinates(
commander, command, level_name
)
results.update(subordinate_results)

return results

async def delegate_to_subordinates(self, commander, command, level_name):
"""向下属分配任务"""
results = {}

for agent_name, struct in self.command_structure.items():
if struct.get("commander") == commander.name and struct["level"] == level_name:
agent = next((a for a in self.levels[level_name] if a.name == agent_name), None)
if agent:
delegated_command = f"Delegated task from {commander.name}: {command}"
result = await agent.process_message(Message(
content=delegated_command,
sender_id=commander.name,
receiver_id=agent_name,
timestamp=asyncio.get_event_loop().time()
))
results[agent_name] = result

return results

class ConsensusMechanism:
"""共识机制"""
def __init__(self, consensus_threshold: float = 0.6):
self.threshold = consensus_threshold
self.proposals = {}
self.votes = {}

async def propose_action(self, proposal_id: str, action: str, proposer: str, agents: List[BaseAgent]):
"""提出行动方案"""
self.proposals[proposal_id] = {
"action": action,
"proposer": proposer,
"timestamp": asyncio.get_event_loop().time(),
"status": "voting"
}

self.votes[proposal_id] = {}

# 向所有智能体征求意见
vote_requests = []
for agent in agents:
vote_request = Message(
content=f"Proposal: {action}. Please vote (yes/no) and provide rationale.",
sender_id=proposer,
receiver_id=agent.name,
timestamp=asyncio.get_event_loop().time()
)
vote_requests.append((agent, vote_request))

votes = {}
for agent, request in vote_requests:
response = await agent.process_message(request)
vote = self.extract_vote(response.content) if response else {"vote": "abstain", "rationale": ""}
votes[agent.name] = vote

self.votes[proposal_id] = votes

# 计算共识
consensus_result = self.calculate_consensus(proposal_id)
self.proposals[proposal_id]["status"] = consensus_result["status"]
self.proposals[proposal_id]["consensus_percentage"] = consensus_result["percentage"]

return consensus_result

def extract_vote(self, content: str) -> Dict[str, str]:
"""提取投票"""
content_lower = content.lower()
if "yes" in content_lower or "accept" in content_lower or "agree" in content_lower:
return {"vote": "yes", "rationale": content}
elif "no" in content_lower or "reject" in content_lower or "disagree" in content_lower:
return {"vote": "no", "rationale": content}
else:
return {"vote": "abstain", "rationale": content}

def calculate_consensus(self, proposal_id: str) -> Dict[str, Any]:
"""计算共识"""
votes = self.votes[proposal_id]

yes_votes = sum(1 for vote_data in votes.values() if vote_data["vote"] == "yes")
total_votes = len(votes)

percentage = yes_votes / total_votes if total_votes > 0 else 0

status = "accepted" if percentage >= self.threshold else "rejected"

return {
"status": status,
"percentage": percentage,
"votes_cast": total_votes,
"yes_votes": yes_votes,
"threshold": self.threshold
}

实际应用案例

智能客服系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# 智能客服多智能体系统
class IntelligentCustomerServiceSystem:
"""智能客服系统"""
def __init__(self):
self.mas = MultiAgentSystem("CustomerServiceMAS")
self.setup_customer_service_agents()
self.conversation_context = {}

def setup_customer_service_agents(self):
"""设置客服智能体"""
# 客服助理智能体
support_agent = AutoGenAssistant(
"SupportAgent",
{"model": "gpt-4", "temperature": 0.3}
)
support_agent.add_skill(search_web)
support_agent.add_skill(calculate)

# 技术专家智能体
tech_expert = AutoGenAssistant(
"TechExpert",
{"model": "gpt-4", "temperature": 0.1}
)
tech_expert.add_skill(execute_code)
tech_expert.system_message = "You are a technical expert who can help with technical problems."

# 订单处理智能体
order_processor = UserProxyAgent("OrderProcessor", "NEVER")
order_processor.add_skill(self.process_order)
order_processor.add_skill(self.check_order_status)

# 投诉处理智能体
complaint_handler = AutoGenAssistant(
"ComplaintHandler",
{"model": "gpt-4", "temperature": 0.2}
)
complaint_handler.system_message = "You handle customer complaints with empathy and professionalism."

# 添加智能体到系统
self.mas.add_agent(support_agent)
self.mas.add_agent(tech_expert)
self.mas.add_agent(order_processor)
self.mas.add_agent(complaint_handler)

# 建立关系
self.mas.establish_relationship("SupportAgent", "TechExpert", "technical_support")
self.mas.establish_relationship("SupportAgent", "OrderProcessor", "order_handling")
self.mas.establish_relationship("ComplaintHandler", "SupportAgent", "escalation")

def process_order(self, order_details: Dict[str, Any]) -> str:
"""处理订单"""
# 模拟订单处理
import uuid
order_id = f"ORD-{uuid.uuid4().hex[:8].upper()}"
return f"Order {order_id} processed successfully. Expected delivery: {order_details.get('delivery_date', '3-5 business days')}"

def check_order_status(self, order_id: str) -> str:
"""检查订单状态"""
# 模拟订单状态检查
statuses = ["processing", "shipped", "out_for_delivery", "delivered"]
status = statuses[int(hash(order_id) % len(statuses))]
return f"Order {order_id} is currently: {status}"

async def handle_customer_query(self, customer_id: str, query: str) -> Dict[str, Any]:
"""处理客户查询"""
if customer_id not in self.conversation_context:
self.conversation_context[customer_id] = []

# 分析查询类型
query_analysis = self.analyze_query_type(query)

# 根据查询类型选择合适的智能体
if query_analysis["is_technical"]:
primary_agent = "TechExpert"
elif query_analysis["is_order_related"]:
primary_agent = "OrderProcessor"
elif query_analysis["is_complaint"]:
primary_agent = "ComplaintHandler"
else:
primary_agent = "SupportAgent"

# 构建上下文消息
context_messages = self.conversation_context[customer_id][-5:] # 最近5条消息
full_context = "\n".join([f"{msg['sender']}: {msg['content']}" for msg in context_messages])

initial_message = Message(
content=f"Context: {full_context}\nCustomer Query: {query}",
sender_id=customer_id,
receiver_id=primary_agent,
timestamp=asyncio.get_event_loop().time()
)

# 协调处理
result = await self.mas.coordinate_task(query)

# 记录对话
self.conversation_context[customer_id].append({
"sender": customer_id,
"content": query,
"timestamp": asyncio.get_event_loop().time(),
"handled_by": primary_agent
})

return {
"response": result,
"primary_agent": primary_agent,
"query_type": query_analysis["type"],
"timestamp": asyncio.get_event_loop().time()
}

def analyze_query_type(self, query: str) -> Dict[str, Any]:
"""分析查询类型"""
query_lower = query.lower()

is_technical = any(word in query_lower for word in [
"error", "bug", "crash", "not working", "technical", "code",
"installation", "setup", "configuration", "debug"
])

is_order_related = any(word in query_lower for word in [
"order", "purchase", "buy", "payment", "refund", "return",
"delivery", "shipment", "tracking", "invoice"
])

is_complaint = any(word in query_lower for word in [
"terrible", "awful", "horrible", "bad", "worst", "hate",
"complaint", "problem", "issue", "unsatisfied", "refund"
])

if is_technical:
query_type = "technical_support"
elif is_order_related:
query_type = "order_issue"
elif is_complaint:
query_type = "complaint"
else:
query_type = "general_inquiry"

return {
"is_technical": is_technical,
"is_order_related": is_order_related,
"is_complaint": is_complaint,
"type": query_type
}

# 使用示例
async def demo_customer_service():
"""客服系统演示"""
cs_system = IntelligentCustomerServiceSystem()

# 模拟客户查询
queries = [
"My application keeps crashing when I try to save files",
"I need to check the status of order ORD-12345",
"The service is terrible and I want a refund",
"How do I reset my password?"
]

for i, query in enumerate(queries):
print(f"\n--- Query {i+1}: {query} ---")
result = await cs_system.handle_customer_query(f"customer_{i}", query)
print(f"Handled by: {result['primary_agent']}")
print(f"Response: {result['response']}")

智能开发团队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# 智能开发团队系统
class IntelligentDevelopmentTeam:
"""智能开发团队系统"""
def __init__(self):
self.mas = MultiAgentSystem("DevTeamMAS")
self.project_context = {}
self.setup_development_agents()

def setup_development_agents(self):
"""设置开发智能体"""
# 项目经理智能体
project_manager = AutoGenAssistant(
"ProjectManager",
{"model": "gpt-4", "temperature": 0.4}
)
project_manager.system_message = "You are a project manager who coordinates development tasks."

# 前端开发智能体
frontend_dev = AutoGenAssistant(
"FrontendDeveloper",
{"model": "gpt-4", "temperature": 0.2}
)
frontend_dev.add_skill(execute_code)
frontend_dev.system_message = "You develop frontend applications using React, Vue, or Angular."

# 后端开发智能体
backend_dev = AutoGenAssistant(
"BackendDeveloper",
{"model": "gpt-4", "temperature": 0.2}
)
backend_dev.add_skill(execute_code)
backend_dev.system_message = "You develop backend services using Node.js, Python, Java, or Go."

# 测试工程师智能体
tester = AutoGenAssistant(
"Tester",
{"model": "gpt-4", "temperature": 0.1}
)
tester.system_message = "You write and execute tests to ensure code quality."

# DevOps工程师智能体
devops_eng = AutoGenAssistant(
"DevOpsEngineer",
{"model": "gpt-4", "temperature": 0.3}
)
devops_eng.system_message = "You handle deployment, CI/CD, and infrastructure management."

# 添加智能体到系统
self.mas.add_agent(project_manager)
self.mas.add_agent(frontend_dev)
self.mas.add_agent(backend_dev)
self.mas.add_agent(tester)
self.mas.add_agent(devops_eng)

# 建立协作关系
self.mas.establish_relationship("ProjectManager", "FrontendDeveloper", "task_assignment")
self.mas.establish_relationship("ProjectManager", "BackendDeveloper", "task_assignment")
self.mas.establish_relationship("FrontendDeveloper", "BackendDeveloper", "api_coordination")
self.mas.establish_relationship("BackendDeveloper", "Tester", "testing_coordination")
self.mas.establish_relationship("Tester", "DevOpsEngineer", "deployment_coordination")

async def develop_feature(self, feature_spec: str) -> Dict[str, Any]:
"""开发功能"""
print(f"🚀 Starting development of: {feature_spec}")

# 1. 项目规划
print("\n📋 Project Planning...")
planning_result = await self.mas.coordinator.coordinate(
f"Plan the implementation of: {feature_spec}",
["ProjectManager", "FrontendDeveloper", "BackendDeveloper"],
self.mas.agents
)

# 2. 前端开发
print("\n🎨 Frontend Development...")
frontend_result = await self.mas.agents["FrontendDeveloper"].process_message(Message(
content=f"Implement frontend for: {feature_spec}",
sender_id="ProjectManager",
receiver_id="FrontendDeveloper",
timestamp=asyncio.get_event_loop().time()
))

# 3. 后端开发
print("\n⚙️ Backend Development...")
backend_result = await self.mas.agents["BackendDeveloper"].process_message(Message(
content=f"Implement backend API for: {feature_spec}",
sender_id="ProjectManager",
receiver_id="BackendDeveloper",
timestamp=asyncio.get_event_loop().time()
))

# 4. 集成测试
print("\n🧪 Integration Testing...")
integration_result = await self.mas.agents["Tester"].process_message(Message(
content=f"Test the integration between frontend and backend for: {feature_spec}",
sender_id="ProjectManager",
receiver_id="Tester",
timestamp=asyncio.get_event_loop().time()
))

# 5. 部署
print("\n☁️ Deployment...")
deployment_result = await self.mas.agents["DevOpsEngineer"].process_message(Message(
content=f"Deploy the completed feature: {feature_spec}",
sender_id="ProjectManager",
receiver_id="DevOpsEngineer",
timestamp=asyncio.get_event_loop().time()
))

return {
"feature": feature_spec,
"planning": planning_result,
"frontend": frontend_result.content if frontend_result else "No frontend result",
"backend": backend_result.content if backend_result else "No backend result",
"testing": integration_result.content if integration_result else "No testing result",
"deployment": deployment_result.content if deployment_result else "No deployment result",
"completed_at": asyncio.get_event_loop().time()
}

# 使用示例
async def demo_dev_team():
"""开发团队演示"""
dev_team = IntelligentDevelopmentTeam()

feature_spec = """
Build a user authentication system with:
- Login/registration forms
- Password reset functionality
- Session management
- OAuth integration (Google, GitHub)
"""

result = await dev_team.develop_feature(feature_spec)
print(f"\n✅ Feature development completed!")
print(f"Frontend: {result['frontend'][:100]}...")
print(f"Backend: {result['backend'][:100]}...")
print(f"Testing: {result['testing'][:100]}...")
print(f"Deployment: {result['deployment'][:100]}...")

性能优化与最佳实践

智能体性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# 智能体性能监控系统
import time
import threading
from collections import defaultdict, deque
from typing import Dict, List, Tuple

class AgentPerformanceMonitor:
"""智能体性能监控"""
def __init__(self):
self.metrics = defaultdict(lambda: defaultdict(deque))
self.monitoring = True
self.stats_lock = threading.Lock()

def record_metric(self, agent_name: str, metric_name: str, value: float, timestamp: float = None):
"""记录指标"""
if timestamp is None:
timestamp = time.time()

with self.stats_lock:
self.metrics[agent_name][metric_name].append((timestamp, value))

# 保持最多1000个数据点
if len(self.metrics[agent_name][metric_name]) > 1000:
self.metrics[agent_name][metric_name].popleft()

def get_average_metric(self, agent_name: str, metric_name: str, window_minutes: int = 5) -> float:
"""获取平均指标值"""
with self.stats_lock:
if agent_name not in self.metrics or metric_name not in self.metrics[agent_name]:
return 0.0

current_time = time.time()
window_start = current_time - (window_minutes * 60)

values = [val for ts, val in self.metrics[agent_name][metric_name] if ts >= window_start]

if not values:
return 0.0

return sum(values) / len(values)

def get_utilization_rate(self, agent_name: str) -> float:
"""获取智能体利用率"""
response_times = self.get_recent_response_times(agent_name)
if not response_times:
return 0.0

# 简化的利用率计算(基于响应时间间隔)
intervals = [response_times[i+1][0] - response_times[i][0] for i in range(len(response_times)-1)]
if not intervals:
return 0.0

avg_interval = sum(intervals) / len(intervals)
return min(1.0, 0.1 / avg_interval) if avg_interval > 0 else 0.0

def get_recent_response_times(self, agent_name: str, count: int = 10) -> List[Tuple[float, float]]:
"""获取最近的响应时间"""
with self.stats_lock:
if agent_name not in self.metrics or 'response_time' not in self.metrics[agent_name]:
return []

return list(self.metrics[agent_name]['response_time'])[-count:]

class LoadBalancer:
"""负载均衡器"""
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
self.performance_monitor = AgentPerformanceMonitor()
self.agent_weights = {agent.name: 1.0 for agent in agents}

def select_agent(self, task_description: str) -> BaseAgent:
"""选择最适合的智能体"""
# 根据多个因素选择智能体
suitable_agents = []

for agent in self.agents:
score = self.calculate_agent_score(agent, task_description)
suitable_agents.append((agent, score))

# 按分数排序并选择
suitable_agents.sort(key=lambda x: x[1], reverse=True)
return suitable_agents[0][0] if suitable_agents else self.agents[0]

def calculate_agent_score(self, agent: BaseAgent, task_description: str) -> float:
"""计算智能体分数"""
# 基础能力匹配分数
capability_score = 0.0
task_lower = task_description.lower()

for skill in agent.skills:
if skill.__name__.lower() in task_lower:
capability_score += 0.8

# 性能分数
utilization = self.performance_monitor.get_utilization_rate(agent.name)
performance_score = max(0.1, 1.0 - utilization) # 利用率越低,分数越高

# 经验分数(基于历史成功率)
historical_success_rate = self.get_historical_success_rate(agent.name)

# 综合分数
total_score = (
capability_score * 0.5 +
performance_score * 0.3 +
historical_success_rate * 0.2
)

return total_score

def get_historical_success_rate(self, agent_name: str) -> float:
"""获取历史成功率"""
# 这里应该从持久化存储中获取实际的历史成功率
# 简单的模拟实现
return 0.85 # 假设平均成功率是85%

# 智能缓存系统
class SmartCache:
"""智能缓存系统"""
def __init__(self, max_size: int = 1000):
self.cache = {}
self.access_times = {}
self.hit_counts = defaultdict(int)
self.max_size = max_size

def get(self, key: str):
"""获取缓存项"""
if key in self.cache:
self.access_times[key] = time.time()
self.hit_counts[key] += 1
return self.cache[key]
return None

def put(self, key: str, value):
"""放入缓存项"""
if len(self.cache) >= self.max_size:
self.evict_least_valuable()

self.cache[key] = value
self.access_times[key] = time.time()

def evict_least_valuable(self):
"""驱逐最低价值的缓存项"""
if not self.cache:
return

# 基于访问频率和时间计算价值
def calculate_value(key):
age = time.time() - self.access_times[key]
hits = self.hit_counts[key]
return hits / (age + 1) # 频繁访问且较新的项目价值更高

# 找到价值最低的项目
least_valuable_key = min(self.cache.keys(), key=calculate_value)

del self.cache[least_valuable_key]
del self.access_times[least_valuable_key]
del self.hit_counts[least_valuable_key]

def get_hit_rate(self) -> float:
"""获取命中率"""
if sum(self.hit_counts.values()) == 0:
return 0.0

hits = sum(count for key, count in self.hit_counts.items() if key in self.cache)
total_accesses = sum(self.hit_counts.values())

return hits / total_accesses if total_accesses > 0 else 0.0

总结

  • AutoGen框架为多智能体系统奠定了基础架构
  • 群体智能模式实现了自组织协作
  • 层次化结构提供了清晰的指挥体系
  • 共识机制确保了决策一致性
  • 智能体性能监控保障了系统稳定性
  • 负载均衡优化了资源利用率
  • 智能缓存提高了响应速度

多智能体协同就像一支精密的交响乐团,每个智能体都有自己的角色和专长。通过精心设计的架构和协调机制,这些智能体能够和谐地协同工作,创造出超越单个智能体能力的智能效果。

未来发展趋势

  1. 自主进化: 智能体会自主学习和适应
  2. 情感计算: 更好的情感理解和表达
  3. 道德决策: 内置伦理和价值观考量
  4. 跨平台协作: 不同系统间的智能体协作
  5. 量子加速: 量子计算赋能的智能体

扩展阅读

  1. AutoGen Framework Documentation
  2. Multi-Agent Systems Research
  3. Swarm Intelligence Algorithms
bulb