ในฐานะวิศวกรที่ทำงานกับข้อมูลคริปโตมาหลายปี ผมเคยเจอปัญหานี้ซ้ำแล้วซ้ำเล่า — การดึงข้อมูล OHLCV (Open-High-Low-Close-Volume) จาก exchange API ดูเหมือนง่าย แต่พอเอาไปใช้จริงกลับมีดอกศูนย์แตกมากมาย วันนี้ผมจะแชร์ architecture ที่พิสูจน์แล้วว่าใช้ได้ใน production รวมถึงโค้ดที่รันได้จริงพร้อม benchmark

สถาปัตยกรรมระบบ ETL โดยรวม

ก่อนลงรายละเอียดโค้ด มาดูภาพรวมของสถาปัตยกรรมกันก่อน ระบบ ETL สำหรับข้อมูลคริปโตที่ดีต้องมีองค์ประกอบหลักดังนี้:

การตั้งค่า Environment และ Dependencies

# requirements.txt
aiohttp==3.9.1
asyncio==3.4.3
pandas==2.1.4
numpy==1.26.2
pyarrow==14.0.2
msgspec==0.18.4
tenacity==8.2.3
prometheus-client==0.19.0
redis==5.0.1

สำหรับ testing

pytest==7.4.3 pytest-asyncio==0.21.1 httpx==0.25.2
# config.py
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class ExchangeConfig:
    """Configuration สำหรับ exchange API"""
    name: str
    base_url: str
    rate_limit_per_minute: int = 1200
    timeout_seconds: int = 30
    max_retries: int = 5
    retry_backoff_factor: float = 2.0

@dataclass
class DataConfig:
    """Configuration สำหรับ data processing"""
    batch_size: int = 1000
    max_workers: int = 10
    checkpoint_interval: int = 100
    anomaly_threshold: float = 0.05  # 5% deviation threshold

Exchange configurations

