Vous souhaitez développer une stratégie d'arbitrage statistique sur les crypto-monnaies mais vous butez sur la récupération des données historiques complètes ? Ce guide technique vous explique comment obtenir des datasets fiables, comparer les solutions disponibles, et intégrer efficacement ces données dans vos modèles de trading algorithmique.

Auteur : Équipe HolySheep AI — Experts en infrastructures de données pour le trading quantitatif

Tableau Comparatif : HolySheep vs API Officielles vs Services Relais

Critère HolySheep AI API Binance Officielle Services Relais (CCXT)
Latence moyenne <50ms ✓ 80-150ms 200-500ms
Historique disponible 5 ans+ OHLCV 3 ans (limité) Variable selon exchange
Formats supportés JSON, CSV, Parquet JSON uniquement JSON
Taux de change ¥1 = $1 (économie 85%+) Prix officiel USD Variable
Paiement WeChat Pay, Alipay, Carte USD uniquement Limité
Crédits gratuits ✓ Inclus Non Variable
Support webhook ✓ Natif Non Partiel
Prix indicatif DeepSeek V3.2 : $0.42/MTok Gratuit (limité) $20-200/mois

Qu'est-ce que l'Arbitrage Statistique en Crypto-Monnaies ?

L'arbitrage statistique est une technique de trading qui exploite les inefficiences de prix entre différents marchés ou instruments financiers. Contrairement à l'arbitrage déterministe (achat et vente simultanés), l'arbitrage statistique utilise des modèles probabilistes pour identifier des opportunités basées sur des corrélations historiques et des écarts statistiques.

Dans le contexte des crypto-monnaies, cette stratégie implique généralement :

Pourquoi les Données Historiques sont Cruciales

Une stratégie d'arbitrage statistique repose entièrement sur la qualité et la quantité des données historiques disponibles. Voici pourquoi :

Architecture de la Solution HolySheep pour l'Arbitrage Crypto

HolySheep AI propose une infrastructure complète pour récupérer et traiter les données historiques nécessaires à vos stratégies d'arbitrage statistique.

Schéma d'Architecture

+------------------------------------------+
|         Sources de Données                |
|  (Binance, Coinbase, Kraken, FTX)        |
+------------------------------------------+
                    |
                    v
+------------------------------------------+
|       HolySheep API Gateway              |
|    https://api.holysheep.ai/v1           |
|         Latence <50ms                     |
+------------------------------------------+
                    |
          +---------+---------+
          |                   |
          v                   v
+--------------------+ +--------------------+
|  Historical Data   | |  Real-time Stream  |
|  Batch Retrieval   | |  WebSocket Feed    |
+--------------------+ +--------------------+
          |                   |
          +---------+---------+
                    |
                    v
+------------------------------------------+
|      Processing Layer (Python)           |
|   - Pandas, NumPy, Statsmodels          |
|   - Cointegration Analysis              |
|   - Z-Score Calculation                  |
+------------------------------------------+
                    |
                    v
+------------------------------------------+
|      Trading Execution Engine            |
|      (Binance API, FTX API)             |
+------------------------------------------+

Guide d'Implémentation : Récupération des Données OHLCV

Prérequis

Code 1 : Récupération basique des données historiques

#!/usr/bin/env python3
"""
Récupération des données OHLCV historiques pour l'arbitrage statistique
Solution HolySheep AI - https://www.holysheep.ai
"""

import requests
import pandas as pd
from datetime import datetime, timedelta

Configuration HolySheep

BASE_URL = "https://api.holysheep.ai/v1" API_KEY = "YOUR_HOLYSHEEP_API_KEY" def get_historical_ohlcv( symbol: str, interval: str = "1h", start_time: int = None, end_time: int = None, limit: int = 1000 ) -> pd.DataFrame: """ Récupère les données OHLCV historiques depuis HolySheep API. Args: symbol: Symbole de trading (ex: "BTCUSDT") interval: Intervalle de temps ("1m", "5m", "1h", "1d") start_time: Timestamp de début en millisecondes end_time: Timestamp de fin en millisecondes limit: Nombre maximum de bougies (max 1000) Returns: DataFrame pandas avec les données OHLCV """ endpoint = f"{BASE_URL}/historical/ohlcv" headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } params = { "symbol": symbol, "interval": interval, "limit": limit } if start_time: params["start_time"] = start_time if end_time: params["end_time"] = end_time try: response = requests.get(endpoint, headers=headers, params=params, timeout=30) response.raise_for_status() data = response.json() # Transformation en DataFrame df = pd.DataFrame(data["data"], columns=[ "open_time", "open", "high", "low", "close", "volume", "close_time" ]) # Conversion des timestamps df["open_time"] = pd.to_datetime(df["open_time"], unit="ms") df["close_time"] = pd.to_datetime(df["close_time"], unit="ms") # Conversion des types numériques for col in ["open", "high", "low", "close", "volume"]: df[col] = pd.to_numeric(df[col]) return df except requests.exceptions.RequestException as e: print(f"Erreur de connexion: {e}") return pd.DataFrame() def get_multi_pair_data(pairs: list, days: int = 365) -> dict: """ Récupère les données historiques pour plusieurs paires de trading. Optimisé pour les stratégies d'arbitrage multi-paires. Args: pairs: Liste des symboles (ex: ["BTCUSDT", "ETHUSDT", "BNBUSDT"]) days: Nombre de jours d'historique à récupérer Returns: Dict avec les DataFrames pour chaque paire """ end_time = int(datetime.now().timestamp() * 1000) start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000) results = {} for pair in pairs: print(f"Récupération des données pour {pair}...") # Récupération par batches de 1000 bougies batch_size = 1000 all_data = [] current_start = start_time while current_start < end_time: batch = get_historical_ohlcv( symbol=pair, interval="1h", start_time=current_start, end_time=end_time, limit=batch_size ) if batch.empty: break all_data.append(batch) current_start = int(batch["open_time"].max().timestamp() * 1000) + 1 if all_data: results[pair] = pd.concat(all_data, ignore_index=True) print(f" ✓ {len(results[pair])} bougies récupérées") return results

Exemple d'utilisation

if __name__ == "__main__": # Configuration des paires pour arbitrage BTC/ETH PAIRS = ["BTCUSDT", "ETHUSDT", "BNBUSDT"] # Récupération des données (365 jours) data = get_multi_pair_data(PAIRS, days=365) # Sauvegarde en CSV for pair, df in data.items(): filename = f"{pair.lower()}_historical.csv" df.to_csv(filename, index=False) print(f"Sauvegarde: {filename}")

Code 2 : Analyse de Cointégration et Détection d'Opportunités

#!/usr/bin/env python3
"""
Module d'analyse statistique pour l'arbitrage crypto
Implémente: Cointegration, Z-Score, Stationnarité
Solution HolySheep AI - https://www.holysheep.ai
"""

import pandas as pd
import numpy as np
from typing import Tuple, Dict
from statsmodels.tsa.stattools import coint, adfuller
from statsmodels.regression.linear_model import OLS
import warnings
warnings.filterwarnings('ignore')

