跳到主要内容

6. 安全与护栏

"信任通过安全来赢得。护栏将实验性代理与生产系统区分开来。"

安全护栏是防止代理造成伤害的多层保护机制。它们在执行的每个阶段——执行前、执行中和执行后——运行,确保代理保持在预定义的边界内。


6.1 预执行检查

输入验证

@Service
public class InputValidationService {

@Autowired
private PIISanitizationService piiSanitizer;

@Autowired
private PermissionService permissionService;

@Autowired
private RateLimiter rateLimiter;

public ValidatedInput validate(UserInput input) {
// 1. 净化输入
String sanitized = sanitize(input.getText());

// 2. 验证格式
if (!isValidFormat(sanitized)) {
throw new ValidationException("无效的输入格式");
}

// 3. 检查提示注入
if (containsPromptInjection(sanitized)) {
throw new SecurityException("检测到潜在的提示注入");
}

// 4. 授权检查
if (!permissionService.hasPermission(
input.getUserId(),
input.getOperation()
)) {
throw new UnauthorizedException(
"用户未授权执行操作"
);
}

// 5. 速率限制
if (!rateLimiter.tryAcquire(input.getUserId())) {
throw new RateLimitExceededException();
}

// 6. 检查资源可用性
checkResourceAvailability();

return ValidatedInput.builder()
.text(sanitized)
.userId(input.getUserId())
.operation(input.getOperation())
.metadata(input.getMetadata())
.build();
}

private String sanitize(String input) {
// 移除PII
String sanitized = piiSanitizer.sanitize(input);

// 移除HTML/XML标签
sanitized = sanitized.replaceAll("<[^>]*>", "");

// 规范化空白字符
sanitized = sanitized.trim().replaceAll("\\s+", " ");

return sanitized;
}

private boolean isValidFormat(String input) {
// 检查长度
if (input.length() > 10000) {
return false;
}

// 检查有效字符
if (!input.matches("[\\p{Print}\\s]*")) {
return false;
}

return true;
}

private boolean containsPromptInjection(String input) {
// 常见的注入模式
List<Pattern> injectionPatterns = List.of(
Pattern.compile("(?i)ignore\\s+(all\\s+)?(previous|above|system)\\s+instructions"),
Pattern.compile("(?i)override\\s+system\\s+prompt"),
Pattern.compile("(?i)disregard\\s+constraints"),
Pattern.compile("(?i)new\\s+instructions?:"),
Pattern.compile("(?i)act\\s+as\\s+if"),
Pattern.compile("(?i)pretend\\s+to\\s+be")
);

for (Pattern pattern : injectionPatterns) {
if (pattern.matcher(input).find()) {
return true;
}
}

return false;
}
}

权限检查

@Service
public class PermissionService {

@Autowired
private PermissionRepository permissionRepository;

public boolean hasPermission(
String userId,
String operation
) {
UserPermission permission = permissionRepository
.findByUserId(userId)
.orElse(defaultPermission());

// 检查操作是否在允许列表中
if (!permission.getAllowedOperations().contains(operation)) {
return false;
}

// 检查用户是否激活
if (!permission.isActive()) {
return false;
}

// 检查基于时间的限制
if (!isWithinAllowedHours(permission)) {
return false;
}

return true;
}

public boolean canExecuteTool(
String userId,
String toolName,
Map<String, Object> parameters
) {
// 检查基本权限
if (!hasPermission(userId, "tool:" + toolName)) {
return false;
}

// 检查工具特定权限
UserPermission permission = permissionRepository
.findByUserId(userId)
.orElse(defaultPermission());

ToolPermission toolPermission =
permission.getToolPermissions()
.get(toolName);

if (toolPermission == null) {
return false;
}

// 检查使用限制
if (toolPermission.getUsageCount() >=
toolPermission.getMaxUsage()) {
return false;
}

// 检查参数约束
if (!validateParameters(toolPermission, parameters)) {
return false;
}

return true;
}

private boolean validateParameters(
ToolPermission permission,
Map<String, Object> parameters
) {
// 检查参数约束
for (Map.Entry<String, Object> entry :
parameters.entrySet()) {
ParameterConstraint constraint =
permission.getParameterConstraints()
.get(entry.getKey());

if (constraint != null) {
if (!constraint.validate(entry.getValue())) {
return false;
}
}
}

return true;
}

private boolean isWithinAllowedHours(UserPermission permission) {
if (permission.getAllowedHoursStart() == null) {
return true; // 无限制
}

LocalTime now = LocalTime.now();
return !now.isBefore(permission.getAllowedHoursStart()) &&
!now.isAfter(permission.getAllowedHoursEnd());
}
}

资源可用性

