暗号資産トレードにおいて、高品質な履歴データはあらゆる分析基盤の生命線です。本稿では、取引所APIから取得した生データを分析可能な状態に変換するETL(Extract, Transform, Load)プロセスを、筆者の実務経験を交えながら詳しく解説します。
暗号資産ETLとは何か
ETLとは、Extract(抽出)、Transform(変換)、Load(読み込み)の頭文字です。暗号資産の文脈では、取引所APIからリアルタイムまたは履歴データを抽出し欠損値を処理して正規化し、データウェアハウスに保存する一連のプロセスを意味します。私は以前、月間100GB以上の取引データを扱うプロジェクトで、このプロセスの設計・実装を担当していました。
主流取引所APIのデータ構造と課題
主要な暗号資産取引所(Binance、Coinbase、Krakenなど)のAPIは、それぞれ固有のデータ形式を持っています。以下に私が実際に遭遇した課題と их 解決策を示します。
Binance Klines データの特徴
# Binance API からKlines(ローソク足)データを取得する例
import requests
import pandas as pd
from datetime import datetime
BASE_URL = "https://api.binance.com/api/v3"
def fetch_binance_klines(symbol="BTCUSDT", interval="1h", limit=1000):
"""
Binanceから履歴ローソク足データを取得
APIエンドポイント: GET /api/v3/klines
"""
endpoint = f"{BASE_URL}/klines"
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
response = requests.get(endpoint, params=params)
response.raise_for_status()
raw_data = response.json()
# 生データ構造: [open_time, open, high, low, close, volume, close_time, ...]
df = pd.DataFrame(raw_data, columns=[
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_volume", "trades", "taker_buy_base",
"taker_buy_quote", "ignore"
])
# データ型変換
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# タイムスタンプをdatetimeに変換
df["open_time"] = pd.to_datetime(df["open_time"], unit="ms")
df["close_time"] = pd.to_datetime(df["close_time"], unit="ms")
return df
使用例
df = fetch_binance_klines(symbol="BTCUSDT", interval="1h", limit=1000)
print(f"取得データ件数: {len(df)}")
print(f"期間: {df['open_time'].min()} ~ {df['open_time'].max()}")
データ清洗の核心処理
import numpy as np
from typing import List, Dict, Tuple
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CryptoDataCleaner:
"""
暗号資産履歴データの清洗処理クラス
筆者が実務で使った基盤クラス
"""
def __init__(self, symbol: str, timeframe: str):
self.symbol = symbol
self.timeframe = timeframe
self.issues_log = []
def detect_anomalies(self, df: pd.DataFrame) -> pd.DataFrame:
"""
異常値検出: 価格が0、未来時刻、極端なボラティリティを検出
"""
df = df.copy()
df["anomaly_flags"] = ""
# 1. 価格0または負の値
zero_price_mask = (df["close"] <= 0) | (df["open"] <= 0)
df.loc[zero_price_mask, "anomaly_flags"] += "zero_price;"
self.issues_log.append({
"type": "zero_price",
"count": zero_price_mask.sum(),
"indices": df[zero_price_mask].index.tolist()
})
# 2. 未来時刻(サーバー時刻より後のタイムスタンプ)
now = pd.Timestamp.now(tz="UTC")
future_mask = df["open_time"] > now
df.loc[future_mask, "anomaly_flags"] += "future_time;"
self.issues_log.append({
"type": "future_time",
"count": future_mask.sum(),
"indices": df[future_mask].index.tolist()
})
# 3. 高値<安値(物理的に不可能)
invalid_range_mask = df["high"] < df["low"]
df.loc[invalid_range_mask, "anomaly_flags"] += "invalid_range;"
self.issues_log.append({
"type": "invalid_range",
"count": invalid_range_mask.sum(),
"indices": df[invalid_range_mask].index.tolist()
})
# 4. 極端な価格変動(24時間平均の5σ超)
price_pct_change = df["close"].pct_change().abs()
mean_change = price_pct_change.mean()
std_change = price_pct_change.std()
extreme_mask = price_pct_change > (mean_change + 5 * std_change)
df.loc[extreme_mask, "anomaly_flags"] += "extreme_volatility;"
logger.info(f"異常値検出完了: {len(df[df['anomaly_flags'] != ''])}件の問題を検出")
return df
def handle_missing_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
欠損データ処理: 前方補間と外挿の使い分け
"""
df = df.copy()
original_len = len(df)
# open_timeでソート
df = df.sort_values("open_time").reset_index(drop=True)
# 連続性チェック: 期待される時間間隔との差分
expected_intervals = {
"1m": pd.Timedelta(minutes=1),
"5m": pd.Timedelta(minutes=5),
"15m": pd.Timedelta(minutes=15),
"1h": pd.Timedelta(hours=1),
"4h": pd.Timedelta(hours=4),
"1d": pd.Timedelta(days=1)
}
expected = expected_intervals.get(self.timeframe, pd.Timedelta(hours=1))
time_diffs = df["open_time"].diff()
# 欠損しているバーを検出
missing_mask = time_diffs > expected
missing_count = missing_mask.sum()
if missing_count > 0:
logger.warning(f"欠損バー検出: {missing_count}件")
# 挿入すべきタイムスタンプを生成
complete_index = pd.date_range(
start=df["open_time"].min(),
end=df["open_time"].max(),
freq=expected
)
# 元のデータに存在しないインデックスを特定
existing_times = set(df["open_time"])
missing_times = [t for t in complete_index if t not in existing_times]
# 欠損データフレームを作成(前方補間用のプレースホルダー)
missing_df = pd.DataFrame({
"open_time": missing_times,
"open": np.nan, "high": np.nan, "low": np.nan,
"close": np.nan, "volume": np.nan,
"anomaly_flags": "missing_original"
})
# マージして再ソート
df = pd.concat([df, missing_df], ignore_index=True)
df = df.sort_values("open_time").reset_index(drop=True)
# 前方補間(直前の確定バーから値を引き継ぐ)
price_cols = ["open", "high", "low", "close"]
df[price_cols] = df