En 2026, la gestion des données historiques de cryptomonnaies représente un défi technique et financier majeur pour les développeurs, les analystes financiers et les chercheurs en blockchain. Avec des milliers de transactions par seconde sur les principales blockchains et des années d'historique à conserver, le choix d'une stratégie de stockage adaptée devient crucial pour la performance et la rentabilité de vos applications.

Cet article présente une approche complète de l'archivage des données cryptographiques, depuis les principes fondamentaux du stockage hiérarchisé jusqu'à l'implémentation d'APIs performantes, en passant par une analyse comparative des coûts de traitement IA pour l'analyse de ces données massives.

Comparatif des coûts de traitement IA en 2026

Avant d'aborder les stratégies de stockage, examinons les tarifs actuels des principaux fournisseurs d'IA pour le traitement de vos données historiques. Ces chiffres vous permettront d'estimer le coût total de votre pipeline d'analyse.

Modèle IA Prix output ($/MTok) Latence moyenne Coût pour 10M tokens/mois
GPT-4.1 8,00 $ ~85ms 80 $
Claude Sonnet 4.5 15,00 $ ~120ms 150 $
Gemini 2.5 Flash 2,50 $ ~45ms 25 $
DeepSeek V3.2 0,42 $ ~38ms 4,20 $

Comme le montre ce tableau, le choix du modèle peut représenter une différence de coût de 35x entre la solution la plus chère et la plus économique. Pour une entreprise traitant 10 millions de tokens par mois, cela représente une économie potentielle de près de 146 $ mensuels en optant pour DeepSeek V3.2 plutôt que Claude Sonnet 4.5.

Comprendre l'archivage des données de cryptomonnaies

Pourquoi archiver les données historiques ?

Les données historiques de cryptomonnaies constituent une ressource inestimable pour plusieurs cas d'usage :

Les défis spécifiques aux données blockchain

Contrairement aux données financières traditionnelles, les données de cryptomonnaies présentent des caractéristiques uniques qui compliquent leur archivage :

Architecture de stockage hiérarchisé en 3 niveaux

La clé d'une stratégie d'archivage efficace réside dans le principe du stockage hiérarchisé, qui adapte le niveau de performance et le coût de conservation aux caractéristiques d'utilisation de chaque type de données.

Niveau 1 : Stockage à chaud (Hot Storage)

Ce niveau accueille les données récemment générées et les informations fréquemment consultées, nécessitant les temps d'accès les plus rapides.

Niveau 2 : Stockage tiède (Warm Storage)

Ce niveau intermédiaire stocke les données consultées occasionnellement mais nécessitant toujours un accès relativement rapide.

Niveau 3 : Stockage à froid (Cold Storage)

Ce niveau conserve l'archive complète et historique avec un accès infrequent mais une durabilité maximale.

Implémentation de l'API d'accès aux données archivées

Maintenant que nous avons établi l'architecture de stockage, voyons comment construire une API robuste pour accéder à ces données. L'intégration avec des services d'IA comme ceux proposés par HolySheep AI permet d'enrichir automatiquement les données brutes avec des analyses contextuelles.

Configuration de base de l'API HolySheep


"""
Configuration du client HolySheep AI pour l'analyse de données cryptographiques
Documentation : https://docs.holysheep.ai
"""

import requests
import json
from typing import Dict, List, Optional
from datetime import datetime, timedelta

