Trong thế giới AI ngày nay, bảo mật cho các hệ thống LLM (Large Language Model) đã trở thành ưu tiên hàng đầu của mọi doanh nghiệp. Bài viết này sẽ hướng dẫn bạn xây dựng một hệ thống phát hiện và cảnh báo Prompt Injection từ đầu đến cuối, giúp bảo vệ ứng dụng AI của bạn trước các cuộc tấn công tinh vi.
Bối Cảnh Thực Tế: Khi Chatbot Thương Mại Điện Tử Bị Tấn Công
Tôi vẫn nhớ rõ ngày hôm đó - một buổi sáng thứ Hai đầu tuần, hệ thống chatbot AI của một khách hàng thương mại điện tử lớn bất ngờ "nói những điều kỳ lạ". Thay vì trả lời về sản phẩm, chatbot lại tiết lộ toàn bộ cơ sở dữ liệu khách hàng, bao gồm email, địa chỉ và lịch sử mua hàng. Đó là một cuộc tấn công Prompt Injection điển hình.
Sau 72 giờ khắc phục khẩn cấp, tôi quyết định xây dựng một hệ thống bảo mật hoàn chỉnh. Kết quả? Trong 6 tháng tiếp theo, hệ thống đã chặn đứng 847 cuộc tấn công và tiết kiệm được khoảng $12,000 chi phí API - nhờ sử dụng HolySheep AI với giá chỉ $0.42/MTok cho DeepSeek V3.2.
Prompt Injection Là Gì?
Prompt Injection là kỹ thuật tấn công mà kẻ xấu chèn các chỉ thị độc hại vào input của LLM, khiến model thực hiện hành vi ngoài ý muốn của nhà phát triển. Các dạng phổ biến bao gồm:
- Direct Injection: Chèn指令 trực tiếp vào prompt người dùng
- Indirect Injection: Chèn mã độc qua dữ liệu từ external sources (RAG, web scraping)
- Context Overflow: Tràn bộ nhớ context để bypass safety filters
Xây Dựng Hệ Thống Detection Engine
1. Cài Đặt Môi Trường
pip install fastapi uvicorn httpx python-dotenv
pip install transformers torch scikit-learn
pip install redis asyncio aiohttp
2. Prompt Injection Detector Class
import re
import httpx
import asyncio
from typing import Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum
class ThreatLevel(Enum):
SAFE = "safe"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DetectionResult:
is_injection: bool
threat_level: ThreatLevel
matched_patterns: List[str]
confidence: float
analysis: str
class PromptInjectionDetector:
def __init__(self, api_key: str):
self.base_url = "https://api.holysheep.ai/v1"
self.api_key = api_key
# Các pattern đã biết
self.injection_patterns = [
r"ignore\s+(previous|all|above)\s+(instructions?|prompts?|commands?)",
r"(system|developer|admin)\s*:\s*",
r"\[\s*INST\s*\]",
r"<\s*/\s*.*?>", # HTML/XML injection
r"{{\s*.*?\s*}}", # Template injection
r"``\s*.*?``", # Code block injection
r"sql\s*:\s*",
r"exec\(|eval\(|os\.system\(",
]
self.compiled_patterns = [
re.compile(p, re.IGNORECASE) for p in self.injection_patterns
]
async def analyze_with_llm(self, prompt: str) -> Dict:
"""Sử dụng AI để phân tích sâu prompt"""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": """Bạn là chuyên gia bảo mật AI. Phân tích prompt sau
và trả về JSON: {"injection": bool, "reason": str, "risk_score": float(0-1)}"""
},
{
"role": "user",
"content": f"Analyze: {prompt}"
}
],
"temperature": 0.1
}
)
return response.json()
def pattern_matching(self, prompt: str) -> Tuple[List[str], float]:
"""Pattern-based detection"""
matched = []
for i, pattern in enumerate(self.compiled_patterns):
if pattern.search(prompt):
matched.append(self.injection_patterns[i])
confidence = min(len(matched) * 0.3, 1.0)
return matched, confidence
async def detect(self, prompt: str) -> DetectionResult:
"""Main detection method"""
# Layer 1: Pattern matching (nhanh, ~5ms)
patterns, pattern_conf = self.pattern_matching(prompt)
# Layer 2: LLM analysis (chính xác, ~150ms với HolySheep <50ms)
llm_result = await self.analyze_with_llm(prompt)
# Kết hợp kết quả
is_injection = pattern_conf > 0.3 or llm_result.get("injection", False)
if pattern_conf > 0.8:
threat = ThreatLevel.CRITICAL
elif pattern_conf > 0.5:
threat = ThreatLevel.HIGH
elif pattern_conf > 0.3:
threat = ThreatLevel.MEDIUM
else:
threat = ThreatLevel.LOW if is_injection else ThreatLevel.SAFE
return DetectionResult(
is_injection=is_injection,
threat_level=threat,
matched_patterns=patterns,
confidence=max(pattern_conf, llm_result.get("risk_score", 0)),
analysis=llm_result.get("reason", "")
)
Usage example
detector = PromptInjectionDetector(api_key="YOUR_HOLYSHEEP_API_KEY")
Real-time Alert System với WebSocket
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
import json
import logging
from datetime import datetime
from collections import defaultdict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="AI Security Alert System")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = defaultdict(list)
self.alert_history: List[Dict] = []
async def connect(self, websocket: WebSocket, channel: str):
await websocket.accept()
self.active_connections[channel].append(websocket)
logger.info(f"Client connected to channel: {channel}")
def disconnect(self, websocket: WebSocket, channel: str):
if websocket in self.active_connections[channel]:
self.active_connections[channel].remove(websocket)
async def broadcast(self, channel: str, message: Dict):
disconnected = []
for connection in self.active_connections[channel]:
try:
await connection.send_json(message)
except:
disconnected.append(connection)
for conn in disconnected:
self.disconnect(conn, channel)
# Lưu vào history
self.alert_history.append({
**message,
"timestamp": datetime.utcnow().isoformat()
})
manager = ConnectionManager()
class AlertSystem:
def __init__(self, detector: PromptInjectionDetector):
self.detector = detector
self.alert_thresholds = {
ThreatLevel.LOW: False, # Không alert
ThreatLevel.MEDIUM: True, # Log only
ThreatLevel.HIGH: True, # Alert + Block
ThreatLevel.CRITICAL: True # Alert + Block + Kill session
}
async def process_prompt(self, prompt: str, session_id: str) -> Dict:
result = await self.detector.detect(prompt)
alert_data = {
"type": "detection_result",
"session_id": session_id,
"is_injection": result.is_injection,
"threat_level": result.threat_level.value,
"confidence": result.confidence,
"matched_patterns": result.matched_patterns,
"analysis": result.analysis,
"timestamp": datetime.utcnow().isoformat()
}
# Broadcast real-time alert
await manager.broadcast("security_alerts", alert_data)
# Log critical threats
if result.threat_level == ThreatLevel.CRITICAL:
logger.critical(f"CRITICAL THREAT detected in session {session_id}")
await manager.broadcast("critical_alerts", alert_data)
return alert_data
detector = PromptInjectionDetector(api_key="YOUR_HOLYSHEEP_API_KEY")
alert_system = AlertSystem(detector)
@app.websocket("/ws/alerts")
async def websocket_endpoint(websocket: WebSocket):
channel = websocket.query_params.get("channel", "security_alerts")
await manager.connect(websocket, channel)
try:
while True:
# Nhận prompt từ client
data = await websocket.receive_text()
payload = json.loads(data)
result = await alert_system.process_prompt(
payload["prompt"],
payload.get("session_id", "unknown")
)
# Gửi kết quả về cho client
await websocket.send_json({
"action": "analyzed",
"result": result
})
except WebSocketDisconnect:
manager.disconnect(websocket, channel)
Demo endpoint
@app.post("/analyze")
async def analyze_prompt(prompt: str, session_id: str = "demo"):
return await alert_system.process_prompt(prompt, session_id)
Khởi động server
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Frontend Dashboard với Real-time Updates
<!-- dashboard.html -->
<!DOCTYPE html>
<html lang="vi">
<head>
<meta charset="UTF-8">
<title>AI Security Dashboard</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: 'Segoe UI', sans-serif; background: #0f1419; color: #fff; }
.container { max-width: 1400px; margin: 0 auto; padding: 20px; }
.header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 30px; }
.stats { display: grid; grid-template-columns: repeat(4, 1fr); gap: 20px; margin-bottom: 30px; }
.stat-card { background: linear-gradient(135deg, #1a2332, #2d3748); padding: 25px; border-radius: 12px; }
.stat-card h3 { color: #a0aec0; font-size: 14px; margin-bottom: 10px; }
.stat-card .value { font-size: 32px; font-weight: bold; }
.stat-card.safe .value { color: #48bb78; }
.stat-card.danger .value { color: #fc8181; }
.alert-list { background: #1a2332; border-radius: 12px; padding: 20px; max-height: 500px; overflow-y: auto; }
.alert-item { padding: 15px; margin-bottom: 10px; border-radius: 8px; border-left: 4px solid; }
.alert-item.critical { background: #2d1f1f; border-color: #fc8181; }
.alert-item.high { background: #2d2a1f; border-color: #f6e05e; }
.alert-item.medium { background: #1f2d2d; border-color: #63b3ed; }
.prompt-input { width: 100%; height: 150px; background: #2d3748; border: none; border-radius: 8px; padding: 15px; color: #fff; font-family: monospace; margin-bottom: 15px; }
.btn-test { background: linear-gradient(135deg, #667eea, #764ba2); border: none; padding: 12px 30px; border-radius: 8px; color: #fff; font-weight: bold; cursor: pointer; }
.btn-test:hover { opacity: 0.9; }
.connection-status { display: flex; align-items: center; gap: 8px; }
.status-dot { width: 10px; height: 10px; border-radius: 50%; }
.status-dot.connected { background: #48bb78; }
.status-dot.disconnected { background: #fc8181; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🛡️ AI Security Monitor</h1>
<div class="connection-status">
<div class="status-dot" id="statusDot"></div>
<span id="statusText">Connecting...</span>
</div>
</div>
<div class="stats">
<div class="stat-card safe">
<h3>✅ Safe Requests</h3>
<div class="value" id="safeCount">0</div>
</div>
<div class="stat-card danger">
<h3>🚨 Blocked Attacks</h3>
<div class="value" id="blockedCount">0</div>
</div>
<div class="stat-card">
<h3>📊 Detection Rate</h3>
<div class="value" id="detectionRate">0%</div>
</div>
<div class="stat-card">
<h3>⚡ Avg Response</h3>
<div class="value" id="avgResponse">0ms</div>
</div>
</div>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;">
<div>
<h2 style="margin-bottom: 15px;">🔍 Test Prompt</h2>
<textarea class="prompt-input" id="promptInput" placeholder="Nhập prompt để test..."></textarea>
<button class="btn-test" onclick="testPrompt()">Analyze</button>
</div>
<div class="alert-list" id="alertList">
<h2 style="margin-bottom: 15px;">🚨 Real-time Alerts</h2>
<div id="alerts"></div>
</div>
</div>
</div>
<script>
let ws;
let stats = { safe: 0, blocked: 0, total: 0, responseTimes: [] };
function connect() {
ws = new WebSocket('ws://localhost:8000/ws/alerts?channel=security_alerts');
ws.onopen = () => {
document.getElementById('statusDot').className = 'status-dot connected';
document.getElementById('statusText').textContent = 'Connected';
};
ws.onclose = () => {
document.getElementById('statusDot').className = 'status-dot disconnected';
document.getElementById('statusText').textContent = 'Disconnected';
setTimeout(connect, 3000);
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'detection_result') {
addAlert(data);
updateStats(data);
}
};
}
async function testPrompt() {
const prompt = document.getElementById('promptInput').value;
if (!prompt) return;
const start = Date.now();
const response = await fetch('/analyze', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt, session_id: 'test-' + Date.now() })
});
const result = await response.json();
stats.responseTimes.push(Date.now() - start);
document.getElementById('avgResponse').textContent =
Math.round(stats.responseTimes.reduce((a,b) => a+b, 0) / stats.responseTimes.length) + 'ms';
}
function addAlert(data) {
const container = document.getElementById('alerts');
const alert = document.createElement('div');
alert.className = alert-item ${data.threat_level};
alert.innerHTML = `
<strong>${data.threat_level.toUpperCase()}</strong>
(Confidence: ${(data.confidence * 100).toFixed(1)}%)
<br><small>${data.analysis || 'Pattern matched: ' + data.matched_patterns.join(', ')}</small>
<br><small>${new Date().toLocaleTimeString()}</small>
`;
container.insertBefore(alert, container.firstChild);
}
function updateStats(data) {
stats.total++;
if (data.is_injection) {
stats.blocked++;
} else {
stats.safe++;
}
document.getElementById('safeCount').textContent = stats.safe;
document.getElementById('blockedCount').textContent = stats.blocked;
document.getElementById('detectionRate').textContent =
((stats.blocked / stats.total) * 100).toFixed(1) + '%';
}
connect();
</script>
</body>
</html>
Tối Ưu Chi Phí với HolySheep AI
Một trong những điểm mạnh của HolySheep AI là chi phí cực kỳ thấp. Với hệ thống detection của chúng ta, mỗi lần phân tích prompt tiêu tốn khoảng 500 tokens. Nếu xử lý 10,000 requests/ngày: