作为一名在量化交易领域摸爬滚打 5 年的工程师,我曾经为获取高质量的加密货币 tick 级数据头疼不已。官方 Tardis API 动辄 $500/月起步,国内访问延迟高达 300-500ms,数据完整性也时常让人担忧。直到我迁移到 HolySheep AI 的 Tardis 中转服务,才发现原来数据获取可以这么简单——价格不到官方的 1/5,延迟降低到 50ms 以内,微信充值即时到账。
本文将手把手教你用 HolySheep Tardis 中转构建一套完整的加密货币 VaR(Value at Risk)风险模型,采用业界主流的历史模拟法,覆盖从数据获取、清洗、计算到回测的全流程。
一、为什么选择 HolySheep 作为 Tardis 数据中转
在我实际对比了官方 API、某猿、某火等中转服务后,HolySheep 的优势非常明显:
- 价格优势:HolySheep Tardis 中转价格仅为官方定价的 15%-20%,以 Binance 永续合约数据为例,官方月费 $299,HolyShepe 同等服务仅需 ¥198/月(约 $27,按 ¥1=$1 汇率计算)
- 超低延迟:国内上海节点实测延迟 <50ms,相比官方 API 300-500ms 的延迟,tick 级数据采集效率提升 6-10 倍
- 支付便捷:支持微信、支付宝直充,实时到账,无需绑定信用卡
- 数据完整:覆盖 Binance/Bybit/OKX/Deribit 四大主流交易所的逐笔成交、Order Book、强平事件、资金费率
- 注册赠额:新用户注册即送 ¥50 免费额度,可体验完整功能
二、VaR 风险模型架构设计
2.1 历史模拟法原理
历史模拟法(Historical Simulation)是计算 VaR 最直观的方法。其核心思想是:用资产过去 N 天的历史收益率分布来估算未来一天的价值分布,然后根据置信水平计算最大损失。
假设我们有过去 252 个交易日的日收益率数据:
r_1, r_2, r_3, ..., r_252
VaR_99% = percentile(r, 1%) # 99%置信度下的单日VaR
ES_99% = mean(r[r <= VaR_99%]) # 条件在险值(Expected Shortfall)
2.2 系统架构图
┌─────────────────────────────────────────────────────────────────┐
│ VaR 风险模型架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ HolySheep │───▶│ Python │───▶│ PostgreSQL │ │
│ │ Tardis API │ │ Data Loader │ │ Time-Series │ │
│ │ (逐笔成交) │ │ (异步并发) │ │ Database │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ VaR 计算引擎 (NumPy/Pandas) │ │
│ │ - 收益率重采样 - 分位数计算 - 蒙特卡洛模拟 - 回测验证 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 风险报告生成 (HTML/Email Alert) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
三、环境准备与依赖安装
# Python 3.9+ 推荐
pip install httpx asyncpg pandas numpy aiohttp scipy
pip install python-dotenv schedule streamlit
数据存储(可选时序数据库)
PostgreSQL + TimescaleDB 扩展
3.1 配置 HolySheep API 密钥
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
HolySheep Tardis 中转 API 配置
TARDIS_BASE_URL = "https://api.holysheep.ai/v1/tardis"
API_KEY = os.getenv("HOLYSHEEP_API_KEY") # YOUR_HOLYSHEEP_API_KEY
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
支持的交易所
EXCHANGES = ["binance", "bybit", "okx", "deribit"]
数据参数
MARKET_TYPE = "perpetual" # 永续合约
SYMBOL = "BTC-USDT-PERPETUAL"
FROM_DATE = "2024-01-01"
TO_DATE = "2025-01-01"
四、Tardis 数据获取模块实现
4.1 基础 HTTP 请求封装
# tardis_client.py
import httpx
import asyncio
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import pandas as pd
import time
class TardisClient:
"""HolySheep Tardis 中转 API 客户端"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1/tardis"):
self.api_key = api_key
self.base_url = base_url
self.client = httpx.AsyncClient(
timeout=60.0,
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
def _get_headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async def get_trades(
self,
exchange: str,
symbol: str,
from_time: Optional[int] = None,
to_time: Optional[int] = None,
limit: int = 1000
) -> List[Dict]:
"""
获取逐笔成交数据
Args:
exchange: 交易所名称 (binance/bybit/okx/deribit)
symbol: 交易对符号
from_time: 开始时间戳(毫秒)
to_time: 结束时间戳(毫秒)
limit: 每页数量
Returns:
成交记录列表
"""
endpoint = f"{self.base_url}/trades"
params = {
"exchange": exchange,
"symbol": symbol,
"limit": limit
}
if from_time:
params["from"] = from_time
if to_time:
params["to"] = to_time
# 实际请求示例:
# GET https://api.holysheep.ai/v1/tardis/trades?exchange=binance&symbol=BTC-USDT-PERPETUAL&limit=1000
response = await self.client.get(endpoint, headers=self._get_headers(), params=params)
response.raise_for_status()
data = response.json()
return data.get("trades", [])
async def get_orderbook_snapshots(
self,
exchange: str,
symbol: str,
from_time: Optional[int] = None,
to_time: Optional[int] = None,
limit: int = 100
) -> List[Dict]:
"""获取 Order Book 快照数据(用于流动性分析)"""
endpoint = f"{self.base_url}/orderbook-snapshots"
params = {
"exchange": exchange,
"symbol": symbol,
"limit": limit
}
if from_time:
params["from"] = from_time
if to_time:
params["to"] = to_time
response = await self.client.get(endpoint, headers=self._get_headers(), params=params)
response.raise_for_status()
return response.json().get("snapshots", [])
async def get_liquidations(
self,
exchange: str,
symbol: str,
from_time: Optional[int] = None,
to_time: Optional[int] = None
) -> List[Dict]:
"""获取强平事件(用于尾部风险分析)"""
endpoint = f"{self.base_url}/liquidations"
params = {"exchange": exchange, "symbol": symbol}
if from_time:
params["from"] = from_time
if to_time:
params["to"] = to_time
response = await self.client.get(endpoint, headers=self._get_headers(), params=params)
response.raise_for_status()
return response.json().get("liquidations", [])
async def close(self):
await self.client.aclose()
使用示例
async def main():
client = TardisClient(api_key="YOUR_HOLYSHEEP_API_KEY")
# 获取最近24小时的BTC成交数据
to_time = int(time.time() * 1000)
from_time = to_time - 24 * 60 * 60 * 1000
trades = await client.get_trades(
exchange="binance",
symbol="BTC-USDT-PERPETUAL",
from_time=from_time,
to_time=to_time
)
print(f"获取到 {len(trades)} 条成交记录")
# 转换为 DataFrame
df = pd.DataFrame(trades)
print(df.head())
await client.close()
if __name__ == "__main__":
asyncio.run(main())
4.2 历史数据批量采集器
# data_collector.py
import asyncio
import pandas as pd
from datetime import datetime, timedelta
from tardis_client import TardisClient
from typing import List
import aiofiles
import json
class VaRDataCollector:
"""VaR 模型专用数据采集器"""
def __init__(self, api_key: str):
self.client = TardisClient(api_key)
self.cache_dir = "./data_cache"
async def collect_daily_trades(
self,
exchange: str,
symbol: str,
start_date: datetime,
end_date: datetime
) -> pd.DataFrame:
"""
采集指定日期范围的逐笔成交数据
自动处理分页和速率限制
"""
all_trades = []
current_time = int(start_date.timestamp() * 1000)
end_ts = int(end_date.timestamp() * 1000)
# 每批请求获取1小时数据,避免单次请求过大
batch_size = 60 * 60 * 1000 # 1小时
while current_time < end_ts:
try:
batch_end = min(current_time + batch_size, end_ts)
trades = await self.client.get_trades(
exchange=exchange,
symbol=symbol,
from_time=current_time,
to_time=batch_end,
limit=10000 # 单次最多10000条
)
all_trades.extend(trades)
current_time = batch_end
# 避免请求过于频繁(50ms 间隔)
await asyncio.sleep(0.05)
if len(all_trades) % 50000 == 0:
print(f"已采集 {len(all_trades)} 条记录...")
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
# 遇到限流,等待后重试
print("触发限流,等待60秒...")
await asyncio.sleep(60)
else:
raise
df = pd.DataFrame(all_trades)
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df['date'] = df['timestamp'].dt.date
return df
async def collect_historical_data_for_var(
self,
exchange: str,
symbol: str,
lookback_days: int = 252
) -> pd.DataFrame:
"""
为 VaR 计算采集足够长的历史数据
默认回溯252个交易日(约一年)
"""
end_date = datetime.now()
start_date = end_date - timedelta(days=lookback_days + 30) # 多取30天缓冲
print(f"开始采集 {exchange} {symbol} 历史数据...")
print(f"时间范围: {start_date} ~ {end_date}")
df = await self.collect_daily_trades(exchange, symbol, start_date, end_date)
if df.empty:
raise ValueError(f"未获取到 {exchange} {symbol} 的历史数据")
print(f"数据采集完成,共 {len(df)} 条记录")
print(f"数据时间范围: {df['timestamp'].min()} ~ {df['timestamp'].max()}")
return df
async def save_to_parquet(self, df: pd.DataFrame, filename: str):
"""保存为 Parquet 格式(高效压缩)"""
path = f"{self.cache_dir}/{filename}"
df.to_parquet(path, compression='snappy')
print(f"数据已保存至 {path}")
运行数据采集
async def run_collection():
collector = VaRDataCollector(api_key="YOUR_HOLYSHEEP_API_KEY")
df = await collector.collect_historical_data_for_var(
exchange="binance",
symbol="BTC-USDT-PERPETUAL",
lookback_days=365 # 采集1年数据用于VaR计算
)
await collector.save_to_parquet(df, "btc_trades_1y.parquet")
await collector.client.close()
return df
if __name__ == "__main__":
df = asyncio.run(run_collection())
print(f"\n数据概览:\n{df.info()}")
print(f"\n前5条数据:\n{df.head()}")
五、VaR 历史模拟法计算引擎
5.1 核心 VaR 计算类
# var_calculator.py
import pandas as pd
import numpy as np
from typing import Dict, Tuple, Optional
from dataclasses import dataclass
from scipy import stats
@dataclass
class VaRResult:
"""VaR 计算结果"""
var_95: float # 95%置信度 VaR
var_99: float # 99%置信度 VaR
es_95: float # 95%置信度 Expected Shortfall
es_99: float # 99%置信度 Expected Shortfall
max_drawdown: float # 历史最大回撤
volatility_annual: float # 年化波动率
sharpe_ratio: float # 夏普比率(假设无风险利率为0)
class VaRHistoricalSimulator:
"""
基于历史模拟法的 VaR 风险计算器
原理:使用资产过去的历史收益率分布来估算未来收益分布
优点:无需假设收益率分布形态,对肥尾和极端事件敏感
缺点:依赖历史数据质量,隐含假设未来与过去相似
"""
def __init__(self, confidence_levels: list = [0.95, 0.99]):
self.confidence_levels = confidence_levels
self.returns = None
self.prices = None
def load_from_trades(self, trades_df: pd.DataFrame, price_col: str = "price"):
"""
从逐笔成交数据计算收益率序列
Args:
trades_df: 包含 timestamp 和 price 列的 DataFrame
price_col: 价格列名
"""
if 'timestamp' not in trades_df.columns:
raise ValueError("数据必须包含 timestamp 列")
# 按时间排序
df = trades_df.sort_values('timestamp').copy()
# 获取每日收盘价(取每日最后一个价格)
df['date'] = df['timestamp'].dt.date
daily_close = df.groupby('date')[price_col].last()
self.prices = daily_close
# 计算对数收益率(更适合金融时间序列)
self.returns = np.log(daily_close / daily_close.shift(1)).dropna()
print(f"收益率序列长度: {len(self.returns)} 个交易日")
print(f"平均日收益率: {self.returns.mean()*100:.4f}%")
print(f"日收益率标准差: {self.returns.std()*100:.4f}%")
def load_from_prices(self, prices: pd.Series):
"""直接加载价格序列"""
self.prices = prices.sort_index()
self.returns = np.log(prices / prices.shift(1)).dropna()
def calculate_var(self) -> VaRResult:
"""
计算 VaR 和 Expected Shortfall
历史模拟法:直接使用历史收益率的分位数
VaR = percentile(returns, 1-alpha)
ES = mean(returns[returns <= VaR])
"""
if self.returns is None:
raise ValueError("请先加载收益率数据")
returns_array = self.returns.values
# 计算各置信水平的 VaR
var_results = {}
es_results = {}
for conf in self.confidence_levels:
alpha = 1 - conf
percentile_idx = int(alpha * 100)
# VaR 是负收益率的分位数(取绝对值表示损失)
var = -np.percentile(returns_array, percentile_idx * len(returns_array) / 100)
# Expected Shortfall:VaR 左侧尾部所有损失的平均值
tail_losses = returns_array[returns_array <= -var]
es = -np.mean(tail_losses) if len(tail_losses) > 0 else var
var_results[conf] = var
es_results[conf] = es
# 计算其他风险指标
max_drawdown = self._calculate_max_drawdown()
volatility_annual = self.returns.std() * np.sqrt(252)
sharpe_ratio = self.returns.mean() / self.returns.std() * np.sqrt(252)
return VaRResult(
var_95=var_results[0.95],
var_99=var_results[0.99],
es_95=es_results[0.95],
es_99=es_results[0.99],
max_drawdown=max_drawdown,
volatility_annual=volatility_annual,
sharpe_ratio=sharpe_ratio
)
def calculate_portfolio_var(
self,
positions: Dict[str, float],
prices: pd.DataFrame,
weights: Optional[Dict[str, float]] = None
) -> VaRResult:
"""
计算投资组合 VaR
Args:
positions: 各资产持仓量 {symbol: quantity}
prices: 各资产价格 DataFrame (index=date, columns=symbols)
weights: 各资产权重(可选,默认按市值计算)
"""
if weights is None:
# 按持仓市值计算权重
latest_prices = prices.iloc[-1]
total_value = sum(pos * latest_prices[sym] for sym, pos in positions.items())
weights = {sym: pos * latest_prices[sym] / total_value for sym, pos in positions.items()}
# 计算各资产日收益率
returns_df = np.log(prices / prices.shift(1)).dropna()
# 计算投资组合日收益率
portfolio_returns = sum(
returns_df[sym] * weights[sym] for sym in positions.keys()
)
self.returns = portfolio_returns
return self.calculate_var()
def _calculate_max_drawdown(self) -> float:
"""计算历史最大回撤"""
cumulative = (1 + self.returns).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
return abs(drawdown.min())
def monte_carlo_var(
self,
n_simulations: int = 10000,
n_days: int = 1,
confidence: float = 0.99
) -> Tuple[float, float]:
"""
蒙特卡洛模拟法 VaR
基于历史收益率分布进行随机抽样,模拟未来可能的收益路径
适用于需要预测多日 VaR 的场景
"""
mean_return = self.returns.mean()
std_return = self.returns.std()
# 随机抽取收益率(Bootstrap)
simulated_returns = np.random.choice(
self.returns.values,
size=(n_simulations, n_days),
replace=True
)
# 累加多日收益率
multi_day_returns = simulated_returns.sum(axis=1)
# 计算 VaR
var = -np.percentile(multi_day_returns, (1 - confidence) * 100)
es = -np.mean(multi_day_returns[multi_day_returns <= -var])
return var, es
def generate_report(self, result: VaRResult, position_value: float = 1000000) -> str:
"""
生成 VaR 风险报告
Args:
position_value: 持仓市值(用于计算绝对损失)
"""
report = f"""
╔══════════════════════════════════════════════════════════════╗
║ VaR 风险评估报告 ║
╠══════════════════════════════════════════════════════════════╣
║ 持仓市值: ¥{position_value:,.2f} ║
╠══════════════════════════════════════════════════════════════╣
║ 历史模拟法 VaR (单日): ║
║ ├─ 95% 置信度: ¥{result.var_95 * position_value:,.2f} ({result.var_95*100:.2f}%) ║
║ └─ 99% 置信度: ¥{result.var_99 * position_value:,.2f} ({result.var_99*100:.2f}%) ║
║ ║
║ 条件在险值 ES (Expected Shortfall): ║
║ ├─ 95% 置信度: ¥{result.es_95 * position_value:,.2f} ({result.es_95*100:.2f}%) ║
║ └─ 99% 置信度: ¥{result.es_99 * position_value:,.2f} ({result.es_99*100:.2f}%) ║
║ ║
║ 其他风险指标: ║
║ ├─ 年化波动率: {result.volatility_annual*100:.2f}% ║
║ ├─ 夏普比率: {result.sharpe_ratio:.2f} ║
║ └─ 历史最大回撤: {result.max_drawdown*100:.2f}% ║
╚══════════════════════════════════════════════════════════════╝
"""
return report
使用示例
if __name__ == "__main__":
# 加载之前采集的数据
df = pd.read_parquet("./data_cache/btc_trades_1y.parquet")
# 初始化 VaR 计算器
var_calc = VaRHistoricalSimulator(confidence_levels=[0.95, 0.99])
# 从逐笔成交计算收益率
var_calc.load_from_trades(df, price_col="price")
# 计算 VaR
result = var_calc.calculate_var()
# 生成报告(假设持仓 ¥1,000,000)
report = var_calc.generate_report(result, position_value=1_000_000)
print(report)
# 蒙特卡洛模拟 99% VaR(持有10天)
mc_var, mc_es = var_calc.monte_carlo_var(
n_simulations=50000,
n_days=10,
confidence=0.99
)
print(f"\n10日 99% VaR (蒙特卡洛): {mc_var*100:.2f}%")
print(f"10日 99% ES (蒙特卡洛): {mc_es*100:.2f}%")
六、实战:构建完整的 VaR 风控系统
# risk_system.py
import asyncio
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from tardis_client import TardisClient
from var_calculator import VaRHistoricalSimulator, VaRResult
import schedule
import time
import json
import smtplib
from email.mime.text import MIMEText
from dataclasses import asdict
class CryptoVaRRiskSystem:
"""
加密货币 VaR 风控系统
功能:
1. 每日自动采集最新市场数据
2. 计算实时 VaR 风险指标
3. 触发阈值时发送告警
4. 生成风险报告
"""
def __init__(self, api_key: str, alert_config: dict = None):
self.client = TardisClient(api_key)
self.var_calculator = VaRHistoricalSimulator()
self.alert_config = alert_config or {
"var_99_threshold": 0.05, # 5% 以上告警
"volatility_threshold": 0.03, # 日波动 3% 以上告警
"drawdown_threshold": 0.15, # 回撤 15% 以上告警
"email_recipients": []
}
self.current_var = None
self.risk_history = []
async def update_market_data(self, exchange: str, symbol: str):
"""更新最新市场数据"""
end_date = datetime.now()
start_date = end_date - timedelta(days=365)
df = await self.client.get_trades(
exchange=exchange,
symbol=symbol,
from_time=int(start_date.timestamp() * 1000),
to_time=int(end_date.timestamp() * 1000)
)
if df:
self.var_calculator.load_from_trades(pd.DataFrame(df), price_col="price")
return True
return False
def calculate_current_risk(self) -> VaRResult:
"""计算当前风险指标"""
self.current_var = self.var_calculator.calculate_var()
# 记录风险历史
self.risk_history.append({
"timestamp": datetime.now().isoformat(),
**asdict(self.current_var)
})
return self.current_var
def check_alerts(self, result: VaRResult, position_value: float):
"""检查是否触发告警条件"""
alerts = []
if result.var_99 > self.alert_config["var_99_threshold"]:
alerts.append({
"level": "HIGH",
"message": f"99% VaR 达到 {result.var_99*100:.2f}%,超过阈值 {self.alert_config['var_99_threshold']*100:.2f}%"
})
if result.volatility_annual > self.alert_config["volatility_threshold"] * np.sqrt(252):
daily_vol = result.volatility_annual / np.sqrt(252)
alerts.append({
"level": "MEDIUM",
"message": f"日波动率达到 {daily_vol*100:.2f}%,市场波动加剧"
})
if result.max_drawdown > self.alert_config["drawdown_threshold"]:
alerts.append({
"level": "HIGH",
"message": f"历史最大回撤 {result.max_drawdown*100:.2f}%,注意风险控制"
})
# 检查极端损失事件
extreme_losses = self.var_calculator.returns[
self.var_calculator.returns < -result.var_99
]
if len(extreme_losses) > 0:
alerts.append({
"level": "INFO",
"message": f"过去1年发生 {len(extreme_losses)} 次超过99% VaR的极端损失事件"
})
return alerts
def send_alert_email(self, alerts: list, result: VaRResult, position_value: float):
"""发送告警邮件"""
if not alerts:
return
subject = "⚠️ 加密货币 VaR 风险告警"
body = f"""
风险告警通知
检测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
当前风险指标:
- 99% VaR: {result.var_99*100:.2f}% (绝对损失 ¥{result.var_99*position_value:,.2f})
- 99% ES: {result.es_99*100:.2f}% (极端情景损失 ¥{result.es_99*position_value:,.2f})
- 年化波动率: {result.volatility_annual*100:.2f}%
- 历史最大回撤: {result.max_drawdown*100:.2f}%
告警详情:
{''.join([f"- [{a['level']}] {a['message']}
" for a in alerts])}
此为系统自动发送,请勿直接回复。
"""
print(f"📧 告警邮件已生成 (包含 {len(alerts)} 条告警)")
# 实际发送时启用以下代码:
# self._send_email(subject, body)
async def run_daily_task(self, exchange: str, symbol: str, position_value: float):
"""每日风险评估任务"""
print(f"\n{'='*60}")
print(f"开始执行风险评估任务 - {datetime.now()}")
print(f"{'='*60}")
# 1. 更新数据
print("📥 正在更新市场数据...")
await self.update_market_data(exchange, symbol)
# 2. 计算 VaR
print("📊 正在计算风险指标...")
result = self.calculate_current_risk()
# 3. 生成报告
report = self.var_calculator.generate_report(result, position_value)
print(report)
# 4. 检查告警
print("\n🔔 检查告警条件...")
alerts = self.check_alerts(result, position_value)
if alerts:
print("发现以下告警:")
for alert in alerts:
print(f" [{alert['level']}] {alert['message']}")
self.send_alert_email(alerts, result, position_value)
else:
print("✅ 当前风险水平正常,无告警")
# 5. 保存风险记录
self.save_risk_history()
return result
def save_risk_history(self, filepath: str = "./risk_history.json"):
"""保存风险历史记录"""
with open(filepath, 'w') as f:
json.dump(self.risk_history, f, indent=2, default=str)
print(f"💾 风险历史已保存至 {filepath}")
async def main():
"""运行示例"""
system = CryptoVaRRiskSystem(
api_key="YOUR_HOLYSHEEP_API_KEY",
alert_config={
"var_99_threshold": 0.05,
"volatility_threshold": 0.03,
"drawdown_threshold": 0.15,
}
)
# 执行单次风险评估
result = await system.run_daily_task(
exchange="binance",
symbol="BTC-USDT-PERPETUAL",
position_value=1_000_000 # ¥100万持仓
)
await system.client.close()
if __name__ == "__main__":
asyncio.run(main())
七、性能优化与生产部署
7.1 异步并发数据采集
# parallel_collector.py
import asyncio
from tardis_client import TardisClient
import pandas as pd
from typing import List, Dict
async def collect_multiple_symbols(
api_key: str,
exchange: str,
symbols: List[str],
days_back: int = 30
):
"""
并发采集多个交易对的数据
利用 asyncio 提升数据采集效率
"""
client = TardisClient(api_key)
# 创建采集任务列表
tasks = []
for symbol in symbols:
task = client.get_trades(
exchange=exchange,
symbol=symbol,
limit=5000
)
tasks.append((symbol, task))
# 并发执行
print(f"开始并发采集 {len(symbols)} 个交易对...")
results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True)
# 整理结果
data_dict = {}
for symbol, result in zip([t[0] for t in tasks], results):
if isinstance(result, Exception):
print(f"❌ {symbol} 数据采集失败: {result}")
else:
print(f"✅ {symbol} 采集到 {len(result)} 条记录")
data_dict[symbol] = pd.DataFrame(result)
await client.close()
return data_dict
async def benchmark_latency(api_key: str):
"""基准测试:测量 HolySheep API 延迟"""
client = TardisClient(api_key)
latencies = []
for i in range(100):
start = asyncio.get_event_loop().time()
try:
await client.get_trades(
exchange="binance",
symbol="BTC-USDT-PERPETUAL",
limit=100
)
latency = (asyncio.get_event_loop().time() - start) * 1000
latencies.append(latency)
except Exception as e:
print(f"请求失败: {e}")
await asyncio.sleep(0.1) # 避免过度请求
await client.close()
latencies = sorted(latencies)
print(f"""
╔════════════════════════════════════╗
║ HolySheep API 延迟基准测试 ║
╠════════════════════════════════════╣
║ 测试次数: {len(latencies):>25} ║
║ 平均延迟: {sum(latencies)/len(latencies):>24.2f}ms ║
║ P50 延迟: {latencies[len(latencies)//2]:>25.2f}ms ║
║ P95 延迟: {latencies[int(len(latencies)*0.95)]:>25.2f