class CryptoDataAnalyzer:
    """Classe pour analyser les données historiques de cryptomonnaies via l'API HolySheep"""
    
    def __init__(self, api_key: str):
        """
        Initialise le client avec votre clé API HolySheep.
        
        Args:
            api_key: Votre clé API disponible sur https://www.holysheep.ai/register
        """
        self.base_url = "https://api.holysheep.ai/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.model_prices = {
            "gpt-4.1": 8.00,        # $ par million de tokens
            "claude-sonnet-4.5": 15.00,
            "gemini-2.5-flash": 2.50,
            "deepseek-v3.2": 0.42   # Économie de 97% vs Claude Sonnet
        }
    
    def analyze_transaction_pattern(
        self, 
        transaction_data: List[Dict],
        model: str = "deepseek-v3.2"
    ) -> Dict:
        """
        Analyse les patterns de transactions avec un modèle IA.
        
        Args:
            transaction_data: Liste de dictionnaires contenant les données de transactions
            model: Modèle à utiliser (défaut: deepseek-v3.2 pour экономия)
        
        Returns:
            Dict contenant l'analyse et les métadonnées de coût
        """
        prompt = self._build_analysis_prompt(transaction_data)
        
        payload = {
            "model": model,
            "messages": [
                {
                    "role": "system",
                    "content": "Vous êtes un analyste expert en données de blockchain. "
                              "Analysez les patterns de transactions et identifiez les anomalies."
                },
                {
                    "role": "user", 
                    "content": prompt
                }
            ],
            "temperature": 0.3,
            "max_tokens": 2000
        }
        
        start_time = datetime.now()
        response = requests.post(
            f"{self.base_url}/chat/completions",
            headers=self.headers,
            json=payload,
            timeout=30
        )
        latency_ms = (datetime.now() - start_time).total_seconds() * 1000
        
        if response.status_code != 200:
            raise APIError(f"Erreur API: {response.status_code} - {response.text}")
        
        result = response.json()
        
        # Calcul du coût réel
        tokens_used = result.get('usage', {}).get('total_tokens', 0)
        cost = (tokens_used / 1_000_000) * self.model_prices.get(model, 0)
        
        return {
            "analysis": result['choices'][0]['message']['content'],
            "model_used": model,
            "tokens_used": tokens_used,
            "latency_ms": round(latency_ms, 2),
            "estimated_cost_usd": round(cost, 4),
            "timestamp": datetime.now().isoformat()
        }
    
    def _build_analysis_prompt(self, transactions: List[Dict]) -> str:
        """Construit le prompt pour l'analyse des transactions."""
        return f"""
        Analysez le following jeu de données de transactions blockchain:
        
        Total des transactions: {len(transactions)}
        Somme totale: {sum(t.get('value', 0) for t in transactions)} satoshis/wweis
        Période: {transactions[0].get('timestamp', 'N/A')} à {transactions[-1].get('timestamp', 'N/A')}
        
        Échantillon de données (5 premières transactions):
        {json.dumps(transactions[:5], indent=2)}
        
        Veuillez fournir:
        1. Identification des patterns récurrents
        2. Détection d'éventuelles anomalies
        3. Recommandations d'analyse supplémentaire
        """


class APIError(Exception):
    """Exception personnalisée pour les erreurs API."""
    pass


Exemple d'utilisation

if __name__ == "__main__": API_KEY = "YOUR_HOLYSHEEP_API_KEY" # Remplacez par votre clé analyzer = CryptoDataAnalyzer(API_KEY) # Données de démonstration sample_transactions = [ {"hash": "0xabc123", "value": 1500000, "timestamp": "2026-01-15T10:30:00Z", "from": "0xwallet1"}, {"hash": "0xdef456", "value": 2300000, "timestamp": "2026-01-15T11:45:00Z", "from": "0xwallet2"}, {"hash": "0xghi789", "value": 750000, "timestamp": "2026-01-15T12:15:00Z", "from": "0xwallet1"}, {"hash": "0xjkl012", "value": 4200000, "timestamp": "2026-01-15T14:20:00Z", "from": "0xwallet3"}, {"hash": "0xmno345", "value": 1800000, "timestamp": "2026-01-15T15:00:00Z", "from": "0xwallet2"}, ] # Analyse avec DeepSeek V3.2 (le plus économique) result = analyzer.analyze_transaction_pattern( sample_transactions, model="deepseek-v3.2" ) print(f"Analyse terminée en {result['latency_ms']}ms") print(f"Tokens utilisés: {result['tokens_used']}") print(f"Coût estimé: ${result['estimated_cost_usd']}")

Service de migration vers le stockage froid


"""
Service d'archivage automatique avec stockage hiérarchisé
Gère automatiquement la migration des données entre niveaux de stockage
"""

import boto3
from botocore.config import Config
from datetime import datetime, timedelta
import json
import hashlib

