프로덕션 환경에서 AI API를 운영할 때 가장 중요한 것 중 하나는 바로 보안 감사(Security Audit)입니다. 저는 지난 3년간 HolySheep AI 게이트웨이 환경에서 수백 개의 엔드포인트를 관리하면서ログ 관리와 접근 제어가 얼마나 중요한지 뼈저리게 느꼈습니다. 이번 튜토리얼에서는 실무에서 검증된 로그 탈감 기법과 역할 기반 접근 제어(RBAC)를 활용한 프로덕션 보안 아키텍처를 상세히 다룹니다.

1. 보안 감사 아키텍처 개요

AI API 보안 감사의 핵심은 세 가지 레이어로 구성됩니다:

"""
HolySheep AI 보안 감사 시스템 아키텍처
저장소 패턴 + 파사드 패턴을 활용한 모듈화된 설계
"""

from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
from datetime import datetime
import hashlib
import re
import threading
from collections import defaultdict


class SensitivityLevel(Enum):
    """데이터 민감도 등급"""
    PUBLIC = 0
    INTERNAL = 1
    CONFIDENTIAL = 2
    RESTRICTED = 3


@dataclass
class AuditLogEntry:
    """감사 로그 엔트리 구조체"""
    timestamp: datetime
    request_id: str
    user_id: str
    api_key_hash: str
    endpoint: str
    model: str
    tokens_used: int
    latency_ms: float
    status_code: int
    sanitized_payload: Dict[str, Any]
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "timestamp": self.timestamp.isoformat(),
            "request_id": self.request_id,
            "user_id_hash": hashlib.sha256(self.user_id.encode()).hexdigest()[:16],
            "api_key_fingerprint": self.api_key_hash[:12],
            "endpoint": self.endpoint,
            "model": self.model,
            "tokens": self.tokens_used,
            "latency_ms": self.latency_ms,
            "status": self.status_code,
            "sensitive_data_masked": True
        }


class LogSanitizer:
    """로그 탈감 처리기 - 정규식 기반 PII 감지"""
    
    # 패턴 컴파일 (스레드 안전을 위해 클래스 레벨에서 한 번만)
    PATTERNS = {
        "email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
        "phone": re.compile(r'\b\d{3}[-.]?\d{3,4}[-.]?\d{4}\b'),
        "credit_card": re.compile(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'),
        "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
        "ip_address": re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'),
        "api_key": re.compile(r'sk-[A-Za-z0-9]{32,}'),
        "bearer_token": re.compile(r'Bearer\s+[A-Za-z0-9\-._~+/]+'),
    }
    
    MASKING_CONFIG = {
        "email": "***@***.***",
        "phone": "***-****-****",
        "credit_card": "****-****-****-****",
        "ssn": "***-**-****",
        "ip_address": "***.***.***.***",
        "api_key": "sk-************",
        "bearer_token": "Bearer ************",
    }
    
    def __init__(self, custom_patterns: Optional[Dict[str, str]] = None):
        self.custom_patterns = custom_patterns or {}
        self._stats = defaultdict(int)
        self._lock = threading.Lock()
    
    def sanitize(self, data: Any, depth: int = 0, max_depth: int = 10) -> Any:
        """재귀적 로그 탈감 처리"""
        if depth > max_depth:
            return "[MAX_DEPTH_EXCEEDED]"
        
        if isinstance(data, dict):
            return {
                k: self.sanitize(v, depth + 1, max_depth) 
                for k, v in data.items()
            }
        elif isinstance(data, list):
            return [self.sanitize(item, depth + 1, max_depth) for item in data]
        elif isinstance(data, str):
            return self._mask_sensitive_strings(data)
        else:
            return data
    
    def _mask_sensitive_strings(self, text: str) -> str:
        """문자열 내 민감 정보 마스킹"""
        result = text
        
        for pattern_name, compiled_pattern in self.PATTERNS.items():
            matches = compiled_pattern.findall(result)
            for match in matches:
                with self._lock:
                    self._stats[pattern_name] += len(matches)
                result = result.replace(match, self.MASKING_CONFIG[pattern_name])
        
        return result
    
    def get_stats(self) -> Dict[str, int]:
        """탈감 통계 반환"""
        with self._lock:
            return dict(self._stats)


HolySheep API 연동을 위한 HTTP 클라이언트 래퍼

class HolySheepAuditClient: """HolySheep AI API를 활용한 감사 로그 수집 클라이언트""" BASE_URL = "https://api.holysheep.ai/v1" def __init__(self, api_key: str): self.api_key = api_key self.sanitizer = LogSanitizer() self._request_count = 0 self._total_cost = 0.0 def create_chat_completion(self, messages: List[Dict], model: str = "gpt-4.1") -> Dict: """HolySheep AI 채팅 완성 API 호출 + 자동 감사 로그 생성""" import json import time import urllib.request url = f"{self.BASE_URL}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": messages } # 요청 전 탈감 처리 sanitized_payload = self.sanitizer.sanitize(payload) start_time = time.time() req = urllib.request.Request( url, data=json.dumps(payload).encode('utf-8'), headers=headers, method='POST' ) try: with urllib.request.urlopen(req, timeout=30) as response: response_data = json.loads(response.read().decode('utf-8')) latency_ms = (time.time() - start_time) * 1000 # 응답도 탈감 처리 sanitized_response = self.sanitizer.sanitize(response_data) # 비용 계산 (HolySheep AI 실시간 가격) usage = response_data.get("usage", {}) tokens = usage.get("total_tokens", 0) cost = self._calculate_cost(model, tokens) # 감사 로그 엔트리 생성 audit_entry = AuditLogEntry( timestamp=datetime.utcnow(), request_id=response_data.get("id", "unknown"), user_id="system", api_key_hash=hashlib.sha256(self.api_key.encode()).hexdigest(), endpoint="/v1/chat/completions", model=model, tokens_used=tokens, latency_ms=latency_ms, status_code=200, sanitized_payload=sanitized_payload, metadata={"cost_usd": cost} ) self._request_count += 1 self._total_cost += cost return { "response": response_data, "audit_entry": audit_entry.to_dict() } except Exception as e: return {"error": str(e), "sanitized": sanitized_payload} def _calculate_cost(self, model: str, tokens: int) -> float: """HolySheep AI 가격표 기반 비용 계산""" pricing = { "gpt-4.1": 8.0, # $8.00/MTok "claude-sonnet-4": 15.0, # $15.00/MTok "gemini-2.5-flash": 2.5, # $2.50/MTok "deepseek-v3.2": 0.42, # $0.42/MTok } rate = pricing.get(model, 8.0) return (tokens / 1_000_000) * rate

