Trong thế giới trading và phân tích crypto, dữ liệu lịch sử là vàng. Nhưng việc thu thập và làm sạch dữ liệu từ các sàn giao dịch có thể là cơn ác mộng thực sự. Bài viết này sẽ hướng dẫn bạn xây dựng một pipeline ETL hoàn chỉnh để xử lý dữ liệu crypto một cách chuyên nghiệp.
Mở đầu: Vì sao dữ liệu crypto cần ETL đặc biệt?
Dữ liệu từ các sàn giao dịch crypto có những đặc thù riêng mà các ngành khác không gặp phải. Mỗi giây có thể xảy ra hàng nghìn giao dịch, dữ liệu spread khác nhau giữa các cặp tiền, và các sàn liên tục thay đổi cấu trúc API của họ.
So sánh các phương án thu thập dữ liệu
| Tiêu chí | API sàn trực tiếp | Dịch vụ Relay (CoinGecko, etc) | HolySheep AI |
|---|---|---|---|
| Độ trễ trung bình | 50-200ms | 500ms-3s | <50ms |
| Giới hạn rate limit | 10-120 req/phút | 10-50 req/phút | Không giới hạn |
| Định dạng dữ liệu | Raw, cần xử lý | Đã chuẩn hóa | JSON có cấu trúc rõ ràng |
| Chi phí | Miễn phí (có giới hạn) | $25-$200/tháng | $0.42-$15/MTok |
| Hỗ trợ tiếng Việt | Không | Không | Có, 24/7 |
| Thanh toán | Chỉ crypto | Thẻ quốc tế | WeChat/Alipay/VNPay |
Pipeline ETL Crypto hoàn chỉnh
Kiến trúc tổng quan
+------------------+ +------------------+ +------------------+
| DATA SOURCE | --> | TRANSFORM | --> | DATA WAREHOUSE |
| (Exchange API) | | (Clean/Enrich) | | (PostgreSQL) |
+------------------+ +------------------+ +------------------+
| |
v v
+------------------+ +------------------+
| ERROR HANDLER | | VALIDATION |
+------------------+ +------------------+
| |
v v
+------------------+ +------------------+
| ALERT SYSTEM | | MONITORING |
+------------------+ +------------------+
Bước 1: Kết nối và thu thập dữ liệu
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
from typing import Dict, List, Optional
class CryptoETL:
"""Pipeline ETL cho dữ liệu cryptocurrency"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
})
# Cache để tránh request trùng lặp
self.cache = {}
self.cache_ttl = 300 # 5 phút
def fetch_historical_klines(
self,
symbol: str,
interval: str = "1h",
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = 1000
) -> pd.DataFrame:
"""
Thu thập dữ liệu nến lịch sử từ Binance-style API
Args:
symbol: Cặp tiền, ví dụ: BTCUSDT
interval: Khung thời gian (1m, 5m, 1h, 1d)
start_time: Timestamp bắt đầu (ms)
end_time: Timestamp kết thúc (ms)
limit: Số lượng nến tối đa (1-1000)
"""
# Sử dụng HolySheep AI cho việc gọi API
prompt = f"""Truy vấn dữ liệu kline lịch sử:
Symbol: {symbol}
Interval: {interval}
Start: {start_time or (int(time.time()*1000) - 86400000)}
End: {end_time or int(time.time()*1000)}
Limit: {limit}
Trả về dữ liệu theo format:
[[timestamp, open, high, low, close, volume, close_time, quote_volume], ...]
"""
payload = {
"model": "gpt-4.1",
"messages": [
{"role": "system", "content": "Bạn là API proxy cho Binance."},
{"role": "user", "content": prompt}
],
"temperature": 0.1
}
try:
response = self.session.post(
f"{self.base_url}/chat/completions",
json=payload,
timeout=30
)
response.raise_for_status()
data = response.json()
# Parse kết quả
content = data['choices'][0]['message']['content']
# Xử lý logic để lấy dữ liệu thực tế
return self._parse_klines(content)
except requests.exceptions.RequestException as e:
print(f"Lỗi kết nối API: {e}")
return pd.DataFrame()
def fetch_trade_ticks(
self,
symbol: str,
from_id: Optional[int] = None,
limit: int = 1000
) -> pd.DataFrame:
"""Thu thập chi tiết từng giao dịch"""
endpoint = f"{self.base_url}/trades"
params = {
"symbol": symbol,
"limit": min(limit, 1000),
"fromId": from_id
}
try:
response = self.session.get(endpoint, params=params)
response.raise_for_status()
trades = response.json()
df = pd.DataFrame(trades)
if not df.empty:
df['timestamp'] = pd.to_datetime(df['time'], unit='ms')
df['price'] = df['price'].astype(float)
df['qty'] = df['qty'].astype(float)
df['quote_qty'] = df['quoteQty'].astype(float)
return df
except Exception as e:
print(f"Lỗi lấy trade ticks: {e}")
return pd.DataFrame()
def _parse_klines(self, content: str) -> pd.DataFrame:
"""Parse dữ liệu kline từ response"""
import json
import re
# Tìm và parse JSON array trong content
match = re.search(r'\[.*\]', content, re.DOTALL)
if match:
try:
data = json.loads(match.group())
df = pd.DataFrame(data, columns=[
'open_time', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
return df
except:
pass
return pd.DataFrame()
Sử dụng
etl = CryptoETL(api_key="YOUR_HOLYSHEEP_API_KEY")
df_btc = etl.fetch_historical_klines("BTCUSDT", "1h", limit=500)
Bước 2: Làm sạch và xử lý dữ liệu
import numpy as np
from scipy import stats
from typing import Tuple
class DataCleaner:
"""Xử lý làm sạch dữ liệu crypto"""
@staticmethod
def remove_outliers_zscore(
df: pd.DataFrame,
columns: List[str],
threshold: float = 3.0
) -> pd.DataFrame:
"""
Loại bỏ outliers sử dụng Z-score method
Args:
df: DataFrame đầu vào
columns: Các cột cần kiểm tra outliers
threshold: Ngưỡng Z-score (mặc định 3.0 = 99.7% confidence)
"""
df_clean = df.copy()
for col in columns:
if col in df_clean.columns:
z_scores = np.abs(stats.zscore(df_clean[col].astype(float)))
mask = z_scores < threshold
removed = (~mask).sum()
if removed > 0:
print(f"Đã loại bỏ {removed} outliers từ cột {col}")
df_clean = df_clean[mask]
return df_clean.reset_index(drop=True)
@staticmethod
def handle_missing_values(
df: pd.DataFrame,
strategy: str = "interpolate"
) -> pd.DataFrame:
"""
Xử lý giá trị missing
Strategies:
- 'drop': Xóa rows có missing
- 'interpolate': Nội suy tuyến tính
- 'forward': Forward fill
- 'backward': Backward fill
- 'mean': Thay bằng mean
"""
df_fixed = df.copy()
# Kiểm tra missing values
missing_count = df_fixed.isnull().sum()
if missing_count.sum() > 0:
print("Missing values trước khi xử lý:")
print(missing_count[missing_count > 0])
numeric_cols = df_fixed.select_dtypes(include=[np.number]).columns
if strategy == "drop":
df_fixed = df_fixed.dropna()
elif strategy == "interpolate":
df_fixed[numeric_cols] = df_fixed[numeric_cols].interpolate(
method='linear',
limit_direction='both'
)
# Fill remaining với forward/backward
df_fixed = df_fixed.fillna(method='ffill').fillna(method='bfill')
elif strategy == "forward":
df_fixed = df_fixed.fillna(method='ffill')
elif strategy == "backward":
df_fixed = df_fixed.fillna(method='bfill')
elif strategy == "mean":
for col in numeric_cols:
df_fixed[col] = df_fixed[col].fillna(df_fixed[col].mean())
print(f"Missing values sau khi xử lý: {df_fixed.isnull().sum().sum()}")
return df_fixed
@staticmethod
def detect_and_fill_gaps(
df: pd.DataFrame,
time_col: str,
freq: str = "1h"
) -> pd.DataFrame:
"""
Phát hiện và điền các gap trong dữ liệu thời gian
Ví dụ: Dữ liệu 1h nhưng thiếu vài giờ
"""
df_time = df.copy()
df_time[time_col] = pd.to_datetime(df_time[time_col])
df_time = df_time.set_index(time_col)
# Tạo date range hoàn chỉnh
full_range = pd.date_range(
start=df_time.index.min(),
end=df_time.index.max(),
freq=freq
)
# Reindex để fill gaps
df_reindexed = df_time.reindex(full_range)
# Đánh dấu các điểm được fill
original_idx = set(df_time.index)
new_idx = set(full_range)
filled_idx = new_idx - original_idx
if filled_idx:
print(f"Đã điền {len(filled_idx)} gaps trong dữ liệu")
df_reindexed = df_reindexed.reset_index().rename(
columns={'index': time_col}
)
return df_reindexed
@staticmethod
def validate_data_quality(
df: pd.DataFrame
) -> Tuple[bool, Dict]:
"""
Kiểm tra chất lượng dữ liệu
Returns:
(is_valid, issues_dict)
"""
issues = {}
# 1. Kiểm tra duplicate
duplicates = df.duplicated().sum()
if duplicates > 0:
issues['duplicates'] = duplicates
# 2. Kiểm tra giá trị âm (volume, price phải dương)
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in ['open', 'high', 'low', 'close', 'volume']:
if col in df.columns:
negative = (df[col] < 0).sum()
if negative > 0:
issues[f'{col}_negative'] = negative
# 3. Kiểm tra OHLC logic
if all(col in df.columns for col in ['open', 'high', 'low', 'close']):
invalid_ohlc = (
(df['high'] < df['low']) |
(df['high'] < df['open']) |
(df['high'] < df['close']) |
(df['low'] > df['open']) |
(df['low'] > df['close'])
).sum()
if invalid_ohlc > 0:
issues['invalid_ohlc'] = invalid_ohlc
# 4. Kiểm tra timestamp trùng lặp
if 'open_time' in df.columns:
dup_time = df['open_time'].duplicated().sum()
if dup_time > 0:
issues['duplicate_timestamps'] = dup_time
is_valid = len(issues) == 0
return is_valid, issues
Sử dụng
cleaner = DataCleaner()
Loại bỏ outliers
df_clean = cleaner.remove_outliers_zscore(
df_btc,
columns=['open', 'high', 'low', 'close', 'volume']
)
Xử lý missing values
df_clean = cleaner.handle_missing_values(df_clean, strategy="interpolate")
Validate chất lượng
is_valid, issues = cleaner.validate_data_quality(df_clean)
print(f"Dữ liệu hợp lệ: {is_valid}")
if issues:
print(f"Các vấn đề: {issues}")
Bước 3: Enrichment và Feature Engineering
import ta # Technical Analysis Library
from ta.volatility import BollingerBands, AverageTrueRange
from ta.momentum import RSIIndicator, StochasticOscillator
from ta.trend import MACD, SMAIndicator, EMAIndicator
class CryptoFeatureEngineer:
"""Tạo features cho machine learning và phân tích"""
def __init__(self):
self.lookback_periods = [7, 14, 21, 50, 200]
def add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Thêm các chỉ báo kỹ thuật phổ biến"""
df_feat = df.copy()
# Đảm bảo các cột là float
for col in ['open', 'high', 'low', 'close', 'volume']:
if col in df_feat.columns:
df_feat[col] = df_feat[col].astype(float)
# RSI
rsi_14 = RSIIndicator(close=df_feat['close'], window=14)
df_feat['rsi_14'] = rsi_14.rsi()
# MACD
macd = MACD(close=df_feat['close'])
df_feat['macd'] = macd.macd()
df_feat['macd_signal'] = macd.macd_signal()
df_feat['macd_diff'] = macd.macd_diff()
# Bollinger Bands
bb = BollingerBands(close=df_feat['close'], window=20, window_dev=2)
df_feat['bb_high'] = bb.bollinger_hband()
df_feat['bb_low'] = bb.bollinger_lband()
df_feat['bb_mid'] = bb.bollinger_mavg()
df_feat['bb_width'] = (df_feat['bb_high'] - df_feat['bb_low']) / df_feat['bb_mid']
# ATR
atr = AverageTrueRange(
high=df_feat['high'],
low=df_feat['low'],
close=df_feat['close'],
window=14
)
df_feat['atr'] = atr.average_true_range()
# Moving Averages
for period in [7, 21, 50, 200]:
sma = SMAIndicator(close=df_feat['close'], window=period)
df_feat[f'sma_{period}'] = sma.sma_indicator()
ema = EMAIndicator(close=df_feat['close'], window=period)
df_feat[f'ema_{period}'] = ema.ema_indicator()
# Price returns
df_feat['returns'] = df_feat['close'].pct_change()
df_feat['log_returns'] = np.log(df_feat['close'] / df_feat['close'].shift(1))
# Volatility
df_feat['volatility_7d'] = df_feat['returns'].rolling(window=7).std()
df_feat['volatility_14d'] = df_feat['returns'].rolling(window=14).std()
df_feat['volatility_30d'] = df_feat['returns'].rolling(window=30).std()
# Volume features
df_feat['volume_sma_7'] = df_feat['volume'].rolling(window=7).mean()
df_feat['volume_sma_14'] = df_feat['volume'].rolling(window=14).mean()
df_feat['volume_ratio'] = df_feat['volume'] / df_feat['volume_sma_14']
return df_feat
def add_lagged_features(
self,
df: pd.DataFrame,
columns: List[str],
lags: List[int] = [1, 2, 3, 5, 7]
) -> pd.DataFrame:
"""Thêm các lagged features cho time series"""
df_lagged = df.copy()
for col in columns:
for lag in lags:
df_lagged[f'{col}_lag_{lag}'] = df_lagged[col].shift(lag)
return df_lagged
def add_rolling_statistics(
self,
df: pd.DataFrame,
column: str,
windows: List[int] = [7, 14, 30]
) -> pd.DataFrame:
"""Thêm các rolling statistics"""
df_roll = df.copy()
for window in windows:
# Mean
df_roll[f'{column}_mean_{window}'] = df_roll[column].rolling(window).mean()
# Std
df_roll[f'{column}_std_{window}'] = df_roll[column].rolling(window).std()
# Min/Max
df_roll[f'{column}_min_{window}'] = df_roll[column].rolling(window).min()
df_roll[f'{column}_max_{window}'] = df_roll[column].rolling(window).max()
return df_roll
Sử dụng
engineer = CryptoFeatureEngineer()
Thêm technical indicators
df_features = engineer.add_technical_indicators(df_clean)
Thêm lagged features
df_features = engineer.add_lagged_features(
df_features,
columns=['close', 'volume', 'returns']
)
Thêm rolling statistics
df_features = engineer.add_rolling_statistics(
df_features,
column='close',
windows=[7, 14, 30]
)
Drop NaN từ lagged/rolling features
df_final = df_features.dropna().reset_index(drop=True)
print(f"Kích thước dataset cuối cùng: {df_final.shape}")
Lỗi thường gặp và cách khắc phục
1. Lỗi Rate Limit (429 Too Many Requests)
# VẤN ĐỀ: Bị chặn do gọi API quá nhiều
MÃ LỖI: 429 Client Error: Too Many Requests
GIẢI PHÁP: Implement exponential backoff
import time
import functools
from ratelimit import limits, sleep_and_retry
class RateLimitedClient:
"""Client với rate limiting thông minh"""
def __init__(self, calls: int = 10, period: int = 60):
self.calls = calls
self.period = period
def with_rate_limit(self, func):
"""Decorator để giới hạn số lần gọi API"""
@functools.wraps(func)
@sleep_and_retry
@limits(calls=self.calls, period=self.period)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@staticmethod
def exponential_backoff(
func,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
"""Retry với exponential backoff"""
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
# Kiểm tra nếu là lỗi rate limit
if '429' in str(e) or 'rate limit' in str(e).lower():
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"Rate limited. Đợi {delay:.1f}s... (attempt {attempt+1})")
time.sleep(delay)
else:
raise e
return wrapper
Sử dụng
@sleep_and_retry
@limits(calls=10, period=60)
def fetch_with_limit(client, symbol):
return client.fetch_historical_klines(symbol)
2. Lỗi Missing Data (Nến thiếu hoặc trùng lặp)
# VẤN ĐỀ: Dữ liệu bị gián đoạn hoặc có timestamp trùng lặp
GIẢI PHÁP: Comprehensive gap detection và resolution
class GapResolver:
"""Giải quyết các vấn đề gap trong dữ liệu"""
@staticmethod
def find_gaps(df: pd.DataFrame, time_col: str, freq: str) -> List[Dict]:
"""
Tìm tất cả các gap trong chuỗi thời gian
Returns:
List of gap information dictionaries
"""
df_sorted = df.sort_values(time_col).copy()
df_sorted['time_diff'] = df_sorted[time_col].diff()
expected_diff = pd.Timedelta(freq)
gap_threshold = expected_diff * 1.5 # 50% tolerance
gaps = []
for idx, row in df_sorted.iterrows():
if pd.notna(row['time_diff']) and row['time_diff'] > gap_threshold:
gaps.append({
'before_time': df_sorted.loc[idx-1, time_col] if idx > 0 else None,
'gap_start': row[time_col],
'gap_duration': row['time_diff'],
'expected_candles': int(row['time_diff'] / expected_diff) - 1
})
return gaps
@staticmethod
def handle_duplicate_timestamps(df: pd.DataFrame, time_col: str) -> pd.DataFrame:
"""
Xử lý timestamp trùng lặp bằng cách:
1. Group by timestamp
2. Lấy giá trị trung bình hoặc lấy record mới nhất
"""
# Đếm số lượng duplicates
dup_count = df[time_col].duplicated().sum()
print(f"Phát hiện {dup_count} timestamp trùng lặp")
if dup_count > 0:
# Chọn cách xử lý: 'mean' hoặc 'last'
numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
# Group và aggregate
df_resolved = df.groupby(time_col, as_index=False).agg({
col: 'mean' for col in numeric_cols
})
return df_resolved
return df
Sử dụng
resolver = GapResolver()
Tìm gaps
gaps = resolver.find_gaps(df_clean, 'open_time', '1h')
print(f"Tìm thấy {len(gaps)} gaps trong dữ liệu")
Xử lý duplicates
df_resolved = resolver.handle_duplicate_timestamps(df_clean, 'open_time')
3. Lỗi Invalid OHLC (High < Low hoặc giá âm)
# VẤN ĐỀ: Dữ liệu OHLC không hợp lệ từ sàn giao dịch
Nguyên nhân: Lỗi snapshot, flash crash, hoặc dữ liệu từ nguồn không đáng tin cậy
class OHLCValidator:
"""Validate và sửa dữ liệu OHLC"""
@staticmethod
def fix_invalid_ohlc(df: pd.DataFrame) -> pd.DataFrame:
"""
Sửa các OHLC không hợp lệ:
- high phải >= open, close, low
- low phải <= open, close, high
"""
df_fixed = df.copy()
# Đếm số lượng cần fix
invalid_mask = (
(df_fixed['high'] < df_fixed['low']) |
(df_fixed['high'] < df_fixed['open']) |
(df_fixed['high'] < df_fixed['close']) |
(df_fixed['low'] > df_fixed['open']) |
(df_fixed['low'] > df_fixed['close'])
)
print(f"Số nến cần fix: {invalid_mask.sum()}")
# Fix: Đặt high = max(open, close, previous_high)
# và low = min(open, close, previous_low)
for idx in df_fixed[invalid_mask].index:
row = df_fixed.loc[idx]
# Tính giá trị hợp lệ
high_values = [row['open'], row['close']]
low_values = [row['open'], row['close']]
# Tham khảo giá trị trước đó nếu có
if idx > 0:
prev_row = df_fixed.loc[idx-1]
high_values.append(prev_row['high'] * 1.01) # +1% buffer
low_values.append(prev_row['low'] * 0.99) # -1% buffer
df_fixed.loc[idx, 'high'] = max(high_values)
df_fixed.loc[idx, 'low'] = min(low_values)
return df_fixed
@staticmethod
def fix_negative_prices(df: pd.DataFrame) -> pd.DataFrame:
"""Thay thế giá âm bằng giá trị hợp lệ gần nhất"""
df_fixed = df.copy()
price_cols = ['open', 'high', 'low', 'close']
for col in price_cols:
if col in df_fixed.columns:
negative_count = (df_fixed[col] < 0).sum()
if negative_count > 0:
print(f"Cột {col}: {negative_count} giá trị âm")
df_fixed[col] = df_fixed[col].apply(
lambda x: x if x > 0 else np.nan
)
# Forward fill các giá trị NaN
df_fixed[col] = df_fixed[col].fillna(method='ffill')
return df_fixed
Sử dụng
validator = OHLCValidator()
Fix invalid OHLC
df_fixed = validator.fix_invalid_ohlc(df_clean)
Fix negative prices
df_fixed = validator.fix_negative_prices(df_fixed)
4. Lỗi Timestamp Timezone và Format
# VẤN ĐỀ: Timestamp không nhất quán giữa các sàn
Một số dùng UTC, một số dùng local time
class TimestampNormalizer:
"""Chuẩn hóa timestamp về UTC"""
SUPPORTED_FORMATS = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%d %H:%M:%S.%f',
'%Y/%m/%d %H:%M:%S',
]
@staticmethod
def parse_timestamp(
value: any,
unit: str = 'ms'
) -> pd.Timestamp:
"""
Parse timestamp từ nhiều định dạng khác nhau
Args:
value: Giá trị timestamp (int, str, datetime)
unit: 's' (seconds) hoặc 'ms' (milliseconds)
"""
if pd.isna(value):
return pd.NaT
if isinstance(value, (int, float)):
# Convert sang milliseconds nếu cần
val = int(value)
if unit == 's':
val = val * 1000
return pd.to_datetime(val, unit='ms', utc=True)
if isinstance(value, str):
for fmt in TimestampNormalizer.SUPPORTED_FORMATS:
try:
return pd.to_datetime(value, format=fmt, utc=True)
except ValueError:
continue
# Thử parse tự động
return pd.to_datetime(value, utc=True)
if isinstance(value, (datetime, pd.Timestamp)):
return pd.Timestamp(value, tz='UTC')
return pd.NaT
@staticmethod
def normalize_dataframe_timestamps(
df: pd.DataFrame,
time_columns: List[str],
unit: str = 'ms'
) -> pd.DataFrame:
"""Chuẩn hóa tất cả các cột timestamp trong DataFrame"""
df_norm = df.copy()
for col in time_columns:
if col in df_norm.columns:
print(f"Đang chuẩn hóa cột: {col}")
df_norm[col] = df_norm[col].apply(
lambda x: TimestampNormalizer.parse_timestamp(x, unit)
)
# Chuyển về UTC và loại bỏ timezone info
df_norm[col] = df_norm[col].dt.tz_convert('UTC').dt.tz_localize(None)
return df_norm
Sử dụng
normalizer = TimestampNormalizer()
Chuẩn hóa các cột thời gian
df_normalized = normalizer.normalize_dataframe_timestamps(
df_fixed,
time_columns=['open_time', 'close_time'],
unit='ms'
)
Lưu trữ và xuất dữ liệu
from sqlalchemy import create_engine
import pyarrow as pa
import pyarrow.parquet as pq
class DataWarehouse:
"""Lưu trữ dữ liệu đã xử lý"""
def __init__(self, connection_string: str = None):
if connection_string:
self.engine = create_engine(connection_string)
self.blob_storage = {} # Local storage fallback
def save_to_postgresql(
self,
df: pd.DataFrame,
table_name: str,
if_exists: str = 'append',
index: bool = False
):
"""Lưu vào PostgreSQL với partitioned tables"""
df.to_sql(
name=table_name,
con=self.engine,
if_exists=if_exists,
index=index,
method='multi',
chunksize=1000
)
print(f"Đã lưu {len(df)} records vào bảng {table_name}")
def save_to_parquet(
self,
df: pd.DataFrame,
filepath: str,
partition_cols: List[str] = None
):
"""Lưu thành Parquet với partitioning"""
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path=filepath,
partition_cols=partition_cols,
compression='snappy'
)
print(f"Đã lưu Parquet vào {filepath}")
def save_incremental(
self,
df: pd.Data