Giới thiệu — Tại sao cần theo dõi API như một người canh gác

Mình từng làm việc tại một startup AI ở Việt Nam, và một ngày đẹp trời, hệ thống báo lỗi hàng loạt. Kiểm tra log mới phát hiện — có ai đó đang gọi API liên tục với tần suất bất thường, chi phí tăng vọt từ $200/tháng lên $4,800 chỉ trong 3 ngày. Đó là lúc mình nhận ra: nếu không có hệ thống giám sát và tự động ngăn chặn, tài khoản API của bạn có thể bị "khai thác" theo cách không ai ngờ tới. Trong bài viết này, mình sẽ chia sẻ cách xây dựng một hệ thống giám sát API hoàn chỉnh, phát hiện các mẫu gọi bất thường và tự động chặn trước khi thiệt hại lan rộng. Tất cả code đều sử dụng HolySheep AI — nền tảng có tỷ giá ¥1=$1 (tiết kiệm 85%+ so với các provider khác), hỗ trợ WeChat/Alipay, độ trễ dưới 50ms, và tín dụng miễn phí khi đăng ký.

API là gì? Hiểu đơn giản như đặt đồ ăn

Nếu bạn chưa từng làm việc với API, hãy tưởng tượng API như một "quầy bar" trong nhà hàng: Mỗi lần gọi API, bạn tốn tiền. Nếu có người "trà trộn" vào gọi món liên tục với thẻ của bạn — đó chính là "异常调用" (gọi bất thường). Bài viết hôm nay sẽ hướng dẫn bạn phát hiện và ngăn chặn điều này.

Kiến trúc hệ thống giám sát

Trước khi code, hãy hiểu bức tranh toàn cảnh:
┌─────────────────────────────────────────────────────────────┐
│                    HỆ THỐNG GIÁM SÁT API                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐               │
│  │  Proxy   │───▶│ Monitor  │───▶│ Auto-ban │               │
│  │  Layer   │    │  Service │    │  Service │               │
│  └──────────┘    └──────────┘    └──────────┘               │
│       │               │               │                      │
│       ▼               ▼               ▼                      │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐               │
│  │  Log     │    │ Pattern  │    │ Block    │               │
│  │  Store   │    │ Analysis │    │ Manager  │               │
│  └──────────┘    └──────────┘    └──────────┘               │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Phần 1: Thiết lập Logging Proxy — Ghi nhật ký mọi cuộc gọi

Đầu tiên, bạn cần ghi lại TẤT CẢ các yêu cầu API. Mình sử dụng một proxy đơn giản bằng Python:
# api_monitor_proxy.py

Proxy ghi nhật ký mọi cuộc gọi API HolySheep

import requests import time import json from datetime import datetime from collections import defaultdict

Cấu hình HolySheep API

BASE_URL = "https://api.holysheep.ai/v1" API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Thay bằng API key thật

Lưu trữ tạm thời cho logging

api_call_log = [] rate_limit_store = defaultdict(list) # Lưu timestamp theo IP/API key class APIMonitorProxy: def __init__(self): self.headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } def log_call(self, endpoint, method, status_code, latency_ms, request_data=None, ip_address="unknown"): """Ghi nhật ký một cuộc gọi API""" log_entry = { "timestamp": datetime.now().isoformat(), "endpoint": endpoint, "method": method, "status_code": status_code, "latency_ms": latency_ms, "ip_address": ip_address, "request_data": request_data, "tokens_used": request_data.get("max_tokens", 0) if request_data else 0 } api_call_log.append(log_entry) print(f"[{log_entry['timestamp']}] {method} {endpoint} - {status_code} - {latency_ms}ms") def call_api(self, endpoint, payload, ip_address="unknown"): """Gọi API thông qua proxy có giám sát""" start_time = time.time() try: response = requests.post( f"{BASE_URL}/{endpoint}", headers=self.headers, json=payload, timeout=30 ) latency_ms = round((time.time() - start_time) * 1000, 2) self.log_call(endpoint, "POST", response.status_code, latency_ms, payload, ip_address) return response except requests.exceptions.RequestException as e: latency_ms = round((time.time() - start_time) * 1000, 2) self.log_call(endpoint, "POST", "ERROR", latency_ms, payload, ip_address) raise

Khởi tạo proxy

monitor = APIMonitorProxy()

Ví dụ gọi API chatbot