2. 역할 기반 접근 제어(RBAC) 구현

저는 HolySheep AI 환경에서 최소 권한 원칙(Principle of Least Privilege)을 적용하여 팀별 접근 제어를 구성합니다. 각 역할에 따라 허용되는 모델, 토큰 제한,_RATE_LIMIT을 차등 적용하는 것이 핵심입니다.

/**
 * HolySheep AI를 위한 Role-Based Access Control (RBAC) 구현
 * TypeScript + Express.js 기반 미들웨어
 */

// ==================== 타입 정의 ====================
enum Role {
  ADMIN = "admin",
  DEVELOPER = "developer", 
  ANALYST = "analyst",
  READONLY = "readonly"
}

enum Permission {
  READ_MODELS = "read:models",
  WRITE_LOGS = "write:logs",
  READ_AUDIT = "read:audit",
  EXECUTE_API = "execute:api",
  MANAGE_USERS = "manage:users",
  VIEW_COSTS = "view:costs"
}

interface UserPolicy {
  role: Role;
  allowedModels: string[];
  maxTokensPerRequest: number;
  maxRequestsPerMinute: number;
  allowedEndpoints: string[];
  permissions: Permission[];
}

interface RateLimitConfig {
  windowMs: number;
  maxRequests: number;
  tokenCost: number;
}

// ==================== 역할 정책 정의 ====================
const ROLE_POLICIES: Record = {
  [Role.ADMIN]: {
    role: Role.ADMIN,
    allowedModels: ["gpt-4.1", "claude-sonnet-4", "gemini-2.5-flash", "deepseek-v3.2"],
    maxTokensPerRequest: 128000,
    maxRequestsPerMinute: 1000,
    allowedEndpoints: ["*"],
    permissions: Object.values(Permission)
  },
  [Role.DEVELOPER]: {
    role: Role.DEVELOPER,
    allowedModels: ["gpt-4.1", "gemini-2.5-flash", "deepseek-v3.2"],
    maxTokensPerRequest: 32768,
    maxRequestsPerMinute: 100,
    allowedEndpoints: ["/v1/chat/completions", "/v1/embeddings"],
    permissions: [
      Permission.READ_MODELS,
      Permission.EXECUTE_API,
      Permission.READ_AUDIT,
      Permission.VIEW_COSTS
    ]
  },
  [Role.ANALYST]: {
    role: Role.ANALYST,
    allowedModels: ["deepseek-v3.2", "gemini-2.5-flash"],
    maxTokensPerRequest: 8192,
    maxRequestsPerMinute: 30,
    allowedEndpoints: ["/v1/chat/completions"],
    permissions: [
      Permission.READ_MODELS,
      Permission.EXECUTE_API,
      Permission.READ_AUDIT
    ]
  },
  [Role.READONLY]: {
    role: Role.READONLY,
    allowedModels: [],
    maxTokensPerRequest: 0,
    maxRequestsPerMinute: 10,
    allowedEndpoints: ["/v1/models"],
    permissions: [Permission.READ_MODELS]
  }
};

