3. 状态管理
"状态是 agent 的记忆。没有适当的状态管理,agents 无法学习、恢复或改进。"
状态管理是指捕获、持久化和恢复 agent 状态(State)跨执行的实践。它使 agents 能够从故障中恢复,在长时间运行的工作流中维护上下文,并在多个 agent 实例之间协调。
3.1 Agent 状态类型
状态分类
对话状态(Conversation State)
对话历史和上下文。
@Document(collection = "conversation_state")
public class ConversationState {
@Id
private String id;
private String agentId;
private String userId;
private String sessionId;
private List<Message> messages;
private Map<String, Object> context;
private Map<String, Object> metadata;
private Instant createdAt;
private Instant updatedAt;
@Getter
@Setter
public static class Message {
private String role; // user, assistant, system, tool
private String content;
private Instant timestamp;
private Map<String, Object> metadata;
}
}
@Service
public class ConversationStateManager {
@Autowired
private MongoTemplate mongoTemplate;
public ConversationState save(ConversationState state) {
state.setUpdatedAt(Instant.now());
return mongoTemplate.save(state);
}
public Optional<ConversationState> load(String sessionId) {
return Optional.ofNullable(
mongoTemplate.findById(
sessionId,
ConversationState.class
)
);
}
public void addMessage(
String sessionId,
Message message
) {
ConversationState state = load(sessionId)
.orElseGet(() -> createNew(sessionId));
state.getMessages().add(message);
save(state);
}
public List<Message> getRecentMessages(
String sessionId,
int limit
) {
return load(sessionId)
.map(ConversationState::getMessages)
.map(messages -> messages.stream()
.skip(Math.max(0, messages.size() - limit))
.toList())
.orElse(List.of());
}
}
任务状态(Task State)
当前任务的进度和状态。
@Document(collection = "task_state")
public class TaskState {
@Id
private String id;
private String agentId;
private String taskId;
private String userId;
private TaskStatus status;
private String currentStep;
private List<String> completedSteps;
private Map<String, Object> inputData;
private Map<String, Object> outputData;
private Map<String, Object> intermediateData;
private ErrorInfo lastError;
private int retryCount;
private Instant createdAt;
private Instant startedAt;
private Instant completedAt;
public enum TaskStatus {
PENDING,
IN_PROGRESS,
BLOCKED,
COMPLETED,
FAILED,
CANCELLED
}
@Getter
@Setter
public static class ErrorInfo {
private String code;
private String message;
private String stackTrace;
private Instant timestamp;
}
}
@Service
public class TaskStateManager {
@Autowired
private MongoTemplate mongoTemplate;
public TaskState create(TaskState state) {
state.setCreatedAt(Instant.now());
state.setStatus(TaskStatus.PENDING);
return mongoTemplate.save(state);
}
public TaskState update(
String taskId,
Consumer<TaskState> updater
) {
TaskState state = load(taskId);
updater.accept(state);
state.setUpdatedAt(Instant.now());
return mongoTemplate.save(state);
}
public void markInProgress(String taskId) {
update(taskId, state -> {
state.setStatus(TaskStatus.IN_PROGRESS);
if (state.getStartedAt() == null) {
state.setStartedAt(Instant.now());
}
});
}
public void markCompleted(
String taskId,
Map<String, Object> output
) {
update(taskId, state -> {
state.setStatus(TaskStatus.COMPLETED);
state.setCompletedAt(Instant.now());
state.setOutputData(output);
});
}
public void markFailed(
String taskId,
String error,
String stackTrace
) {
update(taskId, state -> {
state.setStatus(TaskStatus.FAILED);
state.setCompletedAt(Instant.now());
TaskState.ErrorInfo errorInfo = new TaskState.ErrorInfo();
errorInfo.setCode("TASK_FAILED");
errorInfo.setMessage(error);
errorInfo.setStackTrace(stackTrace);
errorInfo.setTimestamp(Instant.now());
state.setLastError(errorInfo);
});
}
}
记忆状态(Memory State)
学到的信息和知识。
@Document(collection = "memory_state")
public class MemoryState {
@Id
private String id;
private String agentId;
private String userId;
private Map<String, Object> facts; // Entity memories(实体记忆)
private List<Memory> episodicMemories; // Past experiences(过去经历)
private Map<String, Object> semanticMemory; // Knowledge(知识)
private Instant createdAt;
private Instant updatedAt;
@Getter
@Setter
public static class Memory {
private String id;
private String type;
private String content;
private Map<String, Object> metadata;
private Instant timestamp;
private double importance; // 0-1
}
}
@Service
public class MemoryStateManager {
@Autowired
private MongoTemplate mongoTemplate;
public void storeFact(
String agentId,
String key,
Object value
) {
Query query = Query.query(
Criteria.where("agentId").is(agentId)
);
Update update = new Update()
.set("facts." + key, value)
.set("updatedAt", Instant.now());
mongoTemplate.upsert(query, update, MemoryState.class);
}
public Optional<Object> getFact(
String agentId,
String key
) {
return Optional.ofNullable(
mongoTemplate.findOne(
Query.query(
Criteria.where("agentId").is(agentId)
),
MemoryState.class
)
).map(state -> state.getFacts().get(key));
}
public void addEpisodicMemory(
String agentId,
String content,
double importance
) {
Memory memory = new Memory();
memory.setId(UUID.randomUUID().toString());
memory.setType("episodic");
memory.setContent(content);
memory.setTimestamp(Instant.now());
memory.setImportance(importance);
Query query = Query.query(
Criteria.where("agentId").is(agentId)
);
Update update = new Update()
.push("episodicMemories", memory)
.set("updatedAt", Instant.now());
mongoTemplate.upsert(query, update, MemoryState.class);
}
public List<Memory> getImportantMemories(
String agentId,
double minImportance,
int limit
) {
return Optional.ofNullable(
mongoTemplate.findOne(
Query.query(
Criteria.where("agentId").is(agentId)
),
MemoryState.class
)
)
.map(MemoryState::getEpisodicMemories)
.map(memories -> memories.stream()
.filter(m -> m.getImportance() >= minImportance)
.sorted(Comparator
.comparing(Memory::getTimestamp)
.reversed())
.limit(limit)
.toList())
.orElse(List.of());
}
}