形态管理class BaseAgent:
...
# 形态初始化
state: AgentState = Field(
default=AgentState.IDLE, description="Current agent state"
)
# 允许执行的最多轮次
max_steps: int = Field(default=10, description="Maximum steps before termination")
# 当前已执行轮次
current_step: int = Field(default=0, description="Current step in execution")
...
# 形态流转操作
async def state_context(self, new_state: AgentState):
"""Context manager for safe agent state transitions.
Args:
new_state: The state to transition to during the context.
Yields:
None: Allows execution within the new state.
Raises:
ValueError: If the new_state is invalid.
"""
if not isinstance(new_state, AgentState):
raise ValueError(f"Invalid state: {new_state}")
previous_state = self.state
self.state = new_state
try:
yield
except Exception as e:
self.state = AgentState.ERROR # Transition to ERROR on failure
raise e
finally:
self.state = previous_state # Revert to previous state
# 判别当前Agent能否停在原地打转
def is_stuck(self) -> bool:
"""Check if the agent is stuck in a loop by detecting duplicate content"""
if len(self.memory.messages) < 2:
return False
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# Count identical content occurrences
duplicate_count = sum(
1
for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
return duplicate_count >= self.duplicate_threshold
# 处理应Agent原地打转时的处理操作(优化传递给 LLM 的prompt,以跳出原地打转的形态)
def handle_stuck_state(self):
"""Handle stuck state by adding a prompt to change strategy"""
stuck_prompt = "\
Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
...
Args:
role: The role of the message sender (user, system, assistant, tool).
content: The message content.
base64_image: Optional base64 encoded image.
**kwargs: Additional arguments (e.g., tool_call_id for tool messages).
Raises:
ValueError: If the role is unsupported.
"""
message_map = {
"user": Message.user_message,
"system": Message.system_message,
"assistant": Message.assistant_message,
"tool": lambda content, **kw: Message.tool_message(content, **kw),
}
if role not in message_map:
raise ValueError(f"Unsupported message role: {role}")
# Create message with appropriate parameters based on role
kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})}
self.memory.add_message(message_map[role](content, **kwargs))
# 获取记忆
@property
def messages(self) -> List[Message]:
"""Retrieve a list of messages from the agent's memory."""
return self.memory.messages
# 修正记忆
@messages.setter
def messages(self, value: List[Message]):
"""Set the list of messages in the agent's memory."""
self.memory.messages = value
...
Run loopasync def run(self, request: Optional[str] = None) -> str:
"""Execute the agent's main loop asynchronously.
Args:
request: Optional initial user request to process.
Returns:
A string summarizing the execution results.
Raises:
RuntimeError: If the agent is not in IDLE state at start.
"""
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and self.state != AgentState.FINISHED
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
step_result = await self.step()
# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
class ReActAgent(BaseAgent):
...
@abstractmethod
async def think(self) -> bool:
"""Process current state and decide next action"""
@abstractmethod
async def act(self) -> str:
"""Execute decided actions"""
async def step(self) -> str:
"""Execute a single step: think and act."""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()ToolCallAgent
class ToolCollection:
"""A collection of defined tools."""
...
def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}
...
# 执行工具集中的工具
async def execute(
self, *, name: str, tool_input: Dict[str, Any] = None
) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
result = await tool(**tool_input)
return result
except ToolError as e:
return ToolFailure(error=e.message)BaseTool
定义工具的基本结构,以及核心方法
class BaseTool(ABC, BaseModel):
name: str
...
async def __call__(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
return await self.execute(**kwargs)
@abstractmethod
async def execute(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""CreateChatCompletion以CreateChatCompletion为例,这里详细完成就不做深化,不同的工具不一样class CreateChatCompletion(BaseTool):
name: str = "create_chat_completion"
...
async def execute(self, required: list | None = None, **kwargs) -> Any:
"""Execute the chat completion with type conversion.
Args:
required: List of required field names or None
**kwargs: Response data
Returns:
Converted response based on response_type
"""
required = required or self.required
# Handle case when required is a list
if isinstance(required, list) and len(required) > 0:
if len(required) == 1:
required_field = required[0]
result = kwargs.get(required_field, "")
else:
# Return multiple fields as a dictionary
return {field: kwargs.get(field, "") for field in required}
else:
required_field = "response"
result = kwargs.get(required_field, "")
# Type conversion logic
if self.response_type == str:
return result
if isinstance(self.response_type, type) and issubclass(
self.response_type, BaseModel
):
return self.response_type(**kwargs)
if get_origin(self.response_type) in (list, dict):
return result # Assuming result is already in correct format
# Check if response is valid
if not response.choices or not response.choices[0].message:
print(response)
# raise ValueError("Invalid or empty response from LLM")
return None