// ==================== 토큰 기반 Rate Limiter ====================
class TokenBucketRateLimiter {
  private buckets: Map = new Map();
  private config: RateLimitConfig;

  constructor(config: RateLimitConfig) {
    this.config = config;
  }

  async checkLimit(userId: string, tokenCost: number = 1): Promise {
    const now = Date.now();
    const bucket = this.buckets.get(userId);

    if (!bucket) {
      this.buckets.set(userId, {
        tokens: this.config.maxRequests - tokenCost,
        lastRefill: now
      });
      return true;
    }

    // 시간 경과에 따른 토큰 복구
    const elapsed = now - bucket.lastRefill;
    const refillRate = (this.config.windowMs / 1000) / this.config.maxRequests;
    const tokensToAdd = elapsed / 1000 / refillRate;

    bucket.tokens = Math.min(
      this.config.maxRequests,
      bucket.tokens + tokensToAdd
    );
    bucket.lastRefill = now;

    if (bucket.tokens >= tokenCost) {
      bucket.tokens -= tokenCost;
      return true;
    }

    return false;
  }

  getRemainingRequests(userId: string): number {
    const bucket = this.buckets.get(userId);
    return bucket ? Math.floor(bucket.tokens) : this.config.maxRequests;
  }
}

// ==================== HolySheep API Gateway 미들웨어 ====================
class HolySheepAccessControl {
  private rateLimiter: TokenBucketRateLimiter;
  private apiKeyPolicyMap: Map = new Map();
  private auditLogger: AuditLogger;

  constructor() {
    this.rateLimiter = new TokenBucketRateLimiter({
      windowMs: 60000, // 1분
      maxRequests: 100,
      tokenCost: 1
    });
    this.auditLogger = new AuditLogger();
  }

  // API 키 등록
  registerApiKey(apiKey: string, role: Role): void {
    this.apiKeyPolicyMap.set(apiKey, role);
    console.log([RBAC] API Key registered: ${apiKey.slice(0, 8)}... -> ${role});
  }

  // 접근 제어 미들웨어
  createMiddleware() {
    return async (req: any, res: any, next: any) => {
      const apiKey = req.headers.authorization?.replace("Bearer ", "");
      const startTime = Date.now();

      if (!apiKey) {
        return res.status(401).json({
          error: "UNAUTHORIZED",
          message: "API 키가 제공되지 않았습니다"
        });
      }

      const role = this.apiKeyPolicyMap.get(apiKey);
      if (!role) {
        await this.auditLogger.log({
          type: "AUTH_FAILURE",
          apiKey: this.hashApiKey(apiKey),
          ip: req.ip,
          endpoint: req.path
        });
        return res.status(403).json({
          error: "FORBIDDEN",
          message: "유효하지 않은 API 키입니다"
        });
      }

      const policy = ROLE_POLICIES[role];

      // 1. 엔드포인트 접근 권한 확인
      const endpoint = req.path;
      const endpointAllowed = policy.allowedEndpoints.includes("*") ||
        policy.allowedEndpoints.some(e => endpoint.startsWith(e));

      if (!endpointAllowed) {
        return res.status(403).json({
          error: "ENDPOINT_NOT_ALLOWED",
          message: 이 엔드포인트(${endpoint})에 대한 접근 권한이 없습니다,
          required_permission: "특정 엔드포인트 접근 필요"
        });
      }

      // 2. 모델 접근 권한 확인
      if (req.body?.model && !policy.allowedModels.includes(req.body.model)) {
        return res.status(403).json({
          error: "MODEL_NOT_ALLOWED",
          message: 선택한 모델(${req.body.model})에 대한 접근 권한이 없습니다,
          allowed_models: policy.allowedModels
        });
      }

      // 3. Rate Limit 확인
      const allowed = await this.rateLimiter.checkLimit(apiKey);
      if (!allowed) {
        res.set("X-RateLimit-Remaining", "0");
        res.set("Retry-After", "60");
        return res.status(429).json({
          error: "RATE_LIMIT_EXCEEDED",
          message: "분당 요청 제한을 초과했습니다",
          retry_after: 60
        });
      }

      // 4. 토큰 제한 확인
      const requestedTokens = req.body?.max_tokens || 2048;
      if (requestedTokens > policy.maxTokensPerRequest) {
        return res.status(400).json({
          error: "TOKEN_LIMIT_EXCEEDED",
          message: 토큰 제한(${policy.maxTokensPerRequest})을 초과했습니다,
          requested: requestedTokens
        });
      }

      // 5. 감사 로그 기록
      const latencyMs = Date.now() - startTime;
      await this.auditLogger.log({
        type: "API_REQUEST",
        apiKey: this.hashApiKey(apiKey),
        role: role,
        endpoint: endpoint,
        model: req.body?.model,
        tokens: requestedTokens,
        latency_ms: latencyMs,
        status: "SUCCESS"
      });

      // 응답 헤더에 Rate Limit 정보 추가
      res.set("X-RateLimit-Remaining", 
        String(this.rateLimiter.getRemainingRequests(apiKey)));
      res.set("X-User-Role", role);

      next();
    };
  }

