开启左侧

OpenManus 核心架构解析

[复制链接]
在线会员 mInr 发表于 2025-4-9 16:17:44 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题 |快速收录
OpenManus 介绍2025年3月5日早,Manus 的 Demo 引爆媒介;松交着 1天后,3月7日,海内DeepWisdom MetaGPT团队战CAMEL AI 团队别离拉进去启源了名目OpenManus战OWL,复刻Manus ,持续正在收集及Github社区激发普遍会商。

GitHub地点:https://github.com/mannaandpoem/OpenManus


写正在前面OpenManus团队只用1小时便完毕了中心体系,部分只用了3小时完毕终极上线,收获 Github万星,这样高的本钱,完整能够鉴于那套框架完毕自己的智能体,先钻研一下完毕


部分运行过程
OpenManus中心  架构剖析w2.jpg
先屏障失落架构完毕,仄展部分施行框架:
    Agent接纳 到用户恳求后,会启开 run loop,当 agent形状 为 FINISHED 时或者到达最年夜施行轮次时完毕部分运行run loop 期间
      每次轮回挪用 step办法 ,step 前去运行成果每次轮回,展示 step 前去的运行成果
    step办法 内乱,使用了典范的 ReAct 方法,即 Reason(思考)后,获得到施行提醒战需要挪用的 Tools(东西),终极施行 Act(举措),完毕具体的东西挪用
      此中Reason(思考)颠末挪用 LLM 完毕(需挑选撑持 Function Call 的模子)
        Agent内部 干了对于话影象的办理(Memory),屡屡挪用会将残破的对于话记载传进别的 Agent 上供给了一点儿可供使用东西散,挪用 LLM 时传进
      LLM前去照应实质(content)战需要干的东西挪用(ToolCalls)正在 Act(举措)办法内乱,挪用 Tools 的 exec办法 ,按照 LLM 前去的 ToolCalls 施行对于应的东西东西的运行成果会保存到 Memory 中,供下一次传进 LLM 使用
    当运行 step 的次数超越最年夜次数限定(max_steps)大概Agent捕捉到一点儿毛病的时候,Agent停止 run loop每轮step的运行成果城市被记载到 results,终极展示给用户
部分的施行框架仍是比力简朴且明了的,交下来阐发 OpenManus 的架构


具体架构
    OpenManus 的定位是一个极简的 Agent框架,能够干到 Tools 战 Prompt 拉拢的可插拔Prompt 掌握 Agent 的举动,Tools 界说了 Agent 的可举措的空间范畴,Prompt 战 Tools 构成一个 ReAct Agent能够将差别场景中的 Tools 拉拢到共同组成一定场景的 agent,只要改正Tools,没有改写内部逻辑便可适配目标场景
AgentOpenManus 干了根底Agent的分层设想,使患上每层的工作绝对大白,包管了可扩大性
    BaseAgent动作最根底的AgentReActAgent承袭 BaseAgentToolCallAgent承袭 ReActAgentManus承袭ToolCallAgent(那里的 Manus 可活络交流成具体场景下的 Agent)


