ในโลกของการเทรดคริปโตและการวิเคราะห์ข้อมูลตลาด การจัดการข้อมูลที่มีประสิทธิภาพเป็นหัวใจสำคัญ โดยเฉพาะเมื่อต้องทำงานกับไฟล์ CSV ขนาดใหญ่จาก Binance และ OKX ซึ่งมักมีปัญหาเรื่องรูปแบบข้อมูลที่ไม่ตรงกัน แถวว่างเปล่า หรือประเภทข้อมูลที่ไม่ถูกต้อง

บทความนี้จะพาคุณสร้าง Pipeline สำหรับทำความสะอาดข้อมูล CSV และแปลงเป็น Parquet อย่างมืออาชีพ โดยใช้ Python ร่วมกับ AI Assistant จาก HolySheep AI ที่มีความหน่วงต่ำกว่า 50 มิลลิวินาที ช่วยเขียนโค้ดและแก้ไขปัญหาได้อย่างรวดเร็ว

ทำไมต้องแปลง CSV เป็น Parquet?

ก่อนจะเข้าสู่ขั้นตอนการทำงาน มาดูกันว่าทำไมรูปแบบ Parquet ถึงเหมาะกับการวิเคราะห์ข้อมูลเทรดมากกว่า CSV ธรรมดา

เตรียม Environment และ Dependencies

ขั้นตอนแรก ติดตั้ง Package ที่จำเป็นสำหรับการทำงาน

pip install pandas pyarrow fastparquet python-dotenv requests

สำหรับการใช้งานกับ HolySheep AI ในการช่วยเขียนโค้ด Data Cleaning เราจะใช้ API ดังนี้

import os
import requests

ตั้งค่า HolySheep AI API

BASE_URL = "https://api.holysheep.ai/v1" API_KEY = "YOUR_HOLYSHEEP_API_KEY" # แทนที่ด้วย API Key ของคุณ def call_holysheep(prompt: str) -> str: """เรียกใช้ HolySheep AI สำหรับช่วยสร้างโค้ด Data Cleaning""" headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } payload = { "model": "gpt-4.1", # $8/MTok - เหมาะสำหรับงานเขียนโค้ด "messages": [ {"role": "system", "content": "You are an expert Python data engineer specializing in financial data processing."}, {"role": "user", "content": prompt} ], "temperature": 0.3 } response = requests.post( f"{BASE_URL}/chat/completions", headers=headers, json=payload, timeout=30 ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"]

การดาวน์โหลดและโหลดข้อมูล CSV จาก Exchange

จากประสบการณ์การทำงานจริงกับข้อมูลจาก Binance และ OKX พบว่าแต่ละ Exchange มีรูปแบบ CSV ที่แตกต่างกัน ดังนี้

โครงสร้างข้อมูลจาก Binance

import pandas as pd
from dataclasses import dataclass
from typing import Optional
import os

@dataclass
class ExchangeConfig:
    exchange: str
    date_col: str
    price_cols: list[str]
    volume_col: str
    datetime_format: str

BINANCE_CONFIG = ExchangeConfig(
    exchange="Binance",
    date_col="Date",
    price_cols=["Open", "High", "Low", "Close"],
    volume_col="Volume",
    datetime_format="%Y-%m-%d %H:%M:%S"
)

OKX_CONFIG = ExchangeConfig(
    exchange="OKX",
    date_col="ts",
    price_cols=["open", "high", "low", "close"],
    volume_col="vol",
    datetime_format="%Y-%m-%dD%H:%M:%S:%f"
)

