Giới thiệu — Tại sao cần theo dõi API như một người canh gác
Mình từng làm việc tại một startup AI ở Việt Nam, và một ngày đẹp trời, hệ thống báo lỗi hàng loạt. Kiểm tra log mới phát hiện — có ai đó đang gọi API liên tục với tần suất bất thường, chi phí tăng vọt từ $200/tháng lên $4,800 chỉ trong 3 ngày. Đó là lúc mình nhận ra: nếu không có hệ thống giám sát và tự động ngăn chặn, tài khoản API của bạn có thể bị "khai thác" theo cách không ai ngờ tới.
Trong bài viết này, mình sẽ chia sẻ cách xây dựng một hệ thống giám sát API hoàn chỉnh, phát hiện các mẫu gọi bất thường và tự động chặn trước khi thiệt hại lan rộng. Tất cả code đều sử dụng
HolySheep AI — nền tảng có tỷ giá ¥1=$1 (tiết kiệm 85%+ so với các provider khác), hỗ trợ WeChat/Alipay, độ trễ dưới 50ms, và tín dụng miễn phí khi đăng ký.
API là gì? Hiểu đơn giản như đặt đồ ăn
Nếu bạn chưa từng làm việc với API, hãy tưởng tượng API như một "quầy bar" trong nhà hàng:
- Bạn (ứng dụng) gọi món qua API = đưa yêu cầu cho bartender
- Bartender (server API) pha chế và trả đồ = gửi kết quả về
- Bạn trả tiền = trả phí theo số lượng yêu cầu
Mỗi lần gọi API, bạn tốn tiền. Nếu có người "trà trộn" vào gọi món liên tục với thẻ của bạn — đó chính là "异常调用" (gọi bất thường). Bài viết hôm nay sẽ hướng dẫn bạn phát hiện và ngăn chặn điều này.
Kiến trúc hệ thống giám sát
Trước khi code, hãy hiểu bức tranh toàn cảnh:
┌─────────────────────────────────────────────────────────────┐
│ HỆ THỐNG GIÁM SÁT API │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Proxy │───▶│ Monitor │───▶│ Auto-ban │ │
│ │ Layer │ │ Service │ │ Service │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Log │ │ Pattern │ │ Block │ │
│ │ Store │ │ Analysis │ │ Manager │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Phần 1: Thiết lập Logging Proxy — Ghi nhật ký mọi cuộc gọi
Đầu tiên, bạn cần ghi lại TẤT CẢ các yêu cầu API. Mình sử dụng một proxy đơn giản bằng Python:
# api_monitor_proxy.py
Proxy ghi nhật ký mọi cuộc gọi API HolySheep
import requests
import time
import json
from datetime import datetime
from collections import defaultdict
Cấu hình HolySheep API
BASE_URL = "https://api.holysheep.ai/v1"
API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Thay bằng API key thật
Lưu trữ tạm thời cho logging
api_call_log = []
rate_limit_store = defaultdict(list) # Lưu timestamp theo IP/API key
class APIMonitorProxy:
def __init__(self):
self.headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
def log_call(self, endpoint, method, status_code, latency_ms,
request_data=None, ip_address="unknown"):
"""Ghi nhật ký một cuộc gọi API"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"endpoint": endpoint,
"method": method,
"status_code": status_code,
"latency_ms": latency_ms,
"ip_address": ip_address,
"request_data": request_data,
"tokens_used": request_data.get("max_tokens", 0) if request_data else 0
}
api_call_log.append(log_entry)
print(f"[{log_entry['timestamp']}] {method} {endpoint} - {status_code} - {latency_ms}ms")
def call_api(self, endpoint, payload, ip_address="unknown"):
"""Gọi API thông qua proxy có giám sát"""
start_time = time.time()
try:
response = requests.post(
f"{BASE_URL}/{endpoint}",
headers=self.headers,
json=payload,
timeout=30
)
latency_ms = round((time.time() - start_time) * 1000, 2)
self.log_call(endpoint, "POST", response.status_code,
latency_ms, payload, ip_address)
return response
except requests.exceptions.RequestException as e:
latency_ms = round((time.time() - start_time) * 1000, 2)
self.log_call(endpoint, "POST", "ERROR", latency_ms, payload, ip_address)
raise
Khởi tạo proxy
monitor = APIMonitorProxy()
Ví dụ gọi API chatbot
def chatbot_completion(user_message, ip="127.0.0.1"):
payload = {
"model": "gpt-4.1", # $8/MTok - giá rẻ hơn 85%
"messages": [
{"role": "system", "content": "Bạn là trợ lý AI thân thiện"},
{"role": "user", "content": user_message}
],
"max_tokens": 1000,
"temperature": 0.7
}
response = monitor.call_api("chat/completions", payload, ip)
return response.json()
Test thử
if __name__ == "__main__":
print("=== Test API Monitor Proxy ===")
result = chatbot_completion("Xin chào, hãy giới thiệu về HolySheep AI", "192.168.1.100")
print(f"\nKết quả: {json.dumps(result, indent=2, ensure_ascii=False)}")
print(f"\nTổng cuộc gọi đã ghi: {len(api_call_log)}")
Chạy thử code trên:
# Terminal output:
=== Test API Monitor Proxy ===
[2026-01-15T10:30:45.123] POST chat/completions - 200 - 45ms
Kết quả: {..."choices": [...]}...
Tổng cuộc gọi đã ghi: 1
Phần 2: Phát hiện mẫu gọi bất thường
Đây là phần quan trọng nhất — phân tích các mẫu (pattern) và phát hiện bất thường. Mình đã rút kinh nghiệm từ vụ việc khiến công ty mình thiệt hại nặng nề:
# anomaly_detector.py
Phát hiện các mẫu gọi API bất thường
from collections import defaultdict
from datetime import datetime, timedelta
import statistics
class AnomalyDetector:
def __init__(self):
# Ngưỡng cảnh báo (có thể điều chỉnh)
self.thresholds = {
"requests_per_minute": 60, # Quá 60 req/phút = bất thường
"requests_per_hour": 500, # Quá 500 req/giờ = nghi ngờ
"tokens_per_minute": 50000, # Quá 50K tokens/phút
"failed_requests_percent": 0.3, # Quá 30% lỗi = có vấn đề
"unique_ips_per_key": 5, # Nhiều IP bất thường
"avg_latency_ms": 2000, # Latency cao bất thường
}
# Lưu trữ theo thời gian
self.key_stats = defaultdict(lambda: {
"requests": [],
"tokens": [],
"ips": set(),
"errors": [],
"latencies": []
})
def analyze_request(self, api_key, log_entry):
"""Phân tích một yêu cầu và cập nhật thống kê"""
key = api_key or "anonymous"
stats = self.key_stats[key]
timestamp = datetime.fromisoformat(log_entry["timestamp"])
stats["requests"].append(timestamp)
stats["tokens"].append(log_entry.get("tokens_used", 0))
stats["ips"].add(log_entry["ip_address"])
stats["latencies"].append(log_entry["latency_ms"])
if log_entry["status_code"] != 200:
stats["errors"].append({
"timestamp": timestamp,
"status": log_entry["status_code"]
})
# Phát hiện bất thường
anomalies = self.detect_anomalies(key)
return anomalies
def detect_anomalies(self, key):
"""Phát hiện các bất thường cho một API key"""
stats = self.key_stats[key]
anomalies = []
now = datetime.now()
# 1. Kiểm tra tần suất theo phút
one_minute_ago = now - timedelta(minutes=1)
recent_requests = [r for r in stats["requests"] if r > one_minute_ago]
if len(recent_requests) > self.thresholds["requests_per_minute"]:
anomalies.append({
"type": "HIGH_FREQUENCY",
"severity": "CRITICAL",
"message": f"Quá {len(recent_requests)} yêu cầu trong 1 phút (ngưỡng: {self.thresholds['requests_per_minute']})",
"value": len(recent_requests),
"threshold": self.thresholds["requests_per_minute"]
})
# 2. Kiểm tra tần suất theo giờ
one_hour_ago = now - timedelta(hours=1)
hourly_requests = [r for r in stats["requests"] if r > one_hour_ago]
hourly_tokens = sum(stats["tokens"][-len([r for r in stats["requests"] if r > one_hour_ago]):])
if len(hourly_requests) > self.thresholds["requests_per_hour"]:
anomalies.append({
"type": "VERY_HIGH_FREQUENCY",
"severity": "WARNING",
"message": f"Quá {len(hourly_requests)} yêu cầu trong 1 giờ",
"value": len(hourly_requests),
"threshold": self.thresholds["requests_per_hour"]
})
# 3. Kiểm tra tokens/phút
tokens_this_minute = sum([
stats["tokens"][i]
for i, t in enumerate(stats["requests"])
if t > one_minute_ago
])
if tokens_this_minute > self.thresholds["tokens_per_minute"]:
anomalies.append({
"type": "HIGH_TOKEN_USAGE",
"severity": "CRITICAL",
"message": f"Sử dụng {tokens_this_minute:,} tokens trong 1 phút",
"value": tokens_this_minute,
"threshold": self.thresholds["tokens_per_minute"]
})
# 4. Kiểm tra tỷ lệ lỗi
total_requests = len(stats["requests"])
total_errors = len(stats["errors"])
if total_requests > 10:
error_rate = total_errors / total_requests
if error_rate > self.thresholds["failed_requests_percent"]:
anomalies.append({
"type": "HIGH_ERROR_RATE",
"severity": "WARNING",
"message": f"Tỷ lệ lỗi {error_rate*100:.1f}% (ngưỡng: {self.thresholds['failed_requests_percent']*100}%)",
"value": error_rate,
"threshold": self.thresholds["failed_requests_percent"]
})
# 5. Kiểm tra nhiều IP khác nhau
if len(stats["ips"]) > self.thresholds["unique_ips_per_key"]:
anomalies.append({
"type": "MULTIPLE_IPS",
"severity": "WARNING",
"message": f"API key được sử dụng từ {len(stats['ips'])} IP khác nhau",
"value": len(stats["ips"]),
"threshold": self.thresholds["unique_ips_per_key"],
"ips": list(stats["ips"])
})
# 6. Kiểm tra latency bất thường
if len(stats["latencies"]) >= 5:
avg_latency = statistics.mean(stats["latencies"][-10:])
if avg_latency > self.thresholds["avg_latency_ms"]:
anomalies.append({
"type": "HIGH_LATENCY",
"severity": "INFO",
"message": f"Latency trung bình {avg_latency:.0f}ms (ngưỡng: {self.thresholds['avg_latency_ms']}ms)",
"value": avg_latency,
"threshold": self.thresholds["avg_latency_ms"]
})
return anomalies
def get_stats_summary(self, key):
"""Lấy tóm tắt thống kê cho một API key"""
stats = self.key_stats[key]
now = datetime.now()
return {
"api_key": key[:8] + "..." if len(key) > 8 else key,
"total_requests": len(stats["requests"]),
"unique_ips": len(stats["ips"]),
"total_tokens": sum(stats["tokens"]),
"error_count": len(stats["errors"]),
"requests_last_hour": len([r for r in stats["requests"] if r > now - timedelta(hours=1)]),
"requests_last_minute": len([r for r in stats["requests"] if r > now - timedelta(minutes=1)])
}
Demo phát hiện bất thường
if __name__ == "__main__":
detector = AnomalyDetector()
# Giả lập: API key bị gọi từ nhiều IP với tần suất cao
print("=== Demo phát hiện bất thường ===\n")
# Gọi 70 lần trong 1 phút (vượt ngưỡng 60)
for i in range(70):
log = {
"timestamp": datetime.now().isoformat(),
"ip_address": f"192.168.1.{i % 10}",
"status_code": 200,
"tokens_used": 1000
}
anomalies = detector.analyze_request("sk-test-key-12345", log)
# Kiểm tra stats
summary = detector.get_stats_summary("sk-test-key-12345")
print(f"Tổng quan: {summary}")
print(f"\n⚠️ Bất thường phát hiện được: {len(anomalies)}")
for anomaly in anomalies:
print(f"\n🚨 [{anomaly['severity']}] {anomaly['type']}")
print(f" {anomaly['message']}")
print(f" Giá trị: {anomaly['value']} (ngưỡng: {anomaly['threshold']})")
Kết quả chạy demo:
# === Demo phát hiện bất thường ===
Tổng quan: {'api_key': 'sk-test-k...', 'total_requests': 70, 'unique_ips': 10, 'total_tokens': 70000, 'error_count': 0, 'requests_last_hour': 70, 'requests_last_minute': 70}
#
⚠️ Bất thường phát hiện được: 3
#
🚨 [CRITICAL] HIGH_FREQUENCY
Quá 70 yêu cầu trong 1 phút (ngưỡng: 60)
Giá trị: 70 (ngưỡng: 60)
#
🚨 [WARNING] HIGH_TOKEN_USAGE
Sử dụng 70,000 tokens trong 1 phút
Giá trị: 70000 (ngưỡng: 50000)
#
🚨 [WARNING] MULTIPLE_IPS
API key được sử dụng từ 10 IP khác nhau
Giá trị: 10 (ngưỡng: 5)
Phần 3: Hệ thống tự động chặn (Auto-ban)
Khi phát hiện bất thường nghiêm trọng, hệ thống cần tự động chặn để bảo vệ tài khoản:
# auto_ban_manager.py
Hệ thống tự động chặn IP/API key nghi ngờ
import time
from datetime import datetime, timedelta
from collections import defaultdict
import json
class AutoBanManager:
def __init__(self):
# Danh sách chặn
self.banned_ips = {} # IP -> {reason, banned_at, expires_at}
self.banned_keys = {} # API Key -> {reason, banned_at, expires_at}
self.ban_history = [] #
Tài nguyên liên quan
Bài viết liên quan