class ArbitrageAnalyzer:
    """
    Classe pour analyser les opportunités d'arbitrage statistique
    entre paires de crypto-monnaies.
    """
    
    def __init__(self, half_life_window: int = 60):
        self.half_life_window = half_life_window
        self.cointegration_results = {}
        self.spread_statistics = {}
    
    def calculate_spread(
        self, 
        price_a: pd.Series, 
        price_b: pd.Series, 
        hedge_ratio: float = None
    ) -> pd.Series:
        """
        Calcule le spread entre deux séries de prix.
        Utilise la régression OLS pour déterminer le hedge ratio optimal.
        
        Args:
            price_a: Série de prix du premier actif
            price_b: Série de prix du second actif
            hedge_ratio: Ratio de couverture (si None, calculé automatiquement)
        
        Returns:
            Série contenant le spread
        """
        # Alignement des données
        df = pd.DataFrame({"a": price_a, "b": price_b}).dropna()
        
        if hedge_ratio is None:
            # Calcul du hedge ratio via OLS
            model = OLS(df["a"], df["b"]).fit()
            hedge_ratio = model.params["b"]
        
        spread = df["a"] - hedge_ratio * df["b"]
        return spread, hedge_ratio
    
    def test_cointegration(
        self, 
        price_a: pd.Series, 
        price_b: pd.Series
    ) -> Dict:
        """
        Teste la cointégration entre deux séries de prix.
        Retourne les statistiques de Engle-Granger.
        
        Returns:
            Dict avec t-stat, p-value, et conclusion
        """
        df = pd.DataFrame({"a": price_a, "b": price_b}).dropna()
        
        # Test de cointégration
        t_stat, p_value, crit_values = coint(df["a"], df["b"])
        
        return {
            "t_statistic": t_stat,
            "p_value": p_value,
            "critical_values": {
                "1%": crit_values[0],
                "5%": crit_values[1],
                "10%": crit_values[2]
            },
            "is_cointegrated": p_value < 0.05,
            "confidence_level": "99%" if p_value < 0.01 else ("95%" if p_value < 0.05 else "Non significatif")
        }
    
    def calculate_z_score(self, spread: pd.Series, lookback: int = 20) -> pd.Series:
        """
        Calcule le Z-Score du spread.
        Le Z-Score indique combien d'écarts-types le spread est éloigné
        de sa moyenne mobile.
        
        Args:
            spread: Série du spread
            lookback: Fenêtre de calcul de la moyenne et écart-type
        
        Returns:
            Série contenant les Z-Scores
        """
        mean = spread.rolling(window=lookback).mean()
        std = spread.rolling(window=lookback).std()
        
        z_score = (spread - mean) / std
        return z_score
    
    def calculate_half_life(self, spread: pd.Series) -> float:
        """
        Calcule la demi-vie du spread (mean reversion half-life).
        C'est le temps moyen pour que le spread revienne à 50% de sa valeur.
        
        Args:
            spread: Série du spread
        
        Returns:
            Demi-vie en périodes (typiquement heures)
        """
        spread_lag = spread.shift(1)
        delta = spread - spread_lag
        
        # Régression : delta = lambda * spread_lag + epsilon
        df = pd.DataFrame({"delta": delta, "lag": spread_lag}).dropna()
        
        model = OLS(df["delta"], df["lag"]).fit()
        lambda_param = model.params["lag"]
        
        if lambda_param <= 0:
            return float('inf')
        
        half_life = -np.log(2) / lambda_param
        return half_life
    
    def generate_signals(
        self,
        price_a: pd.Series,
        price_b: pd.Series,
        z_entry: float = 2.0,
        z_exit: float = 0.5,
        lookback: int = 20
    ) -> pd.DataFrame:
        """
        Génère les signaux de trading pour la stratégie d'arbitrage.
        
        Args:
            price_a: Prix du premier actif
            price_b: Prix du second actif
            z_entry: Seuil Z-Score pour entrée (ex: 2.0 = 2 écarts-types)
            z_exit: Seuil Z-Score pour sortie
            lookback: Fenêtre de calcul du Z-Score
        
        Returns:
            DataFrame avec les signaux et positions
        """
        spread, hedge_ratio = self.calculate_spread(price_a, price_b)
        z_score = self.calculate_z_score(spread, lookback)
        
        df = pd.DataFrame({
            "price_a": price_a,
            "price_b": price_b,
            "spread": spread,
            "z_score": z_score,
            "hedge_ratio": hedge_ratio
        })
        
        # Génération des signaux
        df["signal"] = 0
        df.loc[df["z_score"] > z_entry, "signal"] = -1  # Short spread
        df.loc[df["z_score"] < -z_entry, "signal"] = 1   # Long spread
        df.loc[abs(df["z_score"]) < z_exit, "signal"] = 0  # Sortie
        
        # Forward-fill des positions
        df["position"] = df["signal"].replace(0, np.nan).ffill().fillna(0)
        
        # Calcul du P&L
        returns_a = df["price_a"].pct_change()
        returns_b = df["price_b"].pct_change()
        
        df["strategy_returns"] = (
            df["position"].shift(1) * 
            (returns_a - hedge_ratio * returns_b)
        )
        
        return df
    
    def analyze_pair(
        self, 
        price_a: pd.Series, 
        price_b: pd.Series,
        pair_name: str = "Pair"
    ) -> Dict:
        """
        Analyse complète d'une paire pour l'arbitrage.
        """
        # Test de cointégration
        coint_result = self.test_cointegration(price_a, price_b)
        
        # Calcul du spread
        spread, hedge_ratio = self.calculate_spread(price_a, price_b)
        
        # Test de stationnarité du spread
        adf_result = adfuller(spread.dropna())
        
        # Demi-vie
        half_life = self.calculate_half_life(spread)
        
        # Z-Score statistics
        z_score = self.calculate_z_score(spread)
        
        return {
            "pair_name": pair_name,
            "hedge_ratio": hedge_ratio,
            "cointegration": coint_result,
            "spread_stationarity": {
                "adf_statistic": adf_result[0],
                "p_value": adf_result[1],
                "is_stationary": adf_result[1] < 0.05
            },
            "half_life_hours": half_life if half_life != float('inf') else None,
            "z_score_stats": {
                "mean": z_score.mean(),
                "std": z_score.std(),
                "min": z_score.min(),
                "max": z_score.max()
            }
        }