class HierarchicalArchiveManager:
    """Gestionnaire de stockage hiérarchisé pour données blockchain"""
    
    def __init__(self, hot_endpoint: str, warm_endpoint: str, cold_bucket: str):
        """
        Configure les connexions aux différents niveaux de stockage.
        
        Args:
            hot_endpoint: URL du serveur Redis/PostgreSQL pour données chaudes
            warm_endpoint: URL du cluster pour données tièdes
            cold_bucket: Nom du bucket S3/equivalent pour archive froide
        """
        self.hot_store = self._init_hot_storage(hot_endpoint)
        self.warm_store = self._init_warm_storage(warm_endpoint)
        self.cold_client = boto3.client('s3', 
            config=Config(signature_version='s3v4'))
        self.cold_bucket = cold_bucket
        
        # Politique de rétention (en jours)
        self.retention_policy = {
            'hot': 7,      # 7 jours en stockage chaud
            'warm': 90,    # 90 jours en stockage tiède
            'cold': 365    # 1 an minimum en archive froide
        }
        
        # Métadonnées de stockage
        self.storage_stats = {
            'hot': {'count': 0, 'size_bytes': 0},
            'warm': {'count': 0, 'size_bytes': 0},
            'cold': {'count': 0, 'size_bytes': 0}
        }
    
    def store_transaction(
        self, 
        transaction: dict, 
        store_in_hot: bool = True
    ) -> dict:
        """
        Stocke une transaction dans le niveau approprié.
        
        Args:
            transaction: Données de transaction à archiver
            store_in_hot: Si True, force le stockage en niveau chaud
        
        Returns:
            Métadonnées de stockage incluant la localisation
        """
        tx_hash = transaction.get('hash', transaction.get('txid'))
        timestamp = transaction.get('timestamp', datetime.now().isoformat())
        
        # Génère la clé de stockage
        storage_key = self._generate_storage_key(tx_hash, timestamp)
        storage_level = 'hot' if store_in_hot else self._determine_level(timestamp)
        
        metadata = {
            'tx_hash': tx_hash,
            'storage_key': storage_key,
            'storage_level': storage_level,
            'created_at': datetime.now().isoformat(),
            'original_timestamp': timestamp,
            'size_bytes': len(json.dumps(transaction)),
            'checksum': hashlib.sha256(
                json.dumps(transaction, sort_keys=True).encode()
            ).hexdigest()
        }
        
        if storage_level == 'hot':
            self._write_to_hot(storage_key, transaction)
        elif storage_level == 'warm':
            self._write_to_warm(storage_key, transaction)
        else:
            self._write_to_cold(storage_key, transaction)
        
        self._update_stats(storage_level, metadata['size_bytes'])
        
        return metadata
    
    def retrieve_transaction(self, tx_hash: str) -> dict:
        """
        Récupère une transaction depuis le niveau de stockage approprié.
        Vérifie d'abord le stockage chaud, puis tiède, puis froide.
        
        Args:
            tx_hash: Hash de la transaction à récupérer
        
        Returns:
            Données de transaction ou None si non trouvée
        """
        # Recherche dans le stockage chaud
        for days_back in range(self.retention_policy['hot'] + 1):
            date = datetime.now() - timedelta(days=days_back)
            key = self._generate_storage_key(tx_hash, date.isoformat())
            
            result = self._read_from_hot(key)
            if result:
                return {'data': result, 'level': 'hot', 'retrieval_time_ms': 5}
            
            result = self._read_from_warm(key)
            if result:
                return {'data': result, 'level': 'warm', 'retrieval_time_ms': 150}
        
        # Recherche dans l'archive froide
        result = self._search_cold_archive(tx_hash)
        if result:
            return {'data': result, 'level': 'cold', 'retrieval_time_ms': 3000}
        
        return None
    
    def run_maintenance(self) -> dict:
        """
        Exécute les tâches de maintenance:
        - Migration des données chaudes vers tièdes
        - Migration des données tièdes vers froides
        - Nettoyage des doublons
        
        Returns:
            Rapport de maintenance
        """
        report = {
            'executed_at': datetime.now().isoformat(),
            'migrations': {'hot_to_warm': 0, 'warm_to_cold': 0},
            'archived_bytes': 0,
            'errors': []
        }
        
        # Migration hot -> warm
        hot_items = self._scan_hot_storage()
        for item in hot_items:
            age_days = (datetime.now() - datetime.fromisoformat(
                item['created_at']
            )).days
            
            if age_days >= self.retention_policy['hot']:
                self._migrate_hot_to_warm(item)
                report['migrations']['hot_to_warm'] += 1
                report['archived_bytes'] += item['size_bytes']
        
        # Migration warm -> cold
        warm_items = self._scan_warm_storage()
        for item in warm_items:
            age_days = (datetime.now() - datetime.fromisoformat(
                item['created_at']
            )).days
            
            if age_days >= self.retention_policy['warm']:
                self._migrate_warm_to_cold(item)
                report['migrations']['warm_to_cold'] += 1
                report['archived_bytes'] += item['size_bytes']
        
        return report
    
    def get_storage_report(self) -> dict:
        """Génère un rapport complet de l'utilisation du stockage."""
        return {
            'timestamp': datetime.now().isoformat(),
            'total_transactions': sum(s['count'] for s in self.storage_stats.values()),
            'total_size_gb': sum(s['size_bytes'] for s in self.storage_stats.values()) / (1024**3),
            'by_level': {
                level: {
                    'count': stats['count'],
                    'size_gb': stats['size_bytes'] / (1024**3),
                    'retention_days': self.retention_policy[level]
                }
                for level, stats in self.storage_stats.items()
            }
        }
    
    def _generate_storage_key(self, tx_hash: str, timestamp: str) -> str:
        """Génère une clé de stockage structurée."""
        dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
        return f"tx/{dt.year}/{dt.month:02d}/{dt.day:02d}/{tx_hash}"
    
    def _determine_level(self, timestamp: str) -> str:
        """Détermine le niveau de stockage approprié selon l'ancienneté."""
        dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
        age_days = (datetime.now() - dt).days
        
        if age_days < self.retention_policy['hot']:
            return 'hot'
        elif age_days < self.retention_policy['warm']:
            return 'warm'
        return 'cold'
    
    # Méthodes privées d'implémentation (exemple simplifié)
    def _init_hot_storage(self, endpoint): return {'endpoint': endpoint}
    def _init_warm_storage(self, endpoint): return {'endpoint': endpoint}
    def _write_to_hot(self, key, data): pass
    def _write_to_warm(self, key, data): pass
    def _write_to_cold(self, key, data): pass
    def _read_from_hot(self, key): return None
    def _read_from_warm(self, key): return None
    def _search_cold_archive(self, tx_hash): return None
    def _scan_hot_storage(self): return []
    def _scan_warm_storage(self): return []
    def _migrate_hot_to_warm(self, item): pass
    def _migrate_warm_to_cold(self, item): pass
    def _update_stats(self, level, size_bytes):
        self.storage_stats[level]['count'] += 1
        self.storage_stats[level]['size_bytes'] += size_bytes