EXCHANGES = { "binance": ExchangeConfig( name="binance", base_url="https://api.binance.com", rate_limit_per_minute=1200, ), "coinbase": ExchangeConfig( name="coinbase", base_url="https://api.exchange.coinbase.com", rate_limit_per_minute=10, # Coinbase มี rate limit เข้มงวดกว่า ), "kraken": ExchangeConfig( name="kraken", base_url="https://api.kraken.com", rate_limit_per_minute=60, ), }

HolySheep AI Configuration สำหรับ anomaly detection

HOLYSHEEP_CONFIG = { "base_url": "https://api.holysheep.ai/v1", "api_key": os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY"), "model": "gpt-4.1", "temperature": 0.1, }

Core Data Fetcher พร้อม Rate Limiting และ Retry Logic

# fetcher.py
import asyncio
import time
from typing import Optional
from dataclasses import dataclass
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential

@dataclass
class OHLCVData:
    """โครงสร้างข้อมูล OHLCV มาตรฐาน"""
    timestamp: int  # Unix timestamp in milliseconds
    open: float
    high: float
    low: float
    close: float
    volume: float
    quote_volume: Optional[float] = None
    trades: Optional[int] = None
    taker_buy_volume: Optional[float] = None
    exchange: str = ""

class RateLimiter:
    """Token bucket algorithm สำหรับ rate limiting"""
    
    def __init__(self, rate: int, per_seconds: int = 60):
        self.rate = rate
        self.per_seconds = per_seconds
        self.allowance = rate
        self.last_check = time.time()
        self._lock = asyncio.Lock()
    
    async def acquire(self):
        async with self._lock:
            current = time.time()
            elapsed = current - self.last_check
            self.allowance += elapsed * (self.rate / self.per_seconds)
            
            if self.allowance > self.rate:
                self.allowance = self.rate
            
            if self.allowance < 1:
                sleep_time = (1 - self.allowance) * (self.per_seconds / self.rate)
                await asyncio.sleep(sleep_time)
                self.allowance = 0
            else:
                self.allowance -= 1
            
            self.last_check = time.time()

class ExchangeFetcher:
    """Async fetcher สำหรับ exchange APIs"""
    
    def __init__(self, exchange_config: dict):
        self.config = exchange_config
        self.rate_limiter = RateLimiter(
            rate=exchange_config.get("rate_limit_per_minute", 1200),
            per_seconds=60
        )
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        timeout = aiohttp.ClientTimeout(total=self.config.get("timeout_seconds", 30))
        self._session = aiohttp.ClientSession(timeout=timeout)
        return self
    
    async def __aexit__(self, *args):
        if self._session:
            await self._session.close()
    
    async def fetch_klines(
        self,
        symbol: str,
        interval: str = "1h",
        start_time: Optional[int] = None,
        end_time: Optional[int] = None,
        limit: int = 1000
    ) -> list[OHLCVData]:
        """Fetch OHLCV data จาก Binance API"""
        
        await self.rate_limiter.acquire()
        
        params = {
            "symbol": symbol.upper(),
            "interval": interval,
            "limit": limit,
        }
        
        if start_time:
            params["startTime"] = start_time
        if end_time:
            params["endTime"] = end_time
        
        url = f"{self.config['base_url']}/api/v3/klines"
        
        async with self._session.get(url, params=params) as response:
            if response.status == 429:
                # Rate limited - wait and retry
                await asyncio.sleep(60)
                return await self.fetch_klines(symbol, interval, start_time, end_time, limit)
            
            response.raise_for_status()
            data = await response.json()
        
        return [
            OHLCVData(
                timestamp=int(kline[0]),
                open=float(kline[1]),
                high=float(kline[2]),
                low=float(kline[3]),
                close=float(kline[4]),
                volume=float(kline[5]),
                quote_volume=float(kline[7]) if len(kline) > 7 else None,
                trades=int(kline[8]) if len(kline) > 8 else None,
                exchange="binance"
            )
            for kline in data
        ]

Data Validator และ Cleaner

# validator.py
import pandas as pd
import numpy as np
from typing import Optional
import logging

logger = logging.getLogger(__name__)

class DataValidator:
    """Validator สำหรับตรวจสอบความถูกต้องของ OHLCV data"""
    
    def __init__(
        self,
        price_deviation_threshold: float = 0.05,
        volume_deviation_threshold: float = 0.10,
        gap_threshold_minutes: int = 60
    ):
        self.price_threshold = price_deviation_threshold
        self.volume_threshold = volume_deviation_threshold
        self.gap_threshold = gap_threshold_minutes * 60 * 1000  # เปลี่ยนเป็น ms
    
    def validate_and_clean(self, df: pd.DataFrame) -> pd.DataFrame:
        """Main validation pipeline"""
        
        df = df.copy()
        initial_count = len(df)
        
        # Step 1: Remove duplicates
        df = df.drop_duplicates(subset=['timestamp'], keep='first')
        duplicates_removed = initial_count - len(df)
        if duplicates_removed > 0:
            logger.warning(f"Removed {duplicates_removed} duplicate records")
        
        # Step 2: Sort by timestamp
        df = df.sort_values('timestamp').reset_index(drop=True)
        
        # Step 3: Detect and handle gaps
        df = self._handle_gaps(df)
        
        # Step 4: Validate price consistency
        df = self._validate_prices(df)
        
        # Step 5: Validate volumes
        df = self._validate_volumes(df)
        
        # Step 6: Handle outliers
        df = self._handle_outliers(df)
        
        # Step 7: Fill missing values
        df = self._fill_missing(df)
        
        return df
    
    def _handle_gaps(self, df: pd.DataFrame) -> pd.DataFrame:
        """Detect gaps ในข้อมูลและ mark หรือ fill"""
        
        if len(df) < 2:
            return df
        
        df['time_diff'] = df['timestamp'].diff()
        
        # Mark gaps > threshold
        gap_mask = df['time_diff'] > self.gap_threshold
        gap_count = gap_mask.sum()
        
        if gap_count > 0:
            logger.warning(f"Detected {gap_count} gaps in data")
            df.loc[gap_mask, 'has_gap'] = True
        
        # Create complete timeline and fill gaps with NaN
        # (for later interpolation or marking)
        df = df.drop(columns=['time_diff'])
        
        return df
    
    def _validate_prices(self, df: pd.DataFrame) -> pd.DataFrame:
        """Validate OHLC relationships"""
        
        # High must be >= Open, Close, Low
        invalid_high = df['high'] < df[['open', 'close', 'low']].max(axis=1)
        
        # Low must be <= Open, Close, High
        invalid_low = df['low'] > df[['open', 'close', 'high']].min(axis=1)
        
        # Open must equal previous Close (for continuous data)
        if len(df) > 1:
            open_close_gap = (
                (df['open'] - df['close'].shift(1)).abs() / 
                df['close'].shift(1)
            ) > self.price_threshold
            # แค่ warning ไม่ drop เพราะบางกรณี market gap เป็นเรื่องปกติ
            if open_close_gap.sum() > 0:
                logger.info(f"Found {open_close_gap.sum()} potential gap candles")
        
        # Flag invalid records
        invalid = invalid_high | invalid_low
        if invalid.sum() > 0:
            logger.warning(f"Found {invalid.sum()} candles with invalid OHLC relationships")
            df.loc[invalid, 'data_quality'] = 'invalid'
        
        return df
    
    def _validate_volumes(self, df: pd.DataFrame) -> pd.DataFrame:
        """Validate volume data"""
        
        # Volume must be >= 0
        invalid_volume = df['volume'] < 0
        
        # Quote volume should be positive if present
        if 'quote_volume' in df.columns:
            invalid_quote = df['quote_volume'] < 0
            invalid_volume = invalid_volume | invalid_quote
        
        if invalid_volume.sum() > 0:
            logger.warning(f"Found {invalid_volume.sum()} records with negative volume")
            df.loc[invalid_volume, 'volume'] = 0
            df.loc[invalid_volume, 'data_quality'] = 'invalid'
        
        return df
    
    def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        """Handle outliers โดยใช้ IQR method"""
        
        for col in ['open', 'high', 'low', 'close', 'volume']:
            if col not in df.columns:
                continue
            
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            
            lower_bound = Q1 - 3 * IQR  # Using 3x IQR for crypto (more volatile)
            upper_bound = Q3 + 3 * IQR
            
            outliers = (df[col] < lower_bound) | (df[col] > upper_bound)
            
            if outliers.sum() > 0:
                logger.info(f"Found {outliers.sum()} outliers in {col} column")
                # Mark as suspicious but don't drop
                df.loc[outliers, 'data_quality'] = 'suspicious'
        
        return df
    
    def _fill_missing(self, df: pd.DataFrame) -> pd.DataFrame:
        """Fill missing values ด้วย appropriate methods"""
        
        numeric_cols = ['open', 'high', 'low', 'close', 'volume']
        
        for col in numeric_cols:
            if col in df.columns and df[col].isna().sum() > 0:
                # ใช้ forward fill สำหรับ price data
                if col in ['open', 'high', 'low', 'close']:
                    df[col] = df[col].ffill()
                    # Backup with backward fill
                    df[col] = df[col].bfill()
                else:
                    # Volume ใช้ median
                    df[col] = df[col].fillna(df[col].median())
        
        return df

ตัวอย่างการใช้งาน

if __name__ == "__main__": import asyncio async def test_validation(): from fetcher import ExchangeFetcher, OHLCVData config = { "base_url": "https://api.binance.com", "rate_limit_per_minute": 1200, "timeout_seconds": 30 } async with ExchangeFetcher(config) as fetcher: data = await fetcher.fetch_klines( symbol="BTCUSDT", interval="1h", limit=500 ) df = pd.DataFrame([ { 'timestamp': d.timestamp, 'open': d.open, 'high': d.high, 'low': d.low, 'close': d.close, 'volume': d.volume, 'quote_volume': d.quote_volume } for d in data ]) validator = DataValidator() cleaned_df = validator.validate_and_clean(df) print(f"Original records: {len(df)}") print(f"Cleaned records: {len(cleaned_df)}") print(f"Data quality distribution:\n{cleaned_df['data_quality'].value_counts(dropna=False)}") asyncio.run(test_validation())

Concurrent Processing ด้วย Semaphore

# parallel_fetcher.py
import asyncio
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
import time
from concurrent.futures import ThreadPoolExecutor

@dataclass
class FetchResult:
    symbol: str
    interval: str
    records: int
    start_time: int
    end_time: int
    duration_ms: float
    success: bool
    error: Optional[str] = None

class ParallelFetcher:
    """Fetch ข้อมูลจากหลาย symbols/intervals พร้อมกัน"""
    
    def __init__(
        self,
        max_concurrent: int = 10,
        semaphore_value: int = 5  # Binance limit ~5 concurrent connections
    ):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(semaphore_value)
        self.results: List[FetchResult] = []
    
    async def fetch_multiple(
        self,
        tasks: List[Dict[str, Any]],
        fetcher_factory,
        progress_callback=None
    ) -> List[FetchResult]:
        """Fetch หลาย tasks พร้อมกันด้วย semaphore control"""
        
        async def bounded_fetch(task: Dict[str, Any]) -> FetchResult:
            async with self.semaphore:
                start = time.perf_counter()
                try:
                    data = await fetcher_factory(
                        symbol=task['symbol'],
                        interval=task['interval'],
                        start_time=task.get('start_time'),
                        end_time=task.get('end_time'),
                        limit=task.get('limit', 1000)
                    )
                    
                    duration = (time.perf_counter() - start) * 1000
                    
                    if progress_callback:
                        progress_callback(task['symbol'], len(data))
                    
                    return FetchResult(
                        symbol=task['symbol'],
                        interval=task['interval'],
                        records=len(data),
                        start_time=task.get('start_time', 0),
                        end_time=task.get('end_time', 0),
                        duration_ms=duration,
                        success=True
                    )
                    
                except Exception as e:
                    duration = (time.perf_counter() - start) * 1000
                    return FetchResult(
                        symbol=task['symbol'],
                        interval=task['interval'],
                        records=0,
                        start_time=task.get('start_time', 0),
                        end_time=task.get('end_time', 0),
                        duration_ms=duration,
                        success=False,
                        error=str(e)
                    )
        
        # Execute all tasks with rate limiting
        results = await asyncio.gather(*[bounded_fetch(t) for t in tasks])
        
        self.results.extend(results)
        return results

Benchmark

async def benchmark_parallel_fetch(): """Benchmark parallel vs sequential fetch""" config = { "base_url": "https://api.binance.com", "rate_limit_per_minute": 1200, "timeout_seconds": 30 } symbols = [ ("BTCUSDT", "1h"), ("ETHUSDT", "1h"), ("BNBUSDT", "1h"), ("SOLUSDT", "1h"), ("XRPUSDT", "1h"), ("ADAUSDT", "1h"), ("DOGEUSDT", "1h"), ("DOTUSDT", "1h"), ] tasks = [ {"symbol": s, "interval": i, "limit": 500} for s, i in symbols ] # Sequential benchmark from fetcher import ExchangeFetcher print("=== Sequential Fetch Benchmark ===") sequential_start = time.perf_counter() async with ExchangeFetcher(config) as fetcher: for task in tasks: try: data = await fetcher.fetch_klines( symbol=task['symbol'], interval=task['interval'], limit=task['limit'] ) print(f"{task['symbol']}: {len(data)} records") except Exception as e: print(f"{task['symbol']}: Error - {e}") sequential_time = (time.perf_counter() - sequential_start) * 1000 print(f"Sequential total time: {sequential_time:.2f} ms") # Parallel benchmark print("\n=== Parallel Fetch Benchmark ===") parallel_start = time.perf_counter() async def create_fetcher(): return ExchangeFetcher(config) fetcher_instance = await create_fetcher() parallel_fetcher = ParallelFetcher(semaphore_value=5) results = await parallel_fetcher.fetch_multiple(tasks, fetcher_instance.fetch_klines) parallel_time = (time.perf_counter() - parallel_start) * 1000 print(f"Parallel total time: {parallel_time:.2f} ms") print(f"\n=== Results ===") print(f"Speed improvement: {sequential_time / parallel_time:.2f}x faster") for r in results: status = "✓" if r.success else "✗" print(f"{status} {r.symbol}: {r.records} records in {r.duration_ms:.2f}ms")

Run benchmark

if __name__ == "__main__": asyncio.run(benchmark_parallel_fetch())

Incremental Fetch ด้วย State Management

# state_manager.py
import json
import os
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import redis
import logging

logger = logging.getLogger(__name__)

class StateManager:
    """จัดการ state สำหรับ incremental data fetch"""
    
    def __init__(self, redis_url: Optional[str] = None, local_file: str = "fetch_state.json"):
        self.redis_url = redis_url
        self.local_file = local_file
        self._redis_client = None
        
        if redis_url:
            try:
                self._redis_client = redis.from_url(redis_url)
            except Exception as e:
                logger.warning(f"Could not connect to Redis: {e}. Using local file.")
                self._redis_client = None
    
    def get_last_fetch_time(
        self,
        symbol: str,
        interval: str,
        default_start: Optional[int] = None
    ) -> Optional[int]:
        """Get last fetch timestamp สำหรับ symbol/interval"""
        
        key = f"fetch_state:{symbol}:{interval}"
        
        if self._redis_client:
            try:
                timestamp = self._redis_client.get(key)
                if timestamp:
                    return int(timestamp)
            except Exception as e:
                logger.error(f"Redis error: {e}")
        
        # Fallback to local file
        if os.path.exists(self.local_file):
            with open(self.local_file, 'r') as f:
                state = json.load(f)
                return state.get(key, default_start)
        
        return default_start
    
    def update_last_fetch_time(
        self,
        symbol: str,
        interval: str,
        timestamp: int
    ) -> None:
        """Update last fetch timestamp"""
        
        key = f"fetch_state:{symbol}:{interval}"
        
        if self._redis_client:
            try:
                self._redis_client.set(key, str(timestamp))
                return
            except Exception as e:
                logger.error(f"Redis error: {e}")
        
        # Fallback to local file
        state = {}
        if os.path.exists(self.local_file):
            with open(self.local_file, 'r') as f:
                state = json.load(f)
        
        state[key] = timestamp
        
        with open(self.local_file, 'w') as f:
            json.dump(state, f, indent=2)
    
    def get_checkpoint(self, job_id: str) -> Dict[str, Any]:
        """Get checkpoint for resumable job"""
        
        key = f"checkpoint:{job_id}"
        
        if self._redis_client:
            try:
                data = self._redis_client.get(key)
                if data:
                    return json.loads(data)
            except Exception as e:
                logger.error(f"Redis error: {e}")
        
        return {"completed": [], "in_progress": [], "failed": []}
    
    def save_checkpoint(self, job_id: str, checkpoint: Dict[str, Any]) -> None:
        """Save checkpoint for resumable job"""
        
        key = f"checkpoint:{job_id}"
        
        if self._redis_client:
            try:
                self._redis_client.setex(key, 86400, json.dumps(checkpoint))  # 24h TTL
                return
            except Exception as e:
                logger.error(f"Redis error: {e}")
        
        # Fallback - ไม่มี local checkpoint ในโหมดนี้
        logger.warning("Checkpoint not saved - no Redis or persistent storage")

class IncrementalFetcher:
    """Fetch ข้อมูลแบบ incremental ด้วย state management"""
    
    def __init__(
        self,
        fetcher,
        state_manager: StateManager,
        lookback_hours: int = 24  # ดึงย้อนหลังเผื่อกรณี gap
    ):
        self.fetcher = fetcher
        self.state_manager = state_manager
        self.lookback_hours = lookback_hours
    
    async def fetch_incremental(
        self,
        symbol: str,
        interval: str,
        batch_size: int = 1000
    ) -> list:
        """Fetch เฉพาะข้อมูลใหม่ที่ยังไม่เคย fetch มาก่อน"""
        
        # Get last fetch time
        last_time = self.state_manager.get_last_fetch_time(symbol, interval)
        
        # Calculate start time with lookback
        lookback_ms = self.lookback_hours * 60 * 60 * 1000
        current_time = int(datetime.now().timestamp() * 1000)
        
        if last_time:
            start_time = max(last_time - lookback_ms, 0)
        else:
            # First fetch - get last 30 days
            start_time = current_time - (30 * 24 * 60 * 60 * 1000)
        
        all_data = []
        current_start = start_time
        
        # Fetch in batches until we reach current time
        while current_start < current_time:
            end_time = min(current_start + (batch_size * self._get_interval_ms(interval)), current_time)
            
            try:
                batch = await self.fetcher.fetch_klines(
                    symbol=symbol,
                    interval=interval,
                    start_time=current_start,
                    end_time=end_time,
                    limit=batch_size
                )
                
                if not batch:
                    break
                
                all_data.extend(batch)
                
                # Update checkpoint
                last_timestamp = batch[-1].timestamp
                self.state_manager.update_last_fetch_time(symbol, interval, last_timestamp)
                
                # Move to next batch
                current_start = last_timestamp + self._get_interval_ms(interval)
                
                logger.info(f"{symbol} {interval}: fetched {len(batch)} records, up to {last_timestamp}")
                
            except Exception as e:
                logger.error(f"Error fetching {symbol} {interval}: {e}")
                break
        
        return all_data
    
    def _get_interval_ms(self, interval: str) -> int:
        """Convert interval string to milliseconds"""
        intervals = {
            "1m": 60000,
            "5m": 300000,
            "15m": 900000,
            "1h": 3600000,
            "4h": 14400000,
            "1d": 86400000,
        }
        return intervals.get(interval, 3600000)

Anomaly Detection ด้วย HolySheep AI

ในกรณีที่ข้อมูลมี anomaly ที่ซับซ้อนเกินกว่าจะ detect ด้วย rule-based approach ธรรมดา ผมแนะนำให้ใช้ AI ช่วยวิเคราะห์ โดยใช้ HolySheep AI ซึ่งมีความเร็วตอบสนองน้อยกว่า 50ms และราคาประหยัดกว่า 85% เมื่อเทียบกับบริการอื่น

# anomaly_detector.py
import json
import asyncio
from typing import List, Dict, Any, Optional
import aiohttp
from dataclasses import dataclass

@dataclass
class AnomalyResult:
    """ผลลัพธ์จากการวิเคราะห์ anomaly"""
    timestamp: int
    symbol: str
    is_anomaly: bool
    anomaly_type: Optional[str]
    confidence: float
    explanation: str
    suggested_action: str

class HolySheepAnomalyDetector:
    """ใช้ HolySheep AI สำหรับ advanced anomaly detection"""
    
    def __init__(
        self,
        api_key: str = "YOUR_HOLYSHEEP_API_KEY",
        base_url: str = "https://api.holysheep.ai/v1",
        model: str = "gpt-4.1"
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.model = model
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        timeout = aiohttp.ClientTimeout(total=30)
        self._session = aiohttp.ClientSession(timeout=timeout)
        return self
    
    async def __aexit__(self, *args):
        if self._session:
            await self._session.close()
    
    async def detect_anomalies(
        self,
        candles: List[Dict[str, Any]],
        symbol: str,
        batch_size: int = 50
    ) -> List[AnomalyResult]:
        """Detect anomalies ใน batch ของ candles โดยใช้ AI"""
        
        results = []
        
        # Process in batches
        for i in range(0, len(candles), batch_size):
            batch = candles[i:i + batch_size]
            
            try:
                batch_results = await self._analyze_batch(batch, symbol)
                results.extend(batch_results)
            except Exception as e:
                print(f"Error analyzing batch {i//batch_size}: {e}")
                # Continue with other batches
        
        return results
    
    async def _analyze_batch(
        self,
        candles: List[Dict[str, Any]],
        symbol: str
    ) -> List[AnomalyResult]:
        """วิเคราะห์ batch ของ candles ด้วย HolySheep AI"""
        
        # Prepare prompt
        candles_summary = self._prepare_candles_summary(candles)
        
        prompt = f"""Analyze the following {symbol} candlestick data for anomalies.
        
Focus on:
1. Unusual price movements (sudden pumps/dumps > 10%)
2. Volume anomalies (unusual trading activity)
3. Price-volume discrepancies
4. Suspicious patterns (wash trading indicators)
5. Data quality issues

Return a JSON array with each item having:
- timestamp: Unix timestamp in milliseconds
- is_anomaly: boolean
- anomaly_type: "volume_spike", "price_manipulation", "data_gap", "wash_trading", or null
- confidence: 0.0 to 1.0
- explanation: brief explanation
- suggested_action: "drop", "flag", or "keep"

Candle Data:
{candles_summary}

Return ONLY the JSON array, no markdown formatting."""

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": self.model,
            "messages": [
                {
                    "role":