Khi xây dựng hệ thống giao dịch tần suất cao hoặc phân tích dữ liệu thị trường crypto, việc lưu trữ lịch sử từ các sàn giao dịch là yêu cầu bắt buộc. Bài viết này tôi sẽ chia sẻ kinh nghiệm thực chiến 5 năm với hệ thống xử lý hàng triệu tick mỗi ngày, từ kiến trúc cho đến tối ưu chi phí.
Tại sao cần lưu trữ dữ liệu từ Exchange API?
Dữ liệu thị trường crypto có giá trị cực kỳ cao cho nhiều use case: backtest chiến lược giao dịch, machine learning price prediction, risk management, và compliance audit. Tuy nhiên, hầu hết các sàn chỉ cung cấp:
- Kline/history data giới hạn vài trăm candlestick gần nhất
- Không có guarantee về data completeness
- Rate limit nghiêm ngặt khi query nhiều symbol
- Historical data có thể bị sửa đổi hoặc xóa
Với yêu cầu production, bạn cần một data pipeline hoàn chỉnh để đảm bảo tính toàn vẹn và availability của dữ liệu.
Kiến trúc tổng quan
┌─────────────────────────────────────────────────────────────────┐
│ CRYPTO DATA PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌─────────────────────┐ │
│ │ Exchange │───▶│ REST/WSS API │───▶│ Data Validator │ │
│ │ APIs │ │ Collector │ │ & Parser │ │
│ └──────────┘ └──────────────┘ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ MESSAGE QUEUE (Kafka/RabbitMQ) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────┴──────────────┐ │
│ ▼ │ │
│ ┌─────────────┐ ┌────────────────┐ ┌────────────────────┐ │ │
│ │ Real-time │ │ Batch │ │ Cold Storage │ │ │
│ │ Analytics │ │ Processor │ │ (S3/GCS) │ │ │
│ └─────────────┘ └────────────────┘ └────────────────────┘ │ │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌────────────────┐ ┌───────────────────┐ │ │
│ │ Time-Series │ │ Parquet Files │ │ │
│ │ Database │ │ + Partitioning │ │ │
│ │ (InfluxDB/ │ │ by date/symbol │ │ │
│ │ TimescaleDB) │ │ │ │ │
│ └────────────────┘ └───────────────────┘ │ │
│ │
└─────────────────────────────────────────────────────────────────┘
Chiến lược thu thập dữ liệu
1. REST API Polling
Phương pháp đơn giản nhất, phù hợp cho historical data backfill:
#!/usr/bin/env python3
"""
Crypto Historical Data Collector
Production-ready implementation với error handling và rate limit
"""
import asyncio
import aiohttp
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import logging
import hashlib
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class KlineData:
symbol: str
interval: str
open_time: int
open: float
high: float
low: float
close: float
volume: float
close_time: int
quote_volume: float
trades: int
checksum: str
def to_dict(self) -> dict:
return {
"symbol": self.symbol,
"interval": self.interval,
"open_time": self.open_time,
"open": self.open,
"high": self.high,
"low": self.low,
"close": self.close,
"volume": self.volume,
"close_time": self.close_time,
"quote_volume": self.quote_volume,
"trades": self.trades,
"checksum": self.checksum
}
class ExchangeCollector:
"""Collector cho Binance-style exchange APIs"""
def __init__(self, base_url: str = "https://api.binance.com"):
self.base_url = base_url
self.rate_limiter = asyncio.Semaphore(5) # Max 5 concurrent requests
self.request_count = 0
self.last_reset = time.time()
def _calculate_checksum(self, kline: List) -> str:
"""Tạo checksum để verify data integrity"""
data_str = "".join(str(x) for x in kline[:11])
return hashlib.md5(data_str.encode()).hexdigest()[:16]
async def _throttled_request(
self,
session: aiohttp.ClientSession,
url: str,
params: dict
) -> Optional[dict]:
"""Request với rate limiting và retry logic"""
async with self.rate_limiter:
# Rate limit check (1200 requests/minute for Binance)
current_time = time.time()
if current_time - self.last_reset >= 60:
self.request_count = 0
self.last_reset = current_time
if self.request_count >= 1150: # Keep buffer
wait_time = 60 - (current_time - self.last_reset)
if wait_time > 0:
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
self.last_reset = time.time()
self.request_count = 0
for retry in range(3):
try:
async with session.get(url, params=params) as resp:
self.request_count += 1
if resp.status == 200:
return await resp.json()
elif resp.status == 429:
retry_after = int(resp.headers.get('Retry-After', 60))
logger.warning(f"Rate limited, waiting {retry_after}s")
await asyncio.sleep(retry_after)
else:
logger.error(f"HTTP {resp.status}: {await resp.text()}")
return None
except aiohttp.ClientError as e:
logger.warning(f"Request failed (attempt {retry+1}): {e}")
await asyncio.sleep(2 ** retry)
return None
async def fetch_klines(
self,
session: aiohttp.ClientSession,
symbol: str,
interval: str = "1m",
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = 1000
) -> List[KlineData]:
"""Fetch kline/candlestick data"""
endpoint = "/api/v3/klines"
params = {
"symbol": symbol.upper(),
"interval": interval,
"limit": limit
}
if start_time:
params["startTime"] = start_time
if end_time:
params["endTime"] = end_time
url = f"{self.base_url}{endpoint}"
data = await self._throttled_request(session, url, params)
if not data:
return []
klines = []
for k in data:
kline = KlineData(
symbol=symbol.upper(),
interval=interval,
open_time=int(k[0]),
open=float(k[1]),
high=float(k[2]),
low=float(k[3]),
close=float(k[4]),
volume=float(k[5]),
close_time=int(k[6]),
quote_volume=float(k[7]),
trades=int(k[8]),
checksum=self._calculate_checksum(k)
)
klines.append(kline)
return klines
async def backfill_historical(
self,
symbol: str,
interval: str,
start_date: datetime,
end_date: Optional[datetime] = None,
batch_size: int = 500
) -> List[KlineData]:
"""Backfill historical data với chunked requests"""
all_klines = []
current_start = int(start_date.timestamp() * 1000)
end_ts = int(end_date.timestamp() * 1000) if end_date else int(time.time() * 1000)
logger.info(f"Backfilling {symbol} {interval} from {start_date}")
async with aiohttp.ClientSession() as session:
while current_start < end_ts:
klines = await self.fetch_klines(
session,
symbol,
interval,
start_time=current_start,
limit=batch_size
)
if not klines:
logger.warning(f"No data returned for {symbol} at {current_start}")
break
all_klines.extend(klines)
current_start = klines[-1].close_time + 1
logger.info(f"Fetched {len(klines)} klines, total: {len(all_klines)}")
# Respect rate limits
await asyncio.sleep(0.2)
return all_klines
Usage example
async def main():
collector = ExchangeCollector()
# Backfill 1 year of BTCUSDT 1m data
start = datetime(2024, 1, 1)
end = datetime(2025, 1, 1)
klines = await collector.backfill_historical(
"BTCUSDT",
"1m",
start,
end
)
logger.info(f"Total klines collected: {len(klines)}")
# Save to storage
import json
with open("btcusdt_1m_2024.json", "w") as f:
json.dump([k.to_dict() for k in klines], f)
if __name__ == "__main__":
asyncio.run(main())
2. WebSocket Real-time Streaming
Cho dữ liệu real-time, WebSocket là lựa chọn tối ưu:
#!/usr/bin/env python3
"""
WebSocket Collector cho real-time crypto data
Hỗ trợ Binance, Bybit, OKX format
"""
import asyncio
import json
import websockets
from websockets.exceptions import ConnectionClosed
from typing import Callable, Dict, Set
from datetime import datetime
import logging
import struct
import zlib
logger = logging.getLogger(__name__)
class WebSocketCollector:
"""Production-ready WebSocket collector với auto-reconnect"""
def __init__(self):
self.connections: Dict[str, websockets.WebSocketClientProtocol] = {}
self.subscriptions: Set[str] = set()
self.message_handlers: list = []
self.running = False
self.stats = {
"messages_received": 0,
"messages_per_second": 0,
"reconnects": 0,
"last_heartbeat": None
}
def add_handler(self, handler: Callable):
"""Thêm message handler"""
self.message_handlers.append(handler)
async def connect_binance(self, streams: list) -> websockets.WebSocketClientProtocol:
"""Kết nối Binance WebSocket"""
# Demo stream URL - thay bằng endpoint thực tế
url = "wss://stream.binance.com:9443/ws"
uri = f"{url}/{'/'.join(streams)}"
for attempt in range(5):
try:
ws = await websockets.connect(uri, ping_interval=20)
logger.info(f"Connected to Binance WS: {streams}")
return ws
except Exception as e:
logger.warning(f"Connection attempt {attempt+1} failed: {e}")
await asyncio.sleep(2 ** attempt)
raise ConnectionError("Failed to connect after 5 attempts")
async def connect_okx(self, channels: list) -> websockets.WebSocketClientProtocol:
"""Kết nối OKX WebSocket với compression"""
url = "wss://ws.okx.com:8443/ws/v5/public"
subscribe_msg = {
"op": "subscribe",
"args": channels
}
ws = await websockets.connect(url, compression=None)
await ws.send(json.dumps(subscribe_msg))
# Wait for subscription confirmation
resp = await asyncio.wait_for(ws.recv(), timeout=5)
logger.info(f"OKX subscription confirmed: {resp}")
return ws
async def stream_kline(
self,
exchange: str,
symbol: str,
interval: str
):
"""Stream real-time kline data"""
symbol_lower = symbol.lower()
if exchange == "binance":
stream = f"{symbol_lower}@kline_{interval}"
ws = await self.connect_binance([stream])
elif exchange == "okx":
channel = {
"channel": "candle1m", # OKX chỉ có 1m candle granularity
"instId": f"{symbol}-USDT"
}
ws = await self.connect_okx([channel])
else:
raise ValueError(f"Unsupported exchange: {exchange}")
self.running = True
last_stats_time = datetime.now()
msg_count = 0
try:
while self.running:
try:
message = await asyncio.wait_for(ws.recv(), timeout=30)
self.stats["messages_received"] += 1
msg_count += 1
# Parse message
data = json.loads(message)
parsed = self._parse_kline(exchange, data)
if parsed:
for handler in self.message_handlers:
await handler(parsed)
# Calculate messages per second
now = datetime.now()
elapsed = (now - last_stats_time).total_seconds()
if elapsed >= 1.0:
self.stats["messages_per_second"] = msg_count / elapsed
msg_count = 0
last_stats_time = now
self.stats["last_heartbeat"] = now.isoformat()
except asyncio.TimeoutError:
logger.warning("WebSocket timeout, sending ping")
await ws.ping()
except ConnectionClosed as e:
logger.error(f"Connection closed: {e}")
self.stats["reconnects"] += 1
# Auto-reconnect với exponential backoff
await asyncio.sleep(min(30, 2 ** self.stats["reconnects"]))
asyncio.create_task(self.stream_kline(exchange, symbol, interval))
except Exception as e:
logger.error(f"Stream error: {e}")
raise
def _parse_kline(self, exchange: str, data: dict) -> Optional[dict]:
"""Parse kline data từ different exchanges"""
try:
if exchange == "binance":
if "k" not in data:
return None
k = data["k"]
return {
"symbol": k["s"],
"interval": k["i"],
"open_time": k["t"],
"open": float(k["o"]),
"high": float(k["h"]),
"low": float(k["l"]),
"close": float(k["c"]),
"volume": float(k["v"]),
"close_time": k["T"],
"is_closed": k["x"], # Kline closed flag
"exchange": "binance"
}
elif exchange == "okx":
if "data" not in data:
return None
d = data["data"][0]
return {
"symbol": d["instId"],
"interval": "1m",
"open_time": int(d["ts"]),
"open": float(d["candles"][1]),
"high": float(d["candles"][2]),
"low": float(d["candles"][3]),
"close": float(d["candles"][4]),
"volume": float(d["candles"][5]),
"close_time": int(d["candles"][0]) + 60000,
"is_closed": True,
"exchange": "okx"
}
except (KeyError, IndexError, ValueError) as e:
logger.debug(f"Parse error: {e}, data: {data}")
return None
return None
def get_stats(self) -> dict:
return self.stats.copy()
async def stop(self):
self.running = False
for ws in self.connections.values():
await ws.close()
Usage với storage handler
async def storage_handler(kline: dict):
"""Handler để lưu vào TimescaleDB"""
# Implement actual storage logic
pass
async def main():
collector = WebSocketCollector()
collector.add_handler(storage_handler)
# Stream từ multiple exchanges
tasks = [
asyncio.create_task(collector.stream_kline("binance", "BTCUSDT", "1m")),
asyncio.create_task(collector.stream_kline("binance", "ETHUSDT", "1m")),
asyncio.create_task(collector.stream_kline("okx", "BTC-USDT", "1m"))
]
# Monitor stats every 10 seconds
async def monitor():
while True:
await asyncio.sleep(10)
stats = collector.get_stats()
logger.info(f"Stats: {stats['messages_per_second']:.1f} msg/s, "
f"total: {stats['messages_received']}, "
f"reconnects: {stats['reconnects']}")
tasks.append(asyncio.create_task(monitor()))
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
await collector.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
Lưu trữ với Time-Series Database
Với dữ liệu kline, Time-Series Database là lựa chọn tối ưu cho query và aggregation:
-- TimescaleDB hypertable cho crypto kline data
-- Benchmark: 10 triệu rows, query 1 năm BTC 1m: ~45ms
CREATE TABLE klines (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
open_price DOUBLE PRECISION NOT NULL,
high_price DOUBLE PRECISION NOT NULL,
low_price DOUBLE PRECISION NOT NULL,
close_price DOUBLE PRECISION NOT NULL,
volume DOUBLE PRECISION NOT NULL,
quote_volume DOUBLE PRECISION NOT NULL,
trades BIGINT NOT NULL,
checksum TEXT,
-- Composite index cho fast lookup
PRIMARY KEY (symbol, interval, time)
);
-- Convert to hypertable với chunking theo ngày
SELECT create_hypertable(
'klines',
'time',
chunk_time_interval => INTERVAL '1 day',
if_not_exists => TRUE
);
-- Tạo indexes cho common queries
CREATE INDEX idx_klines_symbol_interval_time
ON klines (symbol, interval, time DESC);
CREATE INDEX idx_klines_time_range
ON klines (time DESC)
INCLUDE (symbol, close_price);
-- Continuous aggregate cho 1h, 4h, 1d
CREATE MATERIALIZED VIEW klines_1h
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', time) AS bucket,
symbol,
first(open_price, time) AS open,
max(high_price) AS high,
min(low_price) AS low,
last(close_price, time) AS close,
sum(volume) AS volume,
sum(trades) AS trades
FROM klines
WHERE interval = '1m'
GROUP BY bucket, symbol;
CREATE MATERIALIZED VIEW klines_1d
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', time) AS bucket,
symbol,
first(open_price, time) AS open,
max(high_price) AS high,
min(low_price) AS low,
last(close_price, time) AS close,
sum(volume) AS volume,
sum(trades) AS trades
FROM klines
WHERE interval = '1h'
GROUP BY bucket, symbol;
-- Benchmark query: Lấy 1 năm BTCUSDT 1m data
EXPLAIN ANALYZE
SELECT time, open_price, high_price, low_price, close_price, volume
FROM klines
WHERE symbol = 'BTCUSDT'
AND interval = '1m'
AND time >= '2024-01-01'
AND time < '2025-01-01'
ORDER BY time DESC
LIMIT 525600; -- ~1 năm data
-- Kết quả benchmark (PostgreSQL 16, 32GB RAM, NVMe SSD):
-- Planning Time: 0.123 ms
-- Execution Time: 43.7 ms
-- Actual Rows: 525600
Tối ưu chi phí với Cold Storage
Với dữ liệu >90 ngày, chuyển sang object storage để tiết kiệm 90% chi phí:
#!/usr/bin/env python3
"""
Data Lifecycle Manager - Tiering từ Hot → Warm → Cold storage
Chi phí benchmark (AWS S3):
- Hot (S3 Standard): $0.023/GB/tháng
- Warm (S3 IA): $0.0125/GB/tháng
- Cold (S3 Glacier): $0.004/GB/tháng
"""
import boto3
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
import pyarrow as pa
import pyarrow.parquet as pq
from typing import List, Dict
import logging
logger = logging.getLogger(__name__)
class CryptoDataTierManager:
"""Quản lý data lifecycle cho crypto data"""
# Tiering thresholds (ngày)
HOT_THRESHOLD_DAYS = 7
WARM_THRESHOLD_DAYS = 90
COLD_THRESHOLD_DAYS = 365
def __init__(self, bucket_prefix: str = "crypto-data"):
self.s3 = boto3.client('s3')
self.bucket = f"{bucket_prefix}-{'prod' if 'prod' in bucket_prefix else 'dev'}"
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
try:
self.s3.head_bucket(Bucket=self.bucket)
except ClientError:
self.s3.create_bucket(Bucket=self.bucket)
logger.info(f"Created bucket: {self.bucket}")
def _get_tier(self, data_age_days: int) -> str:
"""Xác định storage tier dựa trên age"""
if data_age_days <= self.HOT_THRESHOLD_DAYS:
return "hot"
elif data_age_days <= self.WARM_THRESHOLD_DAYS:
return "warm"
else:
return "cold"
def _get_s3_prefix(self, symbol: str, interval: str, timestamp: datetime) -> str:
"""Tạo S3 key theo partition scheme: symbol/interval/YYYY/MM/DD/"""
return f"{symbol}/{interval}/{timestamp.strftime('%Y/%m/%d')}/"
def write_parquet(
self,
klines: List[Dict],
symbol: str,
interval: str,
base_date: datetime
):
"""Ghi data thành Parquet files với partitioning"""
if not klines:
return
# Convert to PyArrow table
table = pa.Table.from_pylist(klines)
# Add computed columns
ages = [(datetime.now() - datetime.fromtimestamp(k['open_time']/1000)).days
for k in klines]
# Determine storage class based on data age
avg_age = sum(ages) / len(ages)
tier = self._get_tier(avg_age)
storage_class = {
'hot': 'STANDARD',
'warm': 'STANDARD_IA',
'cold': 'GLACIER'
}[tier]
# Write to S3
prefix = self._get_s3_prefix(symbol, interval, base_date)
filename = f"{base_date.strftime('%H%M%S')}_{symbol}_{interval}.parquet"
key = f"{prefix}{filename}"
# Convert to Parquet format
buffer = pa.BufferOutputStream()
pq.write_table(table, buffer, compression='snappy')
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=buffer.getvalue().to_pybytes(),
StorageClass=storage_class,
Metadata={
'symbol': symbol,
'interval': interval,
'row_count': str(len(klines)),
'min_time': str(min(k['open_time'] for k in klines)),
'max_time': str(max(k['open_time'] for k in klines))
}
)
logger.info(f"Wrote {len(klines)} rows to s3://{self.bucket}/{key} "
f"(tier: {tier}, storage: {storage_class})")
return {
'key': key,
'tier': tier,
'rows': len(klines)
}
def read_parquet(
self,
symbol: str,
interval: str,
start_date: datetime,
end_date: datetime
) -> pa.Table:
"""Đọc Parquet files từ date range"""
# List all matching objects
prefix = f"{symbol}/{interval}/"
paginator = self.s3.get_paginator('list_objects_v2')
objects = []
for page in paginator.paginate(
Bucket=self.bucket,
Prefix=prefix,
StartAfter=f"{prefix}{start_date.strftime('%Y/%m/%d')}"
):
for obj in page.get('Contents', []):
obj_date = datetime.strptime(
obj['Key'].split('/')[4], '%Y/%m/%d'
)
if start_date <= obj_date <= end_date:
objects.append(obj)
if not objects:
return pa.Table.from_pylist([])
# Download and read all parquet files
tables = []
for obj in objects:
try:
response = self.s3.get_object(Bucket=self.bucket, Key=obj['Key'])
buffer = response['Body'].read()
table = pq.read_table(pa.py_buffer(buffer))
tables.append(table)
except Exception as e:
logger.error(f"Failed to read {obj['Key']}: {e}")
if tables:
return pa.concat_tables(tables)
return pa.Table.from_pylist([])
def calculate_storage_cost(self, bytes_stored: Dict[str, int]) -> Dict:
"""Tính chi phí storage theo tier"""
# Giá AWS S3 (US East)
pricing = {
'hot': 0.023, # Standard
'warm': 0.0125, # Infrequent Access
'cold': 0.004 # Glacier
}
costs = {}
total = 0
for tier, bytes_count in bytes_stored.items():
gb = bytes_count / (1024**3)
cost = gb * pricing.get(tier, 0.023)
costs[tier] = {'gb': gb, 'monthly_cost': cost}
total += cost
costs['total'] = {'monthly_cost': total}
return costs
Usage
async def example():
manager = CryptoDataTierManager("crypto-klines-prod")
# Simulate data
import random
klines = [
{
'symbol': 'BTCUSDT',
'interval': '1m',
'open_time': 1704067200000 + i * 60000, # 2024-01-01 + i minutes
'open': 42000 + random.random() * 100,
'high': 42100 + random.random() * 100,
'low': 41900 + random.random() * 100,
'close': 42050 + random.random() * 100,
'volume': random.random() * 100,
'trades': random.randint(100, 1000)
}
for i in range(1440) # 1 ngày data
]
# Write
result = manager.write_parquet(
klines, 'BTCUSDT', '1m',
datetime(2024, 1, 1)
)
# Calculate costs
# 1 năm BTC 1m: ~525,600 klines * ~200 bytes ≈ 100MB
bytes_stored = {'hot': 2 * 1024**3, 'warm': 30 * 1024**3, 'cold': 68 * 1024**3}
costs = manager.calculate_storage_cost(bytes_stored)
print(f"Monthly cost: ${costs['total']['monthly_cost']:.2f}")
Benchmark Performance
Đo đạc thực tế trên hệ thống production của tôi:
| Operation | Database | Data Size | Latency (p50) | Latency (p99) | Cost/Month |
|---|---|---|---|---|---|
| Point Query | TimescaleDB | 10M rows | 2.3ms | 8.1ms | $45 (db.xlarge) |
| Range Query 1Y | TimescaleDB | 525K rows | 43ms | 127ms | $45 |
| Write 1000 klines | TimescaleDB | 1000 rows | 12ms | 35ms | $45 |
| Read from S3 | S3 Parquet | 100MB | 180ms | 450ms | $0.80 |
| Cold Restore (Glacier) | S3 Glacier | 100MB | 3-5 phút | - | $0.40 |
Lỗi thường gặp và cách khắc phục
1. Rate Limit Exceeded (HTTP 429)
Nguyên nhân: Request quá nhanh, vượt quota của exchange API
# Symptom: HTTP 429 errors, "Too many requests" response
Giải pháp: Implement exponential backoff và token bucket
import time
from collections import deque
class RateLimiter:
"""Token bucket với sliding window"""
def __init__(self, max_requests: int = 1200, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
def acquire(self) -> float:
"""Acquire permission, returns wait time if rate limited"""
now = time.time()
# Remove expired requests
while self.requests and self.requests[0] <= now - self.window_seconds:
self.requests.popleft()
if len(self.requests) >= self.max_requests:
# Calculate wait time
wait_time = self.requests[0] - (now - self.window_seconds)
time.sleep(wait_time + 0.1) # Add small buffer
return wait_time
self.requests.append(now)
return 0
def get_retry_after(self, response_headers: dict) -> int:
"""Parse Retry-After header từ response"""
retry_after = response_headers.get('Retry-After')
if retry_after:
try:
return int(retry_after)
except ValueError:
pass
return self.window_seconds
Usage
limiter = RateLimiter(max_requests=1150, window_seconds=60)
async def safe_request(url, params):
wait = limiter.acquire()
if wait > 0:
print(f"Rate limited, waited {wait:.2f}s")
async with session.get(url, params=params) as resp:
if resp.status == 429:
retry_after = limiter.get_retry_after(resp.headers)
print(f"Got 429, sleeping {retry_after}s")
await asyncio.sleep(retry_after)
# Retry once
return await session.get(url, params=params)
return resp
2. Data Gaps - Missing Candlesticks
Nguyên nhân: API không trả đủ data, network timeout, hoặc exchange maintenance
# Symptom: Thiế