Exemple d'utilisation

if __name__ == "__main__": manager = HierarchicalArchiveManager( hot_endpoint="redis://localhost:6379", warm_endpoint="postgresql://localhost:5432/crypto", cold_bucket="crypto-archive-2026" ) # Stockage d'une transaction tx = { 'hash': '0xabcd1234efgh5678', 'value': 1000000000000000000, 'from': '0x742d35Cc6634C0532925a3b844Bc9e7595f', 'to': '0x853d955aCEf822Db058eb8505911ED77F175b99e', 'timestamp': datetime.now().isoformat(), 'block_number': 18500000, 'gas_used': 21000 } metadata = manager.store_transaction(tx) print(f"Transaction archivée dans le niveau: {metadata['storage_level']}") print(f"Clé de stockage: {metadata['storage_key']}") # Rapport d'utilisation report = manager.get_storage_report() print(f"\nTotal stocké: {report['total_size_gb']:.2f} Go")

Intégration avec analyse IA pour enrichissement automatique


"""
Pipeline complet d'ingestion et d'analyse de données cryptographiques
Combine archivage hiérarchisé et analyse IA en temps réel
"""

import asyncio
import aiohttp
from typing import List, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor

@dataclass
class PipelineConfig:
    """Configuration du pipeline d'analyse"""
    api_key: str
    base_url: str = "https://api.holysheep.ai/v1"
    max_concurrent_requests: int = 5
    batch_size: int = 100
    preferred_model: str = "deepseek-v3.2"  # Plus économique
    
    # Seuils de coût
    max_cost_per_analysis: float = 0.01  # 0.01$ par lot
    monthly_budget: float = 100.0  # Budget mensuel max

