暗号資産トレードにおいて、高品質な履歴データはあらゆる分析基盤の生命線です。本稿では、取引所APIから取得した生データを分析可能な状態に変換するETL(Extract, Transform, Load)プロセスを、筆者の実務経験を交えながら詳しく解説します。

暗号資産ETLとは何か

ETLとは、Extract(抽出)、Transform(変換)、Load(読み込み)の頭文字です。暗号資産の文脈では、取引所APIからリアルタイムまたは履歴データを抽出し欠損値を処理して正規化し、データウェアハウスに保存する一連のプロセスを意味します。私は以前、月間100GB以上の取引データを扱うプロジェクトで、このプロセスの設計・実装を担当していました。

主流取引所APIのデータ構造と課題

主要な暗号資産取引所(Binance、Coinbase、Krakenなど)のAPIは、それぞれ固有のデータ形式を持っています。以下に私が実際に遭遇した課題と их 解決策を示します。

Binance Klines データの特徴

# Binance API からKlines(ローソク足)データを取得する例
import requests
import pandas as pd
from datetime import datetime

BASE_URL = "https://api.binance.com/api/v3"

def fetch_binance_klines(symbol="BTCUSDT", interval="1h", limit=1000):
    """
    Binanceから履歴ローソク足データを取得
    APIエンドポイント: GET /api/v3/klines
    """
    endpoint = f"{BASE_URL}/klines"
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit
    }
    
    response = requests.get(endpoint, params=params)
    response.raise_for_status()
    
    raw_data = response.json()
    
    # 生データ構造: [open_time, open, high, low, close, volume, close_time, ...]
    df = pd.DataFrame(raw_data, columns=[
        "open_time", "open", "high", "low", "close", "volume",
        "close_time", "quote_volume", "trades", "taker_buy_base",
        "taker_buy_quote", "ignore"
    ])
    
    # データ型変換
    for col in ["open", "high", "low", "close", "volume"]:
        df[col] = df[col].astype(float)
    
    # タイムスタンプをdatetimeに変換
    df["open_time"] = pd.to_datetime(df["open_time"], unit="ms")
    df["close_time"] = pd.to_datetime(df["close_time"], unit="ms")
    
    return df

使用例

df = fetch_binance_klines(symbol="BTCUSDT", interval="1h", limit=1000) print(f"取得データ件数: {len(df)}") print(f"期間: {df['open_time'].min()} ~ {df['open_time'].max()}")

データ清洗の核心処理

import numpy as np
from typing import List, Dict, Tuple
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CryptoDataCleaner:
    """
    暗号資産履歴データの清洗処理クラス
    筆者が実務で使った基盤クラス
    """
    
    def __init__(self, symbol: str, timeframe: str):
        self.symbol = symbol
        self.timeframe = timeframe
        self.issues_log = []
    
    def detect_anomalies(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        異常値検出: 価格が0、未来時刻、極端なボラティリティを検出
        """
        df = df.copy()
        df["anomaly_flags"] = ""
        
        # 1. 価格0または負の値
        zero_price_mask = (df["close"] <= 0) | (df["open"] <= 0)
        df.loc[zero_price_mask, "anomaly_flags"] += "zero_price;"
        self.issues_log.append({
            "type": "zero_price",
            "count": zero_price_mask.sum(),
            "indices": df[zero_price_mask].index.tolist()
        })
        
        # 2. 未来時刻(サーバー時刻より後のタイムスタンプ)
        now = pd.Timestamp.now(tz="UTC")
        future_mask = df["open_time"] > now
        df.loc[future_mask, "anomaly_flags"] += "future_time;"
        self.issues_log.append({
            "type": "future_time", 
            "count": future_mask.sum(),
            "indices": df[future_mask].index.tolist()
        })
        
        # 3. 高値<安値(物理的に不可能)
        invalid_range_mask = df["high"] < df["low"]
        df.loc[invalid_range_mask, "anomaly_flags"] += "invalid_range;"
        self.issues_log.append({
            "type": "invalid_range",
            "count": invalid_range_mask.sum(),
            "indices": df[invalid_range_mask].index.tolist()
        })
        
        # 4. 極端な価格変動(24時間平均の5σ超)
        price_pct_change = df["close"].pct_change().abs()
        mean_change = price_pct_change.mean()
        std_change = price_pct_change.std()
        extreme_mask = price_pct_change > (mean_change + 5 * std_change)
        df.loc[extreme_mask, "anomaly_flags"] += "extreme_volatility;"
        
        logger.info(f"異常値検出完了: {len(df[df['anomaly_flags'] != ''])}件の問題を検出")
        return df
    
    def handle_missing_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        欠損データ処理: 前方補間と外挿の使い分け
        """
        df = df.copy()
        original_len = len(df)
        
        # open_timeでソート
        df = df.sort_values("open_time").reset_index(drop=True)
        
        # 連続性チェック: 期待される時間間隔との差分
        expected_intervals = {
            "1m": pd.Timedelta(minutes=1),
            "5m": pd.Timedelta(minutes=5),
            "15m": pd.Timedelta(minutes=15),
            "1h": pd.Timedelta(hours=1),
            "4h": pd.Timedelta(hours=4),
            "1d": pd.Timedelta(days=1)
        }
        
        expected = expected_intervals.get(self.timeframe, pd.Timedelta(hours=1))
        time_diffs = df["open_time"].diff()
        
        # 欠損しているバーを検出
        missing_mask = time_diffs > expected
        missing_count = missing_mask.sum()
        
        if missing_count > 0:
            logger.warning(f"欠損バー検出: {missing_count}件")
            
            # 挿入すべきタイムスタンプを生成
            complete_index = pd.date_range(
                start=df["open_time"].min(),
                end=df["open_time"].max(),
                freq=expected
            )
            
            # 元のデータに存在しないインデックスを特定
            existing_times = set(df["open_time"])
            missing_times = [t for t in complete_index if t not in existing_times]
            
            # 欠損データフレームを作成(前方補間用のプレースホルダー)
            missing_df = pd.DataFrame({
                "open_time": missing_times,
                "open": np.nan, "high": np.nan, "low": np.nan,
                "close": np.nan, "volume": np.nan,
                "anomaly_flags": "missing_original"
            })
            
            # マージして再ソート
            df = pd.concat([df, missing_df], ignore_index=True)
            df = df.sort_values("open_time").reset_index(drop=True)
            
            # 前方補間(直前の確定バーから値を引き継ぐ)
            price_cols = ["open", "high", "low", "close"]
            df[price_cols] = df