6. 安全与护栏
"信任通过安全来赢得。护栏将实验性代理与生产系统区分开来。"
安全护栏是防止代理造成伤害的多层保护机制。它们在执行的每个阶段——执行前、执行中和执行后——运行,确保代理保持在预定义的边界内。
6.1 预执行检查
输入验证
@Service
public class InputValidationService {
@Autowired
private PIISanitizationService piiSanitizer;
@Autowired
private PermissionService permissionService;
@Autowired
private RateLimiter rateLimiter;
public ValidatedInput validate(UserInput input) {
// 1. 净化输入
String sanitized = sanitize(input.getText());
// 2. 验证格式
if (!isValidFormat(sanitized)) {
throw new ValidationException("无效的输入格式");
}
// 3. 检查提示注入
if (containsPromptInjection(sanitized)) {
throw new SecurityException("检测到潜在的提示注入");
}
// 4. 授权检查
if (!permissionService.hasPermission(
input.getUserId(),
input.getOperation()
)) {
throw new UnauthorizedException(
"用户未授权执行操作"
);
}
// 5. 速率限制
if (!rateLimiter.tryAcquire(input.getUserId())) {
throw new RateLimitExceededException();
}
// 6. 检查资源可用性
checkResourceAvailability();
return ValidatedInput.builder()
.text(sanitized)
.userId(input.getUserId())
.operation(input.getOperation())
.metadata(input.getMetadata())
.build();
}
private String sanitize(String input) {
// 移除PII
String sanitized = piiSanitizer.sanitize(input);
// 移除HTML/XML标签
sanitized = sanitized.replaceAll("<[^>]*>", "");
// 规范化空白字符
sanitized = sanitized.trim().replaceAll("\\s+", " ");
return sanitized;
}
private boolean isValidFormat(String input) {
// 检查长度
if (input.length() > 10000) {
return false;
}
// 检查有效字符
if (!input.matches("[\\p{Print}\\s]*")) {
return false;
}
return true;
}
private boolean containsPromptInjection(String input) {
// 常见的注入模式
List<Pattern> injectionPatterns = List.of(
Pattern.compile("(?i)ignore\\s+(all\\s+)?(previous|above|system)\\s+instructions"),
Pattern.compile("(?i)override\\s+system\\s+prompt"),
Pattern.compile("(?i)disregard\\s+constraints"),
Pattern.compile("(?i)new\\s+instructions?:"),
Pattern.compile("(?i)act\\s+as\\s+if"),
Pattern.compile("(?i)pretend\\s+to\\s+be")
);
for (Pattern pattern : injectionPatterns) {
if (pattern.matcher(input).find()) {
return true;
}
}
return false;
}
}
权限检查
@Service
public class PermissionService {
@Autowired
private PermissionRepository permissionRepository;
public boolean hasPermission(
String userId,
String operation
) {
UserPermission permission = permissionRepository
.findByUserId(userId)
.orElse(defaultPermission());
// 检查操作是否在允许列表中
if (!permission.getAllowedOperations().contains(operation)) {
return false;
}
// 检查用户是否激活
if (!permission.isActive()) {
return false;
}
// 检查基于时间的限制
if (!isWithinAllowedHours(permission)) {
return false;
}
return true;
}
public boolean canExecuteTool(
String userId,
String toolName,
Map<String, Object> parameters
) {
// 检查基本权限
if (!hasPermission(userId, "tool:" + toolName)) {
return false;
}
// 检查工具特定权限
UserPermission permission = permissionRepository
.findByUserId(userId)
.orElse(defaultPermission());
ToolPermission toolPermission =
permission.getToolPermissions()
.get(toolName);
if (toolPermission == null) {
return false;
}
// 检查使用限制
if (toolPermission.getUsageCount() >=
toolPermission.getMaxUsage()) {
return false;
}
// 检查参数约束
if (!validateParameters(toolPermission, parameters)) {
return false;
}
return true;
}
private boolean validateParameters(
ToolPermission permission,
Map<String, Object> parameters
) {
// 检查参数约束
for (Map.Entry<String, Object> entry :
parameters.entrySet()) {
ParameterConstraint constraint =
permission.getParameterConstraints()
.get(entry.getKey());
if (constraint != null) {
if (!constraint.validate(entry.getValue())) {
return false;
}
}
}
return true;
}
private boolean isWithinAllowedHours(UserPermission permission) {
if (permission.getAllowedHoursStart() == null) {
return true; // 无限制
}
LocalTime now = LocalTime.now();
return !now.isBefore(permission.getAllowedHoursStart()) &&
!now.isAfter(permission.getAllowedHoursEnd());
}
}
资源可用性
@Service
public class ResourceCheckService {
@Autowired
private QuotaService quotaService;
@Autowired
private HealthCheckService healthCheckService;
public void checkResourceAvailability() {
// 检查LLM配额
if (quotaService.getRemainingTokens() < 1000) {
throw new ResourceUnavailableException(
"Token配额不足"
);
}
// 检查API速率限制
if (quotaService.isRateLimited("openai")) {
throw new ResourceUnavailableException(
"API速率限制已超出"
);
}
// 检查服务健康状态
List<String> unhealthyServices =
healthCheckService.getUnhealthyServices();
if (!unhealthyServices.isEmpty()) {
throw new ResourceUnavailableException(
"必要服务不可用: " +
String.join(", ", unhealthyServices)
);
}
// 检查数据库连接
if (!databaseHealthCheck()) {
throw new ResourceUnavailableException(
"数据库连接不可用"
);
}
}
private boolean databaseHealthCheck() {
try {
// 测试数据库连接
return true;
} catch (Exception e) {
return false;
}
}
}
6.2 运行时约束
Token限制
@Service
public class TokenLimitService {
private final Map<String, TokenLimit> limits =
new ConcurrentHashMap<>();
@Value("${agent.limits.max-tokens-per-task}")
private int maxTokensPerTask;
@Value("${agent.limits.max-tokens-per-tool-call}")
private int maxTokensPerToolCall;
public void checkTokenLimit(
String taskId,
int requestedTokens
) {
TokenLimit limit = limits.computeIfAbsent(
taskId,
k -> new TokenLimit(maxTokensPerTask)
);
if (limit.getUsedTokens() + requestedTokens >
limit.getMaxTokens()) {
throw new TokenLimitExceededException(
String.format(
"Token限制超出: %d/%d",
limit.getUsedTokens(),
limit.getMaxTokens()
)
);
}
limit.reserve(requestedTokens);
}
public void releaseTokens(
String taskId,
int actualTokens
) {
TokenLimit limit = limits.get(taskId);
if (limit != null) {
limit.updateUsage(actualTokens);
}
}
@Data
private static class TokenLimit {
private final int maxTokens;
private int reservedTokens;
private int usedTokens;
public TokenLimit(int maxTokens) {
this.maxTokens = maxTokens;
}
public void reserve(int tokens) {
this.reservedTokens += tokens;
}
public void updateUsage(int tokens) {
this.usedTokens += tokens;
this.reservedTokens -= tokens;
}
public int getRemainingTokens() {
return maxTokens - usedTokens - reservedTokens;
}
}
}
时间限制
@Service
public class TimeLimitService {
private final Map<String, Instant> taskStartTimes =
new ConcurrentHashMap<>();
private final Map<String, Duration> taskTimeouts =
new ConcurrentHashMap<>();
public void startTask(String taskId, Duration timeout) {
taskStartTimes.put(taskId, Instant.now());
taskTimeouts.put(taskId, timeout);
}
public void checkTimeLimit(String taskId) {
Instant startTime = taskStartTimes.get(taskId);
Duration timeout = taskTimeouts.get(taskId);
if (startTime == null || timeout == null) {
throw new IllegalStateException("任务未开始");
}
Duration elapsed = Duration.between(
startTime,
Instant.now()
);
if (elapsed.compareTo(timeout) > 0) {
throw new TimeoutException(
String.format(
"任务超时: %dms / %dms",
elapsed.toMillis(),
timeout.toMillis()
)
);
}
}
public void endTask(String taskId) {
taskStartTimes.remove(taskId);
taskTimeouts.remove(taskId);
}
}
工具使用限制
@Service
public class ToolUsageLimitService {
private final Map<String, ToolUsage> usage =
new ConcurrentHashMap<>();
public void checkToolLimit(
String taskId,
String toolName
) {
ToolUsage taskUsage = usage.computeIfAbsent(
taskId,
k -> new ToolUsage()
);
if (taskUsage.getToolCallCount(toolName) >=
getMaxCallsForTool(toolName)) {
throw new ToolUsageLimitExceededException(
String.format(
"工具使用限制超出: %s (%d 次调用)",
toolName,
taskUsage.getToolCallCount(toolName)
)
);
}
taskUsage.recordCall(toolName);
}
private int getMaxCallsForTool(String toolName) {
// 配置每个工具的限制
return switch (toolName) {
case "web_search" -> 10;
case "database_query" -> 20;
case "llm_call" -> 50;
default -> 100;
};
}
@Data
private static class ToolUsage {
private final Map<String, Integer> callCounts = new HashMap<>();
public int getToolCallCount(String toolName) {
return callCounts.getOrDefault(toolName, 0);
}
public void recordCall(String toolName) {
callCounts.merge(toolName, 1, Integer::sum);
}
}
}
6.3 后执行验证
输出净化
@Service
public class OutputSanitizationService {
@Autowired
private PIISanitizationService piiSanitizer;
public String sanitize(String output) {
// 移除PII
String sanitized = piiSanitizationService.sanitize(output);
// 移除恶意代码模式
sanitized = removeMaliciousPatterns(sanitized);
// 规范化内容
sanitized = normalizeContent(sanitized);
return sanitized;
}
private String removeMaliciousPatterns(String output) {
// 移除script标签
output = output.replaceAll("<script[^>]*>.*?</script>", "");
// 移除iframe标签
output = output.replaceAll("<iframe[^>]*>.*?</iframe>", "");
// 移除onclick处理器
output = output.replaceAll("onclick\\s*=\\s*['\"][^'\"]*['\"]", "");
return output;
}
private String normalizeContent(String output) {
// 规范化行结束符
output = output.replaceAll("\\r\\n", "\n");
// 移除多余的空白
output = output.replaceAll("\\n{3,}", "\n\n");
return output.trim();
}
}
结果验证
@Service
public class ResultVerificationService {
@Autowired
private ChatClient chatClient;
public VerificationResult verify(
String task,
String result
) {
// 使用LLM验证结果
String verification = chatClient.prompt()
.system("""
您是一个结果验证器。
检查结果是否充分解决了任务。
返回JSON:
{
"adequate": true/false,
"completeness": 0-100,
"accuracy": 0-100,
"issues": ["问题列表"]
}
""")
.user("""
任务: {task}
结果: {result}
""".formatted(task, result))
.call()
.content();
return parseVerification(verification);
}
public VerificationResult verifyWithConstraints(
String task,
String result,
List<Constraint> constraints
) {
VerificationResult basicResult = verify(task, result);
// 检查约束
List<String> constraintViolations = new ArrayList<>();
for (Constraint constraint : constraints) {
if (!constraint.check(result)) {
constraintViolations.add(
constraint.getDescription()
);
}
}
if (!constraintViolations.isEmpty()) {
return VerificationResult.builder()
.adequate(false)
.completeness(basicResult.getCompleteness())
.accuracy(basicResult.getAccuracy())
.issues(constraintViolations)
.build();
}
return basicResult;
}
}
安全检查
@Service
public class SafetyCheckService {
private final List<SafetyChecker> checkers = List.of(
new HarmfulContentChecker(),
new BiasChecker(),
new FactualityChecker(),
new PolicyComplianceChecker()
);
public SafetyReport performSafetyChecks(String output) {
List<SafetyViolation> violations = new ArrayList<>();
for (SafetyChecker checker : checkers) {
SafetyCheckResult result = checker.check(output);
if (!result.isSafe()) {
violations.addAll(result.getViolations());
}
}
return SafetyReport.builder()
.safe(violations.isEmpty())
.violations(violations)
.severity(calculateSeverity(violations))
.build();
}
private static class HarmfulContentChecker implements SafetyChecker {
@Override
public SafetyCheckResult check(String output) {
List<String> harmfulPatterns = List.of(
"violence",
"illegal",
"harm",
"hurt"
);
List<SafetyViolation> violations = new ArrayList<>();
for (String pattern : harmfulPatterns) {
if (output.toLowerCase().contains(pattern)) {
violations.add(SafetyViolation.builder()
.type("有害内容")
.severity(Severity.HIGH)
.description("包含有害内容: " + pattern)
.build());
}
}
return SafetyCheckResult.builder()
.safe(violations.isEmpty())
.violations(violations)
.build();
}
}
}
6.4 人工监督
审批工作流
@Service
public class ApprovalWorkflowService {
@Autowired
private ApprovalRepository approvalRepository;
@Autowired
private NotificationService notificationService;
public ApprovalRequest requestApproval(
String taskId,
String operation,
String reason,
Map<String, Object> context
) {
ApprovalRequest request = ApprovalRequest.builder()
.id(UUID.randomUUID().toString())
.taskId(taskId)
.operation(operation)
.reason(reason)
.context(context)
.status(ApprovalStatus.PENDING)
.createdAt(Instant.now())
.expiresAt(Instant.now().plus(Duration.ofMinutes(5)))
.build();
approvalRepository.save(request);
// 通知审批者
notificationService.notifyApprovers(request);
return request;
}
public ApprovalStatus waitForApproval(
String requestId
) {
ApprovalRequest request = approvalRepository.findById(requestId)
.orElseThrow(() ->
new IllegalArgumentException("请求未找到")
);
// 轮询审批结果(或使用WebSocket)
for (int i = 0; i < 60; i++) { // 1分钟超时
request = approvalRepository.findById(requestId)
.orElseThrow();
if (request.getStatus() != ApprovalStatus.PENDING) {
return request.getStatus();
}
if (Instant.now().isAfter(request.getExpiresAt())) {
return ApprovalStatus.EXPIRED;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ApprovalStatus.EXPIRED;
}
}
return ApprovalStatus.EXPIRED;
}
public void approve(
String requestId,
String approverId,
String comment
) {
ApprovalRequest request = approvalRepository.findById(requestId)
.orElseThrow();
request.setStatus(ApprovalStatus.APPROVED);
request.setApproverId(approverId);
request.setApprovedAt(Instant.now());
request.setComment(comment);
approvalRepository.save(request);
}
public void deny(
String requestId,
String approverId,
String reason
) {
ApprovalRequest request = approvalRepository.findById(requestId)
.orElseThrow();
request.setStatus(ApprovalStatus.DENIED);
request.setApproverId(approverId);
request.setDeniedAt(Instant.now());
request.setComment(reason);
approvalRepository.save(request);
}
}
干预机制
@Service
public class InterventionService {
@Autowired
private WebSocketMessenger messenger;
public void requestIntervention(
String agentId,
String taskId,
InterventionType type,
String message
) {
InterventionRequest request = InterventionRequest.builder()
.agentId(agentId)
.taskId(taskId)
.type(type)
.message(message)
.timestamp(Instant.now())
.build();
// 发送给连接的人工操作员
messenger.sendToOperators(
"intervention_request",
request
);
// 等待响应
InterventionResponse response =
waitForResponse(request.getId(), Duration.ofMinutes(5));
handleResponse(request, response);
}
private InterventionResponse waitForResponse(
String requestId,
Duration timeout
) {
// 实现等待逻辑或使用WebSocket
// 这是一个简化的示例
return InterventionResponse.builder()
.requestId(requestId)
.action(InterventionAction.CONTINUE)
.build();
}
private void handleResponse(
InterventionRequest request,
InterventionResponse response
) {
switch (response.getAction()) {
case CONTINUE:
// 恢复代理执行
resumeAgent(request.getTaskId(), response.getData());
break;
case MODIFY:
// 修改代理状态
modifyAgent(request.getTaskId(), response.getData());
break;
case STOP:
// 停止代理执行
stopAgent(request.getTaskId());
break;
case REROUTE:
// 重定向代理到不同任务
rerouteAgent(request.getTaskId(), response.getData());
break;
}
}
private void resumeAgent(String taskId, Map<String, Object> data) {
// 使用提供的数据恢复代理
}
private void modifyAgent(String taskId, Map<String, Object> data) {
// 修改代理状态
}
private void stopAgent(String taskId) {
// 停止代理执行
}
private void rerouteAgent(String taskId, Map<String, Object> data) {
// 重定向代理到新任务
}
}
紧急停止
@Service
public class EmergencyStopService {
private final Map<String, Boolean> stopSignals =
new ConcurrentHashMap<>();
public void signalStop(String agentId) {
stopSignals.put(agentId, true);
// 也持久化到数据库以实现跨实例通信
emergencyStopRepository.signalStop(agentId);
}
public boolean shouldStop(String agentId) {
// 检查内存信号
if (stopSignals.getOrDefault(agentId, false)) {
return true;
}
// 检查数据库(用于其他实例)
return emergencyStopRepository.shouldStop(agentId);
}
public void clearStopSignal(String agentId) {
stopSignals.remove(agentId);
emergencyStopRepository.clearStop(agentId);
}
@Scheduled(fixedRate = 1000) // 每秒检查
public void checkEmergencyStops() {
List<AgentTask> activeTasks =
taskRepository.findActiveTasks();
for (AgentTask task : activeTasks) {
if (shouldStop(task.getAgentId())) {
log.warn("为代理触发紧急停止: {}",
task.getAgentId());
// 停止任务
taskService.stopTask(task.getId());
// 清除信号
clearStopSignal(task.getAgentId());
}
}
}
}
6.5 关键要点
多层保护
| 层级 | 目的 | 示例 |
|---|---|---|
| 预执行 | 防止问题 | 输入验证、权限检查 |
| 运行时 | 强制限制 | Token限制、时间限制 |
| 后执行 | 验证结果 | 输出净化、安全检查 |
| 人工 | 最终监督 | 审批工作流、紧急停止 |
安全第一
验证 → 授权 → 限制 → 验证 → 审批
生产环境检查清单
- 输入验证和净化
- 权限和授权检查
- 速率限制
- Token、时间和工具使用限制
- 输出净化
- 结果验证
- 安全检查
- 审批工作流
- 紧急停止机制
6.6 下一步
继续您的旅程:
- → 7. 生产模式 - 真实世界的实现
深度防御
使用多层保护。每层都应该能够独立运行。
假设突破
设计系统时要假设每层都可能失败。如果验证漏掉了什么会发生?
人工监督至关重要
即使有了所有自动防护,人工监督对生产系统仍然必不可少。