def load_trade_csv(filepath: str, config: ExchangeConfig) -> pd.DataFrame:
    """
    โหลดไฟล์ CSV จาก Exchange โดยตรง
    รองรับทั้ง Binance และ OKX format
    """
    # ตรวจสอบว่าไฟล์มีอยู่จริง
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"ไม่พบไฟล์: {filepath}")
    
    df = pd.read_csv(filepath)
    
    print(f"✅ โหลดข้อมูลสำเร็จ: {len(df):,} rows")
    print(f"📋 Columns: {list(df.columns)}")
    print(f"💾 Memory: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    
    return df

ตัวอย่างการใช้งาน

df_binance = load_trade_csv("data/binance_trades.csv", BINANCE_CONFIG) df_okx = load_trade_csv("data/okx_trades.csv", OKX_CONFIG)

Data Cleaning Pipeline สำหรับข้อมูลเทรด

นี่คือหัวใจหลักของบทความ การสร้าง Pipeline ที่ทำความสะอาดข้อมูลอย่างเป็นระบบ

import pandas as pd
import numpy as np
from datetime import datetime
from typing import Optional

class TradeDataCleaner:
    """คลาสสำหรับทำความสะอาดข้อมูลเทรดจาก Exchange ต่างๆ"""
    
    def __init__(self, df: pd.DataFrame, config: ExchangeConfig):
        self.df = df.copy()
        self.config = config
        self.cleaning_log = []
    
    def remove_duplicates(self) -> 'TradeDataCleaner':
        """ลบแถวที่ซ้ำกัน"""
        before = len(self.df)
        self.df = self.df.drop_duplicates()
        after = len(self.df)
        removed = before - after
        
        if removed > 0:
            self.cleaning_log.append(f"🗑️ ลบแถวซ้ำ: {removed:,} rows")
        
        return self
    
    def handle_missing_values(self, strategy: str = "drop") -> 'TradeDataCleaner':
        """
        จัดการค่าที่หายไป
        strategy: 'drop', 'forward_fill', 'interpolate'
        """
        missing_before = self.df.isnull().sum().sum()
        
        if strategy == "drop":
            self.df = self.df.dropna()
            self.cleaning_log.append(f"🗑️ ลบแถวที่มีค่าหาย: {missing_before:,} cells")
        
        elif strategy == "forward_fill":
            # ใช้สำหรับข้อมูล OHLCV ที่ต้องการความต่อเนื่อง
            for col in self.config.price_cols + [self.config.volume_col]:
                if col in self.df.columns:
                    self.df[col] = self.df[col].fillna(method='ffill')
            self.cleaning_log.append(f"🔧 Forward fill ค่าหาย: {missing_before:,} cells")
        
        return self
    
    def parse_datetime(self, date_column: Optional[str] = None) -> 'TradeDataCleaner':
        """แปลงคอลัมน์วันที่ให้เป็น datetime format"""
        col = date_column or self.config.date_col
        
        if col not in self.df.columns:
            raise ValueError(f"ไม่พบคอลัมน์ '{col}' ในข้อมูล")
        
        # ลบ Timezone ออกถ้ามี
        self.df[col] = self.df[col].astype(str).str.replace('Z', '', regex=False)
        
        # พยายาม parse ด้วย format ที่กำหนด ถ้าไม่ได้ใช้ infer
        try:
            self.df[col] = pd.to_datetime(
                self.df[col], 
                format=self.config.datetime_format,
                errors='coerce'
            )
        except:
            self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
        
        # ลบแถวที่ parse ไม่ได้
        invalid_dates = self.df[col].isnull().sum()
        if invalid_dates > 0:
            self.df = self.df.dropna(subset=[col])
            self.cleaning_log.append(f"⚠️ ลบแถวที่ parse วันที่ไม่ได้: {invalid_dates:,} rows")
        
        # Sort ให้เป็นลำดับเวลา
        self.df = self.df.sort_values(col).reset_index(drop=True)
        
        self.cleaning_log.append(f"✅ แปลงวันที่สำเร็จ: {col} → datetime64")
        
        return self
    
    def standardize_columns(self) -> 'TradeDataCleaner':
        """มาตรฐานชื่อคอลัมน์ให้เป็น lowercase พร้อม underscore"""
        rename_map = {}
        
        for col in self.df.columns:
            new_name = col.lower().strip().replace(' ', '_')
            # Standardize ชื่อคอลัมน์ OHLCV
            if col.lower() in ['open', 'o']:
                new_name = 'open'
            elif col.lower() in ['high', 'h']:
                new_name = 'high'
            elif col.lower() in ['low', 'l']:
                new_name = 'low'
            elif col.lower() in ['close', 'c']:
                new_name = 'close'
            elif col.lower() in ['volume', 'vol', 'v']:
                new_name = 'volume'
            
            rename_map[col] = new_name
        
        self.df = self.df.rename(columns=rename_map)
        self.cleaning_log.append(f"🔄 มาตรฐานคอลัมน์: {len(rename_map)} columns renamed")
        
        return self
    
    def validate_ohlcv(self) -> 'TradeDataCleaner':
        """ตรวจสอบความถูกต้องของข้อมูล OHLCV"""
        ohlcv_cols = ['open', 'high', 'low', 'close', 'volume']
        
        # ตรวจสอบว่ามีคอลัมน์ครบ
        missing_cols = [c for c in ohlcv_cols if c not in self.df.columns]
        if missing_cols:
            self.cleaning_log.append(f"⚠️ คอลัมน์ที่หาย: {missing_cols}")
            return self
        
        # High ต้อง >= Open, Close, Low
        invalid_high = (self.df['high'] < self.df['open']) | \
                       (self.df['high'] < self.df['close']) | \
                       (self.df['high'] < self.df['low'])
        
        # Low ต้อง <= Open, Close, High  
        invalid_low = (self.df['low'] > self.df['open']) | \
                      (self.df['low'] > self.df['close']) | \
                      (self.df['low'] > self.df['high'])
        
        # Volume ต้อง >= 0
        invalid_volume = self.df['volume'] < 0
        
        total_invalid = invalid_high.sum() + invalid_low.sum() + invalid_volume.sum()
        
        if total_invalid > 0:
            self.df = self.df[~invalid_high & ~invalid_low & ~invalid_volume]
            self.cleaning_log.append(f"⚠️ ลบแถวที่ข้อมูล OHLCV ไม่ถูกต้อง: {total_invalid:,} rows")
        
        self.cleaning_log.append(f"✅ ตรวจสอบ OHLCV ผ่าน: {len(self.df):,} rows คงเหลือ")
        
        return self
    
    def add_derived_features(self) -> 'TradeDataCleaner':
        """เพิ่ม Features ที่มีประโยชน์สำหรับการวิเคราะห์"""
        # คำนวณ VWAP
        self.df['vwap'] = (self.df['high'] + self.df['low'] + self.df['close']) / 3
        
        # คำนวณ Spread
        self.df['spread'] = self.df['high'] - self.df['low']
        
        # คำนวณ Body Size
        self.df['body_size'] = abs(self.df['close'] - self.df['open'])
        
        # คำนวณ % Change
        self.df['pct_change'] = self.df['close'].pct_change() * 100
        
        self.cleaning_log.append("📊 เพิ่ม derived features: vwap, spread, body_size, pct_change")
        
        return self
    
    def get_cleaned_data(self) -> pd.DataFrame:
        """ส่งคืน DataFrame ที่ทำความสะอาดแล้ว"""
        return self.df
    
    def print_report(self):
        """พิมพ์รายงานการทำความสะอาด"""
        print("\n" + "="*60)
        print("📋 DATA CLEANING REPORT")
        print("="*60)
        for log in self.cleaning_log:
            print(log)
        print("-"*60)
        print(f"📦 Final Shape: {self.df.shape[0]:,} rows × {self.df.shape[1]} columns")
        print(f"💾 Final Memory: {self.df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
        print("="*60 + "\n")

การแปลงเป็น Parquet

หลังจากทำความสะอาดข้อมูลเรียบร้อยแล้ว ขั้นตอนสุดท้ายคือการแปลงเป็น Parquet format

import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path

def save_to_parquet(
    df: pd.DataFrame, 
    output_path: str, 
    compression: str = "snappy",
    partition_by: Optional[str] = None
) -> str:
    """
    บันทึก DataFrame เป็น Parquet format
    
    Parameters:
        df: DataFrame ที่ต้องการบันทึก
        output_path: ที่อยู่ไฟล์ output
        compression: วิธีการบีบอัด ('snappy', 'gzip', 'brotli', 'none')
        partition_by: คอลัมน์ที่ต้องการ partition (เช่น 'date')
    """
    # สร้าง directory ถ้ายังไม่มี
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    
    # แปลง DataFrame เป็น PyArrow Table
    table = pa.Table.from_pandas(df)
    
    # กำหนด schema ที่ชัดเจน
    schema = table.schema
    
    # เพิ่ม metadata
    metadata = {
        "created_at": datetime.now().isoformat(),
        "source": "Binance/OKX CSV",
        "rows": len(df),
        "columns": len(df.columns),
        "compression": compression
    }
    
    # บันทึกไฟล์
    if partition_by and partition_by in df.columns:
        # Partition by column (เหมาะกับข้อมูลขนาดใหญ่)
        pq.write_to_dataset(
            table,
            root_path=output_path,
            partition_cols=[partition_by],
            compression=compression
        )
        print(f"✅ บันทึก partitioned Parquet ไปที่: {output_path}")
    else:
        # บันทึกไฟล์เดียว
        pq.write_table(
            table, 
            output_path, 
            compression=compression,
            use_dictionary=True,  # บีบอัด string columns
            write_statistics=True  # เพิ่ม statistics สำหรับ query optimization
        )
        print(f"✅ บันทึก Parquet ไปที่: {output_path}")
    
    # แสดงขนาดไฟล์
    original_size = df.memory_usage(deep=True).sum()
    parquet_size = Path(output_path).stat().st_size
    
    print(f"\n📊 COMPARISON:")
    print(f"   Original (CSV in memory): {original_size / 1024**2:.2f} MB")
    print(f"   Parquet (on disk):        {parquet_size / 1024**2:.2f} MB")
    print(f"   Compression Ratio:        {original_size / parquet_size:.1f}x smaller")
    
    return output_path

ตัวอย่างการใช้งานเต็มรูปแบบ

if __name__ == "__main__": # 1. โหลดข้อมูล df = load_trade_csv("data/binance_trades.csv", BINANCE_CONFIG) # 2. ทำความสะอาด cleaner = TradeDataCleaner(df, BINANCE_CONFIG) df_clean = (cleaner .remove_duplicates() .handle_missing_values(strategy="drop") .parse_datetime() .standardize_columns() .validate_ohlcv() .add_derived_features() .get_cleaned_data() ) # 3. พิมพ์รายงาน cleaner.print_report() # 4. บันทึกเป็น Parquet save_to_parquet(df_clean, "data/binance_trades.parquet", compression="snappy")

ใช้ AI ช่วยสร้าง Custom Cleaning Rules

หนึ่งในความสามารถที่น่าสนใจของ HolySheep AI คือการช่วยสร้าง Custom Cleaning Rules ที่เหมาะกับข้อมูลเฉพาะของคุณ

# ตัวอย่างการใช้ HolySheep AI ช่วยสร้าง cleaning logic
def generate_custom_cleaning_rules(df_sample: pd.DataFrame, target_use: str) -> str:
    """
    ใช้ AI ช่วยสร้าง cleaning rules ที่เหมาะกับ use case
    
    Parameters:
        df_sample: ตัวอย่างข้อมูล 5-10 rows
        target_use: วัตถุประสงค์การใช้งาน เช่น 'backtesting', 'machine_learning', 'visualization'
    """
    # สร้าง sample description
    sample_str = df_sample.head(5).to_string()
    
    prompt = f"""
ฉันมีข้อมูลเทรดดังนี้:
{sample_str}

คอลัมน์: {list(df_sample.columns)}
ประเภทข้อมูล: {df_sample.dtypes.to_string()}

วัตถุประสงค์: {target_use}

กรุณาสร้าง Python code สำหรับ DataCleaner class ที่:
1. จัดการ outliers (เช่น price spike ที่ผิดปกติ)
2. กรองข้อมูลที่ไม่เกี่ยวข้อง
3. เพิ่ม derived features ที่เหมาะสมกับ {target_use}

ให้คืนเป็น complete Python class ที่รันได้ทันที
    """
    
    return call_holysheep(prompt)

ตัวอย่างการใช้งาน

sample_df = load_trade_csv("data/sample.csv", BINANCE_CONFIG) custom_code = generate_custom_cleaning_rules( sample_df, target_use="machine learning feature engineering" ) print(custom_code)

เปรียบเทียบประสิทธิภาพ: CSV vs Parquet

เกณฑ์CSVParquetผู้ชนะ
ขนาดไฟล์ (1M rows)~250 MB~35 MBParquet ✓
เวลาโหลด (Pandas)~12 วินาที~1.5 วินาทีParquet ✓
Query เฉพาะ columnsต้องอ่านทั้งไฟล์อ่านเฉพาะ column ที่ต้องการParquet ✓
Compatibilityทุก toolทุก modern toolเท่ากัน
Schema Enforcementไม่มีมีParquet ✓

ราคาและ ROI

การลงทุนในระบบ Data Pipeline ที่ดีนั้นคุ้มค่าอย่างยิ่ง โดยเฉพาะเมื่อต้องทำงานกับข้อมูลจำนวนมากเป็นประจำ หากคุณใช้ AI ช่วยเขียนโค้ด ค่าใช้จ่ายต่อเดือนจะอยู่ที่ประมาณนี้

โมเดลราคา/MTokใช้งานต่อเดือนค่าใช้จ่ายโดยประมาณ
GPT-4.1$8.00~2 MTok$16
Claude Sonnet 4.5$15.00~1 MTok$15
Gemini 2.5 Flash$2.50~5 MTok$12.50
DeepSeek V3.2$0.42~10 MTok$4.20

ROI ที่คาดหวัง:

เหมาะกับใคร / ไม่เหมาะกับใคร

✅ เหมาะกับ

❌ ไม่เหมาะกับ

ทำไมต้องเลือก HolySheep