1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
| class SwarmIntelligence: """群体智能模式""" def __init__(self): self.agents = [] self.swarm_state = {} self.emergent_behaviors = []
def add_agent(self, agent): """添加智能体到群体""" self.agents.append(agent) self.initialize_agent_state(agent)
def initialize_agent_state(self, agent): """初始化智能体状态""" self.swarm_state[agent.name] = { "position": [0, 0], "energy": 100, "knowledge": [], "relationships": {} }
def evolve_behavior(self, iterations: int = 100): """演化行为模式""" for iteration in range(iterations): for agent in self.agents: self.update_agent_behavior(agent)
emergent_behavior = self.detect_emergent_behavior() if emergent_behavior: self.emergent_behaviors.append(emergent_behavior)
def update_agent_behavior(self, agent): """更新智能体行为""" neighbors = self.get_neighbors(agent) environment_state = self.get_environment_state()
self.apply_swarm_rules(agent, neighbors, environment_state)
def get_neighbors(self, agent, radius: float = 5.0): """获取邻居智能体""" neighbors = [] agent_pos = self.swarm_state[agent.name]["position"]
for other_agent in self.agents: if other_agent.name != agent.name: other_pos = self.swarm_state[other_agent.name]["position"] distance = self.calculate_distance(agent_pos, other_pos) if distance <= radius: neighbors.append((other_agent, distance))
return neighbors
def calculate_distance(self, pos1, pos2): """计算距离""" import math return math.sqrt(sum((a - b) ** 2 for a, b in zip(pos1, pos2)))
def get_environment_state(self): """获取环境状态""" return { "resources": len([a for a in self.agents if self.swarm_state[a.name]["energy"] > 50]), "tasks_available": len([a for a in self.agents if len(self.swarm_state[a.name]["knowledge"]) < 10]) }
def apply_swarm_rules(self, agent, neighbors, environment_state): """应用群体规则""" if neighbors: avg_position = [sum(n[0].swarm_state[n[0].name]["position"][i] for n in neighbors) / len(neighbors) for i in range(len(neighbors[0][0].swarm_state[neighbors[0][0].name]["position"]))]
agent_pos = self.swarm_state[agent.name]["position"] new_position = [(p + ap) / 2 for p, ap in zip(agent_pos, avg_position)] self.swarm_state[agent.name]["position"] = new_position
self.swarm_state[agent.name]["energy"] -= 0.1
if environment_state["tasks_available"] > 0: self.swarm_state[agent.name]["knowledge"].append(f"Knowledge gained at iteration {len(self.swarm_state[agent.name]['knowledge'])}")
class HierarchicalAgentSystem: """层次化智能体系统""" def __init__(self): self.levels = {} self.command_structure = {}
def add_level(self, level_name: str, agents: List[BaseAgent]): """添加层级""" self.levels[level_name] = agents
if len(agents) > 1: commander = agents[0] subordinates = agents[1:]
self.command_structure[commander.name] = { "level": level_name, "subordinates": [s.name for s in subordinates], "authority": len(subordinates) }
for subordinate in subordinates: self.command_structure[subordinate.name] = { "level": level_name, "commander": commander.name, "responsibility": "execution" }
async def hierarchical_execute(self, command: str, level_filter: Optional[str] = None): """分层执行命令""" results = {}
for level_name, agents in self.levels.items(): if level_filter and level_name != level_filter: continue
if level_name in self.command_structure: commander_name = agents[0].name if agents else None if commander_name and commander_name in self.command_structure: commander = next((a for a in agents if a.name == commander_name), None) if commander: command_result = await commander.process_message(Message( content=f"Hierarchical command: {command}", sender_id="system", receiver_id=commander_name, timestamp=asyncio.get_event_loop().time() ))
results[commander_name] = command_result
subordinate_results = await self.delegate_to_subordinates( commander, command, level_name ) results.update(subordinate_results)
return results
async def delegate_to_subordinates(self, commander, command, level_name): """向下属分配任务""" results = {}
for agent_name, struct in self.command_structure.items(): if struct.get("commander") == commander.name and struct["level"] == level_name: agent = next((a for a in self.levels[level_name] if a.name == agent_name), None) if agent: delegated_command = f"Delegated task from {commander.name}: {command}" result = await agent.process_message(Message( content=delegated_command, sender_id=commander.name, receiver_id=agent_name, timestamp=asyncio.get_event_loop().time() )) results[agent_name] = result
return results
class ConsensusMechanism: """共识机制""" def __init__(self, consensus_threshold: float = 0.6): self.threshold = consensus_threshold self.proposals = {} self.votes = {}
async def propose_action(self, proposal_id: str, action: str, proposer: str, agents: List[BaseAgent]): """提出行动方案""" self.proposals[proposal_id] = { "action": action, "proposer": proposer, "timestamp": asyncio.get_event_loop().time(), "status": "voting" }
self.votes[proposal_id] = {}
vote_requests = [] for agent in agents: vote_request = Message( content=f"Proposal: {action}. Please vote (yes/no) and provide rationale.", sender_id=proposer, receiver_id=agent.name, timestamp=asyncio.get_event_loop().time() ) vote_requests.append((agent, vote_request))
votes = {} for agent, request in vote_requests: response = await agent.process_message(request) vote = self.extract_vote(response.content) if response else {"vote": "abstain", "rationale": ""} votes[agent.name] = vote
self.votes[proposal_id] = votes
consensus_result = self.calculate_consensus(proposal_id) self.proposals[proposal_id]["status"] = consensus_result["status"] self.proposals[proposal_id]["consensus_percentage"] = consensus_result["percentage"]
return consensus_result
def extract_vote(self, content: str) -> Dict[str, str]: """提取投票""" content_lower = content.lower() if "yes" in content_lower or "accept" in content_lower or "agree" in content_lower: return {"vote": "yes", "rationale": content} elif "no" in content_lower or "reject" in content_lower or "disagree" in content_lower: return {"vote": "no", "rationale": content} else: return {"vote": "abstain", "rationale": content}
def calculate_consensus(self, proposal_id: str) -> Dict[str, Any]: """计算共识""" votes = self.votes[proposal_id]
yes_votes = sum(1 for vote_data in votes.values() if vote_data["vote"] == "yes") total_votes = len(votes)
percentage = yes_votes / total_votes if total_votes > 0 else 0
status = "accepted" if percentage >= self.threshold else "rejected"
return { "status": status, "percentage": percentage, "votes_cast": total_votes, "yes_votes": yes_votes, "threshold": self.threshold }
|