Trong thế giới trading và phân tích crypto, dữ liệu lịch sử là vàng. Nhưng việc thu thập và làm sạch dữ liệu từ các sàn giao dịch có thể là cơn ác mộng thực sự. Bài viết này sẽ hướng dẫn bạn xây dựng một pipeline ETL hoàn chỉnh để xử lý dữ liệu crypto một cách chuyên nghiệp.

Mở đầu: Vì sao dữ liệu crypto cần ETL đặc biệt?

Dữ liệu từ các sàn giao dịch crypto có những đặc thù riêng mà các ngành khác không gặp phải. Mỗi giây có thể xảy ra hàng nghìn giao dịch, dữ liệu spread khác nhau giữa các cặp tiền, và các sàn liên tục thay đổi cấu trúc API của họ.

So sánh các phương án thu thập dữ liệu

Tiêu chí API sàn trực tiếp Dịch vụ Relay (CoinGecko, etc) HolySheep AI
Độ trễ trung bình 50-200ms 500ms-3s <50ms
Giới hạn rate limit 10-120 req/phút 10-50 req/phút Không giới hạn
Định dạng dữ liệu Raw, cần xử lý Đã chuẩn hóa JSON có cấu trúc rõ ràng
Chi phí Miễn phí (có giới hạn) $25-$200/tháng $0.42-$15/MTok
Hỗ trợ tiếng Việt Không Không Có, 24/7
Thanh toán Chỉ crypto Thẻ quốc tế WeChat/Alipay/VNPay

Pipeline ETL Crypto hoàn chỉnh

Kiến trúc tổng quan

+------------------+     +------------------+     +------------------+
|   DATA SOURCE    | --> |   TRANSFORM      | --> |   DATA WAREHOUSE |
|  (Exchange API)  |     |   (Clean/Enrich) |     |  (PostgreSQL)    |
+------------------+     +------------------+     +------------------+
        |                         |
        v                         v
+------------------+     +------------------+
|   ERROR HANDLER |     |   VALIDATION     |
+------------------+     +------------------+
        |                         |
        v                         v
+------------------+     +------------------+
|   ALERT SYSTEM   |     |   MONITORING     |
+------------------+     +------------------+

Bước 1: Kết nối và thu thập dữ liệu

import requests
import pandas as pd
from datetime import datetime, timedelta
import time
from typing import Dict, List, Optional

