在生产环境中,AI API 的日志记录是调试和监控的核心手段,但未经处理的日志可能暴露用户隐私、商业机密和合规风险。我曾在某金融科技公司主导日志安全改造项目,通过自研脱敏系统成功将敏感数据泄露风险降低 98%,同时将日志查询效率提升 40%。本文将深入探讨如何构建企业级日志脱敏方案,并无缝集成到 HolySheep AI 等主流 AI 平台。

一、为什么 AI API 日志必须脱敏

当调用 HolySheep AI 的 API 时,请求体和响应体可能包含多种敏感信息:用户身份证号、手机号、银行卡号、企业财务数据、医疗记录等。根据《个人信息保护法》和《数据安全法》,这些信息必须得到妥善保护。

常见敏感数据类型

二、脱敏策略与算法设计

2.1 核心脱敏策略

"""
AI API 日志脱敏系统核心实现
支持多种脱敏策略:掩码、哈希、替换、加密、分段保留
"""

import re
import hashlib
import json
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Pattern
from dataclasses import dataclass, field
from enum import Enum


class SanitizationStrategy(Enum):
    MASK = "mask"           # 掩码: 138****8888
    HASH = "hash"           # 哈希: a3f2b8c1 (不可逆)
    REDACT = "redact"       # 替换: [REDACTED]
    TOKENIZE = "tokenize"   # 令牌化: uuid映射
    PARTIAL = "partial"     # 部分保留: 620***1234


@dataclass
class SensitiveField:
    name: str
    patterns: List[Pattern] = field(default_factory=list)
    strategy: SanitizationStrategy = SanitizationStrategy.MASK
    custom_processor: Callable[[str], str] = None
    preserve_format: bool = True  # 保留格式用于调试


class SanitizerRegistry:
    """脱敏规则注册中心"""
    
    def __init__(self):
        self._fields: Dict[str, SensitiveField] = {}
        self._field_order: List[str] = []
        self._preprocessors: List[Callable[[str], str]] = []
        self._postprocessors: List[Callable[[str], str]] = []
    
    def register(self, field: SensitiveField, priority: int = 100):
        """注册敏感字段配置"""
        key = f"{priority}_{field.name}"
        self._field_order.append(key)
        self._fields[key] = field
        self._field_order.sort()
    
    def add_preprocessor(self, func: Callable[[str], str]):
        self._preprocessors.append(func)
    
    def add_postprocessor(self, func: Callable[[str], str]):
        self._postprocessors.append(func)


class BaseSanitizer(ABC):
    """脱敏处理器基类"""
    
    @abstractmethod
    def sanitize(self, value: str, field: SensitiveField) -> str:
        pass
    
    @staticmethod
    def mask_phone(phone: str) -> str:
        """手机号掩码: 138****8888"""
        if len(phone) >= 11:
            return phone[:3] + "****" + phone[-4:]
        return "*" * len(phone)
    
    @staticmethod
    def mask_id_card(id_card: str) -> str:
        """身份证掩码: 110***1234"""
        if len(id_card) == 18:
            return id_card[:3] + "***" + id_card[-4:]
        return "***"
    
    @staticmethod
    def mask_bank_card(card: str) -> str:
        """银行卡掩码: **** **** **** 1234"""
        clean = re.sub(r'\s', '', card)
        if len(clean) >= 8:
            return "**** **** **** " + clean[-4:]
        return "****"
    
    @staticmethod
    def hash_value(value: str, salt: str = "") -> str:
        """不可逆哈希"""
        return hashlib.sha256(
            (value + salt).encode()
        ).hexdigest()[:16]