  private hashApiKey(apiKey: string): string {
    const crypto = require('crypto');
    return crypto.createHash('sha256').update(apiKey).digest('hex').slice(0, 12);
  }
}

// ==================== 감사 로거 ====================
class AuditLogger {
  private logs: any[] = [];

  async log(entry: any): Promise {
    const logEntry = {
      timestamp: new Date().toISOString(),
      ...entry
    };
    this.logs.push(logEntry);
    
    // HolySheep API로 감사 로그 전송 (별도 스레드)
    this.sendToRemote(logEntry);
  }

  private async sendToRemote(entry: any): Promise {
    // HolySheep Audit Endpoint에 비동기 전송
    console.log([AUDIT] ${JSON.stringify(entry)});
  }

  getLogs(filter?: any): any[] {
    return this.logs.filter(log => {
      if (!filter) return true;
      if (filter.role && log.role !== filter.role) return false;
      if (filter.type && log.type !== filter.type) return false;
      return true;
    });
  }
}

// ==================== 사용 예제 ====================
const accessControl = new HolySheepAccessControl();

// API 키 등록 (실제 환경에서는 DB에서 로드)
accessControl.registerApiKey("YOUR_HOLYSHEEP_API_KEY", Role.DEVELOPER);
accessControl.registerApiKey("admin-key-xxx", Role.ADMIN);
accessControl.registerApiKey("analyst-key-yyy", Role.ANALYST);

// Express 미들웨어로 사용
const express = require('express');
const app = express();

app.use(express.json());
app.use("/v1", accessControl.createMiddleware());

// HolySheep AI 프록시 라우트
app.post("/v1/chat/completions", async (req, res) => {
  // 이미 middleware에서 모든 검증을 통과함
  const response = await forwardToHolySheep(req.body);
  res.json(response);
});

console.log("HolySheep AI RBAC Gateway initialized");
console.log("Base URL: https://api.holysheep.ai/v1");

3. HolySheep AI 통합 감사 대시보드

저는 매일 아침 HolySheep AI 대시보드를 통해 비용 추이, API 호출 패턴, 보안 이벤트를 점검합니다. 이를 자동화하는 모니터링 스크립트를 공유합니다.

"""
HolySheep AI 보안 감사 대시보드 백엔드
실시간 비용 추적 + 이상 행동 탐지
"""

import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional
import statistics


@dataclass
class CostSnapshot:
    """비용 스냅샷"""
    timestamp: datetime
    model: str
    total_tokens: int
    total_cost_usd: float
    request_count: int
    avg_latency_ms: float
    error_rate: float


class SecurityAnomalyDetector:
    """보안 이상 행동 탐지기"""
    
    def __init__(self, baseline_window_hours: int = 24):
        self.baseline_window = timedelta(hours=baseline_window_hours)
        self.user_baselines: Dict[str, Dict] = {}
        self.alerts: List[Dict] = []
    
    def update_baseline(self, user_id: str, metrics: Dict):
        """사용자 기준선 업데이트"""
        if user_id not in self.user_baselines:
            self.user_baselines[user_id] = {
                "requests_per_hour": [],
                "tokens_per_request": [],
                "unique_endpoints": set(),
                "unique_models": set()
            }
        
        baseline = self.user_baselines[user_id]
        baseline["requests_per_hour"].append(metrics.get("requests", 0))
        baseline["tokens_per_request"].append(metrics.get("avg_tokens", 0))
        baseline["unique_endpoints"].update(metrics.get("endpoints", []))
        baseline["unique_models"].update(metrics.get("models", []))
        
        # 윈도우 사이즈 유지
        if len(baseline["requests_per_hour"]) > self.baseline_window_hours if hasattr(self, 'baseline_window_hours') else 24:
            baseline["requests_per_hour"].pop(0)
            baseline["tokens_per_request"].pop(0)