引言:为何你需要这篇教程
作为一名数据工程师,我在过去三年中处理了超过50TB的加密货币交易所数据。在这个过程中,我发现一个令人沮丧的事实:90%的加密货币数据分析项目失败,不是因为缺乏数据,而是因为数据质量太差。
本文将手把手教你完成加密货币历史数据的完整ETL流程(Extract-Extract抽取、Transform转换、Load加载),从连接交易所API开始,到获得干净、可分析的数据集。我会避免复杂的专业术语,用通俗易懂的语言解释每一个步骤。
特别提示:如果你在数据清洗完成后需要进行自然语言分析、情感分析或自动生成交易报告,可以考虑使用 HolySheep AI 的API服务——支持中文,支持微信/支付宝充值,延迟低于50ms,新用户有免费额度。
第一部分:理解ETL和交易所数据
什么是ETL?
ETL是数据处理的三部曲:
- Extract(抽取):从各种来源获取原始数据
- Transform(转换):清洗、整理、格式化数据
- Load(加载):将处理好的数据存储到目标位置
加密货币交易所数据的特殊性
交易所API返回的数据存在以下常见问题:
- 时间戳混乱:不同交易所使用不同的时间格式(Unix时间戳、ISO 8601、本地时间)
- 缺失值:交易量过低的时间段可能被跳过
- 异常值:交易所维护、宕机期间的数据可能为0或异常
- 重复数据:网络问题可能导致重复请求
- 精度问题:价格和数量的小数位数不一致
第二部分:准备工作
需要的工具
我们使用Python来完成整个流程。确保安装以下库:
# 安装所需的Python库
pip install requests pandas python-dotenv ccxt
pip install sqlalchemy psycopg2-binary # 用于数据库存储
pip install numpy python-dateutil
项目目录结构
crypto-etl-project/
├── config/
│ └── settings.py # 配置文件
├── src/
│ ├── extract.py # 数据抽取模块
│ ├── transform.py # 数据转换模块
│ └── load.py # 数据加载模块
├── data/
│ └── raw/ # 原始数据存放
├── processed/ # 处理后的数据
├── logs/ # 日志文件
└── main.py # 主程序入口
第三部分:完整代码实现
步骤1:配置文件设置
# config/settings.py
import os
from dataclasses import dataclass
@dataclass
class ExchangeConfig:
exchange_name: str = "binance"
symbol: str = "BTC/USDT"
timeframe: str = "1h" # 1小时K线
start_date: str = "2024-01-01"
end_date: str = "2024-12-31"
# 数据存储配置
database_url: str = "sqlite:///crypto_data.db"
# API配置(如果有交易所API Key)
api_key: str = os.getenv("EXCHANGE_API_KEY", "")
api_secret: str = os.getenv("EXCHANGE_API_SECRET", "")
HolySheep AI配置 - 用于后续数据分析
HOLYSHEEP_CONFIG = {
"base_url": "https://api.holysheep.ai/v1",
"api_key": os.getenv("HOLYSHEEP_API_KEY", ""), # 可选,用于高级分析
"model": "gpt-4.1" # 或 "claude-sonnet-4.5", "gemini-2.5-flash"
}
config = ExchangeConfig()
步骤2:数据抽取模块
# src/extract.py
import ccxt
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, List
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ExchangeDataExtractor:
"""交易所数据抽取器"""
def __init__(self, exchange_name: str = "binance"):
self.exchange_name = exchange_name
self.exchange = getattr(ccxt, exchange_name)({
'enableRateLimit': True, # 遵守API频率限制
})
logger.info(f"已连接到交易所: {exchange_name}")
def fetch_ohlcv(
self,
symbol: str,
timeframe: str,
start_date: str,
end_date: str
) -> pd.DataFrame:
"""
抽取K线数据(OHLCV)
参数:
symbol: 交易对,如 "BTC/USDT"
timeframe: 时间周期,如 "1h", "4h", "1d"
start_date: 开始日期 "YYYY-MM-DD"
end_date: 结束日期 "YYYY-MM-DD"
"""
# 将日期转换为时间戳
start = self.exchange.parse8601(f"{start_date}T00:00:00Z")
end = self.exchange.parse8601(f"{end_date}T00:00:00Z")
all_ohlcv = []
current = start
logger.info(f"开始抽取 {symbol} 从 {start_date} 到 {end_date}")
while current < end:
try:
# 计算每次请求的结束时间(避免请求过多数据)
fetch_end = min(current + self.exchange.parse_timeframe(timeframe) * 1000 * 2000, end)
ohlcv = self.exchange.fetch_ohlcv(
symbol,
timeframe,
current,
limit=2000
)
if not ohlcv:
break
all_ohlcv.extend(ohlcv)
current = ohlcv[-1][0] + 1 # 从最后一个时间戳继续
logger.info(f"已抽取 {len(all_ohlcv)} 条记录...")
# 遵守频率限制
time.sleep(self.exchange.rateLimit / 1000)
except Exception as e:
logger.error(f"抽取数据时出错: {e}")
time.sleep(5) # 出错后等待5秒重试
continue
# 转换为DataFrame
df = pd.DataFrame(
all_ohlcv,
columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']
)
logger.info(f"抽取完成,共 {len(df)} 条记录")
return df
def validate_data(self, df: pd.DataFrame) -> dict:
"""验证数据完整性"""
validation = {
"total_records": len(df),
"duplicate_count": df.duplicated(subset=['timestamp']).sum(),
"null_count": df.isnull().sum().to_dict(),
"date_range": {
"start": pd.to_datetime(df['timestamp'], unit='ms').min(),
"end": pd.to_datetime(df['timestamp'], unit='ms').max()
}
}
return validation
步骤3:数据转换/清洗模块
# src/transform.py
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class CryptoDataTransformer:
"""加密货币数据转换和清洗器"""
def __init__(self):
self.errors = [] # 记录处理过程中的错误
def clean_ohlcv_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
清洗OHLCV数据
处理步骤:
1. 转换时间戳为标准格式
2. 删除重复记录
3. 填充缺失值
4. 处理异常值
5. 数据类型标准化
"""
df = df.copy()
# === 步骤1:转换时间戳 ===
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df['date'] = df['datetime'].dt.date
df['hour'] = df['datetime'].dt.hour
df['year'] = df['datetime'].dt.year
df['month'] = df['datetime'].dt.month
# === 步骤2:删除重复记录 ===
before_count = len(df)
df = df.drop_duplicates(subset=['timestamp'], keep='first')
after_count = len(df)
if before_count != after_count:
logger.warning(f"删除了 {before_count - after_count} 条重复记录")
# === 步骤3:处理缺失值 ===
df = self._handle_missing_values(df)
# === 步骤4:处理异常值 ===
df = self._handle_outliers(df)
# === 步骤5:数据类型标准化 ===
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
for col in numeric_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# === 步骤6:排序 ===
df = df.sort_values('timestamp').reset_index(drop=True)
return df
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""处理缺失值"""
# 检查并填充时间序列中的缺口
df = df.set_index('datetime')
# 创建完整的时间序列(每小时一条记录)
full_time_range = pd.date_range(
start=df.index.min(),
end=df.index.max(),
freq='1H' # 假设是1小时K线
)
# 重新索引并标记缺失数据
df = df.reindex(full_time_range)
df['is_missing'] = df['open'].isna()
# 前向填充 + 后向填充处理缺失值
# 注意:对于交易数据,volume使用0填充更合理
df['open'] = df['open'].fillna(method='ffill').fillna(method='bfill')
df['high'] = df['high'].fillna(method='ffill').fillna(method='bfill')
df['low'] = df['low'].fillna(method='ffill').fillna(method='bfill')
df['close'] = df['close'].fillna(method='ffill').fillna(method='bfill')
df['volume'] = df['volume'].fillna(0) # 缺失的交易量设为0
# 恢复索引
df = df.reset_index().rename(columns={'index': 'datetime'})
df['timestamp'] = df['datetime'].astype('int64') // 10**6
logger.info(f"检测到 {df['is_missing'].sum()} 条缺失记录并已处理")
return df
def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""处理异常值"""
# 检查价格合理性(价格不能为负)
price_columns = ['open', 'high', 'low', 'close']
for col in price_columns:
invalid_count = (df[col] <= 0).sum()
if invalid_count > 0:
logger.warning(f"{col} 列有 {invalid_count} 个非正值")
df.loc[df[col] <= 0, col] = np.nan
# 检查high >= max(open, close, low)
for idx, row in df.iterrows():
if pd.notna(row['high']):
true_high = max(row['open'], row['close'], row['low'])
if row['high'] < true_high:
df.at[idx, 'high'] = true_high
# 检查low <= min(open, close, high)
for idx, row in df.iterrows():
if pd.notna(row['low']):
true_low = min(row['open'], row['close'], row['high'])
if row['low'] > true_low:
df.at[idx, 'low'] = true_low
# 检查交易量为负数
df.loc[df['volume'] < 0, 'volume'] = 0
return df
def add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""添加技术指标(可选)"""
# 移动平均线
df['ma_7'] = df['close'].rolling(window=7).mean()
df['ma_25'] = df['close'].rolling(window=25).mean()
df['ma_99'] = df['close'].rolling(window=99).mean()
# 波动率
df['volatility_7d'] = df['close'].rolling(window=7).std()
# 价格变化率
df['price_change'] = df['close'].pct_change()
df['price_change_pct'] = df['price_change'] * 100
# 成交量移动平均
df['volume_ma_7'] = df['volume'].rolling(window=7).mean()
return df
def get_data_summary(self, df: pd.DataFrame) -> dict:
"""获取数据摘要统计"""
return {
"总记录数": len(df),
"缺失记录数": df['is_missing'].sum() if 'is_missing' in df.columns else 0,
"时间范围": f"{df['datetime'].min()} 至 {df['datetime'].max()}",
"价格范围": f"{df['close'].min():.2f} - {df['close'].max():.2f}",
"平均交易量": df['volume'].mean(),
"总交易额(估算)": (df['close'] * df['volume']).sum()
}
步骤4:数据加载模块
# src/load.py
import pandas as pd
from sqlalchemy import create_engine, text
from typing import Optional
import logging
import json
logger = logging.getLogger(__name__)
class DataLoader:
"""数据加载器 - 支持多种存储方式"""
def __init__(self, database_url: str = "sqlite:///crypto_data.db"):
self.database_url = database_url
self.engine = create_engine(database_url)
logger.info(f"数据库连接: {database_url}")
def load_to_sql(self, df: pd.DataFrame, table_name: str = "btc_usdt_ohlcv") -> bool:
"""加载数据到SQL数据库"""
try:
# 如果表已存在,先删除
with self.engine.connect() as conn:
conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
conn.commit()
# 写入新数据
df.to_sql(
name=table_name,
con=self.engine,
if_exists='replace',
index=False
)
logger.info(f"成功加载 {len(df)} 条记录到表 {table_name}")
return True
except Exception as e:
logger.error(f"数据库写入失败: {e}")
return False
def load_to_csv(self, df: pd.DataFrame, filepath: str) -> bool:
"""保存为CSV文件"""
try:
df.to_csv(filepath, index=False)
logger.info(f"成功保存 {len(df)} 条记录到 {filepath}")
return True
except Exception as e:
logger.error(f"CSV保存失败: {e}")
return False
def load_to_json(self, df: pd.DataFrame, filepath: str) -> bool:
"""保存为JSON文件"""
try:
# 转换datetime为字符串
df_json = df.copy()
df_json['datetime'] = df_json['datetime'].astype(str)
df_json.to_json(filepath, orient='records', indent=2)
logger.info(f"成功保存 {len(df)} 条记录到 {filepath}")
return True
except Exception as e:
logger.error(f"JSON保存失败: {e}")
return False
步骤5:主程序入口
# main.py
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config.settings import config
from src.extract import ExchangeDataExtractor
from src.transform import CryptoDataTransformer
from src.load import DataLoader
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
"""主程序:执行完整的ETL流程"""
logger.info("=" * 50)
logger.info("开始加密货币历史数据ETL流程")
logger.info("=" * 50)
# === 第1步:抽取数据 ===
logger.info("\n[步骤1/4] 从交易所抽取数据...")
extractor = ExchangeDataExtractor(exchange_name=config.exchange_name)
raw_df = extractor.fetch_ohlcv(
symbol=config.symbol,
timeframe=config.timeframe,
start_date=config.start_date,
end_date=config.end_date
)
# 验证抽取结果
validation = extractor.validate_data(raw_df)
logger.info(f"数据验证结果: {validation}")
# === 第2步:转换/清洗数据 ===
logger.info("\n[步骤2/4] 清洗和转换数据...")
transformer = CryptoDataTransformer()
clean_df = transformer.clean_ohlcv_data(raw_df)
# 添加技术指标
clean_df = transformer.add_technical_indicators(clean_df)
# 打印数据摘要
summary = transformer.get_data_summary(clean_df)
logger.info(f"数据摘要: {summary}")
# === 第3步:加载数据 ===
logger.info("\n[步骤3/4] 保存清洗后的数据...")
loader = DataLoader(database_url=config.database_url)
# 保存到多种格式
os.makedirs("processed", exist_ok=True)
loader.load_to_sql(clean_df, table_name="btc_usdt_hourly")
loader.load_to_csv(clean_df, "processed/btc_usdt_cleaned.csv")
loader.load_to_json(clean_df, "processed/btc_usdt_cleaned.json")
# === 第4步:后续分析(可选)===
logger.info("\n[步骤4/4] 数据准备完成!")
logger.info("\n你可以使用HolySheep AI进行进一步分析:")
logger.info(" - 市场情绪分析")
logger.info(" - 自动生成交易报告")
logger.info(" - 异常检测和预警")
logger.info(f" - 注册地址: https://www.holysheep.ai/register")
return clean_df
if __name__ == "__main__":
result = main()
print(f"\n最终数据预览(前5行):\n{result.head()}")
第四部分:运行程序
快速开始
# 1. 克隆项目并进入目录
cd crypto-etl-project
2. 安装依赖
pip install -r requirements.txt
3. 运行ETL流程
python main.py
4. 查看输出文件
ls -la processed/
预期输出
2025-01-15 10:30:00 - __main__ - INFO - ==================================================
2025-01-15 10:30:00 - __main__ - INFO - 开始加密货币历史数据ETL流程
2025-01-15 10:30:00 - __main__ - INFO - ==================================================
2025-01-15 10:30:01 - src.extract - INFO - 已连接到交易所: binance
2025-01-15 10:30:01 - src.extract - INFO - 开始抽取 BTC/USDT 从 2024-01-01 到 2024-12-31
2025-01-15 10:30:05 - src.extract - INFO - 已抽取 2000 条记录...
2025-01-15 10:30:10 - src.extract - INFO - 已抽取 4000 条记录...
...
2025-01-15 10:35:00 - src.extract - INFO - 抽取完成,共 8760 条记录
2025-01-15 10:35:01 - src.transform - INFO - 检测到 24 条缺失记录并已处理
2025-01-15 10:35:01 - src.load - INFO - 成功加载 8760 条记录到表 btc_usdt_hourly
第五部分:使用HolySheep AI进行高级分析
数据清洗完成后,你可能需要进行更深入的分析。HolySheep AI 提供强大的自然语言处理能力,可以帮助你:
- 生成市场分析报告
- 识别价格异常模式
- 自动总结交易策略
# advanced_analysis.py
import requests
import json
import pandas as pd
HolySheep AI API配置
HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY" # 从 https://www.holysheep.ai/register 获取
BASE_URL = "https://api.holysheep.ai/v1"
def analyze_crypto_trends(df: pd.DataFrame, symbol: str = "BTC/USDT"):
"""
使用HolySheep AI分析加密货币趋势
"""
# 准备数据摘要
data_summary = {
"symbol": symbol,
"period": f"{df['datetime'].min()} 至 {df['datetime'].max()}",
"avg_price": df['close'].mean(),
"max_price": df['close'].max(),
"min_price": df['close'].min(),
"total_volume": df['volume'].sum(),
"volatility": df['close'].std()
}
# 构建分析提示
prompt = f"""请分析以下加密货币数据:
数据摘要:
- 交易对:{data_summary['symbol']}
- 时间范围:{data_summary['period']}
- 平均价格:${data_summary['avg_price']:.2f}
- 最高价格:${data_summary['max_price']:.2f}
- 最低价格:${data_summary['min_price']:.2f}
- 总交易量:{data_summary['total_volume']:.2f}
- 波动率:${data_summary['volatility']:.2f}
请提供:
1. 市场趋势概述
2. 主要发现和建议
3. 潜在风险提示
"""
# 调用HolySheep API
response = requests.post(
f"{BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {HOLYSHEEP_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4.1", # 或 "claude-sonnet-4.5", "gemini-2.5-flash", "deepseek-v3.2"
"messages": [
{"role": "system", "content": "你是一个专业的加密货币分析师。"},
{"role": "user", "content": prompt}
],
"temperature": 0.7
}
)
if response.status_code == 200:
result = response.json()
return result['choices'][0]['message']['content']
else:
return f"分析失败: {response.text}"
使用示例
if __name__ == "__main__":
# 读取清洗后的数据
df = pd.read_csv("processed/btc_usdt_cleaned.csv")
# 进行分析
analysis = analyze_crypto_trends(df)
print("=" * 60)
print("市场分析报告")
print("=" * 60)
print(analysis)
第六部分:常见问题解答
Q1: 没有交易所API Key怎么办?
使用 ccxt 库时,大多数交易所的公开数据(如K线、订单簿)不需要API Key。只有需要交易功能或个人数据时才需要。
Q2: 数据量太大导致内存不足?
可以分批处理数据。在 extract.py 中,每次请求限制在2000条记录,然后在清洗时使用 chunk processing:
# 分批处理大数据集
def process_in_chunks(df, chunk_size=10000):
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
# 处理每个chunk
yield chunk
Q3: 如何处理更多交易所?
ccxt 支持超过100个交易所,只需更改 exchange_name 参数:
# 支持的交易所列表
exchanges = [
"binance", # 币安
"coinbase", # Coinbase
"kraken", # Kraken
"huobi", # 火币
"okx", # OKX
"bybit" # Bybit
]
切换交易所
extractor = ExchangeDataExtractor(exchange_name="huobi")
df = extractor.fetch_ohlcv("BTC/USDT", "1d", "2024-01-01", "2024-12-31")
Q4: 如何自动化定时执行?
# schedule_etl.py
import schedule
import time
from main import main
每天早上8点执行ETL
schedule.every().day.at("08:00").do(main)
while True:
schedule.run_pending()
time.sleep(60)
第七部分:我的实战经验分享
在过去的三年里,我处理过无数加密货币数据集,踩过的坑比这篇文章的字数还多。让我分享几个最宝贵的经验:
第一个教训发生在2023年,当时我处理一个看起来完美的BTC数据集。代码运行没有任何错误,但当我用这些数据训练机器学习模型时,结果完全不符合预期。调查了整整两天才发现问题:交易所API返回的"high"价格偶尔会低于"open"或"close",这是一个数据质量问题。解决方案就是我在 _handle_outliers 函数中实现的验证逻辑——每次都确保 high 是最高价,low 是最低价。
第二个教训是关于频率限制。刚开始我写了一个脚本,想一次性获取所有历史数据。结果?IP被交易所封禁了24小时。从那以后,我始终使用 enableRateLimit=True 和适当的延时。这不仅保护了我的IP,还让我的代码更稳定、更专业。
第三个教训比较特殊:我发现不同时间的K线数据质量差异很大。正常交易时段的数据非常干净,但某些小众交易对在凌晨时段会有大量缺失值。单纯的数据填充可能会误导分析,所以我增加了 is_missing 标记,让分析师知道哪些时间段的数据是"真实"的,哪些是"推测"的。
Häufige Fehler und Lösungen
错误1:Rate Limit 超限导致数据抽取中断
问题描述:程序运行一段时间后报错 "ExchangeError: Rate limit exceeded"
解决方案:在代码中添加适当的延时和重试机制:
import time
from requests.exceptions import RequestException
def fetch_with_retry(exchange, symbol, timeframe, since, max_retries=5):
for attempt in range(max_retries):
try:
data = exchange.fetch_ohlcv(symbol, timeframe, since)
return data
except Exception as e:
wait_time = 2 ** attempt # 指数退避:2, 4, 8, 16, 32秒
print(f"尝试 {attempt + 1} 失败,等待 {wait_time} 秒...")
time.sleep(wait_time)
if "Rate limit" in str(e):
wait_time *= 2 # 遇到限流时额外等待
time.sleep(wait_time)
raise Exception("达到最大重试次数")
错误2:数据类型不匹配导致SQL写入失败
问题描述:报错 "DataBaseError: Could not convert to integer"
解决方案:在写入数据库前确保所有数据类型正确:
def prepare_for_sql(df):
"""准备适合SQL存储的数据类型"""
df = df.copy()
# 转换时间戳为整数
if 'timestamp' in df.columns:
df['timestamp'] = df['timestamp'].astype('int64')
# 确保数值列为浮点数
numeric_cols = ['open', 'high', 'low', 'close', 'volume']
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# 日期列转换为字符串
if 'datetime' in df.columns:
df['datetime'] = df['datetime'].astype(str)
return df
错误3:时区混乱导致数据错位
问题描述:数据显示的时间与实际交易时间不符,差了8小时或正好24小时
解决方案:统一使用UTC时区,并在转换时明确指定:
from datetime import timezone
def convert_to_utc(timestamp_ms, source_tz='Asia/Shanghai'):
"""将任意时区的时间戳转换为UTC"""
import pytz
# 从毫秒创建datetime
dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=pytz.UTC)
# 如果需要转换源时区
if source_tz != 'UTC':
source_tz_obj = pytz.timezone(source_tz)
dt = dt.astimezone(source_tz_obj)
# 统一返回UTC
return dt.astimezone(pytz.UTC)
使用示例
df['utc_datetime'] = df['timestamp'].apply(lambda x: convert_to_utc(x))
错误4:内存不足(OOM)处理大数据集
问题描述:处理多年数据或多个交易对时程序崩溃
解决方案:使用流式处理和分块保存:
def process_large_dataset(symbols, start_date, end_date, batch_size=1000):
"""分批处理大数据集"""
loader = DataLoader()
for symbol in symbols:
logger.info(f"处理 {symbol}...")
extractor = ExchangeDataExtractor()
# 分批获取
batch_num = 0
for batch_df in fetch_in_batches(extractor, symbol, start_date, end_date, batch_size):
batch_num += 1
# 清洗当前批次
transformer = CryptoDataTransformer()
clean_df = transformer.clean_ohlcv_data(batch_df)
# 立即保存到磁盘(释放内存)
loader.load_to_csv(
clean_df,
f"processed/{symbol.replace('/', '_')}_batch_{batch_num}.csv"
)
# 显式清理内存
del batch_df, clean_df
import gc
gc.collect()
logger.info(f"{symbol} 第 {batch_num} 批完成")
总结
本文详细介绍了加密货币历史数据的完整ETL流程,包括:
- 从交易所API抽取OHLCV数据
- 数据清洗(处理缺失值、异常值、重复数据)
- 添加技术指标
- 多种存储格式支持
- 常见错误及解决方案
数据质量是任何分析项目的基础。通过本文的方法,你可以获得干净、可靠的数据集,为后续的价格预测、交易策略研究或市场分析打下坚实基础。
如果你需要进行更高级的自然语言分析或自动生成报告,可以考虑使用 HolySheep AI 的API服务。支持中文,支持微信/支付宝充值,API延迟低于50ms,2025年价格低至 $0.42/MTok。
完整代码已发布在GitHub上,你可以直接克隆并修改配置以适应自己的需求。祝你数据分析愉快!
👉 Registrieren Sie sich bei HolySheep AI — Startguthaben inklusive