BaseAgentBaseAgent 担当一点儿十分根底的事情,如形状办理、影象办理和施行过程run loop
    形状办理class BaseAgent:
      ...
      #形状 初初化
      state: AgentState = Field(
            default=AgentState.IDLE, description="Current agent state"
        )
      # 许可施行的至多轮次
      max_steps: int = Field(default=10, description="Maximum steps before termination")
      #以后 已经施行轮次
      current_step: int = Field(default=0, description="Current step in execution")
      ...
      #形状 流转操纵
      async def state_context(self, new_state: AgentState):
            """Context manager for safe agent state transitions.

            Args:
                new_state: The state to transition to during the context.

            Yields:
                None: Allows execution within the new state.

            Raises:
                ValueError: If the new_state is invalid.
            """
            if not isinstance(new_state, AgentState):
                raise ValueError(f"Invalid state: {new_state}")

            previous_state = self.state
            self.state = new_state
            try:
                yield
            except Exception as e:
                self.state = AgentState.ERROR  # Transition to ERROR on failure
                raise e
            finally:
                self.state = previous_state  # Revert to previous state
      # 鉴别目前Agent可否停正在本天挨转
      def is_stuck(self) -> bool:
            """Check if the agent is stuck in a loop by detecting duplicate content"""
            if len(self.memory.messages) < 2:
                return False

            last_message = self.memory.messages[-1]
            if not last_message.content:
                return False

            # Count identical content occurrences
            duplicate_count = sum(
                1
                for msg in reversed(self.memory.messages[:-1])
                if msg.role == "assistant" and msg.content == last_message.content
            )

            return duplicate_count >= self.duplicate_threshold
      #处置 应Agent本天挨转时的处置操纵(劣化通报给 LLM 的prompt,以跳出本天挨转的形状)
      def handle_stuck_state(self):
            """Handle stuck state by adding a prompt to change strategy"""
            stuck_prompt = "\
            Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
            self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
            logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
      ...
      state:目前Agent形状max_steps:许可施行的至多step轮次current_step:目前已经施行的step轮次state_context():Agent形状流转is_stuck():鉴别目前 Agent 可否正在本天挨转handle_stuck_state():处置应Agent本天挨转时的处置操纵(劣化通报给 LLM 的prompt,以跳出本天挨转的形状)
    影象办理class BaseAgent:
      ...
      memory: Memory = Field(default_factory=Memory, description="Agent's memory store")
      ...
      # 增加影象
      def update_memory(
            self,
            role: ROLE_TYPE,  # type: ignore
            content: str,
            base64_image: Optional[str] = None,
            **kwargs,
        ) -> None:
            """Add a message to the agent's memory.

            Args:
                role: The role of the message sender (user, system, assistant, tool).
                content: The message content.
                base64_image: Optional base64 encoded image.
                **kwargs: Additional arguments (e.g., tool_call_id for tool messages).

            Raises:
                ValueError: If the role is unsupported.
            """
            message_map = {
                "user": Message.user_message,
                "system": Message.system_message,
                "assistant": Message.assistant_message,
                "tool": lambda content, **kw: Message.tool_message(content, **kw),
            }

            if role not in message_map:
                raise ValueError(f"Unsupported message role: {role}")

            # Create message with appropriate parameters based on role
            kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})}
            self.memory.add_message(message_map[role](content, **kwargs))
      # 获得影象
      @property
      def messages(self) -> List[Message]:
          """Retrieve a list of messages from the agent's memory."""
          return self.memory.messages
      # 改正影象
      @messages.setter
      def messages(self, value: List[Message]):
          """Set the list of messages in the agent's memory."""
          self.memory.messages = value
      ...
      memory:目前影象,正在影象里会保存许多品种型的疑息,如AI消息(assistant_message)、用户消息(user_message)、体系消息(system_message)、东西施行记载(tool_message)update_memory():背影象里增加messagegetter messages:获得影象里的 messagessetter messages:改正影象里的 messages
    Run loopasync def run(self, request: Optional[str] = None) -> str:
        """Execute the agent's main loop asynchronously.

        Args:
            request: Optional initial user request to process.

        Returns:
            A string su妹妹arizing the execution results.

        Raises:
            RuntimeError: If the agent is not in IDLE state at start.
        """
        if self.state != AgentState.IDLE:
            raise RuntimeError(f"Cannot run agent from state: {self.state}")

        if request:
            self.update_memory("user", request)

        results: List[str] = []
        async with self.state_context(AgentState.RUNNING):
            while (
                self.current_step < self.max_steps and self.state != AgentState.FINISHED
            ):
                self.current_step += 1
                logger.info(f"Executing step {self.current_step}/{self.max_steps}")
                step_result = await self.step()

                # Check for stuck state
                if self.is_stuck():
                    self.handle_stuck_state()

                results.append(f"Step {self.current_step}: {step_result}")

            if self.current_step >= self.max_steps:
                self.current_step = 0
                self.state = AgentState.IDLE
                results.append(f"Terminated: Reached max steps ({self.max_steps})")
        await SANDBOX_CLIENT.cleanup()
        return "\n".join(results) if results else "No steps executed"
      假设Agent形状没有是 FINISHED或许 step 次数没有超越 max_steps,便不竭施行中心干了Agent stuck的检测处置将每次 step 的施行成果存到 results 里run loop完毕后
        一点儿 tools可以 会颠末沙箱情况会见操纵体系,完毕后施行沙箱情况清理前去 results 施行成果


ReActAgent
    ReActAgent界说了 step施行的过程,限制为先 think(思考),假设思考的成果需要act(挪用东西),则施行 actReActAgent 界说了 think 战 act 的抽象办法
class ReActAgent(BaseAgent):
    ...
    @abstractmethod
    async def think(self) -> bool:
        """Process current state and decide next action"""
    @abstractmethod
    async def act(self) -> str:
        """Execute decided actions"""

    async def step(self) -> str:
        """Execute a single step: think and act."""
        should_act = await self.think()
        if not should_act:
            return "Thinking complete - no action needed"
        return await self.act()ToolCallAgent
    ToolCallAgent完毕了 think 战 act办法 think():假设存留next_step_prompt,则将其调整退影象战目前东西散一齐传给 LLM,读与 LLM 前去成果
      获得到 response.content,将其促进影象获得到 response.tool_calls,如为空,则前去false,没有需要act,如没有为空则前去 true
    act():遍历挪用正在 think 阶段产出的 tools,顺次把tool施行成果参加到影象中,并前去
