职贝云数AI新零售门户

标题: OpenManus 核心架构解析 [打印本页]

作者: mInr    时间: 2025-4-9 16:17
标题: OpenManus 核心架构解析
OpenManus 引见2025年3月5日晚,Manus 的 Demo 引爆媒体;紧接着 1天后,3月7日,国内DeepWisdom MetaGPT团队和CAMEL AI 团队分别推出来开源了项目OpenManus和OWL,复刻Manus ,继续在网络及Github社区引发广泛讨论。

GitHub地址:https://github.com/mannaandpoem/OpenManus


写在后面OpenManus团队只用1小时就完成了核心系统,全体只用了3小时完成最终上线,播种Github万星,这么低的成本,完全可以基于这套框架完成本人的智能体,先研讨一下完成


全体运转流程
(, 下载次数: 0)
先屏蔽掉架构完成,平铺全体执行框架:全体的执行框架还是比较简单且明晰的,接上去分析 OpenManus 的架构


详细架构
AgentOpenManus 做了基础Agent的分层设计,使得每一层的职责相对明白,保证了可扩展性


BaseAgentBaseAgent 担任一些非常基础的工作,如形态管理、记忆管理以及执行流程run loop
ReActAgent
class ReActAgent(BaseAgent):
    ...
    @abstractmethod
    async def think(self) -> bool:
        """Process current state and decide next action"""
    @abstractmethod
    async def act(self) -> str:
        """Execute decided actions"""

    async def step(self) -> str:
        """Execute a single step: think and act."""
        should_act = await self.think()
        if not should_act:
            return "Thinking complete - no action needed"
        return await self.act()ToolCallAgent
class ToolCallAgent(ReActAgent):
  """Base agent class for handling tool/function calls with enhanced abstraction"""
  ...
  # 可用的工具集
  available_tools: ToolCollection = ToolCollection(
        CreateChatCompletion(), Terminate()
    )
  # 待执行的工具调用
  tool_calls: List[ToolCall] = Field(default_factory=list)
  ...
  async def think(self) -> bool:
    ...
    # Get response with tool options
    response = await self.llm.ask_tool(
        messages=self.messages,
        system_msgs=(
            [Message.system_message(self.system_prompt)]
            if self.system_prompt
            else None
        ),
        tools=self.available_tools.to_params(),
        tool_choice=self.tool_choices,
    )
    ...
    self.tool_calls = tool_calls = (
            response.tool_calls if response and response.tool_calls else []
        )
    content = response.content if response and response.content else ""
    ...
    assistant_msg = (
        Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
        if self.tool_calls
        else Message.assistant_message(content)
    )
    self.memory.add_message(assistant_msg)
    ...
    return bool(self.tool_calls)
  ...
  async def act(self) -> str:
    results = []
    for command in self.tool_calls:
        ...
        result = await self.execute_tool(command)

        if self.max_observe:
            result = result[: self.max_observe]

        logger.info(
            f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}"
        )

        # Add tool response to memory
        tool_msg = Message.tool_message(
            content=result,
            tool_call_id=command.id,
            name=command.function.name,
            ...
        )
        self.memory.add_message(tool_msg)
        results.append(result)

    return "\n\n".join(results)Manus这里指详细场景下的Agent完成,承继自ToolCallAgent,重新定义了 tools 空间,加强了 think 方法
class Manus(ToolCallAgent):
  """A versatile general-purpose agent."""
  ...
  # Add general-purpose tools to the tool collection
  available_tools: ToolCollection = Field(
      default_factory=lambda: ToolCollection(
          PythonExecute(), BrowserUseTool(), StrReplaceEditor(), Terminate()
      )
  )
  # 阅读器操作环境
  browser_context_helper: Optional[BrowserContextHelper] = None
  ...
  # 加强 think 方法
  async def think(self) -> bool:
    """Process current state and decide next actions with appropriate context."""
    original_prompt = self.next_step_prompt
    recent_messages = self.memory.messages[-3:] if self.memory.messages else []
    browser_in_use = any(
        tc.function.name == BrowserUseTool().name
        for msg in recent_messages
        if msg.tool_calls
        for tc in msg.tool_calls
    )

    if browser_in_use:
        self.next_step_prompt = (
            await self.browser_context_helper.format_next_step_prompt()
        )

    result = await super().think()

    # Restore original prompt
    self.next_step_prompt = original_prompt

    return resultTools在之前的代码中可以看到,ToolCallAgent 经过调用 execute_tool 方法来执行 tools 调用