class RegexSanitizer(BaseSanitizer):
    """基于正则表达式的脱敏处理器"""
    
    def __init__(self, registry: SanitizerRegistry):
        self.registry = registry
        self._compiled_patterns: Dict[str, re.Pattern] = {}
    
    def sanitize(self, value: str, field: SensitiveField) -> str:
        if field.custom_processor:
            return field.custom_processor(value)
        
        result = value
        for pattern in field.patterns:
            if isinstance(pattern, str):
                pattern = re.compile(pattern)
            
            if field.strategy == SanitizationStrategy.MASK:
                if field.preserve_format:
                    result = self._mask_with_format(result, pattern)
                else:
                    result = pattern.sub("***", result)
            
            elif field.strategy == SanitizationStrategy.HASH:
                result = pattern.sub(
                    lambda m: self.hash_value(m.group()), 
                    result
                )
            
            elif field.strategy == SanitizationStrategy.REDACT:
                result = pattern.sub("[REDACTED]", result)
            
            elif field.strategy == SanitizationStrategy.TOKENIZE:
                result = self._tokenize_match(result, pattern)
        
        return result
    
    def _mask_with_format(self, text: str, pattern: re.Pattern) -> str:
        """保持格式的掩码处理"""
        def replacer(match):
            matched = match.group()
            length = len(matched)
            if length <= 4:
                return "*" * length
            return matched[:length//4] + "*" * (length - length//4)
        return pattern.sub(replacer, text)
    
    def _tokenize_match(self, text: str, pattern: re.Pattern) -> str:
        """令牌化处理"""
        tokens = {}
        counter = [0]
        
        def replacer(match):
            value = match.group()
            if value not in tokens:
                tokens[value] = f"TOKEN_{counter[0]:04d}"
                counter[0] += 1
            return tokens[value]
        
        return pattern.sub(replacer, text)


使用示例

def create_default_registry() -> SanitizerRegistry: registry = SanitizerRegistry() # 手机号 registry.register(SensitiveField( name="phone", patterns=[ r'1[3-9]\d{9}', # 国内手机号 r'\d{3}-\d{4}-\d{4}', # 格式化的手机号 ], strategy=SanitizationStrategy.MASK )) # 身份证 registry.register(SensitiveField( name="id_card", patterns=[ r'\d{17}[\dXx]', # 18位身份证 ], strategy=SanitizationStrategy.MASK )) # 银行卡 registry.register(SensitiveField( name="bank_card", patterns=[ r'\d{16,19}', # 银行卡号 ], strategy=SanitizationStrategy.MASK, preserve_format=False )) # 邮箱 registry.register(SensitiveField( name="email", patterns=[ r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', ], strategy=SanitizationStrategy.MASK )) # API Key registry.register(SensitiveField( name="api_key", patterns=[ r'sk-[a-zA-Z0-9]{32,}', # OpenAI 风格 r'(?i)bearer\s+[a-zA-Z0-9._-]+', # Bearer Token ], strategy=SanitizationStrategy.REDACT )) # 密码字段 registry.register(SensitiveField( name="password", patterns=[ r'(?i)(password|passwd|pwd|secret)\s*[:=]\s*["\']?[^"\'}\s]+["\']?', ], strategy=SanitizationStrategy.REDACT )) return registry

2.2 JSON 结构递归脱敏

"""
JSON 结构递归脱敏处理器
支持嵌套结构、数组、特殊字段自动识别
"""

from typing import Any, Union, List, Dict
import copy


class JSONSanitizer:
    """JSON 文档递归脱敏器"""
    
    def __init__(self, base_sanitizer: RegexSanitizer):
        self.sanitizer = base_sanitizer
        self._sensitive_keys = {
            'password', 'passwd', 'pwd', 'secret', 'token', 'apikey',
            'api_key', 'access_token', 'refresh_token', 'auth',
            'ssn', 'social_security', 'credit_card', 'card_number',
            'cvv', 'pin', 'private_key', 'encryption_key'
        }
        self._hash_sensitive_keys = {
            'id', 'account', 'user_id', 'order_id', 'transaction_id'
        }
    
    def sanitize_json(self, data: Any, depth: int = 0, max_depth: int = 50) -> Any:
        """递归脱敏 JSON 数据"""
        if depth > max_depth:
            return "[MAX_DEPTH_EXCEEDED]"
        
        if data is None:
            return None
        
        if isinstance(data, str):
            return self._sanitize_string(data)
        
        if isinstance(data, bool):
            return data
        
        if isinstance(data, (int, float)):
            return data
        
        if isinstance(data, list):
            return [self.sanitize_json(item, depth + 1, max_depth) 
                    for item in data]
        
        if isinstance(data, dict):
            return self._sanitize_dict(data, depth, max_depth)
        
        return str(data)
    
    def _sanitize_dict(self, data: Dict, depth: int, max_depth: int) -> Dict:
        result = {}
        for key, value in data.items():
            lower_key = key.lower().replace('-', '_').replace(' ', '_')
            
            # 强制脱敏字段
            if lower_key in self._sensitive_keys:
                result[key] = "[REDACTED]"
                continue
            
            # 哈希处理标识字段
            if lower_key in self._hash_sensitive_keys and isinstance(value, str):
                result[key] = hashlib.sha256(value.encode()).hexdigest()[:12]
                continue
            
            # 递归处理嵌套结构
            result[key] = self.sanitize_json(value, depth + 1, max_depth)
        
        return result
    
    def _sanitize_string(self, text: str) -> str:
        """对字符串应用所有注册的脱敏规则"""
        result = text
        for key, field in self.sanitizer.registry._fields.items():
            result = self.sanitizer.sanitize(result, field)
        return result
    
    def sanitize_api_payload(self, payload: Dict) -> Dict:
        """专门处理 AI API 请求负载"""
        sanitized = copy.deepcopy(payload)
        
        # 处理 messages
        if 'messages' in sanitized:
            sanitized['messages'] = [
                self._sanitize_message(msg) for msg in sanitized['messages']
            ]
        
        # 处理 system prompt
        if 'system' in sanitized:
            sanitized['system'] = self._sanitize_string(sanitized['system'])
        
        # 处理 response_format
        if 'response_format' in sanitized and isinstance(sanitized['response_format'], dict):
            sanitized['response_format'] = self.sanitize_json(
                sanitized['response_format']
            )
        
        return sanitized
    
    def _sanitize_message(self, message: Dict) -> Dict:
        """处理对话消息中的敏感信息"""
        sanitized = {}
        for key, value in message.items():
            if key == 'role':
                sanitized[key] = value
            elif key == 'content':
                sanitized[key] = self._sanitize_string(str(value))
            elif key == 'name':
                sanitized[key] = "[USER_MASKED]"
            else:
                sanitized[key] = self.sanitize_json(value)
        return sanitized


def sanitize_response(self, response_data: Any) -> Any:
    """处理 API 响应数据"""
    if isinstance(response_data, dict):
        # 保留 usage 和 model 信息用于计费监控
        result = {
            'model': response_data.get('model'),
            'usage': response_data.get('usage'),
            'choices': []
        }
        
        # 脱敏 choices 中的内容
        for choice in response_data.get('choices', []):
            sanitized_choice = {}
            if 'message' in choice:
                sanitized_choice['message'] = {
                    'role': choice['message'].get('role'),
                    'content': self._sanitize_string(
                        choice['message'].get('content', '')
                    )
                }
            if 'finish_reason' in choice:
                sanitized_choice['finish_reason'] = choice['finish_reason']
            result['choices'].append(sanitized_choice)
        
        return result
    
    return self.sanitize_json(response_data)

三、生产级日志脱敏架构

3.1 异步日志处理管道

"""
生产级日志脱敏架构
支持异步处理、批量写入、缓冲队列、熔断机制
"""

import asyncio
import logging
import time
import uuid
from datetime import datetime
from queue import Queue, Empty
from threading import Thread, Event
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
from contextlib import asynccontextmanager
import gzip
import base64

配置日志

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class LogEntry: """日志条目""" trace_id: str timestamp: str level: str service: str operation: str request_id: str duration_ms: float api_endpoint: str request_payload: Dict[str, Any] response_payload: Dict[str, Any] sanitized_request: Optional[str] = None sanitized_response: Optional[str] = None status_code: Optional[int] = None error_message: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { 'trace_id': self.trace_id, 'timestamp': self.timestamp, 'level': self.level, 'service': self.service, 'operation': self.operation, 'request_id': self.request_id, 'duration_ms': self.duration_ms, 'api_endpoint': self.api_endpoint, 'sanitized_request': self.sanitized_request, 'sanitized_response': self.sanitized_response, 'status_code': self.status_code, 'error_message': self.error_message } class AsyncLogPipeline: """异步日志处理管道""" def __init__( self, sanitizer: JSONSanitizer, batch_size: int = 100, flush_interval: float = 5.0, max_queue_size: int = 10000 ): self.sanitizer = sanitizer self.batch_size = batch_size self.flush_interval = flush_interval self.queue: Queue = Queue(maxsize=max_queue_size) self.buffer: List[LogEntry] = [] self.last_flush_time = time.time() self._shutdown_event = Event() self._worker_thread: Optional[Thread] = None self._stats = { 'total_processed': 0, 'total_sanitized': 0, 'total_dropped': 0, 'errors': 0 } def start(self): """启动日志处理管道""" self._worker_thread = Thread(target=self._process_loop, daemon=True) self._worker_thread.start() logger.info("日志脱敏管道已启动") def _process_loop(self): """后台处理循环""" while not self._shutdown_event.is_set(): try: # 非阻塞获取日志 try: entry = self.queue.get(timeout=1.0) self._process_entry(entry) except Empty: pass # 定期刷新缓冲 if self._should_flush(): self._flush_buffer() except Exception as e: logger.error(f"日志处理错误: {e}") self._stats['errors'] += 1 def _should_flush(self) -> bool: return ( len(self.buffer) >= self.batch_size or (time.time() - self.last_flush_time) >= self.flush_interval ) def _process_entry(self, entry: LogEntry): """处理单个日志条目""" try: # 脱敏请求 sanitized_req = self.sanitizer.sanitize_api_payload( entry.request_payload ) entry.sanitized_request = json.dumps(sanitized_req, ensure_ascii=False) # 脱敏响应 if entry.response_payload: sanitized_resp = self.sanitizer.sanitize_response( entry.response_payload ) entry.sanitized_response = json.dumps(sanitized_resp, ensure_ascii=False) self.buffer.append(entry) self._stats['total_sanitized'] += 1 except Exception as e: logger.warning(f"脱敏失败: {e}") # 记录原始数据但不保存 entry.sanitized_request = "[SANITIZATION_FAILED]" self.buffer.append(entry) def _flush_buffer(self): """刷新缓冲区到存储""" if not self.buffer: return try: batch = self.buffer[:self.batch_size] self.buffer = self.buffer[self.batch_size:] # 批量写入逻辑(可接入 Elasticsearch、Kafka 等) self._write_batch(batch) self.last_flush_time = time.time() self._stats['total_processed'] += len(batch) logger.debug(f"已写入 {len(batch)} 条日志") except Exception as e: logger.error(f"批量写入失败: {e}") self._stats['errors'] += 1 def _write_batch(self, batch: List[LogEntry]): """写入批次数据(可扩展到多种存储后端)""" # 示例:写入到标准输出(生产环境替换为实际存储) for entry in batch: log_line = json.dumps(entry.to_dict(), ensure_ascii=False) print(log_line) def enqueue(self, entry: LogEntry): """入队日志条目""" try: self.queue.put_nowait(entry) self._stats['total_processed'] += 1 except Exception: self._stats['total_dropped'] += 1 def shutdown(self): """优雅关闭""" self._shutdown_event.set() if self._worker_thread: self._worker_thread.join(timeout=10.0) self._flush_buffer() logger.info(f"日志管道关闭,统计: {self._stats}") def get_stats(self) -> Dict[str, int]: return self._stats.copy() class HolySheepAPIClient: """HolySheep AI API 客户端(带日志脱敏)""" def __init__( self, api_key: str, base_url: str = "https://api.holysheep.ai/v1", log_pipeline: Optional[AsyncLogPipeline] = None ): self.api_key = api_key self.base_url = base_url self.log_pipeline = log_pipeline self._session = None async def chat_completions( self, messages: List[Dict[str, str]], model: str = "gpt-4.1", **kwargs ) -> Dict[str, Any]: """调用 Chat Completions API""" trace_id = str(uuid.uuid4()) start_time = time.time() headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": messages, **{k: v for k, v in kwargs.items() if v is not None} } entry = LogEntry( trace_id=trace_id, timestamp=datetime.utcnow().isoformat(), level="INFO", service="holysheep-api", operation="chat.completions", request_id=str(uuid.uuid4()), duration_ms=0, api_endpoint=f"{self.base_url}/chat/completions", request_payload=payload.copy(), response_payload={} ) try: # 这里使用实际的 HTTP 请求 async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/chat/completions", headers=headers, json=payload, timeout=60.0 ) response.raise_for_status() result = response.json() entry.duration_ms = (time.time() - start_time) * 1000 entry.response_payload