class ToolCallAgent(ReActAgent):
  """Base agent class for handling tool/function calls with enhanced abstraction"""
  ...
  # 可用的东西散
  available_tools: ToolCollection = ToolCollection(
        CreateChatCompletion(), Terminate()
    )
  # 待施行的东西挪用
  tool_calls: List[ToolCall] = Field(default_factory=list)
  ...
  async def think(self) -> bool:
    ...
    # Get response with tool options
    response = await self.llm.ask_tool(
        messages=self.messages,
        system_msgs=(
            [Message.system_message(self.system_prompt)]
            if self.system_prompt
            else None
        ),
        tools=self.available_tools.to_params(),
        tool_choice=self.tool_choices,
    )
    ...
    self.tool_calls = tool_calls = (
            response.tool_calls if response and response.tool_calls else []
        )
    content = response.content if response and response.content else ""
    ...
    assistant_msg = (
        Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
        if self.tool_calls
        else Message.assistant_message(content)
    )
    self.memory.add_message(assistant_msg)
    ...
    return bool(self.tool_calls)
  ...
  async def act(self) -> str:
    results = []
    for co妹妹and in self.tool_calls:
        ...
        result = await self.execute_tool(co妹妹and)

        if self.max_observe:
            result = result[: self.max_observe]

        logger.info(
            f"🎯 Tool '{co妹妹and.function.name}' completed its mission! Result: {result}"
        )

        # Add tool response to memory
        tool_msg = Message.tool_message(
            content=result,
            tool_call_id=co妹妹and.id,
            name=co妹妹and.function.name,
            ...
        )
        self.memory.add_message(tool_msg)
        results.append(result)

    return "\n\n".join(results)Manus那里指具体场景下的Agent完毕,承袭自ToolCallAgent,从头界说了 tools 空间,增强了 think办法
    从头界说东西散 available_tools:python 剧本施行器、浏览器挪用东西、字符串交流编纂器增强think办法:正在施行默认的 think 过程前,因为东西中使用了 BrowserUser,以是需要检测近来的对于话中可否存留有 BrowserUser 的东西挪用,
      假设存留,需要将目前浏览器形状搁到 next_step_prompt 中,让LLM感知到,以更佳输出下一步的施行步调

class Manus(ToolCallAgent):
  """A versatile general-purpose agent."""
  ...
  # Add general-purpose tools to the tool collection
  available_tools: ToolCollection = Field(
      default_factory=lambda: ToolCollection(
          PythonExecute(), BrowserUseTool(), StrWordStrEditor(), Terminate()
      )
  )
  # 浏览器操纵情况
  browser_context_helper: Optional[BrowserContextHelper] = None
  ...
  #增强 think办法
  async def think(self) -> bool:
    """Process current state and decide next actions with appropriate context."""
    original_prompt = self.next_step_prompt
    recent_messages = self.memory.messages[-3:] if self.memory.messages else []
    browser_in_use = any(
        tc.function.name == BrowserUseTool().name
        for msg in recent_messages
        if msg.tool_calls
        for tc in msg.tool_calls
    )

    if browser_in_use:
        self.next_step_prompt = (
            await self.browser_context_helper.format_next_step_prompt()
        )

    result = await super().think()

    # Restore original prompt
    self.next_step_prompt = original_prompt

    return resultTools正在以前的代码中能够瞅到,ToolCallAgent 颠末挪用 execute_tool办法 去施行 tools 挪用
    tools 挪用颠末 东西散 ToolCollection 的 execute 施行单个 Tool承袭 自 BaseTool,颠末完毕 execute办法 ,界说具体施行历程
ToolCollection
    颠末挪用 execute办法 ,施行一定东西
class ToolCollection:
    """A collection of defined tools."""
    ...
    def __init__(self, *tools: BaseTool):
        self.tools = tools
        self.tool_map = {tool.name: tool for tool in tools}
    ...
    # 施行东西集合的东西
    async def execute(
        self, *, name: str, tool_input: Dict[str, Any] = None
    ) -> ToolResult:
        tool = self.tool_map.get(name)
        if not tool:
            return ToolFailure(error=f"Tool {name} is invalid")
        try:
            result = await tool(**tool_input)
            return result
        except ToolError as e:
            return ToolFailure(error=e.message)BaseTool
    界说东西的根本构造,和中心办法