class CryptoAnalysisPipeline:
    """
    Pipeline complet pour l'ingestion, l'archivage et l'analyse
    des données historiques de cryptomonnaies.
    """
    
    def __init__(self, config: PipelineConfig, archive_manager):
        self.config = config
        self.archive = archive_manager
        self.session: Optional[aiohttp.ClientSession] = None
        self.executor = ThreadPoolExecutor(max_workers=config.max_concurrent_requests)
        
        # Compteurs de coût
        self.total_cost = 0.0
        self.total_tokens = 0
        self.analysis_count = 0
        
        # Cache des analyses récentes
        self.analysis_cache: Dict[str, Dict] = {}
        self.cache_ttl_seconds = 3600  # 1 heure
    
    async def initialize(self):
        """Initialise la session HTTP asynchrone."""
        self.session = aiohttp.ClientSession(
            headers={
                "Authorization": f"Bearer {self.config.api_key}",
                "Content-Type": "application/json"
            },
            timeout=aiohttp.ClientTimeout(total=30)
        )
    
    async def close(self):
        """Ferme les ressources proprement."""
        if self.session:
            await self.session.close()
        self.executor.shutdown(wait=True)
    
    async def process_transaction_batch(
        self, 
        transactions: List[Dict],
        enable_ai_analysis: bool = True
    ) -> Dict:
        """
        Traite un lot de transactions:
        1. Archive dans le stockage approprié
        2. Enrichit avec analyse IA si activé
        
        Args:
            transactions: Liste des transactions à traiter
            enable_ai_analysis: Activer l'analyse IA (coûteux)
        
        Returns:
            Rapport de traitement avec métadonnées de coût
        """
        results = {
            'processed': 0,
            'archived': 0,
            'analyzed': 0,
            'errors': [],
            'total_cost_usd': 0.0,
            'total_latency_ms': 0.0
        }
        
        for tx in transactions:
            try:
                # Étape 1: Archivage
                archive_meta = self.archive.store_transaction(tx)
                results['archived'] += 1
                
                # Étape 2: Analyse IA (si activée et budget disponible)
                if enable_ai_analysis and self._can_afford_analysis():
                    analysis_result = await self._analyze_transaction(tx)
                    
                    if analysis_result:
                        results['analyzed'] += 1
                        results['total_cost_usd'] += analysis_result['cost']
                        results['total_latency_ms'] += analysis_result['latency']
                        self._update_cost_tracking(analysis_result['cost'])
                
                results['processed'] += 1
                
            except Exception as e:
                results['errors'].append({
                    'tx_hash': tx.get('hash', 'unknown'),
                    'error': str(e)
                })
        
        return results
    
    async def _analyze_transaction(self, transaction: Dict) -> Optional[Dict]:
        """
        Effectue une analyse IA de la transaction via HolySheep.
        Inclut la gestion des erreurs et la mise en cache.
        """
        tx_hash = transaction.get('hash', '')
        
        # Vérifie le cache
        if tx_hash in self.analysis_cache:
            cached = self.analysis_cache[tx_hash]
            age = (datetime.now() - cached['timestamp']).total_seconds()
            if age < self.cache_ttl_seconds:
                return cached
        
        # Vérifie le budget
        if self.total_cost >= self.config.monthly_budget:
            return None
        
        prompt = self._build_transaction_prompt(transaction)
        
        payload = {
            "model": self.config.preferred_model,
            "messages": [
                {
                    "role": "system",
                    "content": "Vous êtes un analyste blockchain expert. "
                              "Analysez cette transaction et fournissez des insights."
                },
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.2,
            "max_tokens": 500
        }
        
        start_time = asyncio.get_event_loop().time()
        
        try:
            async with self.session.post(
                f"{self.config.base_url}/chat/completions",
                json=payload
            ) as response:
                
                if response.status != 200:
                    error_text = await response.text()
                    print(f"Erreur API: {response.status} - {error_text}")
                    return None
                
                result = await response.json()
                latency_ms = (asyncio.get_event_loop().time() - start_time) * 1000
                
                # Extrait les métriques d'utilisation
                usage = result.get('usage', {})
                tokens = usage.get('total_tokens', 0)
                
                # Calcule le coût (prix par million de tokens)
                model_prices = {
                    "deepseek-v3.2": 0.42,
                    "gemini-2.5-flash": 2.50,
                    "gpt-4.1": 8.00,
                    "claude-sonnet-4.5": 15.00
                }
                cost = (tokens / 1_000_000) * model_prices.get(
                    self.config.preferred_model, 0.42
                )
                
                analysis_result = {
                    'tx_hash': tx_hash,
                    'analysis': result['choices'][0]['message']['content'],
                    'tokens': tokens,
                    'cost': cost,
                    'latency_ms': round(latency_ms, 2),
                    'timestamp': datetime.now()
                }
                
                # Met en cache
                self.analysis_cache[tx_hash] = analysis_result
                
                return analysis_result
                
        except asyncio.TimeoutError:
            print(f"Timeout pour la transaction {tx_hash}")
            return None
        except Exception as e:
            print(f"Erreur lors de l'analyse: {e}")
            return None
    
    def _build_transaction_prompt(self, tx: Dict) -> str:
        """Construit un prompt optimisé pour l'analyse."""
        return f"""
        Analysez cette transaction blockchain:
        
        Hash: {tx.get('hash')}
        De: {tx.get('from', 'N/A')}
        Vers: {tx.get('to', 'N/A')}
        Valeur: {tx.get('value', 0)}
        Frais: {tx.get('gas_used', 0)}
        Bloc: {tx.get('block_number', 'N/A')}
        
        Fournissez en 3 points maximum:
        1. Type de transaction (transfert, contrat, swap, etc.)
        2. Risque estimé (bas/moyen/élevé)
        3. Contexte possible ( whale, DEX, bridge, etc.)
        """
    
    def _can_afford_analysis(self) -> bool:
        """Vérifie si le budget permet une nouvelle analyse."""
        return (
            self.total_cost < self.config.monthly_budget and
            self.total_cost + self.config.max_cost_per_analysis <= self.config.monthly_budget
        )
    
    def _update_cost_tracking(self, cost: float):
        """Met à jour les compteurs de coût."""
        self.total_cost += cost
        self.total_tokens += 1
        self.analysis_count += 1
    
    def get_cost_report(self) -> Dict:
        """Génère un rapport détaillé des coûts."""
        return {
            'period': 'monthly',
            'total_cost_usd': round(self.total_cost, 4),
            'total_analyses': self.analysis_count,
            'budget_limit': self.config.monthly_budget,
            'budget_used_percent': round(
                (self.total_cost / self.config.monthly_budget) * 100, 2
            ),
            'average_cost_per_analysis': round(
                self.total_cost / self.analysis_count if self.analysis_count > 0 else 0, 4
            ),
            'cache_hit_rate': self._calculate_cache_hit_rate()
        }
    
    def _calculate_cache_hit_rate(self) -> float:
        """Calcule le taux de命中率 du cache."""
        if not self.analysis_cache:
            return 0.0
        # Simplifié: en pratique, trackez les hits/misses séparément
        return 0.0


Exemple d'utilisation complète

async def main(): from datetime import datetime # Configuration avec HolySheep config = PipelineConfig( api_key="YOUR_HOLYSHEEP_API_KEY", preferred_model="deepseek-v3.2", # Choix économique max_concurrent_requests=3, batch_size=50, monthly_budget=50.0 # Budget de 50$/mois ) # Initialise l'archiveur archive = HierarchicalArchiveManager( hot_endpoint="redis://localhost:6379", warm_endpoint="postgresql://localhost:5432/crypto", cold_bucket="crypto-archive-2026" ) # Crée le pipeline pipeline = CryptoAnalysisPipeline(config, archive) try: await pipeline.initialize() # Génère des données de test test_transactions = [ { 'hash': f'0x{"".join([hex(i)[2:] for i in range(32)])}', 'from': f'0x{"".join([hex(i)[2:] * 4 for i in range(10)])}', 'to': f'0x{"".join([hex(i)[2:] * 4 for i in range(10, 20)])}', 'value': 1000000000000000000, 'gas_used': 21000, 'block_number': 19000000 + i, 'timestamp': datetime.now().isoformat() } for i in range(10) ] # Traite le lot results = await pipeline.process_transaction_batch( test_transactions, enable_ai_analysis=True ) print(f"Transactions traitées: {results['processed']}") print(f"Transactions archivées: {results['archived']}") print(f"Analyses IA effectuées: {results['analyzed']}") print(f"Coût total: ${results['total_cost_usd']:.4f}") # Affiche le rapport de coût cost_report = pipeline.get_cost_report() print(f"\n=== Rapport de coût ===") print(f"Budget utilisé: {cost_report['budget_used_percent']}%") print(f"Coût moyen par analyse: ${cost_report['average_cost_per_analysis']}") finally: await pipeline.close() if __name__ == "__main__": asyncio.run(main())

Pour qui / pour qui ce n'est pas fait

Ressources connexes

Articles connexes