ToolCollection
class ToolCollection:
    """A collection of defined tools."""
    ...
    def __init__(self, *tools: BaseTool):
        self.tools = tools
        self.tool_map = {tool.name: tool for tool in tools}
    ...
    # 执行工具集中的工具
    async def execute(
        self, *, name: str, tool_input: Dict[str, Any] = None
    ) -> ToolResult:
        tool = self.tool_map.get(name)
        if not tool:
            return ToolFailure(error=f"Tool {name} is invalid")
        try:
            result = await tool(**tool_input)
            return result
        except ToolError as e:
            return ToolFailure(error=e.message)BaseTool
class BaseTool(ABC, BaseModel):
    name: str
    ...
    async def __call__(self, **kwargs) -> Any:
        """Execute the tool with given parameters."""
        return await self.execute(**kwargs)

    @abstractmethod
    async def execute(self, **kwargs) -> Any:
        """Execute the tool with given parameters."""CreateChatCompletion以CreateChatCompletion为例,这里详细完成就不做深化,不同的工具不一样class CreateChatCompletion(BaseTool):
  name: str = "create_chat_completion"
  ...
  async def execute(self, required: list | None = None, **kwargs) -> Any:
      """Execute the chat completion with type conversion.

      Args:
          required: List of required field names or None
          **kwargs: Response data

      Returns:
          Converted response based on response_type
      """
      required = required or self.required

      # Handle case when required is a list
      if isinstance(required, list) and len(required) > 0:
          if len(required) == 1:
              required_field = required[0]
              result = kwargs.get(required_field, "")
          else:
              # Return multiple fields as a dictionary
              return {field: kwargs.get(field, "") for field in required}
      else:
          required_field = "response"
          result = kwargs.get(required_field, "")

      # Type conversion logic
      if self.response_type == str:
          return result

      if isinstance(self.response_type, type) and issubclass(
          self.response_type, BaseModel
      ):
          return self.response_type(**kwargs)

      if get_origin(self.response_type) in (list, dict):
          return result  # Assuming result is already in correct format

      try:
          return self.response_type(result)
      except (ValueError, TypeError):
            return resultLLMLLM 对外屏蔽模型之间的差异,这里只讨论跟主流程相关的核心方法和属性
class LLM:
  ...
  async def ask_tool(
        self,
        messages: List[Union[dict, Message]],
        system_msgs: Optional[List[Union[dict, Message]]] = None,
        tools: Optional[List[dict]] = None,
        ...
        **kwargs,
    ) -> ChatCompletionMessage | None:
    ...
    # Message 列表转换成 大模型接收的 音讯格式
    supports_images = self.model in MULTIMODAL_MODELS
    if system_msgs:
        system_msgs = self.format_messages(system_msgs, supports_images)
        messages = system_msgs + self.format_messages(messages, supports_images)
    else:
        messages = self.format_messages(messages, supports_images)
   
    # 计算token数量能否超出限制
    input_tokens = self.count_message_tokens(messages)
    tools_tokens = 0
    if tools:
        for tool in tools:
            tools_tokens += self.count_tokens(str(tool))
    input_tokens += tools_tokens

    if not self.check_token_limit(input_tokens):
        error_message = self.get_limit_error_message(input_tokens)
        raise TokenLimitExceeded(error_message)
   ...
  
   # 发起大模型央求
   params = {
        "model": self.model,
        "messages": messages,
        "tools": tools,
        ...
        **kwargs,
    }
   response: ChatCompletion = await self.client.chat.completions.create(
        **params
    )

    # Check if response is valid
    if not response.choices or not response.choices[0].message:
        print(response)
        # raise ValueError("Invalid or empty response from LLM")
        return None

    # Update token counts
    self.update_token_count(
        response.usage.prompt_tokens, response.usage.completion_tokens
    )

    return response.choices[0].message关于 Function Call 在这里就不再赘述,想了解的可以看 OpenAI function-calling 文档


最后OpenManus后续会基于当前波动的基础架构上会做下面几件事情




欢迎光临 职贝云数AI新零售门户 (https://www.taojin168.com/cloud/) Powered by Discuz! X3.5