class BaseTool(ABC, BaseModel):
    name: str
    ...
    async def __call__(self, **kwargs) -> Any:
        """Execute the tool with given parameters."""
        return await self.execute(**kwargs)

    @abstractmethod
    async def execute(self, **kwargs) -> Any:
        """Execute the tool with given parameters."""CreateChatCompletion以CreateChatCompletion为例,那里具体完毕便没有干深入,差别的东西纷歧样class CreateChatCompletion(BaseTool):
  name: str = "create_chat_completion"
  ...
  async def execute(self, required: list | None = None, **kwargs) -> Any:
      """Execute the chat completion with type conversion.

      Args:
          required: List of required field names or None
          **kwargs: Response data

      Returns:
          Converted response based on response_type
      """
      required = required or self.required

      # Handle case when required is a list
      if isinstance(required, list) and len(required) > 0:
          if len(required) == 1:
              required_field = required[0]
              result = kwargs.get(required_field, "")
          else:
              # Return multiple fields as a dictionary
              return {field: kwargs.get(field, "") for field in required}
      else:
          required_field = "response"
          result = kwargs.get(required_field, "")

      # Type conversion logic
      if self.response_type == str:
          return result

      if isinstance(self.response_type, type) and issubclass(
          self.response_type, BaseModel
      ):
          return self.response_type(**kwargs)

      if get_origin(self.response_type) in (list, dict):
          return result  # Assuming result is already in correct format

      try:
          return self.response_type(result)
      except (ValueError, TypeError):
            return resultLLMLLM 对于中屏障模子之间的差别,那里只会商跟支流程相干的中心办法战属性
    client:年夜模子 SDK client 工具,用去挪用年夜模子从 ToolCallAgent 的 think(),办法中能够瞅到,agent颠末挪用 ask_tool办法 获得思考成果
      恳求参数
        messages:消息列表system_msgs:体系 prompttools:可挪用的东西散
      施行历程
        将传退的Message列表变换成 年夜模子的领受的消息 格局计较行将倡议的年夜模子挪用 message + toocall token数目可否超越限定倡议年夜模子恳求,领受前去革新模子token消耗,终极前去年夜模子 reason后果


class LLM:
  ...
  async def ask_tool(
        self,
        messages: List[Union[dict, Message]],
        system_msgs: Optional[List[Union[dict, Message]]] = None,
        tools: Optional[List[dict]] = None,
        ...
        **kwargs,
    ) -> ChatCompletionMessage | None:
    ...
    # Message 列表变换成 年夜模子领受的消息 格局
    supports_images = self.model in MULTIMODAL_MODELS
    if system_msgs:
        system_msgs = self.format_messages(system_msgs, supports_images)
        messages = system_msgs + self.format_messages(messages, supports_images)
    else:
        messages = self.format_messages(messages, supports_images)
   
    # 计较token数目可否超越限定
    input_tokens = self.count_message_tokens(messages)
    tools_tokens = 0
    if tools:
        for tool in tools:
            tools_tokens += self.count_tokens(str(tool))
    input_tokens += tools_tokens

    if not self.check_token_limit(input_tokens):
        error_message = self.get_limit_error_message(input_tokens)
        raise TokenLimitExceeded(error_message)
   ...
  
   # 倡议年夜模子恳求
   params = {
        "model": self.model,
        "messages": messages,
        "tools": tools,
        ...
        **kwargs,
    }
   response: ChatCompletion = await self.client.chat.completions.create(
        **params
    )

    # Check if response is valid
    if not response.choices or not response.choices[0].message:
        print(response)
        # raise ValueError("Invalid or empty response from LLM")
        return None

    # Update token counts
    self.update_token_count(
        response.usage.prompt_tokens, response.usage.completion_tokens
    )

    return response.choices[0].message对于 Function Call 正在那里便再也不赘述,念理解的能够瞅 OpenAI function-calling 文档


最初OpenManus后绝会鉴于目前颠簸的根底架构上会干上面多少件工作
    Planning才干 (东西):颠末 planning tool 装解任务,处置幻想天下庞大成就适配更多模子:扩大 Claude 及 DeepSeek 系列模子,劣化高本钱使用场景容器化布置:简化情况启用过程RAG 模块散成:颠末内部常识库增强 Agent 结果
您需要登录后才可以回帖 登录 | 立即注册 qq_login

本版积分规则

发布主题
阅读排行更多+
用专业创造成效
400-778-7781
周一至周五 9:00-18:00
意见反馈:server@mailiao.group
紧急联系:181-67184787
ftqrcode

扫一扫关注我们

Powered by 职贝云数A新零售门户 X3.5© 2004-2025 职贝云数 Inc.( 蜀ICP备2024104722号 )