ในโลกของการเทรดคริปโตและการวิเคราะห์ข้อมูลตลาด การจัดการข้อมูลที่มีประสิทธิภาพเป็นหัวใจสำคัญ โดยเฉพาะเมื่อต้องทำงานกับไฟล์ CSV ขนาดใหญ่จาก Binance และ OKX ซึ่งมักมีปัญหาเรื่องรูปแบบข้อมูลที่ไม่ตรงกัน แถวว่างเปล่า หรือประเภทข้อมูลที่ไม่ถูกต้อง
บทความนี้จะพาคุณสร้าง Pipeline สำหรับทำความสะอาดข้อมูล CSV และแปลงเป็น Parquet อย่างมืออาชีพ โดยใช้ Python ร่วมกับ AI Assistant จาก HolySheep AI ที่มีความหน่วงต่ำกว่า 50 มิลลิวินาที ช่วยเขียนโค้ดและแก้ไขปัญหาได้อย่างรวดเร็ว
ทำไมต้องแปลง CSV เป็น Parquet?
ก่อนจะเข้าสู่ขั้นตอนการทำงาน มาดูกันว่าทำไมรูปแบบ Parquet ถึงเหมาะกับการวิเคราะห์ข้อมูลเทรดมากกว่า CSV ธรรมดา
- ขนาดไฟล์เล็กลง 50-80% — Parquet ใช้การบีบอัดแบบ Columnar ทำให้ประหยัดพื้นที่จัดเก็บอย่างมาก
- อ่านข้อมูลเร็วขึ้น 10-100 เท่า — เหมาะกับการ Query ข้อมูลจำนวนมากโดยเฉพาะใน Spark หรือ Pandas
- รองรับ Schema ที่ชัดเจน — มี Type enforcement ช่วยลดข้อผิดพลาดจากข้อมูลที่ไม่ตรง format
- Compatible กับ Big Data Tools — Hive, Spark, Presto, DuckDB ล้วนรองรับ Native
เตรียม Environment และ Dependencies
ขั้นตอนแรก ติดตั้ง Package ที่จำเป็นสำหรับการทำงาน
pip install pandas pyarrow fastparquet python-dotenv requests
สำหรับการใช้งานกับ HolySheep AI ในการช่วยเขียนโค้ด Data Cleaning เราจะใช้ API ดังนี้
import os
import requests
ตั้งค่า HolySheep AI API
BASE_URL = "https://api.holysheep.ai/v1"
API_KEY = "YOUR_HOLYSHEEP_API_KEY" # แทนที่ด้วย API Key ของคุณ
def call_holysheep(prompt: str) -> str:
"""เรียกใช้ HolySheep AI สำหรับช่วยสร้างโค้ด Data Cleaning"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4.1", # $8/MTok - เหมาะสำหรับงานเขียนโค้ด
"messages": [
{"role": "system", "content": "You are an expert Python data engineer specializing in financial data processing."},
{"role": "user", "content": prompt}
],
"temperature": 0.3
}
response = requests.post(
f"{BASE_URL}/chat/completions",
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
การดาวน์โหลดและโหลดข้อมูล CSV จาก Exchange
จากประสบการณ์การทำงานจริงกับข้อมูลจาก Binance และ OKX พบว่าแต่ละ Exchange มีรูปแบบ CSV ที่แตกต่างกัน ดังนี้
โครงสร้างข้อมูลจาก Binance
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import os
@dataclass
class ExchangeConfig:
exchange: str
date_col: str
price_cols: list[str]
volume_col: str
datetime_format: str
BINANCE_CONFIG = ExchangeConfig(
exchange="Binance",
date_col="Date",
price_cols=["Open", "High", "Low", "Close"],
volume_col="Volume",
datetime_format="%Y-%m-%d %H:%M:%S"
)
OKX_CONFIG = ExchangeConfig(
exchange="OKX",
date_col="ts",
price_cols=["open", "high", "low", "close"],
volume_col="vol",
datetime_format="%Y-%m-%dD%H:%M:%S:%f"
)
def load_trade_csv(filepath: str, config: ExchangeConfig) -> pd.DataFrame:
"""
โหลดไฟล์ CSV จาก Exchange โดยตรง
รองรับทั้ง Binance และ OKX format
"""
# ตรวจสอบว่าไฟล์มีอยู่จริง
if not os.path.exists(filepath):
raise FileNotFoundError(f"ไม่พบไฟล์: {filepath}")
df = pd.read_csv(filepath)
print(f"✅ โหลดข้อมูลสำเร็จ: {len(df):,} rows")
print(f"📋 Columns: {list(df.columns)}")
print(f"💾 Memory: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
return df
ตัวอย่างการใช้งาน
df_binance = load_trade_csv("data/binance_trades.csv", BINANCE_CONFIG)
df_okx = load_trade_csv("data/okx_trades.csv", OKX_CONFIG)
Data Cleaning Pipeline สำหรับข้อมูลเทรด
นี่คือหัวใจหลักของบทความ การสร้าง Pipeline ที่ทำความสะอาดข้อมูลอย่างเป็นระบบ
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Optional
class TradeDataCleaner:
"""คลาสสำหรับทำความสะอาดข้อมูลเทรดจาก Exchange ต่างๆ"""
def __init__(self, df: pd.DataFrame, config: ExchangeConfig):
self.df = df.copy()
self.config = config
self.cleaning_log = []
def remove_duplicates(self) -> 'TradeDataCleaner':
"""ลบแถวที่ซ้ำกัน"""
before = len(self.df)
self.df = self.df.drop_duplicates()
after = len(self.df)
removed = before - after
if removed > 0:
self.cleaning_log.append(f"🗑️ ลบแถวซ้ำ: {removed:,} rows")
return self
def handle_missing_values(self, strategy: str = "drop") -> 'TradeDataCleaner':
"""
จัดการค่าที่หายไป
strategy: 'drop', 'forward_fill', 'interpolate'
"""
missing_before = self.df.isnull().sum().sum()
if strategy == "drop":
self.df = self.df.dropna()
self.cleaning_log.append(f"🗑️ ลบแถวที่มีค่าหาย: {missing_before:,} cells")
elif strategy == "forward_fill":
# ใช้สำหรับข้อมูล OHLCV ที่ต้องการความต่อเนื่อง
for col in self.config.price_cols + [self.config.volume_col]:
if col in self.df.columns:
self.df[col] = self.df[col].fillna(method='ffill')
self.cleaning_log.append(f"🔧 Forward fill ค่าหาย: {missing_before:,} cells")
return self
def parse_datetime(self, date_column: Optional[str] = None) -> 'TradeDataCleaner':
"""แปลงคอลัมน์วันที่ให้เป็น datetime format"""
col = date_column or self.config.date_col
if col not in self.df.columns:
raise ValueError(f"ไม่พบคอลัมน์ '{col}' ในข้อมูล")
# ลบ Timezone ออกถ้ามี
self.df[col] = self.df[col].astype(str).str.replace('Z', '', regex=False)
# พยายาม parse ด้วย format ที่กำหนด ถ้าไม่ได้ใช้ infer
try:
self.df[col] = pd.to_datetime(
self.df[col],
format=self.config.datetime_format,
errors='coerce'
)
except:
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
# ลบแถวที่ parse ไม่ได้
invalid_dates = self.df[col].isnull().sum()
if invalid_dates > 0:
self.df = self.df.dropna(subset=[col])
self.cleaning_log.append(f"⚠️ ลบแถวที่ parse วันที่ไม่ได้: {invalid_dates:,} rows")
# Sort ให้เป็นลำดับเวลา
self.df = self.df.sort_values(col).reset_index(drop=True)
self.cleaning_log.append(f"✅ แปลงวันที่สำเร็จ: {col} → datetime64")
return self
def standardize_columns(self) -> 'TradeDataCleaner':
"""มาตรฐานชื่อคอลัมน์ให้เป็น lowercase พร้อม underscore"""
rename_map = {}
for col in self.df.columns:
new_name = col.lower().strip().replace(' ', '_')
# Standardize ชื่อคอลัมน์ OHLCV
if col.lower() in ['open', 'o']:
new_name = 'open'
elif col.lower() in ['high', 'h']:
new_name = 'high'
elif col.lower() in ['low', 'l']:
new_name = 'low'
elif col.lower() in ['close', 'c']:
new_name = 'close'
elif col.lower() in ['volume', 'vol', 'v']:
new_name = 'volume'
rename_map[col] = new_name
self.df = self.df.rename(columns=rename_map)
self.cleaning_log.append(f"🔄 มาตรฐานคอลัมน์: {len(rename_map)} columns renamed")
return self
def validate_ohlcv(self) -> 'TradeDataCleaner':
"""ตรวจสอบความถูกต้องของข้อมูล OHLCV"""
ohlcv_cols = ['open', 'high', 'low', 'close', 'volume']
# ตรวจสอบว่ามีคอลัมน์ครบ
missing_cols = [c for c in ohlcv_cols if c not in self.df.columns]
if missing_cols:
self.cleaning_log.append(f"⚠️ คอลัมน์ที่หาย: {missing_cols}")
return self
# High ต้อง >= Open, Close, Low
invalid_high = (self.df['high'] < self.df['open']) | \
(self.df['high'] < self.df['close']) | \
(self.df['high'] < self.df['low'])
# Low ต้อง <= Open, Close, High
invalid_low = (self.df['low'] > self.df['open']) | \
(self.df['low'] > self.df['close']) | \
(self.df['low'] > self.df['high'])
# Volume ต้อง >= 0
invalid_volume = self.df['volume'] < 0
total_invalid = invalid_high.sum() + invalid_low.sum() + invalid_volume.sum()
if total_invalid > 0:
self.df = self.df[~invalid_high & ~invalid_low & ~invalid_volume]
self.cleaning_log.append(f"⚠️ ลบแถวที่ข้อมูล OHLCV ไม่ถูกต้อง: {total_invalid:,} rows")
self.cleaning_log.append(f"✅ ตรวจสอบ OHLCV ผ่าน: {len(self.df):,} rows คงเหลือ")
return self
def add_derived_features(self) -> 'TradeDataCleaner':
"""เพิ่ม Features ที่มีประโยชน์สำหรับการวิเคราะห์"""
# คำนวณ VWAP
self.df['vwap'] = (self.df['high'] + self.df['low'] + self.df['close']) / 3
# คำนวณ Spread
self.df['spread'] = self.df['high'] - self.df['low']
# คำนวณ Body Size
self.df['body_size'] = abs(self.df['close'] - self.df['open'])
# คำนวณ % Change
self.df['pct_change'] = self.df['close'].pct_change() * 100
self.cleaning_log.append("📊 เพิ่ม derived features: vwap, spread, body_size, pct_change")
return self
def get_cleaned_data(self) -> pd.DataFrame:
"""ส่งคืน DataFrame ที่ทำความสะอาดแล้ว"""
return self.df
def print_report(self):
"""พิมพ์รายงานการทำความสะอาด"""
print("\n" + "="*60)
print("📋 DATA CLEANING REPORT")
print("="*60)
for log in self.cleaning_log:
print(log)
print("-"*60)
print(f"📦 Final Shape: {self.df.shape[0]:,} rows × {self.df.shape[1]} columns")
print(f"💾 Final Memory: {self.df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print("="*60 + "\n")
การแปลงเป็น Parquet
หลังจากทำความสะอาดข้อมูลเรียบร้อยแล้ว ขั้นตอนสุดท้ายคือการแปลงเป็น Parquet format
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
def save_to_parquet(
df: pd.DataFrame,
output_path: str,
compression: str = "snappy",
partition_by: Optional[str] = None
) -> str:
"""
บันทึก DataFrame เป็น Parquet format
Parameters:
df: DataFrame ที่ต้องการบันทึก
output_path: ที่อยู่ไฟล์ output
compression: วิธีการบีบอัด ('snappy', 'gzip', 'brotli', 'none')
partition_by: คอลัมน์ที่ต้องการ partition (เช่น 'date')
"""
# สร้าง directory ถ้ายังไม่มี
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# แปลง DataFrame เป็น PyArrow Table
table = pa.Table.from_pandas(df)
# กำหนด schema ที่ชัดเจน
schema = table.schema
# เพิ่ม metadata
metadata = {
"created_at": datetime.now().isoformat(),
"source": "Binance/OKX CSV",
"rows": len(df),
"columns": len(df.columns),
"compression": compression
}
# บันทึกไฟล์
if partition_by and partition_by in df.columns:
# Partition by column (เหมาะกับข้อมูลขนาดใหญ่)
pq.write_to_dataset(
table,
root_path=output_path,
partition_cols=[partition_by],
compression=compression
)
print(f"✅ บันทึก partitioned Parquet ไปที่: {output_path}")
else:
# บันทึกไฟล์เดียว
pq.write_table(
table,
output_path,
compression=compression,
use_dictionary=True, # บีบอัด string columns
write_statistics=True # เพิ่ม statistics สำหรับ query optimization
)
print(f"✅ บันทึก Parquet ไปที่: {output_path}")
# แสดงขนาดไฟล์
original_size = df.memory_usage(deep=True).sum()
parquet_size = Path(output_path).stat().st_size
print(f"\n📊 COMPARISON:")
print(f" Original (CSV in memory): {original_size / 1024**2:.2f} MB")
print(f" Parquet (on disk): {parquet_size / 1024**2:.2f} MB")
print(f" Compression Ratio: {original_size / parquet_size:.1f}x smaller")
return output_path
ตัวอย่างการใช้งานเต็มรูปแบบ
if __name__ == "__main__":
# 1. โหลดข้อมูล
df = load_trade_csv("data/binance_trades.csv", BINANCE_CONFIG)
# 2. ทำความสะอาด
cleaner = TradeDataCleaner(df, BINANCE_CONFIG)
df_clean = (cleaner
.remove_duplicates()
.handle_missing_values(strategy="drop")
.parse_datetime()
.standardize_columns()
.validate_ohlcv()
.add_derived_features()
.get_cleaned_data()
)
# 3. พิมพ์รายงาน
cleaner.print_report()
# 4. บันทึกเป็น Parquet
save_to_parquet(df_clean, "data/binance_trades.parquet", compression="snappy")
ใช้ AI ช่วยสร้าง Custom Cleaning Rules
หนึ่งในความสามารถที่น่าสนใจของ HolySheep AI คือการช่วยสร้าง Custom Cleaning Rules ที่เหมาะกับข้อมูลเฉพาะของคุณ
# ตัวอย่างการใช้ HolySheep AI ช่วยสร้าง cleaning logic
def generate_custom_cleaning_rules(df_sample: pd.DataFrame, target_use: str) -> str:
"""
ใช้ AI ช่วยสร้าง cleaning rules ที่เหมาะกับ use case
Parameters:
df_sample: ตัวอย่างข้อมูล 5-10 rows
target_use: วัตถุประสงค์การใช้งาน เช่น 'backtesting', 'machine_learning', 'visualization'
"""
# สร้าง sample description
sample_str = df_sample.head(5).to_string()
prompt = f"""
ฉันมีข้อมูลเทรดดังนี้:
{sample_str}
คอลัมน์: {list(df_sample.columns)}
ประเภทข้อมูล: {df_sample.dtypes.to_string()}
วัตถุประสงค์: {target_use}
กรุณาสร้าง Python code สำหรับ DataCleaner class ที่:
1. จัดการ outliers (เช่น price spike ที่ผิดปกติ)
2. กรองข้อมูลที่ไม่เกี่ยวข้อง
3. เพิ่ม derived features ที่เหมาะสมกับ {target_use}
ให้คืนเป็น complete Python class ที่รันได้ทันที
"""
return call_holysheep(prompt)
ตัวอย่างการใช้งาน
sample_df = load_trade_csv("data/sample.csv", BINANCE_CONFIG)
custom_code = generate_custom_cleaning_rules(
sample_df,
target_use="machine learning feature engineering"
)
print(custom_code)
เปรียบเทียบประสิทธิภาพ: CSV vs Parquet
| เกณฑ์ | CSV | Parquet | ผู้ชนะ |
|---|---|---|---|
| ขนาดไฟล์ (1M rows) | ~250 MB | ~35 MB | Parquet ✓ |
| เวลาโหลด (Pandas) | ~12 วินาที | ~1.5 วินาที | Parquet ✓ |
| Query เฉพาะ columns | ต้องอ่านทั้งไฟล์ | อ่านเฉพาะ column ที่ต้องการ | Parquet ✓ |
| Compatibility | ทุก tool | ทุก modern tool | เท่ากัน |
| Schema Enforcement | ไม่มี | มี | Parquet ✓ |
ราคาและ ROI
การลงทุนในระบบ Data Pipeline ที่ดีนั้นคุ้มค่าอย่างยิ่ง โดยเฉพาะเมื่อต้องทำงานกับข้อมูลจำนวนมากเป็นประจำ หากคุณใช้ AI ช่วยเขียนโค้ด ค่าใช้จ่ายต่อเดือนจะอยู่ที่ประมาณนี้
| โมเดล | ราคา/MTok | ใช้งานต่อเดือน | ค่าใช้จ่ายโดยประมาณ |
|---|---|---|---|
| GPT-4.1 | $8.00 | ~2 MTok | $16 |
| Claude Sonnet 4.5 | $15.00 | ~1 MTok | $15 |
| Gemini 2.5 Flash | $2.50 | ~5 MTok | $12.50 |
| DeepSeek V3.2 | $0.42 | ~10 MTok | $4.20 |
ROI ที่คาดหวัง:
- ประหยัดเวลา 70%+ — การใช้ AI ช่วยเขียน cleaning logic แทนการเขียนเองใช้เวลาน้อยลงมาก
- ลดข้อผิดพลาด 80%+ — AI ช่วยจัดการ edge cases ที่มนุษย์อาจมองข้าม
- ประหยัดค่า Storage 70%+ — การใช้ Parquet แทน CSV ลดพื้นที่จัดเก็บอย่างมาก
เหมาะกับใคร / ไม่เหมาะกับใคร
✅ เหมาะกับ
- นักพัฒนา Quant Trading — ต้องการ Pipeline ที่รวดเร็วสำหรับ Backtesting
- Data Engineer — ต้องจัดการข้อมูลจากหลาย Exchange
- นักวิเคราะห์ Data — ต้องการ Query ข้อมูลขนาดใหญ่อย่างมีประสิทธิภาพ
- ทีม ML/AI — ต้องเตรียมข้อมูลสำหรับ Training
- ผู้ที่ต้องการลดค่าใช้จ่าย — HolySheep มีอัตรา ¥1=$1 ประหยัดกว่า 85%+
❌ ไม่เหมาะกับ
- ผู้ที่มีข้อมูลน้อยมาก — ไม่คุ้มค่ากับการตั้ง Pipeline
- ผู้ที่ต้องการ Export เป็น Excel — Parquet ไม่เปิดด้วย Excel โดยตรง
- ผู้ที่ไม่ถนัดเขียนโค้ด — ต้องมีพื้นฐาน Python บ้าง
ทำไมต้องเลือก HolySheep
- ค