@Service
public class ResourceCheckService {

@Autowired
private QuotaService quotaService;

@Autowired
private HealthCheckService healthCheckService;

public void checkResourceAvailability() {
// 检查LLM配额
if (quotaService.getRemainingTokens() < 1000) {
throw new ResourceUnavailableException(
"Token配额不足"
);
}

// 检查API速率限制
if (quotaService.isRateLimited("openai")) {
throw new ResourceUnavailableException(
"API速率限制已超出"
);
}

// 检查服务健康状态
List<String> unhealthyServices =
healthCheckService.getUnhealthyServices();

if (!unhealthyServices.isEmpty()) {
throw new ResourceUnavailableException(
"必要服务不可用: " +
String.join(", ", unhealthyServices)
);
}

// 检查数据库连接
if (!databaseHealthCheck()) {
throw new ResourceUnavailableException(
"数据库连接不可用"
);
}
}

private boolean databaseHealthCheck() {
try {
// 测试数据库连接
return true;
} catch (Exception e) {
return false;
}
}
}

6.2 运行时约束

Token限制

@Service
public class TokenLimitService {

private final Map<String, TokenLimit> limits =
new ConcurrentHashMap<>();

@Value("${agent.limits.max-tokens-per-task}")
private int maxTokensPerTask;

@Value("${agent.limits.max-tokens-per-tool-call}")
private int maxTokensPerToolCall;

public void checkTokenLimit(
String taskId,
int requestedTokens
) {
TokenLimit limit = limits.computeIfAbsent(
taskId,
k -> new TokenLimit(maxTokensPerTask)
);

if (limit.getUsedTokens() + requestedTokens >
limit.getMaxTokens()) {
throw new TokenLimitExceededException(
String.format(
"Token限制超出: %d/%d",
limit.getUsedTokens(),
limit.getMaxTokens()
)
);
}

limit.reserve(requestedTokens);
}

public void releaseTokens(
String taskId,
int actualTokens
) {
TokenLimit limit = limits.get(taskId);

if (limit != null) {
limit.updateUsage(actualTokens);
}
}

@Data
private static class TokenLimit {
private final int maxTokens;
private int reservedTokens;
private int usedTokens;

public TokenLimit(int maxTokens) {
this.maxTokens = maxTokens;
}

public void reserve(int tokens) {
this.reservedTokens += tokens;
}

public void updateUsage(int tokens) {
this.usedTokens += tokens;
this.reservedTokens -= tokens;
}

public int getRemainingTokens() {
return maxTokens - usedTokens - reservedTokens;
}
}
}

时间限制

@Service
public class TimeLimitService {

private final Map<String, Instant> taskStartTimes =
new ConcurrentHashMap<>();

private final Map<String, Duration> taskTimeouts =
new ConcurrentHashMap<>();

public void startTask(String taskId, Duration timeout) {
taskStartTimes.put(taskId, Instant.now());
taskTimeouts.put(taskId, timeout);
}

public void checkTimeLimit(String taskId) {
Instant startTime = taskStartTimes.get(taskId);
Duration timeout = taskTimeouts.get(taskId);

if (startTime == null || timeout == null) {
throw new IllegalStateException("任务未开始");
}

Duration elapsed = Duration.between(
startTime,
Instant.now()
);

if (elapsed.compareTo(timeout) > 0) {
throw new TimeoutException(
String.format(
"任务超时: %dms / %dms",
elapsed.toMillis(),
timeout.toMillis()
)
);
}
}

public void endTask(String taskId) {
taskStartTimes.remove(taskId);
taskTimeouts.remove(taskId);
}
}

工具使用限制

@Service
public class ToolUsageLimitService {

private final Map<String, ToolUsage> usage =
new ConcurrentHashMap<>();

public void checkToolLimit(
String taskId,
String toolName
) {
ToolUsage taskUsage = usage.computeIfAbsent(
taskId,
k -> new ToolUsage()
);

if (taskUsage.getToolCallCount(toolName) >=
getMaxCallsForTool(toolName)) {
throw new ToolUsageLimitExceededException(
String.format(
"工具使用限制超出: %s (%d 次调用)",
toolName,
taskUsage.getToolCallCount(toolName)
)
);
}

taskUsage.recordCall(toolName);
}

private int getMaxCallsForTool(String toolName) {
// 配置每个工具的限制
return switch (toolName) {
case "web_search" -> 10;
case "database_query" -> 20;
case "llm_call" -> 50;
default -> 100;
};
}

@Data
private static class ToolUsage {
private final Map<String, Integer> callCounts = new HashMap<>();

public int getToolCallCount(String toolName) {
return callCounts.getOrDefault(toolName, 0);
}

public void recordCall(String toolName) {
callCounts.merge(toolName, 1, Integer::sum);
}
}
}

6.3 后执行验证