def chatbot_completion(user_message, ip="127.0.0.1"): payload = { "model": "gpt-4.1", # $8/MTok - giá rẻ hơn 85% "messages": [ {"role": "system", "content": "Bạn là trợ lý AI thân thiện"}, {"role": "user", "content": user_message} ], "max_tokens": 1000, "temperature": 0.7 } response = monitor.call_api("chat/completions", payload, ip) return response.json()

Test thử

if __name__ == "__main__": print("=== Test API Monitor Proxy ===") result = chatbot_completion("Xin chào, hãy giới thiệu về HolySheep AI", "192.168.1.100") print(f"\nKết quả: {json.dumps(result, indent=2, ensure_ascii=False)}") print(f"\nTổng cuộc gọi đã ghi: {len(api_call_log)}")
Chạy thử code trên:
# Terminal output:

=== Test API Monitor Proxy ===

[2026-01-15T10:30:45.123] POST chat/completions - 200 - 45ms

Kết quả: {..."choices": [...]}...

Tổng cuộc gọi đã ghi: 1

Phần 2: Phát hiện mẫu gọi bất thường

Đây là phần quan trọng nhất — phân tích các mẫu (pattern) và phát hiện bất thường. Mình đã rút kinh nghiệm từ vụ việc khiến công ty mình thiệt hại nặng nề:
# anomaly_detector.py

Phát hiện các mẫu gọi API bất thường

from collections import defaultdict from datetime import datetime, timedelta import statistics class AnomalyDetector: def __init__(self): # Ngưỡng cảnh báo (có thể điều chỉnh) self.thresholds = { "requests_per_minute": 60, # Quá 60 req/phút = bất thường "requests_per_hour": 500, # Quá 500 req/giờ = nghi ngờ "tokens_per_minute": 50000, # Quá 50K tokens/phút "failed_requests_percent": 0.3, # Quá 30% lỗi = có vấn đề "unique_ips_per_key": 5, # Nhiều IP bất thường "avg_latency_ms": 2000, # Latency cao bất thường } # Lưu trữ theo thời gian self.key_stats = defaultdict(lambda: { "requests": [], "tokens": [], "ips": set(), "errors": [], "latencies": [] }) def analyze_request(self, api_key, log_entry): """Phân tích một yêu cầu và cập nhật thống kê""" key = api_key or "anonymous" stats = self.key_stats[key] timestamp = datetime.fromisoformat(log_entry["timestamp"]) stats["requests"].append(timestamp) stats["tokens"].append(log_entry.get("tokens_used", 0)) stats["ips"].add(log_entry["ip_address"]) stats["latencies"].append(log_entry["latency_ms"]) if log_entry["status_code"] != 200: stats["errors"].append({ "timestamp": timestamp, "status": log_entry["status_code"] }) # Phát hiện bất thường anomalies = self.detect_anomalies(key) return anomalies def detect_anomalies(self, key): """Phát hiện các bất thường cho một API key""" stats = self.key_stats[key] anomalies = [] now = datetime.now() # 1. Kiểm tra tần suất theo phút one_minute_ago = now - timedelta(minutes=1) recent_requests = [r for r in stats["requests"] if r > one_minute_ago] if len(recent_requests) > self.thresholds["requests_per_minute"]: anomalies.append({ "type": "HIGH_FREQUENCY", "severity": "CRITICAL", "message": f"Quá {len(recent_requests)} yêu cầu trong 1 phút (ngưỡng: {self.thresholds['requests_per_minute']})", "value": len(recent_requests), "threshold": self.thresholds["requests_per_minute"] }) # 2. Kiểm tra tần suất theo giờ one_hour_ago = now - timedelta(hours=1) hourly_requests = [r for r in stats["requests"] if r > one_hour_ago] hourly_tokens = sum(stats["tokens"][-len([r for r in stats["requests"] if r > one_hour_ago]):]) if len(hourly_requests) > self.thresholds["requests_per_hour"]: anomalies.append({ "type": "VERY_HIGH_FREQUENCY", "severity": "WARNING", "message": f"Quá {len(hourly_requests)} yêu cầu trong 1 giờ", "value": len(hourly_requests), "threshold": self.thresholds["requests_per_hour"] }) # 3. Kiểm tra tokens/phút tokens_this_minute = sum([ stats["tokens"][i] for i, t in enumerate(stats["requests"]) if t > one_minute_ago ]) if tokens_this_minute > self.thresholds["tokens_per_minute"]: anomalies.append({ "type": "HIGH_TOKEN_USAGE", "severity": "CRITICAL", "message": f"Sử dụng {tokens_this_minute:,} tokens trong 1 phút", "value": tokens_this_minute, "threshold": self.thresholds["tokens_per_minute"] }) # 4. Kiểm tra tỷ lệ lỗi total_requests = len(stats["requests"]) total_errors = len(stats["errors"]) if total_requests > 10: error_rate = total_errors / total_requests if error_rate > self.thresholds["failed_requests_percent"]: anomalies.append({ "type": "HIGH_ERROR_RATE", "severity": "WARNING", "message": f"Tỷ lệ lỗi {error_rate*100:.1f}% (ngưỡng: {self.thresholds['failed_requests_percent']*100}%)", "value": error_rate, "threshold": self.thresholds["failed_requests_percent"] }) # 5. Kiểm tra nhiều IP khác nhau if len(stats["ips"]) > self.thresholds["unique_ips_per_key"]: anomalies.append({ "type": "MULTIPLE_IPS", "severity": "WARNING", "message": f"API key được sử dụng từ {len(stats['ips'])} IP khác nhau", "value": len(stats["ips"]), "threshold": self.thresholds["unique_ips_per_key"], "ips": list(stats["ips"]) }) # 6. Kiểm tra latency bất thường if len(stats["latencies"]) >= 5: avg_latency = statistics.mean(stats["latencies"][-10:]) if avg_latency > self.thresholds["avg_latency_ms"]: anomalies.append({ "type": "HIGH_LATENCY", "severity": "INFO", "message": f"Latency trung bình {avg_latency:.0f}ms (ngưỡng: {self.thresholds['avg_latency_ms']}ms)", "value": avg_latency, "threshold": self.thresholds["avg_latency_ms"] }) return anomalies def get_stats_summary(self, key): """Lấy tóm tắt thống kê cho một API key""" stats = self.key_stats[key] now = datetime.now() return { "api_key": key[:8] + "..." if len(key) > 8 else key, "total_requests": len(stats["requests"]), "unique_ips": len(stats["ips"]), "total_tokens": sum(stats["tokens"]), "error_count": len(stats["errors"]), "requests_last_hour": len([r for r in stats["requests"] if r > now - timedelta(hours=1)]), "requests_last_minute": len([r for r in stats["requests"] if r > now - timedelta(minutes=1)]) }

Demo phát hiện bất thường

if __name__ == "__main__": detector = AnomalyDetector() # Giả lập: API key bị gọi từ nhiều IP với tần suất cao print("=== Demo phát hiện bất thường ===\n") # Gọi 70 lần trong 1 phút (vượt ngưỡng 60) for i in range(70): log = { "timestamp": datetime.now().isoformat(), "ip_address": f"192.168.1.{i % 10}", "status_code": 200, "tokens_used": 1000 } anomalies = detector.analyze_request("sk-test-key-12345", log) # Kiểm tra stats summary = detector.get_stats_summary("sk-test-key-12345") print(f"Tổng quan: {summary}") print(f"\n⚠️ Bất thường phát hiện được: {len(anomalies)}") for anomaly in anomalies: print(f"\n🚨 [{anomaly['severity']}] {anomaly['type']}") print(f" {anomaly['message']}") print(f" Giá trị: {anomaly['value']} (ngưỡng: {anomaly['threshold']})")
Kết quả chạy demo:
# === Demo phát hiện bất thường ===

Tổng quan: {'api_key': 'sk-test-k...', 'total_requests': 70, 'unique_ips': 10, 'total_tokens': 70000, 'error_count': 0, 'requests_last_hour': 70, 'requests_last_minute': 70}

#

⚠️ Bất thường phát hiện được: 3

#

🚨 [CRITICAL] HIGH_FREQUENCY

Quá 70 yêu cầu trong 1 phút (ngưỡng: 60)

Giá trị: 70 (ngưỡng: 60)

#

🚨 [WARNING] HIGH_TOKEN_USAGE

Sử dụng 70,000 tokens trong 1 phút

Giá trị: 70000 (ngưỡng: 50000)

#

🚨 [WARNING] MULTIPLE_IPS

API key được sử dụng từ 10 IP khác nhau

Giá trị: 10 (ngưỡng: 5)

Phần 3: Hệ thống tự động chặn (Auto-ban)

Khi phát hiện bất thường nghiêm trọng, hệ thống cần tự động chặn để bảo vệ tài khoản:
# auto_ban_manager.py

Hệ thống tự động chặn IP/API key nghi ngờ

import time from datetime import datetime, timedelta from collections import defaultdict import json class AutoBanManager: def __init__(self): # Danh sách chặn self.banned_ips = {} # IP -> {reason, banned_at, expires_at} self.banned_keys = {} # API Key -> {reason, banned_at, expires_at} self.ban_history = [] #