Exemple d'utilisation complète

if __name__ == "__main__": # Chargement des données (exemple avec données récupérées) btc_df = pd.read_csv("btcusdt_historical.csv", parse_dates=["open_time"]) eth_df = pd.read_csv("ethusdt_historical.csv", parse_dates=["open_time"]) # Alignement temporel btc_df.set_index("open_time", inplace=True) eth_df.set_index("open_time", inplace=True) # Alignement sur la période commune common_index = btc_df.index.intersection(eth_df.index) btc_prices = btc_df.loc[common_index, "close"] eth_prices = eth_df.loc[common_index, "close"] # Analyse analyzer = ArbitrageAnalyzer() results = analyzer.analyze_pair( btc_prices, eth_prices, pair_name="BTC/ETH" ) print("=" * 50) print(f"Résultat de l'analyse: {results['pair_name']}") print("=" * 50) print(f"Hedge Ratio: {results['hedge_ratio']:.6f}") print(f"Cointégration: {results['cointegration']['is_cointegrated']}") print(f" - P-value: {results['cointegration']['p_value']:.6f}") print(f" - Niveau de confiance: {results['cointegration']['confidence_level']}") print(f"Demi-vie: {results['half_life_hours']:.1f} heures") print(f"Spread stationnaire: {results['spread_stationarity']['is_stationary']}") # Génération des signaux signals = analyzer.generate_signals( btc_prices, eth_prices, z_entry=2.0, z_exit=0.5 ) # Statistiques de la stratégie total_return = (1 + signals["strategy_returns"]).cumprod().iloc[-1] - 1 sharpe = signals["strategy_returns"].mean() / signals["strategy_returns"].std() * np.sqrt(24*365) print(f"\nPerformances simulatedes:") print(f" - Rendement total: {total_return*100:.2f}%") print(f" - Sharpe ratio: {sharpe:.2f}")

Code 3 : Backtest Complet avec HolySheep Integration

#!/usr/bin/env python3
"""
Backtest complet d'une stratégie d'arbitrage statistique
avec récupération automatique des données via HolySheep API
Solution HolySheep AI - https://www.holysheep.ai
"""

import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import json
import os

Configuration

