OpenManus 介绍2025年3月5日早,Manus 的 Demo 引爆媒介;松交着 1天后,3月7日,海内DeepWisdom MetaGPT团队战CAMEL AI 团队别离拉进去启源了名目OpenManus战OWL,复刻Manus ,持续正在收集及Github社区激发普遍会商。
GitHub地点:https://github.com/mannaandpoem/OpenManus
写正在前面OpenManus团队只用1小时便完毕了中心体系,部分只用了3小时完毕终极上线,收获 Github万星,这样高的本钱,完整能够鉴于那套框架完毕自己的智能体,先钻研一下完毕
部分运行过程
先屏障失落架构完毕,仄展部分施行框架:Agent接纳 到用户恳求后,会启开 run loop,当 agent形状 为 FINISHED 时或者到达最年夜施行轮次时完毕部分运行run loop 期间每次轮回挪用 step办法 ,step 前去运行成果每次轮回,展示 step 前去的运行成果
step办法 内乱,使用了典范的 ReAct 方法,即 Reason(思考)后,获得到施行提醒战需要挪用的 Tools(东西),终极施行 Act(举措),完毕具体的东西挪用此中Reason(思考)颠末挪用 LLM 完毕(需挑选撑持 Function Call 的模子)Agent内部 干了对于话影象的办理(Memory),屡屡挪用会将残破的对于话记载传进别的 Agent 上供给了一点儿可供使用东西散,挪用 LLM 时传进
LLM前去照应实质(content)战需要干的东西挪用(ToolCalls)正在 Act(举措)办法内乱,挪用 Tools 的 exec办法 ,按照 LLM 前去的 ToolCalls 施行对于应的东西东西的运行成果会保存到 Memory 中,供下一次传进 LLM 使用
当运行 step 的次数超越最年夜次数限定(max_steps)大概Agent捕捉到一点儿毛病的时候,Agent停止 run loop每轮step的运行成果城市被记载到 results,终极展示给用户
部分的施行框架仍是比力简朴且明了的,交下来阐发 OpenManus 的架构
具体架构
OpenManus 的定位是一个极简的 Agent框架,能够干到 Tools 战 Prompt 拉拢的可插拔Prompt 掌握 Agent 的举动,Tools 界说了 Agent 的可举措的空间范畴,Prompt 战 Tools 构成一个 ReAct Agent能够将差别场景中的 Tools 拉拢到共同组成一定场景的 agent,只要改正Tools,没有改写内部逻辑便可适配目标场景
AgentOpenManus 干了根底Agent的分层设想,使患上每层的工作绝对大白,包管了可扩大性
BaseAgent动作最根底的AgentReActAgent承袭 BaseAgentToolCallAgent承袭 ReActAgentManus承袭ToolCallAgent(那里的 Manus 可活络交流成具体场景下的 Agent)
BaseAgentBaseAgent 担当一点儿十分根底的事情,如形状办理、影象办理和施行过程run loop
形状办理class BaseAgent:
...
#形状 初初化
state: AgentState = Field(
default=AgentState.IDLE, description="Current agent state"
)
# 许可施行的至多轮次
max_steps: int = Field(default=10, description="Maximum steps before termination")
#以后 已经施行轮次
current_step: int = Field(default=0, description="Current step in execution")
...
#形状 流转操纵
async def state_context(self, new_state: AgentState):
"""Context manager for safe agent state transitions.
Args:
new_state: The state to transition to during the context.
Yields:
None: Allows execution within the new state.
Raises:
ValueError: If the new_state is invalid.
"""
if not isinstance(new_state, AgentState):
raise ValueError(f"Invalid state: {new_state}")
previous_state = self.state
self.state = new_state
try:
yield
except Exception as e:
self.state = AgentState.ERROR # Transition to ERROR on failure
raise e
finally:
self.state = previous_state # Revert to previous state
# 鉴别目前Agent可否停正在本天挨转
def is_stuck(self) -> bool:
"""Check if the agent is stuck in a loop by detecting duplicate content"""
if len(self.memory.messages) < 2:
return False
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# Count identical content occurrences
duplicate_count = sum(
1
for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
return duplicate_count >= self.duplicate_threshold
#处置 应Agent本天挨转时的处置操纵(劣化通报给 LLM 的prompt,以跳出本天挨转的形状)
def handle_stuck_state(self):
"""Handle stuck state by adding a prompt to change strategy"""
stuck_prompt = "\
Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
...
state:目前Agent形状max_steps:许可施行的至多step轮次current_step:目前已经施行的step轮次state_context():Agent形状流转is_stuck():鉴别目前 Agent 可否正在本天挨转handle_stuck_state():处置应Agent本天挨转时的处置操纵(劣化通报给 LLM 的prompt,以跳出本天挨转的形状)
影象办理class BaseAgent:
...
memory: Memory = Field(default_factory=Memory, description="Agent's memory store")
...
# 增加影象
def update_memory(
self,
role: ROLE_TYPE, # type: ignore
content: str,
base64_image: Optional[str] = None,
**kwargs,
) -> None:
"""Add a message to the agent's memory.
Args:
role: The role of the message sender (user, system, assistant, tool).
content: The message content.
base64_image: Optional base64 encoded image.
**kwargs: Additional arguments (e.g., tool_call_id for tool messages).
Raises:
ValueError: If the role is unsupported.
"""
message_map = {
"user": Message.user_message,
"system": Message.system_message,
"assistant": Message.assistant_message,
"tool": lambda content, **kw: Message.tool_message(content, **kw),
}
if role not in message_map:
raise ValueError(f"Unsupported message role: {role}")
# Create message with appropriate parameters based on role
kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})}
self.memory.add_message(message_map[role](content, **kwargs))
# 获得影象
@property
def messages(self) -> List[Message]:
"""Retrieve a list of messages from the agent's memory."""
return self.memory.messages
# 改正影象
@messages.setter
def messages(self, value: List[Message]):
"""Set the list of messages in the agent's memory."""
self.memory.messages = value
...
memory:目前影象,正在影象里会保存许多品种型的疑息,如AI消息(assistant_message)、用户消息(user_message)、体系消息(system_message)、东西施行记载(tool_message)update_memory():背影象里增加messagegetter messages:获得影象里的 messagessetter messages:改正影象里的 messages
Run loopasync def run(self, request: Optional[str] = None) -> str:
"""Execute the agent's main loop asynchronously.
Args:
request: Optional initial user request to process.
Returns:
A string su妹妹arizing the execution results.
Raises:
RuntimeError: If the agent is not in IDLE state at start.
"""
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and self.state != AgentState.FINISHED
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
step_result = await self.step()
# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
results.append(f"Step {self.current_step}: {step_result}")
if self.current_step >= self.max_steps:
self.current_step = 0
self.state = AgentState.IDLE
results.append(f"Terminated: Reached max steps ({self.max_steps})")
await SANDBOX_CLIENT.cleanup()
return "\n".join(results) if results else "No steps executed"
假设Agent形状没有是 FINISHED或许 step 次数没有超越 max_steps,便不竭施行中心干了Agent stuck的检测处置将每次 step 的施行成果存到 results 里run loop完毕后
一点儿 tools可以 会颠末沙箱情况会见操纵体系,完毕后施行沙箱情况清理前去 results 施行成果
ReActAgent
ReActAgent界说了 step施行的过程,限制为先 think(思考),假设思考的成果需要act(挪用东西),则施行 actReActAgent 界说了 think 战 act 的抽象办法
class ReActAgent(BaseAgent):
...
@abstractmethod
async def think(self) -> bool:
"""Process current state and decide next action"""
@abstractmethod
async def act(self) -> str:
"""Execute decided actions"""
async def step(self) -> str:
"""Execute a single step: think and act."""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()ToolCallAgent
ToolCallAgent完毕了 think 战 act办法 think():假设存留next_step_prompt,则将其调整退影象战目前东西散一齐传给 LLM,读与 LLM 前去成果
获得到 response.content,将其促进影象获得到 response.tool_calls,如为空,则前去false,没有需要act,如没有为空则前去 true
act():遍历挪用正在 think 阶段产出的 tools,顺次把tool施行成果参加到影象中,并前去
class ToolCallAgent(ReActAgent):
"""Base agent class for handling tool/function calls with enhanced abstraction"""
...
# 可用的东西散
available_tools: ToolCollection = ToolCollection(
CreateChatCompletion(), Terminate()
)
# 待施行的东西挪用
tool_calls: List[ToolCall] = Field(default_factory=list)
...
async def think(self) -> bool:
...
# Get response with tool options
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=(
[Message.system_message(self.system_prompt)]
if self.system_prompt
else None
),
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
...
self.tool_calls = tool_calls = (
response.tool_calls if response and response.tool_calls else []
)
content = response.content if response and response.content else ""
...
assistant_msg = (
Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
if self.tool_calls
else Message.assistant_message(content)
)
self.memory.add_message(assistant_msg)
...
return bool(self.tool_calls)
...
async def act(self) -> str:
results = []
for co妹妹and in self.tool_calls:
...
result = await self.execute_tool(co妹妹and)
if self.max_observe:
result = result[: self.max_observe]
logger.info(
f"🎯 Tool '{co妹妹and.function.name}' completed its mission! Result: {result}"
)
# Add tool response to memory
tool_msg = Message.tool_message(
content=result,
tool_call_id=co妹妹and.id,
name=co妹妹and.function.name,
...
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)Manus那里指具体场景下的Agent完毕,承袭自ToolCallAgent,从头界说了 tools 空间,增强了 think办法
从头界说东西散 available_tools:python 剧本施行器、浏览器挪用东西、字符串交流编纂器增强think办法:正在施行默认的 think 过程前,因为东西中使用了 BrowserUser,以是需要检测近来的对于话中可否存留有 BrowserUser 的东西挪用,
假设存留,需要将目前浏览器形状搁到 next_step_prompt 中,让LLM感知到,以更佳输出下一步的施行步调
class Manus(ToolCallAgent):
"""A versatile general-purpose agent."""
...
# Add general-purpose tools to the tool collection
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(), BrowserUseTool(), StrWordStrEditor(), Terminate()
)
)
# 浏览器操纵情况
browser_context_helper: Optional[BrowserContextHelper] = None
...
#增强 think办法
async def think(self) -> bool:
"""Process current state and decide next actions with appropriate context."""
original_prompt = self.next_step_prompt
recent_messages = self.memory.messages[-3:] if self.memory.messages else []
browser_in_use = any(
tc.function.name == BrowserUseTool().name
for msg in recent_messages
if msg.tool_calls
for tc in msg.tool_calls
)
if browser_in_use:
self.next_step_prompt = (
await self.browser_context_helper.format_next_step_prompt()
)
result = await super().think()
# Restore original prompt
self.next_step_prompt = original_prompt
return resultTools正在以前的代码中能够瞅到,ToolCallAgent 颠末挪用 execute_tool办法 去施行 tools 挪用
tools 挪用颠末 东西散 ToolCollection 的 execute 施行单个 Tool承袭 自 BaseTool,颠末完毕 execute办法 ,界说具体施行历程
ToolCollection
class ToolCollection:
"""A collection of defined tools."""
...
def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}
...
# 施行东西集合的东西
async def execute(
self, *, name: str, tool_input: Dict[str, Any] = None
) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
result = await tool(**tool_input)
return result
except ToolError as e:
return ToolFailure(error=e.message)BaseTool
class BaseTool(ABC, BaseModel):
name: str
...
async def __call__(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
return await self.execute(**kwargs)
@abstractmethod
async def execute(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""CreateChatCompletion以CreateChatCompletion为例,那里具体完毕便没有干深入,差别的东西纷歧样class CreateChatCompletion(BaseTool):
name: str = "create_chat_completion"
...
async def execute(self, required: list | None = None, **kwargs) -> Any:
"""Execute the chat completion with type conversion.
Args:
required: List of required field names or None
**kwargs: Response data
Returns:
Converted response based on response_type
"""
required = required or self.required
# Handle case when required is a list
if isinstance(required, list) and len(required) > 0:
if len(required) == 1:
required_field = required[0]
result = kwargs.get(required_field, "")
else:
# Return multiple fields as a dictionary
return {field: kwargs.get(field, "") for field in required}
else:
required_field = "response"
result = kwargs.get(required_field, "")
# Type conversion logic
if self.response_type == str:
return result
if isinstance(self.response_type, type) and issubclass(
self.response_type, BaseModel
):
return self.response_type(**kwargs)
if get_origin(self.response_type) in (list, dict):
return result # Assuming result is already in correct format
try:
return self.response_type(result)
except (ValueError, TypeError):
return resultLLMLLM 对于中屏障模子之间的差别,那里只会商跟支流程相干的中心办法战属性
client:年夜模子 SDK client 工具,用去挪用年夜模子从 ToolCallAgent 的 think(),办法中能够瞅到,agent颠末挪用 ask_tool办法 获得思考成果
恳求参数
messages:消息列表system_msgs:体系 prompttools:可挪用的东西散
施行历程将传退的Message列表变换成 年夜模子的领受的消息 格局计较行将倡议的年夜模子挪用 message + toocall token数目可否超越限定倡议年夜模子恳求,领受前去革新模子token消耗,终极前去年夜模子 reason后果
class LLM:
...
async def ask_tool(
self,
messages: List[Union[dict, Message]],
system_msgs: Optional[List[Union[dict, Message]]] = None,
tools: Optional[List[dict]] = None,
...
**kwargs,
) -> ChatCompletionMessage | None:
...
# Message 列表变换成 年夜模子领受的消息 格局
supports_images = self.model in MULTIMODAL_MODELS
if system_msgs:
system_msgs = self.format_messages(system_msgs, supports_images)
messages = system_msgs + self.format_messages(messages, supports_images)
else:
messages = self.format_messages(messages, supports_images)
# 计较token数目可否超越限定
input_tokens = self.count_message_tokens(messages)
tools_tokens = 0
if tools:
for tool in tools:
tools_tokens += self.count_tokens(str(tool))
input_tokens += tools_tokens
if not self.check_token_limit(input_tokens):
error_message = self.get_limit_error_message(input_tokens)
raise TokenLimitExceeded(error_message)
...
# 倡议年夜模子恳求
params = {
"model": self.model,
"messages": messages,
"tools": tools,
...
**kwargs,
}
response: ChatCompletion = await self.client.chat.completions.create(
**params
)
# Check if response is valid
if not response.choices or not response.choices[0].message:
print(response)
# raise ValueError("Invalid or empty response from LLM")
return None
# Update token counts
self.update_token_count(
response.usage.prompt_tokens, response.usage.completion_tokens
)
return response.choices[0].message对于 Function Call 正在那里便再也不赘述,念理解的能够瞅 OpenAI function-calling 文档
最初OpenManus后绝会鉴于目前颠簸的根底架构上会干上面多少件工作Planning才干 (东西):颠末 planning tool 装解任务,处置幻想天下庞大成就适配更多模子:扩大 Claude 及 DeepSeek 系列模子,劣化高本钱使用场景容器化布置:简化情况启用过程RAG 模块散成:颠末内部常识库增强 Agent 结果
|