输出净化

@Service
public class OutputSanitizationService {

@Autowired
private PIISanitizationService piiSanitizer;

public String sanitize(String output) {
// 移除PII
String sanitized = piiSanitizationService.sanitize(output);

// 移除恶意代码模式
sanitized = removeMaliciousPatterns(sanitized);

// 规范化内容
sanitized = normalizeContent(sanitized);

return sanitized;
}

private String removeMaliciousPatterns(String output) {
// 移除script标签
output = output.replaceAll("<script[^>]*>.*?</script>", "");

// 移除iframe标签
output = output.replaceAll("<iframe[^>]*>.*?</iframe>", "");

// 移除onclick处理器
output = output.replaceAll("onclick\\s*=\\s*['\"][^'\"]*['\"]", "");

return output;
}

private String normalizeContent(String output) {
// 规范化行结束符
output = output.replaceAll("\\r\\n", "\n");

// 移除多余的空白
output = output.replaceAll("\\n{3,}", "\n\n");

return output.trim();
}
}

结果验证

@Service
public class ResultVerificationService {

@Autowired
private ChatClient chatClient;

public VerificationResult verify(
String task,
String result
) {
// 使用LLM验证结果
String verification = chatClient.prompt()
.system("""
您是一个结果验证器。
检查结果是否充分解决了任务。
返回JSON:
{
"adequate": true/false,
"completeness": 0-100,
"accuracy": 0-100,
"issues": ["问题列表"]
}
""")
.user("""
任务: {task}
结果: {result}
""".formatted(task, result))
.call()
.content();

return parseVerification(verification);
}

public VerificationResult verifyWithConstraints(
String task,
String result,
List<Constraint> constraints
) {
VerificationResult basicResult = verify(task, result);

// 检查约束
List<String> constraintViolations = new ArrayList<>();

for (Constraint constraint : constraints) {
if (!constraint.check(result)) {
constraintViolations.add(
constraint.getDescription()
);
}
}

if (!constraintViolations.isEmpty()) {
return VerificationResult.builder()
.adequate(false)
.completeness(basicResult.getCompleteness())
.accuracy(basicResult.getAccuracy())
.issues(constraintViolations)
.build();
}

return basicResult;
}
}

安全检查

@Service
public class SafetyCheckService {

private final List<SafetyChecker> checkers = List.of(
new HarmfulContentChecker(),
new BiasChecker(),
new FactualityChecker(),
new PolicyComplianceChecker()
);

public SafetyReport performSafetyChecks(String output) {
List<SafetyViolation> violations = new ArrayList<>();

for (SafetyChecker checker : checkers) {
SafetyCheckResult result = checker.check(output);

if (!result.isSafe()) {
violations.addAll(result.getViolations());
}
}

return SafetyReport.builder()
.safe(violations.isEmpty())
.violations(violations)
.severity(calculateSeverity(violations))
.build();
}

private static class HarmfulContentChecker implements SafetyChecker {
@Override
public SafetyCheckResult check(String output) {
List<String> harmfulPatterns = List.of(
"violence",
"illegal",
"harm",
"hurt"
);

List<SafetyViolation> violations = new ArrayList<>();

for (String pattern : harmfulPatterns) {
if (output.toLowerCase().contains(pattern)) {
violations.add(SafetyViolation.builder()
.type("有害内容")
.severity(Severity.HIGH)
.description("包含有害内容: " + pattern)
.build());
}
}

return SafetyCheckResult.builder()
.safe(violations.isEmpty())
.violations(violations)
.build();
}
}
}

6.4 人工监督

审批工作流

@Service
public class ApprovalWorkflowService {

@Autowired
private ApprovalRepository approvalRepository;

@Autowired
private NotificationService notificationService;

public ApprovalRequest requestApproval(
String taskId,
String operation,
String reason,
Map<String, Object> context
) {
ApprovalRequest request = ApprovalRequest.builder()
.id(UUID.randomUUID().toString())
.taskId(taskId)
.operation(operation)
.reason(reason)
.context(context)
.status(ApprovalStatus.PENDING)
.createdAt(Instant.now())
.expiresAt(Instant.now().plus(Duration.ofMinutes(5)))
.build();

approvalRepository.save(request);

// 通知审批者
notificationService.notifyApprovers(request);

return request;
}

public ApprovalStatus waitForApproval(
String requestId
) {
ApprovalRequest request = approvalRepository.findById(requestId)
.orElseThrow(() ->
new IllegalArgumentException("请求未找到")
);

// 轮询审批结果(或使用WebSocket)
for (int i = 0; i < 60; i++) { // 1分钟超时
request = approvalRepository.findById(requestId)
.orElseThrow();

if (request.getStatus() != ApprovalStatus.PENDING) {
return request.getStatus();
}

if (Instant.now().isAfter(request.getExpiresAt())) {
return ApprovalStatus.EXPIRED;
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ApprovalStatus.EXPIRED;
}
}

return ApprovalStatus.EXPIRED;
}

public void approve(
String requestId,
String approverId,
String comment
) {
ApprovalRequest request = approvalRepository.findById(requestId)
.orElseThrow();

request.setStatus(ApprovalStatus.APPROVED);
request.setApproverId(approverId);
request.setApprovedAt(Instant.now());
request.setComment(comment);

approvalRepository.save(request);
}

public void deny(
String requestId,
String approverId,
String reason
) {
ApprovalRequest request = approvalRepository.findById(requestId)
.orElseThrow();

request.setStatus(ApprovalStatus.DENIED);
request.setApproverId(approverId);
request.setDeniedAt(Instant.now());
request.setComment(reason);

approvalRepository.save(request);
}
}