class CryptoETL:
    """Pipeline ETL cho dữ liệu cryptocurrency"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json'
        })
        
        # Cache để tránh request trùng lặp
        self.cache = {}
        self.cache_ttl = 300  # 5 phút
        
    def fetch_historical_klines(
        self,
        symbol: str,
        interval: str = "1h",
        start_time: Optional[int] = None,
        end_time: Optional[int] = None,
        limit: int = 1000
    ) -> pd.DataFrame:
        """
        Thu thập dữ liệu nến lịch sử từ Binance-style API
        
        Args:
            symbol: Cặp tiền, ví dụ: BTCUSDT
            interval: Khung thời gian (1m, 5m, 1h, 1d)
            start_time: Timestamp bắt đầu (ms)
            end_time: Timestamp kết thúc (ms)
            limit: Số lượng nến tối đa (1-1000)
        """
        # Sử dụng HolySheep AI cho việc gọi API
        prompt = f"""Truy vấn dữ liệu kline lịch sử:
        Symbol: {symbol}
        Interval: {interval}
        Start: {start_time or (int(time.time()*1000) - 86400000)}
        End: {end_time or int(time.time()*1000)}
        Limit: {limit}
        
        Trả về dữ liệu theo format:
        [[timestamp, open, high, low, close, volume, close_time, quote_volume], ...]
        """
        
        payload = {
            "model": "gpt-4.1",
            "messages": [
                {"role": "system", "content": "Bạn là API proxy cho Binance."},
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.1
        }
        
        try:
            response = self.session.post(
                f"{self.base_url}/chat/completions",
                json=payload,
                timeout=30
            )
            response.raise_for_status()
            data = response.json()
            
            # Parse kết quả
            content = data['choices'][0]['message']['content']
            # Xử lý logic để lấy dữ liệu thực tế
            return self._parse_klines(content)
            
        except requests.exceptions.RequestException as e:
            print(f"Lỗi kết nối API: {e}")
            return pd.DataFrame()
    
    def fetch_trade_ticks(
        self,
        symbol: str,
        from_id: Optional[int] = None,
        limit: int = 1000
    ) -> pd.DataFrame:
        """Thu thập chi tiết từng giao dịch"""
        endpoint = f"{self.base_url}/trades"
        
        params = {
            "symbol": symbol,
            "limit": min(limit, 1000),
            "fromId": from_id
        }
        
        try:
            response = self.session.get(endpoint, params=params)
            response.raise_for_status()
            
            trades = response.json()
            df = pd.DataFrame(trades)
            
            if not df.empty:
                df['timestamp'] = pd.to_datetime(df['time'], unit='ms')
                df['price'] = df['price'].astype(float)
                df['qty'] = df['qty'].astype(float)
                df['quote_qty'] = df['quoteQty'].astype(float)
                
            return df
            
        except Exception as e:
            print(f"Lỗi lấy trade ticks: {e}")
            return pd.DataFrame()
    
    def _parse_klines(self, content: str) -> pd.DataFrame:
        """Parse dữ liệu kline từ response"""
        import json
        import re
        
        # Tìm và parse JSON array trong content
        match = re.search(r'\[.*\]', content, re.DOTALL)
        if match:
            try:
                data = json.loads(match.group())
                df = pd.DataFrame(data, columns=[
                    'open_time', 'open', 'high', 'low', 'close', 'volume',
                    'close_time', 'quote_volume', 'trades', 'taker_buy_base',
                    'taker_buy_quote', 'ignore'
                ])
                return df
            except:
                pass
        return pd.DataFrame()

Sử dụng

etl = CryptoETL(api_key="YOUR_HOLYSHEEP_API_KEY") df_btc = etl.fetch_historical_klines("BTCUSDT", "1h", limit=500)

Bước 2: Làm sạch và xử lý dữ liệu

import numpy as np
from scipy import stats
from typing import Tuple

class DataCleaner:
    """Xử lý làm sạch dữ liệu crypto"""
    
    @staticmethod
    def remove_outliers_zscore(
        df: pd.DataFrame, 
        columns: List[str], 
        threshold: float = 3.0
    ) -> pd.DataFrame:
        """
        Loại bỏ outliers sử dụng Z-score method
        
        Args:
            df: DataFrame đầu vào
            columns: Các cột cần kiểm tra outliers
            threshold: Ngưỡng Z-score (mặc định 3.0 = 99.7% confidence)
        """
        df_clean = df.copy()
        
        for col in columns:
            if col in df_clean.columns:
                z_scores = np.abs(stats.zscore(df_clean[col].astype(float)))
                mask = z_scores < threshold
                removed = (~mask).sum()
                
                if removed > 0:
                    print(f"Đã loại bỏ {removed} outliers từ cột {col}")
                    df_clean = df_clean[mask]
        
        return df_clean.reset_index(drop=True)
    
    @staticmethod
    def handle_missing_values(
        df: pd.DataFrame,
        strategy: str = "interpolate"
    ) -> pd.DataFrame:
        """
        Xử lý giá trị missing
        
        Strategies:
        - 'drop': Xóa rows có missing
        - 'interpolate': Nội suy tuyến tính
        - 'forward': Forward fill
        - 'backward': Backward fill
        - 'mean': Thay bằng mean
        """
        df_fixed = df.copy()
        
        # Kiểm tra missing values
        missing_count = df_fixed.isnull().sum()
        if missing_count.sum() > 0:
            print("Missing values trước khi xử lý:")
            print(missing_count[missing_count > 0])
        
        numeric_cols = df_fixed.select_dtypes(include=[np.number]).columns
        
        if strategy == "drop":
            df_fixed = df_fixed.dropna()
            
        elif strategy == "interpolate":
            df_fixed[numeric_cols] = df_fixed[numeric_cols].interpolate(
                method='linear',
                limit_direction='both'
            )
            # Fill remaining với forward/backward
            df_fixed = df_fixed.fillna(method='ffill').fillna(method='bfill')
            
        elif strategy == "forward":
            df_fixed = df_fixed.fillna(method='ffill')
            
        elif strategy == "backward":
            df_fixed = df_fixed.fillna(method='bfill')
            
        elif strategy == "mean":
            for col in numeric_cols:
                df_fixed[col] = df_fixed[col].fillna(df_fixed[col].mean())
        
        print(f"Missing values sau khi xử lý: {df_fixed.isnull().sum().sum()}")
        return df_fixed
    
    @staticmethod
    def detect_and_fill_gaps(
        df: pd.DataFrame,
        time_col: str,
        freq: str = "1h"
    ) -> pd.DataFrame:
        """
        Phát hiện và điền các gap trong dữ liệu thời gian
        
        Ví dụ: Dữ liệu 1h nhưng thiếu vài giờ
        """
        df_time = df.copy()
        df_time[time_col] = pd.to_datetime(df_time[time_col])
        df_time = df_time.set_index(time_col)
        
        # Tạo date range hoàn chỉnh
        full_range = pd.date_range(
            start=df_time.index.min(),
            end=df_time.index.max(),
            freq=freq
        )
        
        # Reindex để fill gaps
        df_reindexed = df_time.reindex(full_range)
        
        # Đánh dấu các điểm được fill
        original_idx = set(df_time.index)
        new_idx = set(full_range)
        filled_idx = new_idx - original_idx
        
        if filled_idx:
            print(f"Đã điền {len(filled_idx)} gaps trong dữ liệu")
        
        df_reindexed = df_reindexed.reset_index().rename(
            columns={'index': time_col}
        )
        
        return df_reindexed
    
    @staticmethod
    def validate_data_quality(
        df: pd.DataFrame
    ) -> Tuple[bool, Dict]:
        """
        Kiểm tra chất lượng dữ liệu
        
        Returns:
            (is_valid, issues_dict)
        """
        issues = {}
        
        # 1. Kiểm tra duplicate
        duplicates = df.duplicated().sum()
        if duplicates > 0:
            issues['duplicates'] = duplicates
            
        # 2. Kiểm tra giá trị âm (volume, price phải dương)
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in ['open', 'high', 'low', 'close', 'volume']:
            if col in df.columns:
                negative = (df[col] < 0).sum()
                if negative > 0:
                    issues[f'{col}_negative'] = negative
        
        # 3. Kiểm tra OHLC logic
        if all(col in df.columns for col in ['open', 'high', 'low', 'close']):
            invalid_ohlc = (
                (df['high'] < df['low']) |
                (df['high'] < df['open']) |
                (df['high'] < df['close']) |
                (df['low'] > df['open']) |
                (df['low'] > df['close'])
            ).sum()
            if invalid_ohlc > 0:
                issues['invalid_ohlc'] = invalid_ohlc
        
        # 4. Kiểm tra timestamp trùng lặp
        if 'open_time' in df.columns:
            dup_time = df['open_time'].duplicated().sum()
            if dup_time > 0:
                issues['duplicate_timestamps'] = dup_time
        
        is_valid = len(issues) == 0
        return is_valid, issues

Sử dụng

cleaner = DataCleaner()

Loại bỏ outliers

df_clean = cleaner.remove_outliers_zscore( df_btc, columns=['open', 'high', 'low', 'close', 'volume'] )

Xử lý missing values

df_clean = cleaner.handle_missing_values(df_clean, strategy="interpolate")

Validate chất lượng

is_valid, issues = cleaner.validate_data_quality(df_clean) print(f"Dữ liệu hợp lệ: {is_valid}") if issues: print(f"Các vấn đề: {issues}")

Bước 3: Enrichment và Feature Engineering

import ta  # Technical Analysis Library
from ta.volatility import BollingerBands, AverageTrueRange
from ta.momentum import RSIIndicator, StochasticOscillator
from ta.trend import MACD, SMAIndicator, EMAIndicator

class CryptoFeatureEngineer:
    """Tạo features cho machine learning và phân tích"""
    
    def __init__(self):
        self.lookback_periods = [7, 14, 21, 50, 200]
        
    def add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        """Thêm các chỉ báo kỹ thuật phổ biến"""
        df_feat = df.copy()
        
        # Đảm bảo các cột là float
        for col in ['open', 'high', 'low', 'close', 'volume']:
            if col in df_feat.columns:
                df_feat[col] = df_feat[col].astype(float)
        
        # RSI
        rsi_14 = RSIIndicator(close=df_feat['close'], window=14)
        df_feat['rsi_14'] = rsi_14.rsi()
        
        # MACD
        macd = MACD(close=df_feat['close'])
        df_feat['macd'] = macd.macd()
        df_feat['macd_signal'] = macd.macd_signal()
        df_feat['macd_diff'] = macd.macd_diff()
        
        # Bollinger Bands
        bb = BollingerBands(close=df_feat['close'], window=20, window_dev=2)
        df_feat['bb_high'] = bb.bollinger_hband()
        df_feat['bb_low'] = bb.bollinger_lband()
        df_feat['bb_mid'] = bb.bollinger_mavg()
        df_feat['bb_width'] = (df_feat['bb_high'] - df_feat['bb_low']) / df_feat['bb_mid']
        
        # ATR
        atr = AverageTrueRange(
            high=df_feat['high'],
            low=df_feat['low'],
            close=df_feat['close'],
            window=14
        )
        df_feat['atr'] = atr.average_true_range()
        
        # Moving Averages
        for period in [7, 21, 50, 200]:
            sma = SMAIndicator(close=df_feat['close'], window=period)
            df_feat[f'sma_{period}'] = sma.sma_indicator()
            
            ema = EMAIndicator(close=df_feat['close'], window=period)
            df_feat[f'ema_{period}'] = ema.ema_indicator()
        
        # Price returns
        df_feat['returns'] = df_feat['close'].pct_change()
        df_feat['log_returns'] = np.log(df_feat['close'] / df_feat['close'].shift(1))
        
        # Volatility
        df_feat['volatility_7d'] = df_feat['returns'].rolling(window=7).std()
        df_feat['volatility_14d'] = df_feat['returns'].rolling(window=14).std()
        df_feat['volatility_30d'] = df_feat['returns'].rolling(window=30).std()
        
        # Volume features
        df_feat['volume_sma_7'] = df_feat['volume'].rolling(window=7).mean()
        df_feat['volume_sma_14'] = df_feat['volume'].rolling(window=14).mean()
        df_feat['volume_ratio'] = df_feat['volume'] / df_feat['volume_sma_14']
        
        return df_feat
    
    def add_lagged_features(
        self, 
        df: pd.DataFrame, 
        columns: List[str],
        lags: List[int] = [1, 2, 3, 5, 7]
    ) -> pd.DataFrame:
        """Thêm các lagged features cho time series"""
        df_lagged = df.copy()
        
        for col in columns:
            for lag in lags:
                df_lagged[f'{col}_lag_{lag}'] = df_lagged[col].shift(lag)
        
        return df_lagged
    
    def add_rolling_statistics(
        self,
        df: pd.DataFrame,
        column: str,
        windows: List[int] = [7, 14, 30]
    ) -> pd.DataFrame:
        """Thêm các rolling statistics"""
        df_roll = df.copy()
        
        for window in windows:
            # Mean
            df_roll[f'{column}_mean_{window}'] = df_roll[column].rolling(window).mean()
            # Std
            df_roll[f'{column}_std_{window}'] = df_roll[column].rolling(window).std()
            # Min/Max
            df_roll[f'{column}_min_{window}'] = df_roll[column].rolling(window).min()
            df_roll[f'{column}_max_{window}'] = df_roll[column].rolling(window).max()
        
        return df_roll

Sử dụng

engineer = CryptoFeatureEngineer()

Thêm technical indicators

df_features = engineer.add_technical_indicators(df_clean)

Thêm lagged features

df_features = engineer.add_lagged_features( df_features, columns=['close', 'volume', 'returns'] )

Thêm rolling statistics

df_features = engineer.add_rolling_statistics( df_features, column='close', windows=[7, 14, 30] )

Drop NaN từ lagged/rolling features

df_final = df_features.dropna().reset_index(drop=True) print(f"Kích thước dataset cuối cùng: {df_final.shape}")

Lỗi thường gặp và cách khắc phục

1. Lỗi Rate Limit (429 Too Many Requests)

# VẤN ĐỀ: Bị chặn do gọi API quá nhiều

MÃ LỖI: 429 Client Error: Too Many Requests

GIẢI PHÁP: Implement exponential backoff

import time import functools from ratelimit import limits, sleep_and_retry class RateLimitedClient: """Client với rate limiting thông minh""" def __init__(self, calls: int = 10, period: int = 60): self.calls = calls self.period = period def with_rate_limit(self, func): """Decorator để giới hạn số lần gọi API""" @functools.wraps(func) @sleep_and_retry @limits(calls=self.calls, period=self.period) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper @staticmethod def exponential_backoff( func, max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0 ): """Retry với exponential backoff""" def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: if attempt == max_retries - 1: raise e # Kiểm tra nếu là lỗi rate limit if '429' in str(e) or 'rate limit' in str(e).lower(): delay = min(base_delay * (2 ** attempt), max_delay) print(f"Rate limited. Đợi {delay:.1f}s... (attempt {attempt+1})") time.sleep(delay) else: raise e return wrapper

Sử dụng

@sleep_and_retry @limits(calls=10, period=60) def fetch_with_limit(client, symbol): return client.fetch_historical_klines(symbol)

2. Lỗi Missing Data (Nến thiếu hoặc trùng lặp)

# VẤN ĐỀ: Dữ liệu bị gián đoạn hoặc có timestamp trùng lặp

GIẢI PHÁP: Comprehensive gap detection và resolution

class GapResolver: """Giải quyết các vấn đề gap trong dữ liệu""" @staticmethod def find_gaps(df: pd.DataFrame, time_col: str, freq: str) -> List[Dict]: """ Tìm tất cả các gap trong chuỗi thời gian Returns: List of gap information dictionaries """ df_sorted = df.sort_values(time_col).copy() df_sorted['time_diff'] = df_sorted[time_col].diff() expected_diff = pd.Timedelta(freq) gap_threshold = expected_diff * 1.5 # 50% tolerance gaps = [] for idx, row in df_sorted.iterrows(): if pd.notna(row['time_diff']) and row['time_diff'] > gap_threshold: gaps.append({ 'before_time': df_sorted.loc[idx-1, time_col] if idx > 0 else None, 'gap_start': row[time_col], 'gap_duration': row['time_diff'], 'expected_candles': int(row['time_diff'] / expected_diff) - 1 }) return gaps @staticmethod def handle_duplicate_timestamps(df: pd.DataFrame, time_col: str) -> pd.DataFrame: """ Xử lý timestamp trùng lặp bằng cách: 1. Group by timestamp 2. Lấy giá trị trung bình hoặc lấy record mới nhất """ # Đếm số lượng duplicates dup_count = df[time_col].duplicated().sum() print(f"Phát hiện {dup_count} timestamp trùng lặp") if dup_count > 0: # Chọn cách xử lý: 'mean' hoặc 'last' numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns # Group và aggregate df_resolved = df.groupby(time_col, as_index=False).agg({ col: 'mean' for col in numeric_cols }) return df_resolved return df

Sử dụng

resolver = GapResolver()

Tìm gaps

gaps = resolver.find_gaps(df_clean, 'open_time', '1h') print(f"Tìm thấy {len(gaps)} gaps trong dữ liệu")

Xử lý duplicates

df_resolved = resolver.handle_duplicate_timestamps(df_clean, 'open_time')

3. Lỗi Invalid OHLC (High < Low hoặc giá âm)

# VẤN ĐỀ: Dữ liệu OHLC không hợp lệ từ sàn giao dịch

Nguyên nhân: Lỗi snapshot, flash crash, hoặc dữ liệu từ nguồn không đáng tin cậy

class OHLCValidator: """Validate và sửa dữ liệu OHLC""" @staticmethod def fix_invalid_ohlc(df: pd.DataFrame) -> pd.DataFrame: """ Sửa các OHLC không hợp lệ: - high phải >= open, close, low - low phải <= open, close, high """ df_fixed = df.copy() # Đếm số lượng cần fix invalid_mask = ( (df_fixed['high'] < df_fixed['low']) | (df_fixed['high'] < df_fixed['open']) | (df_fixed['high'] < df_fixed['close']) | (df_fixed['low'] > df_fixed['open']) | (df_fixed['low'] > df_fixed['close']) ) print(f"Số nến cần fix: {invalid_mask.sum()}") # Fix: Đặt high = max(open, close, previous_high) # và low = min(open, close, previous_low) for idx in df_fixed[invalid_mask].index: row = df_fixed.loc[idx] # Tính giá trị hợp lệ high_values = [row['open'], row['close']] low_values = [row['open'], row['close']] # Tham khảo giá trị trước đó nếu có if idx > 0: prev_row = df_fixed.loc[idx-1] high_values.append(prev_row['high'] * 1.01) # +1% buffer low_values.append(prev_row['low'] * 0.99) # -1% buffer df_fixed.loc[idx, 'high'] = max(high_values) df_fixed.loc[idx, 'low'] = min(low_values) return df_fixed @staticmethod def fix_negative_prices(df: pd.DataFrame) -> pd.DataFrame: """Thay thế giá âm bằng giá trị hợp lệ gần nhất""" df_fixed = df.copy() price_cols = ['open', 'high', 'low', 'close'] for col in price_cols: if col in df_fixed.columns: negative_count = (df_fixed[col] < 0).sum() if negative_count > 0: print(f"Cột {col}: {negative_count} giá trị âm") df_fixed[col] = df_fixed[col].apply( lambda x: x if x > 0 else np.nan ) # Forward fill các giá trị NaN df_fixed[col] = df_fixed[col].fillna(method='ffill') return df_fixed

Sử dụng

validator = OHLCValidator()

Fix invalid OHLC

df_fixed = validator.fix_invalid_ohlc(df_clean)

Fix negative prices

df_fixed = validator.fix_negative_prices(df_fixed)

4. Lỗi Timestamp Timezone và Format

# VẤN ĐỀ: Timestamp không nhất quán giữa các sàn

Một số dùng UTC, một số dùng local time

class TimestampNormalizer: """Chuẩn hóa timestamp về UTC""" SUPPORTED_FORMATS = [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%d %H:%M:%S.%f', '%Y/%m/%d %H:%M:%S', ] @staticmethod def parse_timestamp( value: any, unit: str = 'ms' ) -> pd.Timestamp: """ Parse timestamp từ nhiều định dạng khác nhau Args: value: Giá trị timestamp (int, str, datetime) unit: 's' (seconds) hoặc 'ms' (milliseconds) """ if pd.isna(value): return pd.NaT if isinstance(value, (int, float)): # Convert sang milliseconds nếu cần val = int(value) if unit == 's': val = val * 1000 return pd.to_datetime(val, unit='ms', utc=True) if isinstance(value, str): for fmt in TimestampNormalizer.SUPPORTED_FORMATS: try: return pd.to_datetime(value, format=fmt, utc=True) except ValueError: continue # Thử parse tự động return pd.to_datetime(value, utc=True) if isinstance(value, (datetime, pd.Timestamp)): return pd.Timestamp(value, tz='UTC') return pd.NaT @staticmethod def normalize_dataframe_timestamps( df: pd.DataFrame, time_columns: List[str], unit: str = 'ms' ) -> pd.DataFrame: """Chuẩn hóa tất cả các cột timestamp trong DataFrame""" df_norm = df.copy() for col in time_columns: if col in df_norm.columns: print(f"Đang chuẩn hóa cột: {col}") df_norm[col] = df_norm[col].apply( lambda x: TimestampNormalizer.parse_timestamp(x, unit) ) # Chuyển về UTC và loại bỏ timezone info df_norm[col] = df_norm[col].dt.tz_convert('UTC').dt.tz_localize(None) return df_norm

Sử dụng

normalizer = TimestampNormalizer()

Chuẩn hóa các cột thời gian

df_normalized = normalizer.normalize_dataframe_timestamps( df_fixed, time_columns=['open_time', 'close_time'], unit='ms' )

Lưu trữ và xuất dữ liệu

from sqlalchemy import create_engine
import pyarrow as pa
import pyarrow.parquet as pq

class DataWarehouse:
    """Lưu trữ dữ liệu đã xử lý"""
    
    def __init__(self, connection_string: str = None):
        if connection_string:
            self.engine = create_engine(connection_string)
        self.blob_storage = {}  # Local storage fallback
        
    def save_to_postgresql(
        self,
        df: pd.DataFrame,
        table_name: str,
        if_exists: str = 'append',
        index: bool = False
    ):
        """Lưu vào PostgreSQL với partitioned tables"""
        df.to_sql(
            name=table_name,
            con=self.engine,
            if_exists=if_exists,
            index=index,
            method='multi',
            chunksize=1000
        )
        print(f"Đã lưu {len(df)} records vào bảng {table_name}")
    
    def save_to_parquet(
        self,
        df: pd.DataFrame,
        filepath: str,
        partition_cols: List[str] = None
    ):
        """Lưu thành Parquet với partitioning"""
        table = pa.Table.from_pandas(df)
        
        pq.write_to_dataset(
            table,
            root_path=filepath,
            partition_cols=partition_cols,
            compression='snappy'
        )
        print(f"Đã lưu Parquet vào {filepath}")
    
    def save_incremental(
        self,
        df: pd.Data