ในฐานะวิศวกรที่ทำงานกับข้อมูลคริปโตมาหลายปี ผมเคยเจอปัญหานี้ซ้ำแล้วซ้ำเล่า — การดึงข้อมูล OHLCV (Open-High-Low-Close-Volume) จาก exchange API ดูเหมือนง่าย แต่พอเอาไปใช้จริงกลับมีดอกศูนย์แตกมากมาย วันนี้ผมจะแชร์ architecture ที่พิสูจน์แล้วว่าใช้ได้ใน production รวมถึงโค้ดที่รันได้จริงพร้อม benchmark
สถาปัตยกรรมระบบ ETL โดยรวม
ก่อนลงรายละเอียดโค้ด มาดูภาพรวมของสถาปัตยกรรมกันก่อน ระบบ ETL สำหรับข้อมูลคริปโตที่ดีต้องมีองค์ประกอบหลักดังนี้:
- Data Fetcher — ดึงข้อมูลจาก exchange API โดยรองรับ rate limiting และ retry logic
- Data Validator — ตรวจสอบความถูกต้องของข้อมูล รวมถึง anomaly detection
- Data Transformer — แปลงข้อมูลให้อยู่ในรูปแบบมาตรฐาน
- Data Loader — เก็บข้อมูลลง data warehouse หรือ database
- State Manager — จัดการ state สำหรับ incremental fetch
การตั้งค่า Environment และ Dependencies
# requirements.txt
aiohttp==3.9.1
asyncio==3.4.3
pandas==2.1.4
numpy==1.26.2
pyarrow==14.0.2
msgspec==0.18.4
tenacity==8.2.3
prometheus-client==0.19.0
redis==5.0.1
สำหรับ testing
pytest==7.4.3
pytest-asyncio==0.21.1
httpx==0.25.2
# config.py
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class ExchangeConfig:
"""Configuration สำหรับ exchange API"""
name: str
base_url: str
rate_limit_per_minute: int = 1200
timeout_seconds: int = 30
max_retries: int = 5
retry_backoff_factor: float = 2.0
@dataclass
class DataConfig:
"""Configuration สำหรับ data processing"""
batch_size: int = 1000
max_workers: int = 10
checkpoint_interval: int = 100
anomaly_threshold: float = 0.05 # 5% deviation threshold
Exchange configurations
EXCHANGES = {
"binance": ExchangeConfig(
name="binance",
base_url="https://api.binance.com",
rate_limit_per_minute=1200,
),
"coinbase": ExchangeConfig(
name="coinbase",
base_url="https://api.exchange.coinbase.com",
rate_limit_per_minute=10, # Coinbase มี rate limit เข้มงวดกว่า
),
"kraken": ExchangeConfig(
name="kraken",
base_url="https://api.kraken.com",
rate_limit_per_minute=60,
),
}
HolySheep AI Configuration สำหรับ anomaly detection
HOLYSHEEP_CONFIG = {
"base_url": "https://api.holysheep.ai/v1",
"api_key": os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY"),
"model": "gpt-4.1",
"temperature": 0.1,
}
Core Data Fetcher พร้อม Rate Limiting และ Retry Logic
# fetcher.py
import asyncio
import time
from typing import Optional
from dataclasses import dataclass
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential
@dataclass
class OHLCVData:
"""โครงสร้างข้อมูล OHLCV มาตรฐาน"""
timestamp: int # Unix timestamp in milliseconds
open: float
high: float
low: float
close: float
volume: float
quote_volume: Optional[float] = None
trades: Optional[int] = None
taker_buy_volume: Optional[float] = None
exchange: str = ""
class RateLimiter:
"""Token bucket algorithm สำหรับ rate limiting"""
def __init__(self, rate: int, per_seconds: int = 60):
self.rate = rate
self.per_seconds = per_seconds
self.allowance = rate
self.last_check = time.time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
current = time.time()
elapsed = current - self.last_check
self.allowance += elapsed * (self.rate / self.per_seconds)
if self.allowance > self.rate:
self.allowance = self.rate
if self.allowance < 1:
sleep_time = (1 - self.allowance) * (self.per_seconds / self.rate)
await asyncio.sleep(sleep_time)
self.allowance = 0
else:
self.allowance -= 1
self.last_check = time.time()
class ExchangeFetcher:
"""Async fetcher สำหรับ exchange APIs"""
def __init__(self, exchange_config: dict):
self.config = exchange_config
self.rate_limiter = RateLimiter(
rate=exchange_config.get("rate_limit_per_minute", 1200),
per_seconds=60
)
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
timeout = aiohttp.ClientTimeout(total=self.config.get("timeout_seconds", 30))
self._session = aiohttp.ClientSession(timeout=timeout)
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
async def fetch_klines(
self,
symbol: str,
interval: str = "1h",
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = 1000
) -> list[OHLCVData]:
"""Fetch OHLCV data จาก Binance API"""
await self.rate_limiter.acquire()
params = {
"symbol": symbol.upper(),
"interval": interval,
"limit": limit,
}
if start_time:
params["startTime"] = start_time
if end_time:
params["endTime"] = end_time
url = f"{self.config['base_url']}/api/v3/klines"
async with self._session.get(url, params=params) as response:
if response.status == 429:
# Rate limited - wait and retry
await asyncio.sleep(60)
return await self.fetch_klines(symbol, interval, start_time, end_time, limit)
response.raise_for_status()
data = await response.json()
return [
OHLCVData(
timestamp=int(kline[0]),
open=float(kline[1]),
high=float(kline[2]),
low=float(kline[3]),
close=float(kline[4]),
volume=float(kline[5]),
quote_volume=float(kline[7]) if len(kline) > 7 else None,
trades=int(kline[8]) if len(kline) > 8 else None,
exchange="binance"
)
for kline in data
]
Data Validator และ Cleaner
# validator.py
import pandas as pd
import numpy as np
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class DataValidator:
"""Validator สำหรับตรวจสอบความถูกต้องของ OHLCV data"""
def __init__(
self,
price_deviation_threshold: float = 0.05,
volume_deviation_threshold: float = 0.10,
gap_threshold_minutes: int = 60
):
self.price_threshold = price_deviation_threshold
self.volume_threshold = volume_deviation_threshold
self.gap_threshold = gap_threshold_minutes * 60 * 1000 # เปลี่ยนเป็น ms
def validate_and_clean(self, df: pd.DataFrame) -> pd.DataFrame:
"""Main validation pipeline"""
df = df.copy()
initial_count = len(df)
# Step 1: Remove duplicates
df = df.drop_duplicates(subset=['timestamp'], keep='first')
duplicates_removed = initial_count - len(df)
if duplicates_removed > 0:
logger.warning(f"Removed {duplicates_removed} duplicate records")
# Step 2: Sort by timestamp
df = df.sort_values('timestamp').reset_index(drop=True)
# Step 3: Detect and handle gaps
df = self._handle_gaps(df)
# Step 4: Validate price consistency
df = self._validate_prices(df)
# Step 5: Validate volumes
df = self._validate_volumes(df)
# Step 6: Handle outliers
df = self._handle_outliers(df)
# Step 7: Fill missing values
df = self._fill_missing(df)
return df
def _handle_gaps(self, df: pd.DataFrame) -> pd.DataFrame:
"""Detect gaps ในข้อมูลและ mark หรือ fill"""
if len(df) < 2:
return df
df['time_diff'] = df['timestamp'].diff()
# Mark gaps > threshold
gap_mask = df['time_diff'] > self.gap_threshold
gap_count = gap_mask.sum()
if gap_count > 0:
logger.warning(f"Detected {gap_count} gaps in data")
df.loc[gap_mask, 'has_gap'] = True
# Create complete timeline and fill gaps with NaN
# (for later interpolation or marking)
df = df.drop(columns=['time_diff'])
return df
def _validate_prices(self, df: pd.DataFrame) -> pd.DataFrame:
"""Validate OHLC relationships"""
# High must be >= Open, Close, Low
invalid_high = df['high'] < df[['open', 'close', 'low']].max(axis=1)
# Low must be <= Open, Close, High
invalid_low = df['low'] > df[['open', 'close', 'high']].min(axis=1)
# Open must equal previous Close (for continuous data)
if len(df) > 1:
open_close_gap = (
(df['open'] - df['close'].shift(1)).abs() /
df['close'].shift(1)
) > self.price_threshold
# แค่ warning ไม่ drop เพราะบางกรณี market gap เป็นเรื่องปกติ
if open_close_gap.sum() > 0:
logger.info(f"Found {open_close_gap.sum()} potential gap candles")
# Flag invalid records
invalid = invalid_high | invalid_low
if invalid.sum() > 0:
logger.warning(f"Found {invalid.sum()} candles with invalid OHLC relationships")
df.loc[invalid, 'data_quality'] = 'invalid'
return df
def _validate_volumes(self, df: pd.DataFrame) -> pd.DataFrame:
"""Validate volume data"""
# Volume must be >= 0
invalid_volume = df['volume'] < 0
# Quote volume should be positive if present
if 'quote_volume' in df.columns:
invalid_quote = df['quote_volume'] < 0
invalid_volume = invalid_volume | invalid_quote
if invalid_volume.sum() > 0:
logger.warning(f"Found {invalid_volume.sum()} records with negative volume")
df.loc[invalid_volume, 'volume'] = 0
df.loc[invalid_volume, 'data_quality'] = 'invalid'
return df
def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle outliers โดยใช้ IQR method"""
for col in ['open', 'high', 'low', 'close', 'volume']:
if col not in df.columns:
continue
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 3 * IQR # Using 3x IQR for crypto (more volatile)
upper_bound = Q3 + 3 * IQR
outliers = (df[col] < lower_bound) | (df[col] > upper_bound)
if outliers.sum() > 0:
logger.info(f"Found {outliers.sum()} outliers in {col} column")
# Mark as suspicious but don't drop
df.loc[outliers, 'data_quality'] = 'suspicious'
return df
def _fill_missing(self, df: pd.DataFrame) -> pd.DataFrame:
"""Fill missing values ด้วย appropriate methods"""
numeric_cols = ['open', 'high', 'low', 'close', 'volume']
for col in numeric_cols:
if col in df.columns and df[col].isna().sum() > 0:
# ใช้ forward fill สำหรับ price data
if col in ['open', 'high', 'low', 'close']:
df[col] = df[col].ffill()
# Backup with backward fill
df[col] = df[col].bfill()
else:
# Volume ใช้ median
df[col] = df[col].fillna(df[col].median())
return df
ตัวอย่างการใช้งาน
if __name__ == "__main__":
import asyncio
async def test_validation():
from fetcher import ExchangeFetcher, OHLCVData
config = {
"base_url": "https://api.binance.com",
"rate_limit_per_minute": 1200,
"timeout_seconds": 30
}
async with ExchangeFetcher(config) as fetcher:
data = await fetcher.fetch_klines(
symbol="BTCUSDT",
interval="1h",
limit=500
)
df = pd.DataFrame([
{
'timestamp': d.timestamp,
'open': d.open,
'high': d.high,
'low': d.low,
'close': d.close,
'volume': d.volume,
'quote_volume': d.quote_volume
}
for d in data
])
validator = DataValidator()
cleaned_df = validator.validate_and_clean(df)
print(f"Original records: {len(df)}")
print(f"Cleaned records: {len(cleaned_df)}")
print(f"Data quality distribution:\n{cleaned_df['data_quality'].value_counts(dropna=False)}")
asyncio.run(test_validation())
Concurrent Processing ด้วย Semaphore
# parallel_fetcher.py
import asyncio
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
import time
from concurrent.futures import ThreadPoolExecutor
@dataclass
class FetchResult:
symbol: str
interval: str
records: int
start_time: int
end_time: int
duration_ms: float
success: bool
error: Optional[str] = None
class ParallelFetcher:
"""Fetch ข้อมูลจากหลาย symbols/intervals พร้อมกัน"""
def __init__(
self,
max_concurrent: int = 10,
semaphore_value: int = 5 # Binance limit ~5 concurrent connections
):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(semaphore_value)
self.results: List[FetchResult] = []
async def fetch_multiple(
self,
tasks: List[Dict[str, Any]],
fetcher_factory,
progress_callback=None
) -> List[FetchResult]:
"""Fetch หลาย tasks พร้อมกันด้วย semaphore control"""
async def bounded_fetch(task: Dict[str, Any]) -> FetchResult:
async with self.semaphore:
start = time.perf_counter()
try:
data = await fetcher_factory(
symbol=task['symbol'],
interval=task['interval'],
start_time=task.get('start_time'),
end_time=task.get('end_time'),
limit=task.get('limit', 1000)
)
duration = (time.perf_counter() - start) * 1000
if progress_callback:
progress_callback(task['symbol'], len(data))
return FetchResult(
symbol=task['symbol'],
interval=task['interval'],
records=len(data),
start_time=task.get('start_time', 0),
end_time=task.get('end_time', 0),
duration_ms=duration,
success=True
)
except Exception as e:
duration = (time.perf_counter() - start) * 1000
return FetchResult(
symbol=task['symbol'],
interval=task['interval'],
records=0,
start_time=task.get('start_time', 0),
end_time=task.get('end_time', 0),
duration_ms=duration,
success=False,
error=str(e)
)
# Execute all tasks with rate limiting
results = await asyncio.gather(*[bounded_fetch(t) for t in tasks])
self.results.extend(results)
return results
Benchmark
async def benchmark_parallel_fetch():
"""Benchmark parallel vs sequential fetch"""
config = {
"base_url": "https://api.binance.com",
"rate_limit_per_minute": 1200,
"timeout_seconds": 30
}
symbols = [
("BTCUSDT", "1h"),
("ETHUSDT", "1h"),
("BNBUSDT", "1h"),
("SOLUSDT", "1h"),
("XRPUSDT", "1h"),
("ADAUSDT", "1h"),
("DOGEUSDT", "1h"),
("DOTUSDT", "1h"),
]
tasks = [
{"symbol": s, "interval": i, "limit": 500}
for s, i in symbols
]
# Sequential benchmark
from fetcher import ExchangeFetcher
print("=== Sequential Fetch Benchmark ===")
sequential_start = time.perf_counter()
async with ExchangeFetcher(config) as fetcher:
for task in tasks:
try:
data = await fetcher.fetch_klines(
symbol=task['symbol'],
interval=task['interval'],
limit=task['limit']
)
print(f"{task['symbol']}: {len(data)} records")
except Exception as e:
print(f"{task['symbol']}: Error - {e}")
sequential_time = (time.perf_counter() - sequential_start) * 1000
print(f"Sequential total time: {sequential_time:.2f} ms")
# Parallel benchmark
print("\n=== Parallel Fetch Benchmark ===")
parallel_start = time.perf_counter()
async def create_fetcher():
return ExchangeFetcher(config)
fetcher_instance = await create_fetcher()
parallel_fetcher = ParallelFetcher(semaphore_value=5)
results = await parallel_fetcher.fetch_multiple(tasks, fetcher_instance.fetch_klines)
parallel_time = (time.perf_counter() - parallel_start) * 1000
print(f"Parallel total time: {parallel_time:.2f} ms")
print(f"\n=== Results ===")
print(f"Speed improvement: {sequential_time / parallel_time:.2f}x faster")
for r in results:
status = "✓" if r.success else "✗"
print(f"{status} {r.symbol}: {r.records} records in {r.duration_ms:.2f}ms")
Run benchmark
if __name__ == "__main__":
asyncio.run(benchmark_parallel_fetch())
Incremental Fetch ด้วย State Management
# state_manager.py
import json
import os
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import redis
import logging
logger = logging.getLogger(__name__)
class StateManager:
"""จัดการ state สำหรับ incremental data fetch"""
def __init__(self, redis_url: Optional[str] = None, local_file: str = "fetch_state.json"):
self.redis_url = redis_url
self.local_file = local_file
self._redis_client = None
if redis_url:
try:
self._redis_client = redis.from_url(redis_url)
except Exception as e:
logger.warning(f"Could not connect to Redis: {e}. Using local file.")
self._redis_client = None
def get_last_fetch_time(
self,
symbol: str,
interval: str,
default_start: Optional[int] = None
) -> Optional[int]:
"""Get last fetch timestamp สำหรับ symbol/interval"""
key = f"fetch_state:{symbol}:{interval}"
if self._redis_client:
try:
timestamp = self._redis_client.get(key)
if timestamp:
return int(timestamp)
except Exception as e:
logger.error(f"Redis error: {e}")
# Fallback to local file
if os.path.exists(self.local_file):
with open(self.local_file, 'r') as f:
state = json.load(f)
return state.get(key, default_start)
return default_start
def update_last_fetch_time(
self,
symbol: str,
interval: str,
timestamp: int
) -> None:
"""Update last fetch timestamp"""
key = f"fetch_state:{symbol}:{interval}"
if self._redis_client:
try:
self._redis_client.set(key, str(timestamp))
return
except Exception as e:
logger.error(f"Redis error: {e}")
# Fallback to local file
state = {}
if os.path.exists(self.local_file):
with open(self.local_file, 'r') as f:
state = json.load(f)
state[key] = timestamp
with open(self.local_file, 'w') as f:
json.dump(state, f, indent=2)
def get_checkpoint(self, job_id: str) -> Dict[str, Any]:
"""Get checkpoint for resumable job"""
key = f"checkpoint:{job_id}"
if self._redis_client:
try:
data = self._redis_client.get(key)
if data:
return json.loads(data)
except Exception as e:
logger.error(f"Redis error: {e}")
return {"completed": [], "in_progress": [], "failed": []}
def save_checkpoint(self, job_id: str, checkpoint: Dict[str, Any]) -> None:
"""Save checkpoint for resumable job"""
key = f"checkpoint:{job_id}"
if self._redis_client:
try:
self._redis_client.setex(key, 86400, json.dumps(checkpoint)) # 24h TTL
return
except Exception as e:
logger.error(f"Redis error: {e}")
# Fallback - ไม่มี local checkpoint ในโหมดนี้
logger.warning("Checkpoint not saved - no Redis or persistent storage")
class IncrementalFetcher:
"""Fetch ข้อมูลแบบ incremental ด้วย state management"""
def __init__(
self,
fetcher,
state_manager: StateManager,
lookback_hours: int = 24 # ดึงย้อนหลังเผื่อกรณี gap
):
self.fetcher = fetcher
self.state_manager = state_manager
self.lookback_hours = lookback_hours
async def fetch_incremental(
self,
symbol: str,
interval: str,
batch_size: int = 1000
) -> list:
"""Fetch เฉพาะข้อมูลใหม่ที่ยังไม่เคย fetch มาก่อน"""
# Get last fetch time
last_time = self.state_manager.get_last_fetch_time(symbol, interval)
# Calculate start time with lookback
lookback_ms = self.lookback_hours * 60 * 60 * 1000
current_time = int(datetime.now().timestamp() * 1000)
if last_time:
start_time = max(last_time - lookback_ms, 0)
else:
# First fetch - get last 30 days
start_time = current_time - (30 * 24 * 60 * 60 * 1000)
all_data = []
current_start = start_time
# Fetch in batches until we reach current time
while current_start < current_time:
end_time = min(current_start + (batch_size * self._get_interval_ms(interval)), current_time)
try:
batch = await self.fetcher.fetch_klines(
symbol=symbol,
interval=interval,
start_time=current_start,
end_time=end_time,
limit=batch_size
)
if not batch:
break
all_data.extend(batch)
# Update checkpoint
last_timestamp = batch[-1].timestamp
self.state_manager.update_last_fetch_time(symbol, interval, last_timestamp)
# Move to next batch
current_start = last_timestamp + self._get_interval_ms(interval)
logger.info(f"{symbol} {interval}: fetched {len(batch)} records, up to {last_timestamp}")
except Exception as e:
logger.error(f"Error fetching {symbol} {interval}: {e}")
break
return all_data
def _get_interval_ms(self, interval: str) -> int:
"""Convert interval string to milliseconds"""
intervals = {
"1m": 60000,
"5m": 300000,
"15m": 900000,
"1h": 3600000,
"4h": 14400000,
"1d": 86400000,
}
return intervals.get(interval, 3600000)
Anomaly Detection ด้วย HolySheep AI
ในกรณีที่ข้อมูลมี anomaly ที่ซับซ้อนเกินกว่าจะ detect ด้วย rule-based approach ธรรมดา ผมแนะนำให้ใช้ AI ช่วยวิเคราะห์ โดยใช้ HolySheep AI ซึ่งมีความเร็วตอบสนองน้อยกว่า 50ms และราคาประหยัดกว่า 85% เมื่อเทียบกับบริการอื่น
# anomaly_detector.py
import json
import asyncio
from typing import List, Dict, Any, Optional
import aiohttp
from dataclasses import dataclass
@dataclass
class AnomalyResult:
"""ผลลัพธ์จากการวิเคราะห์ anomaly"""
timestamp: int
symbol: str
is_anomaly: bool
anomaly_type: Optional[str]
confidence: float
explanation: str
suggested_action: str
class HolySheepAnomalyDetector:
"""ใช้ HolySheep AI สำหรับ advanced anomaly detection"""
def __init__(
self,
api_key: str = "YOUR_HOLYSHEEP_API_KEY",
base_url: str = "https://api.holysheep.ai/v1",
model: str = "gpt-4.1"
):
self.api_key = api_key
self.base_url = base_url
self.model = model
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
timeout = aiohttp.ClientTimeout(total=30)
self._session = aiohttp.ClientSession(timeout=timeout)
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
async def detect_anomalies(
self,
candles: List[Dict[str, Any]],
symbol: str,
batch_size: int = 50
) -> List[AnomalyResult]:
"""Detect anomalies ใน batch ของ candles โดยใช้ AI"""
results = []
# Process in batches
for i in range(0, len(candles), batch_size):
batch = candles[i:i + batch_size]
try:
batch_results = await self._analyze_batch(batch, symbol)
results.extend(batch_results)
except Exception as e:
print(f"Error analyzing batch {i//batch_size}: {e}")
# Continue with other batches
return results
async def _analyze_batch(
self,
candles: List[Dict[str, Any]],
symbol: str
) -> List[AnomalyResult]:
"""วิเคราะห์ batch ของ candles ด้วย HolySheep AI"""
# Prepare prompt
candles_summary = self._prepare_candles_summary(candles)
prompt = f"""Analyze the following {symbol} candlestick data for anomalies.
Focus on:
1. Unusual price movements (sudden pumps/dumps > 10%)
2. Volume anomalies (unusual trading activity)
3. Price-volume discrepancies
4. Suspicious patterns (wash trading indicators)
5. Data quality issues
Return a JSON array with each item having:
- timestamp: Unix timestamp in milliseconds
- is_anomaly: boolean
- anomaly_type: "volume_spike", "price_manipulation", "data_gap", "wash_trading", or null
- confidence: 0.0 to 1.0
- explanation: brief explanation
- suggested_action: "drop", "flag", or "keep"
Candle Data:
{candles_summary}
Return ONLY the JSON array, no markdown formatting."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": [
{
"role":