Khi triển khai hệ thống AI cho một nền tảng thương mại điện tử quy mô 2 triệu người dùng, tôi đã phải đối mặt với một cơn ác mộng thực sự: function calling bị khai thác để truy cập trái phép dữ liệu khách hàng. Chỉ trong 3 ngày đầu launch, hệ thống đã ghi nhận hơn 47.000 lượt gọi hàm với tham số được tiêm độc từ prompt. Bài viết này chia sẻ chiến lược bảo mật mà tôi đã xây dựng để ngăn chặn hoàn toàn cuộc tấn công này.
Bối Cảnh Thực Tế: Khi Chatbot AI Bị Biến Thành Công Cụ Khai Thác
Trong dự án triển khai AI assistant cho hotline chăm sóc khách hàng, tôi sử dụng nền tảng HolySheep AI với chi phí chỉ $0.42/MTok cho DeepSeek V3.2 — tiết kiệm 85% so với GPT-4.1 ($8/MTok). Tuy nhiên, sau 2 tuần vận hành, đội bảo mật phát hiện:
- Người dùng chủ động inject JSON payload vào prompt
- Tham số hàm bị override qua multi-turn conversation
- SQL injection qua function parameters chưa sanitize
- Path traversal attack qua file operation functions
Đây là lỗ hổng nghiêm trọng mà tôi sẽ hướng dẫn bạn fix triệt để.
Giải Pháp 1: Schema Validation Layer Với Pydantic
Đầu tiên, tôi xây dựng một validation layer hoàn chỉnh trước khi gọi bất kỳ function nào. Sử dụng Pydantic để đảm bảo tham số đầu vào phải tuân theo schema nghiêm ngặt.
# secure_function_calling.py
from pydantic import BaseModel, Field, field_validator
from typing import Optional, Literal
import json
import re
class SecureFunctionSchema:
"""Layer bảo mật validation cho function calling"""
@staticmethod
def sanitize_string(value: str, max_length: int = 500) -> str:
"""Sanitize input string - loại bỏ ký tự độc hại"""
if not isinstance(value, str):
raise ValueError("Tham số phải là string")
# Loại bỏ null bytes và control characters
sanitized = value.replace('\x00', '').strip()
# Giới hạn độ dài
if len(sanitized) > max_length:
raise ValueError(f"String vượt quá {max_length} ký tự")
# Kiểm tra potential injection patterns
dangerous_patterns = [
r'\{\{.*?\}\}', # Template injection
r'.*?', # XSS
r'\.\./', # Path traversal
r'\$;', # Command injection
]
for pattern in dangerous_patterns:
if re.search(pattern, sanitized, re.IGNORECASE):
raise ValueError(f"Phát hiện pattern nguy hiểm: {pattern}")
return sanitized
@staticmethod
def validate_email(email: str) -> str:
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, email):
raise ValueError("Email không hợp lệ")
return email.lower()
@staticmethod
def validate_numeric_range(value: float, min_val: float, max_val: float) -> float:
"""Validate số trong khoảng cho phép"""
if not isinstance(value, (int, float)):
raise ValueError("Tham số phải là số")
if not min_val <= value <= max_val:
raise ValueError(f"Giá trị phải nằm trong khoảng [{min_val}, {max_val}]")
return float(value)
class ProductQuerySchema(BaseModel):
"""Schema cho function get_product_info"""
product_id: str = Field(..., min_length=1, max_length=50)
include_reviews: bool = Field(default=False)
language: Literal['vi', 'en', 'zh'] = Field(default='vi')
@field_validator('product_id')
@classmethod
def validate_product_id(cls, v):
# Chỉ cho phép alphanumeric và dấu gạch ngang
if not re.match(r'^[a-zA-Z0-9\-]+$', v):
raise ValueError("Product ID chỉ chứa alphanumeric và dấu -")
return SecureFunctionSchema.sanitize_string(v, max_length=50)
class OrderQuerySchema(BaseModel):
"""Schema cho function get_order_status"""
order_id: str
customer_email: str
@field_validator('order_id')
@classmethod
def validate_order_id(cls, v):
if not re.match(r'^ORD\d{8,}$', v):
raise ValueError("Order ID phải có format ORD + 8+ số")
return v
@field_validator('customer_email')
@classmethod
def validate_email(cls, v):
return SecureFunctionSchema.validate_email(v)
print("✅ Secure schema validation loaded")
Giải Pháp 2: AI Gateway Với Request Sanitization
Tier thứ hai bảo mật là AI Gateway — một proxy layer kiểm soát mọi request trước khi đến LLM. Tôi triển khai gateway này với HolySheep AI, đạt latency trung bình dưới 50ms.
# ai_gateway.py
import httpx
import json
import hashlib
import time
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
@dataclass
class FunctionDefinition:
"""Định nghĩa function được phép gọi"""
name: str
parameters: Dict[str, Any]
max_calls_per_minute: int = 60
required_role: Optional[str] = None
@dataclass
class SecurityConfig:
"""Cấu hình bảo mật toàn cục"""
max_function_calls_per_request: int = 5
max_recursion_depth: int = 2
blocklisted_patterns: List[str] = field(default_factory=lambda: [
"admin", "root", "sudo", "__import__", "eval", "exec",
"os.system", "subprocess", "pickle", " marshal"
])
class AISecureGateway:
"""
Gateway bảo mật cho AI API calls
- Sanitizes prompts
- Validates function calls
- Rate limiting
- Audit logging
"""
def __init__(self, api_key: str, config: SecurityConfig):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.config = config
self.allowed_functions: Dict[str, FunctionDefinition] = {}
self._rate_limit_cache: Dict[str, List[float]] = {}
def register_function(self, func_def: FunctionDefinition):
"""Đăng ký function được phép sử dụng"""
self.allowed_functions[func_def.name] = func_def
print(f"🔐 Registered function: {func_def.name}")
def _sanitize_function_call(self, function_call: Dict) -> Dict:
"""Sanitize và validate một function call"""
func_name = function_call.get('name') or function_call.get('function', {}).get('name')
if not func_name:
raise ValueError("Function name missing")
# Kiểm tra function được phép
if func_name not in self.allowed_functions:
raise ValueError(f"Function '{func_name}' không được phép gọi")
# Validate tham số
params = function_call.get('arguments') or function_call.get('function', {}).get('arguments', {})
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError:
raise ValueError("Invalid JSON in arguments")
# Check for blocklisted patterns
params_str = json.dumps(params)
for pattern in self.config.blocklisted_patterns:
if pattern.lower() in params_str.lower():
raise ValueError(f"Phát hiện pattern nguy hiểm: {pattern}")
return {
'name': func_name,
'arguments': params
}
def _check_rate_limit(self, user_id: str, function_name: str) -> bool:
"""Kiểm tra rate limit cho user/function"""
key = f"{user_id}:{function_name}"
now = time.time()
window = 60 # 1 phút
if key not in self._rate_limit_cache:
self._rate_limit_cache[key] = []
# Clean old entries
self._rate_limit_cache[key] = [
t for t in self._rate_limit_cache[key]
if now - t < window
]
func_def = self.allowed_functions.get(function_name)
max_calls = func_def.max_calls_per_minute if func_def else 60
if len(self._rate_limit_cache[key]) >= max_calls:
return False
self._rate_limit_cache[key].append(now)
return True
async def call_with_functions(
self,
prompt: str,
user_id: str,
functions: List[Dict],
user_role: Optional[str] = None
) -> Dict:
"""Gọi AI với function calling có bảo mật"""
# Validate và sanitize tất cả function definitions
sanitized_functions = []
for func in functions:
sanitized = self._sanitize_function_call(func)
func_def = self.allowed_functions.get(sanitized['name'])
if func_def and func_def.required_role:
if user_role != func_def.required_role:
raise PermissionError(f"Cần quyền {func_def.required_role}")
sanitized_functions.append(sanitized)
# Kiểm tra rate limit cho tất cả functions
for func in sanitized_functions:
if not self._check_rate_limit(user_id, func['name']):
raise PermissionError(f"Rate limit exceeded for {func['name']}")
# Gọi HolySheep AI
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": "deepseek-v3.2",
"messages": [{"role": "user", "content": prompt}],
"functions": sanitized_functions
}
)
if response.status_code != 200:
raise Exception(f"API Error: {response.status_code}")
return response.json()
Khởi tạo gateway
gateway = AISecureGateway(
api_key="YOUR_HOLYSHEEP_API_KEY",
config=SecurityConfig()
)
Đăng ký các functions được phép
gateway.register_function(FunctionDefinition(
name="get_product_info",
parameters={
"type": "object",
"properties": {
"product_id": {"type": "string"},
"include_reviews": {"type": "boolean"}
}
},
max_calls_per_minute=100
))
gateway.register_function(FunctionDefinition(
name="get_order_status",
parameters={
"type": "object",
"properties": {
"order_id": {"type": "string"},
"customer_email": {"type": "string"}
}
},
max_calls_per_minute=30,
required_role="customer"
))
print("🛡️ AI Secure Gateway initialized")
Giải Pháp 3: Output Sanitization Và Response Validation
Không chỉ đầu vào cần bảo mật. Output từ AI model cũng có thể chứa malicious content hoặc sensitive data leak. Tôi xây dựng output sanitizer để filter mọi response.
# output_sanitizer.py
import re
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
@dataclass
class SensitiveDataPattern:
"""Pattern cho dữ liệu nhạy cảm cần mask"""
name: str
pattern: str
mask_char: str = '*'
class OutputSanitizer:
"""Sanitize và validate AI output"""
def __init__(self):
self.sensitive_patterns = [
SensitiveDataPattern(
name="email",
pattern=r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
),
SensitiveDataPattern(
name="phone_vn",
pattern=r'\b(0[1-9]{1,3}[0-9]{8,9})\b'
),
SensitiveDataPattern(
name="credit_card",
pattern=r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'
),
SensitiveDataPattern(
name="ssn",
pattern=r'\b\d{3}[- ]?\d{2}[- ]?\d{4}\b'
),
]
self.injection_patterns = [
r'',
r'javascript:',
r'on\w+\s*=',
r'eval\s*\(',
r'document\.cookie',
r'window\.location',
]
def mask_sensitive_data(self, text: str, user_authorized: bool = False) -> str:
"""Mask dữ liệu nhạy cảm trong text"""
result = text
for pattern in self.sensitive_patterns:
# Chỉ unmask nếu user có quyền
if not user_authorized:
result = re.sub(
pattern.pattern,
f'[{pattern.name}_masked]',
result
)
return result
def detect_injection_attempt(self, text: str) -> Optional[str]:
"""Phát hiện cố gắng injection trong output"""
for pattern in self.injection_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return f"Detected: {pattern} at position {match.start()}"
return None
def sanitize_function_result(
self,
result: Any,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Sanitize kết quả từ function execution"""
if isinstance(result, dict):
sanitized = {}
for key, value in result.items():
# Check key name for sensitivity hints
sensitivity_hints = ['password', 'token', 'key', 'secret', 'ssn', 'credit']
is_sensitive = any(hint in key.lower() for hint in sensitivity_hints)
if is_sensitive and not context.get('user_can_see_sensitive'):
sanitized[key] = '***REDACTED***'
elif isinstance(value, str):
sanitized[key] = self.mask_sensitive_data(
value,
context.get('user_can_see_pii', False)
)
else:
sanitized[key] = value
return sanitized
return result
def validate_response_structure(self, response: Dict) -> bool:
"""Validate cấu trúc response không có anomaly"""
if not isinstance(response, dict):
return False
# Check for recursive depth (possible DoS)
def check_depth(obj, current_depth=0, max_depth=10):
if current_depth > max_depth:
return False
if isinstance(obj, dict):
return all(check_depth(v, current_depth + 1, max_depth) for v in obj.values())
if isinstance(obj, list):
return all(check_depth(item, current_depth + 1, max_depth) for item in obj)
return True
return check_depth(response)
Demo
sanitizer = OutputSanitizer()
test_output = """
Kết quả đơn hàng cho khách hàng:
Email: [email protected]
SĐT: 0912345678
Mã thẻ: 1234-5678-9012-3456
<script>alert('hacked')</script>
"""
print("=== Original ===")
print(test_output)
print("\n=== Sanitized (user not authorized) ===")
print(sanitizer.mask_sensitive_data(test_output, user_authorized=False))
print("\n=== Injection Check ===")
print(sanitizer.detect_injection_attempt(test_output))
Kiến Trúc Hoàn Chỉnh: Secure AI Pipeline
Kết hợp cả 3 layer bảo mật, đây là pipeline hoàn chỉnh tôi triển khai cho production:
# secure_ai_pipeline.py
import asyncio
import httpx
import json
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Import các module đã định nghĩa
from ai_gateway import AISecureGateway, FunctionDefinition, SecurityConfig
from output_sanitizer import OutputSanitizer
class SecureAIPipeline:
"""
Pipeline hoàn chỉnh cho AI interactions với bảo mật nhiều lớp
Layer 1: Input Sanitization (Schema Validation)
Layer 2: AI Gateway (Proxy + Rate Limiting)
Layer 3: Output Sanitization (Data Masking)
Layer 4: Audit Logging
"""
def __init__(
self,
api_key: str,
allowed_functions: List[FunctionDefinition],
enable_audit: bool = True
):
self.gateway = AISecureGateway(
api_key=api_key,
config=SecurityConfig()
)
self.sanitizer = OutputSanitizer()
self.enable_audit = enable_audit
self.audit_log: List[Dict] = []
# Register all allowed functions
for func in allowed_functions:
self.gateway.register_function(func)
logger.info("🔒 Secure AI Pipeline initialized")
def _log_audit(
self,
event_type: str,
user_id: str,
data: Dict[str, Any],
success: bool
):
"""Audit log cho mọi interaction"""
if not self.enable_audit:
return
entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'user_id': user_id,
'data_hash': hash(str(data)), # Không lưu data thực
'success': success
}
self.audit_log.append(entry)
# Log suspicious events
if not success or event_type in ['injection_detected', 'rate_limited']:
logger.warning(f"⚠️ Security Event: {entry}")
async def process_request(
self,
user_id: str,
prompt: str,
user_role: Optional[str] = None,
context: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Process request qua toàn bộ pipeline bảo mật
Args:
user_id: ID người dùng
prompt: Prompt từ người dùng
user_role: Vai trò người dùng (admin, customer, guest)
context: Context bổ sung (user_can_see_pii, etc.)
"""
context = context or {}
start_time = asyncio.get_event_loop().time()
try:
# === LAYER 1: Input Validation ===
# Validate prompt không chứa obvious injection
injection_check = self.sanitizer.detect_injection_attempt(prompt)
if injection_check:
self._log_audit('injection_detected', user_id, {'prompt': prompt}, False)
return {
'success': False,
'error': 'Request blocked by security filter',
'code': 'SECURITY_VIOLATION'
}
# === LAYER 2: AI Gateway Processing ===
# Chuyển prompt qua gateway để sanitize và rate limit
functions = context.get('available