BASE_URL = "https://api.holysheep.ai/v1" API_KEY = "YOUR_HOLYSHEEP_API_KEY" class HolySheepDataProvider: """ Provider pour récupérer les données depuis HolySheep API. Inclut la gestion du cache et la reprise sur erreur. """ def __init__(self, api_key: str): self.api_key = api_key self.cache_dir = "./data_cache" os.makedirs(self.cache_dir, exist_ok=True) def fetch_ohlcv( self, exchange: str, symbol: str, interval: str, start_date: datetime, end_date: datetime, use_cache: bool = True ) -> pd.DataFrame: """ Récupère les données OHLCV avec mise en cache intelligente. """ cache_file = f"{self.cache_dir}/{exchange}_{symbol}_{interval}_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.parquet" # Vérification du cache if use_cache and os.path.exists(cache_file): print(f" [CACHE] Chargement depuis {cache_file}") return pd.read_parquet(cache_file) # Paramètres de la requête start_ms = int(start_date.timestamp() * 1000) end_ms = int(end_date.timestamp() * 1000) all_data = [] current_start = start_ms while current_start < end_ms: batch_data = self._fetch_batch( exchange, symbol, interval, current_start, end_ms, limit=1000 ) if not batch_data: break all_data.extend(batch_data) last_timestamp = batch_data[-1]["open_time"] current_start = last_timestamp + 1 print(f" Batch récupéré: {len(batch_data)} bougies") if not all_data: return pd.DataFrame() # Conversion en DataFrame df = pd.DataFrame(all_data) df["open_time"] = pd.to_datetime(df["open_time"], unit="ms") df["close_time"] = pd.to_datetime(df["close_time"], unit="ms") # Sauvegarde en cache df.to_parquet(cache_file) print(f" [CACHE] Sauvegarde dans {cache_file}") return df def _fetch_batch( self, exchange: str, symbol: str, interval: str, start_ms: int, end_ms: int, limit: int ) -> List[Dict]: """ Récupère un batch de données depuis l'API. """ endpoint = f"{BASE_URL}/markets/{exchange}/ohlcv" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } params = { "symbol": symbol, "interval": interval, "start_time": start_ms, "end_time": end_ms, "limit": limit } try: response = requests.get(endpoint, headers=headers, params=params, timeout=30) if response.status_code == 429: print(" [RATE LIMIT] Attente de 60 secondes...") import time time.sleep(60) return self._fetch_batch(exchange, symbol, interval, start_ms, end_ms, limit) response.raise_for_status() data = response.json() return data.get("data", []) except requests.exceptions.RequestException as e: print(f" [ERROR] Erreur de requête: {e}") return [] class ArbitrageBacktester: """ Moteur de backtest pour stratégies d'arbitrage statistique. """ def __init__( self, initial_capital: float = 100000, commission: float = 0.001, slippage: float = 0.0005 ): self.initial_capital = initial_capital self.commission = commission self.slippage = slippage def run_backtest( self, df_a: pd.DataFrame, df_b: pd.DataFrame, pair_name: str, params: Dict ) -> Dict: """ Exécute le backtest complet. Args: df_a: DataFrame avec prix du premier actif df_b: DataFrame avec prix du second actif pair_name: Nom de la paire params: Paramètres de la stratégie - z_entry: Seuil d'entrée Z-Score - z_exit: Seuil de sortie Z-Score - lookback: Fenêtre de calcul Z-Score - hedge_ratio: Ratio de couverture (auto si None) """ # Merge des données df = pd.merge( df_a[["open_time", "close"]].rename(columns={"close": "price_a"}), df_b[["open_time", "close"]].rename(columns={"close": "price_b"}), on="open_time", how="inner" ) df.set_index("open_time", inplace=True) # Calcul du spread if params.get("hedge_ratio"): hedge_ratio = params["hedge_ratio"] else: # Calcul du hedge ratio optimal model = np.polyfit(df["price_b"], df["price_a"], 1) hedge_ratio = model[0] df["spread"] = df["price_a"] - hedge_ratio * df["price_b"] # Calcul du Z-Score lookback = params.get("lookback", 20) df["spread_mean"] = df["spread"].rolling(lookback).mean() df["spread_std"] = df["spread"].rolling(lookback).std() df["z_score"] = (df["spread"] - df["spread_mean"]) / df["spread_std"] # Signaux z_entry = params.get("z_entry", 2.0) z_exit = params.get("z_exit", 0.5) df["signal"] = 0 df.loc[df["z_score"] > z_entry, "signal"] = -1 # Short spread df.loc[df["z_score"] < -z_entry, "signal"] = 1 # Long spread # Position = signal précédent (évite look-ahead bias) df["position"] = df["signal"].shift(1).fillna(0) # Calcul des rendements df["return_a"] = df["price_a"].pct_change() df["return_b"] = df["price_b"].pct_change() df["spread_return"] = df["return_a"] - hedge_ratio * df["return_b"] # Stratégie df["strategy_return"] = df["position"] * df["spread_return"] # Application des coûts df["trades"] = df["position"].diff().fillna(0).abs() df["costs"] = df["trades"] * (self.commission + self.slippage) df["net_return"] = df["strategy_return"] - df["costs"] # Equity curve df["equity"] = self.initial_capital * (1 + df["net_return"]).cumprod() # Métriques de performance metrics = self._calculate_metrics(df, pair_name, hedge_ratio) return { "metrics": metrics, "data": df, "hedge_ratio": hedge_ratio } def _calculate_metrics(self, df: pd.DataFrame, pair_name: str, hedge_ratio: float) -> Dict: """ Calcule les métriques de performance standard. """ total_return = (df["equity"].iloc[-1] / self.initial_capital - 1) * 100 # Rendements annualisés n_days = (df.index[-1] - df.index[0]).days n_years = n_days / 365 annualized_return = ((1 + total_return/100) ** (1/n_years) - 1) * 100 if n_years > 0 else 0 # Volatilité annualisée daily_vol = df["net_return"].std() annualized_vol = daily_vol * np.sqrt(365) * 100 # Sharpe ratio risk_free_rate = 2.0 #假设无风险利率为2% sharpe = (annualized_return - risk_free_rate) / annualized_vol if annualized_vol > 0 else 0 # Maximum Drawdown df["peak"] = df["equity"].cummax() df["drawdown"] = (df["equity"] - df["peak"]) / df["peak"] max_drawdown = df["drawdown"].min() * 100 # Win rate winning_trades = (df["net_return"] > 0).sum() total_trades = (df["trades"] > 0).sum() win_rate = (winning_trades / total_trades * 100) if total_trades > 0 else 0 # Nombre de trades num_trades = df["trades"].sum() / 2 # Chaque trade = 2 mouvements return { "pair_name": pair_name, "hedge_ratio": hedge_ratio, "total_return_%": round(total_return, 2), "annualized_return_%": round(annualized_return, 2), "annualized_volatility_%": round(annualized_vol, 2), "sharpe_ratio": round(sharpe, 3), "max_drawdown_%": round(max_drawdown, 2), "win_rate_%": round(win_rate, 2), "number_of_trades": int(num_trades), "period_days": n_days, "final_equity": round(df["equity"].iloc[-1], 2) } def run_complete_backtest(): """ Exécute un backtest complet avec récupération des données. """ # Configuration PROVIDER = HolySheepDataProvider(API_KEY) BACKTESTER = ArbitrageBacktester( initial_capital=100000, commission=0.001, slippage=0.0005 ) # Paramètres END_DATE = datetime.now() START_DATE = END_DATE - timedelta(days=365) PAIRS = [ ("binance", "BTCUSDT"), ("binance", "ETHUSDT"), ] print("=" * 60) print("BACKTEST ARBITRAGE STATISTIQUE HOLYSHEEP") print("=" * 60) print(f"Période: {START_DATE.strftime('%Y-%m-%d')} à {END_DATE.strftime('%Y-%m-%d')}") print() # Récupération des données data = {} for exchange, symbol in PAIRS: print(f"Récupération {exchange}/{symbol}...") df = PROVIDER.fetch_ohlcv( exchange=exchange, symbol=symbol, interval="1h", start_date=START_DATE, end_date=END_DATE ) if not df.empty: data[symbol] = df print(f" ✓ {len(df)} bougies récupérées") if len(data) < 2: print("ERREUR: Données insuffisantes pour le backtest") return # Exécution du backtest print("\nExécution du backtest...") results = BACKTESTER.run_backtest( data["BTCUSDT"], data["ETHUSDT"], pair_name="BTC/ETH", params={ "z_entry": 2.0, "z_exit": 0.5, "lookback": 20, "hedge_ratio": None # Calcul automatique } ) # Affichage des résultats print("\n" + "=" * 60) print("RÉSULTATS DU BACKTEST") print("=" * 60) m = results["metrics"] print(f"\nPaire: {m['pair_name']}") print(f"Hedge Ratio: {m['hedge_ratio']:.6f}") print(f"\n--- Performance ---") print(f"Rendement total: {m['total_return_%']}%") print(f"Rendement annualisé: {m['annualized_return_%']}%") print(f"Volatilité annualisée: {m['annualized_volatility_%']}%") print(f"Sharpe Ratio: {m['sharpe_ratio']}") print(f"\n--- Risque ---") print(f"Drawdown maximum: {m['max_drawdown_%']}%") print(f"Nombre de trades: {m['number_of_trades']}") print(f"Win Rate: {m['win_rate_%']}%") print(f"\n--- Capital