干预机制

@Service
public class InterventionService {

@Autowired
private WebSocketMessenger messenger;

public void requestIntervention(
String agentId,
String taskId,
InterventionType type,
String message
) {
InterventionRequest request = InterventionRequest.builder()
.agentId(agentId)
.taskId(taskId)
.type(type)
.message(message)
.timestamp(Instant.now())
.build();

// 发送给连接的人工操作员
messenger.sendToOperators(
"intervention_request",
request
);

// 等待响应
InterventionResponse response =
waitForResponse(request.getId(), Duration.ofMinutes(5));

handleResponse(request, response);
}

private InterventionResponse waitForResponse(
String requestId,
Duration timeout
) {
// 实现等待逻辑或使用WebSocket
// 这是一个简化的示例
return InterventionResponse.builder()
.requestId(requestId)
.action(InterventionAction.CONTINUE)
.build();
}

private void handleResponse(
InterventionRequest request,
InterventionResponse response
) {
switch (response.getAction()) {
case CONTINUE:
// 恢复代理执行
resumeAgent(request.getTaskId(), response.getData());
break;

case MODIFY:
// 修改代理状态
modifyAgent(request.getTaskId(), response.getData());
break;

case STOP:
// 停止代理执行
stopAgent(request.getTaskId());
break;

case REROUTE:
// 重定向代理到不同任务
rerouteAgent(request.getTaskId(), response.getData());
break;
}
}

private void resumeAgent(String taskId, Map<String, Object> data) {
// 使用提供的数据恢复代理
}

private void modifyAgent(String taskId, Map<String, Object> data) {
// 修改代理状态
}

private void stopAgent(String taskId) {
// 停止代理执行
}

private void rerouteAgent(String taskId, Map<String, Object> data) {
// 重定向代理到新任务
}
}

紧急停止

@Service
public class EmergencyStopService {

private final Map<String, Boolean> stopSignals =
new ConcurrentHashMap<>();

public void signalStop(String agentId) {
stopSignals.put(agentId, true);

// 也持久化到数据库以实现跨实例通信
emergencyStopRepository.signalStop(agentId);
}

public boolean shouldStop(String agentId) {
// 检查内存信号
if (stopSignals.getOrDefault(agentId, false)) {
return true;
}

// 检查数据库(用于其他实例)
return emergencyStopRepository.shouldStop(agentId);
}

public void clearStopSignal(String agentId) {
stopSignals.remove(agentId);
emergencyStopRepository.clearStop(agentId);
}

@Scheduled(fixedRate = 1000) // 每秒检查
public void checkEmergencyStops() {
List<AgentTask> activeTasks =
taskRepository.findActiveTasks();

for (AgentTask task : activeTasks) {
if (shouldStop(task.getAgentId())) {
log.warn("为代理触发紧急停止: {}",
task.getAgentId());

// 停止任务
taskService.stopTask(task.getId());

// 清除信号
clearStopSignal(task.getAgentId());
}
}
}
}

6.5 关键要点

多层保护

层级目的示例
预执行防止问题输入验证、权限检查
运行时强制限制Token限制、时间限制
后执行验证结果输出净化、安全检查
人工最终监督审批工作流、紧急停止

安全第一

验证 → 授权 → 限制 → 验证 → 审批

生产环境检查清单

  • 输入验证和净化
  • 权限和授权检查
  • 速率限制
  • Token、时间和工具使用限制
  • 输出净化
  • 结果验证
  • 安全检查
  • 审批工作流
  • 紧急停止机制

6.6 下一步

继续您的旅程:


深度防御

使用多层保护。每层都应该能够独立运行。

假设突破

设计系统时要假设每层都可能失败。如果验证漏掉了什么会发生?

人工监督至关重要

即使有了所有自动防护,人工监督对生产系统仍然必不可少。