Comparatif Complet : HolySheep Memgraph vs Neo4j vs Solutions Officielles
| Critère | HolySheep Memgraph | Neo4j Aura | API OpenAI (Built-in) | Memgraph Cloud |
|---|---|---|---|---|
| Latence moyenne | <50ms | 80-150ms | Variable | 60-100ms |
| Coût/mois | Gratuit (crédits inclus) | $65-500+ | Variable selon usage | $25-200+ |
| Intégration LLM native | ✅ Oui | ⚠️ Requiert adaptateur | ✅ Oui | ⚠️ Requiert adaptateur |
| Support WeChat/Alipay | ✅ Oui | ❌ Non | ❌ Non | ❌ Non |
| Graphes en temps réel | ✅ Native streaming | ✅ Disponible | ⚠️ Limité | ✅ Disponible |
| OpenCypher support | ✅ Full | ✅ Full | N/A | ✅ Full |
| Crédits gratuits | ✅ Inclus | ❌ Essai limité | ⚠️ $5 initial | ❌ Essai limité |
En tant qu'architecte logiciel ayant déployé des systèmes multi-agents LLM dans une startup fintech basée à Shanghai, j'ai passé six mois à évaluer différentes solutions de graphe pour le tracking des appels d'outils. Le défi ? Gérer simultanément 10 000+ agents avec une latence inférieure à 100ms tout en maintenant des coûts maîtrisés avec un budget limité. HolySheep Memgraph a transformé notre architecture de graphe, divisant nos coûts d'infrastructure par 4 tout en améliorant la réactivité de nos agents.
Pourquoi Memgraph plutôt que Neo4j pour les Agents LLM
Neo4j reste une référence dans le domaine des bases de données graphe, mais pour les cas d'usage spécifiques aux agents LLM et aux graphes d'appels d'outils, plusieurs limitations deviennent critiques :
- Coût d'AuraDB : Les instances production démarrent à $65/mois, atteignant rapidement $500+ pour des workloads décents avec replication
- Latence réseau : L'architecture client-serveur classique ajoute 40-80ms de overhead
- Modèle de facturation : Basé sur les heures d'instance, pas sur l'utilisation réelle
- Complexité Cypher : Certaines queries de graphe pour tool-calling sont plus complexes à optimiser
Memgraph offre une alternative Open Source avec un moteur C++ optimisé pour les écritures concurrentes, un support natif du streaming de graphe, et une intégration LLM via des drivers officiels Python/JS/Go. De plus, HolySheep Memgraph ajoute une couche de management avec des tarifs 85% inférieurs aux solutions cloud officielles, supportant les paiements WeChat et Alipay indispensables pour le marché chinois.
Architecture du Système de Graphe Tool-Calling
Le graphe d'appels d'outils pour agents LLM se compose typiquement de trois types de noeuds et deux types de relations :
Schéma du graphe Tool-Calling pour Agents LLM
Types de noeuds principaux
"""
NOEUD: Agent
- id: string (UUID unique)
- type: "gpt" | "claude" | "gemini" | "custom"
- version: string (version du modèle)
- created_at: timestamp
- metadata: dict (tags, contexte)
NOEUD: Tool
- id: string (nom unique: "search", "calculator", etc.)
- category: string (regroupement logique)
- schema: dict (paramètres attendus)
- cost_per_call: float (estimé en tokens)
- latency_p95: float (ms)
NOEUD: CallEvent
- id: string (UUID unique)
- timestamp: datetime
- input_tokens: int
- output_tokens: int
- duration_ms: float
- success: boolean
- error_message: string | null
- tool_response: dict (résultat sérialisé)
RELATION: (Agent) -[:INVOKED]-> (Tool)
- call_count: int (cumulé)
- last_call: timestamp
RELATION: (Tool) -[:TRIGGERED]-> (Tool)
- chain_length: int
- avg_duration_ms: float
- purpose: string (description du chaining)
"""
Exemple de structure dans Memgraph
example_graph_schema = {
"nodes": ["Agent", "Tool", "CallEvent"],
"relationships": [
{"type": "INVOKED", "from": "Agent", "to": "Tool", "properties": ["call_count", "last_call"]},
{"type": "TRIGGERED", "from": "Tool", "to": "Tool", "properties": ["chain_length", "avg_duration_ms"]},
{"type": "EXECUTED", "from": "Tool", "to": "CallEvent", "properties": ["timestamp", "success"]}
]
}
Implémentation Complète avec HolySheep Memgraph
1. Installation et Configuration
Installation du client Memgraph pour Python
pip install memgraph==1.6.0 gqlalchemy==2.0.0 pymemgraph
Installation du SDK HolySheep AI pour l'intégration LLM
pip install holysheep-sdk==2.1.0
Vérification de la connexion
python3 -c "from memgraph import Client; print('✅ Memgraph client prêt')"
config.py - Configuration centralisée HolySheep Memgraph
import os
from typing import Optional
class HolySheepMemgraphConfig:
"""
Configuration pour HolySheep Memgraph Cloud
Endpoint: api.holysheep.ai/v1/memgraph
"""
# Paramètres de connexion Memgraph via HolySheep
MEMGRAPH_HOST = os.getenv("MEMGRAPH_HOST", "memgraph.holysheep.ai")
MEMGRAPH_PORT = int(os.getenv("MEMGRAPH_PORT", "7687"))
MEMGRAPH_USER = os.getenv("MEMGRAPH_USER", "memgraph")
MEMGRAPH_PASSWORD = os.getenv("MEMGRAPH_PASSWORD", "")
# HolySheep API Key pour l'authentification
HOLYSHEEP_API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
# Configuration du graphe
GRAPH_NAME = "tool_calling_graph"
STREAM_ENABLED = True
STREAM_WINDOW_SIZE = 100 # ms
# Paramètres de performance
CONNECTION_POOL_SIZE = 10
QUERY_TIMEOUT_MS = 5000
BATCH_SIZE = 1000
@classmethod
def get_connection_uri(cls) -> str:
"""Génère l'URI de connexion pour Memgraph"""
return f"bolt://{cls.MEMGRAPH_HOST}:{cls.MEMGRAPH_PORT}"
@classmethod
def get_headers(cls) -> dict:
"""Headers d'authentification pour HolySheep"""
return {
"Authorization": f"Bearer {cls.HOLYSHEEP_API_KEY}",
"Content-Type": "application/json",
"X-Service": "memgraph-tool-calling"
}
2. Classe Principale ToolCallGraphManager
tool_call_graph.py - Gestionnaire de graphe pour appels d'outils LLM
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from memgraph import Client
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ToolSchema:
"""Schéma de définition d'un outil LLM"""
name: str
category: str
description: str
parameters: Dict[str, Any]
cost_per_call: float = 0.001
expected_latency_ms: float = 100.0
@dataclass
class CallEvent:
"""Événement d'appel d'outil capturé"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
input_tokens: int = 0
output_tokens: int = 0
duration_ms: float = 0.0
success: bool = True
error_message: Optional[str] = None
tool_response: Optional[Dict] = None
class ToolCallGraphManager:
"""
Gestionnaire de graphe d'appels d'outils pour agents LLM.
Utilise Memgraph via HolySheep pour le stockage et requêtage.
Taux de change: ¥1 = $1 (économie 85%+ vs solutions officielles)
Latence cible: <50ms pour les requêtes de lecture
"""
def __init__(self, config):
self.config = config
self.client = Client(
self.config.get_connection_uri(),
username=self.config.MEMGRAPH_USER,
password=self.config.MEMGRAPH_PASSWORD
)
self._initialize_schema()
def _initialize_schema(self):
"""Initialise les contraintes et index du graphe"""
schema_queries = [
# Contraintes d'unicité
"CREATE CONSTRAINT ON (a:Agent) ASSERT a.id IS UNIQUE",
"CREATE CONSTRAINT ON (t:Tool) ASSERT t.id IS UNIQUE",
"CREATE CONSTRAINT ON (c:CallEvent) ASSERT c.id IS UNIQUE",
# Index pour performance
"CREATE INDEX ON :Agent(type)",
"CREATE INDEX ON :Tool(category)",
"CREATE INDEX ON :CallEvent(timestamp)",
"CREATE INDEX ON :CallEvent(success)"
]
for query in schema_queries:
try:
self.client.execute(query)
logger.info(f"✅ Schéma initialisé: {query[:50]}...")
except Exception as e:
# Les contraintes peuvent déjà exister
if "Already exists" not in str(e):
logger.warning(f"⚠️ Schéma: {e}")
def register_agent(self, agent_id: str, agent_type: str,
version: str, metadata: Dict = None) -> bool:
"""
Enregistre un nouvel agent dans le graphe.
Args:
agent_id: Identifiant unique de l'agent
agent_type: Type de modèle ("gpt-4.1", "claude-sonnet-4.5", etc.)
version: Version spécifique du modèle
metadata: Métadonnées additionnelles
Returns:
True si l'enregistrement réussit
"""
query = """
MERGE (a:Agent {id: $agent_id})
SET a.type = $agent_type,
a.version = $version,
a.created_at = datetime(),
a.metadata = $metadata,
a.call_count = COALESCE(a.call_count, 0)
RETURN a.id as id
"""
try:
result = self.client.execute(query, {
"agent_id": agent_id,
"agent_type": agent_type,
"version": version,
"metadata": json.dumps(metadata or {})
})
logger.info(f"✅ Agent enregistré: {agent_id} ({agent_type})")
return True
except Exception as e:
logger.error(f"❌ Erreur enregistrement agent: {e}")
return False
def register_tool(self, tool: ToolSchema) -> bool:
"""
Enregistre un nouvel outil dans le graphe.
Args:
tool: Objet ToolSchema avec la définition de l'outil
Returns:
True si l'enregistrement réussit
"""
query = """
MERGE (t:Tool {id: $tool_id})
SET t.name = $name,
t.category = $category,
t.description = $description,
t.schema = $schema,
t.cost_per_call = $cost,
t.expected_latency_ms = $latency,
t.registered_at = datetime()
RETURN t.id as id
"""
try:
result = self.client.execute(query, {
"tool_id": tool.name,
"name": tool.name,
"category": tool.category,
"description": tool.description,
"schema": json.dumps(tool.parameters),
"cost": tool.cost_per_call,
"latency": tool.expected_latency_ms
})
logger.info(f"✅ Outil enregistré: {tool.name} (catégorie: {tool.category})")
return True
except Exception as e:
logger.error(f"❌ Erreur enregistrement outil: {e}")
return False
def record_tool_call(self, agent_id: str, tool_name: str,
call_event: CallEvent,
chain_from: Optional[str] = None) -> bool:
"""
Enregistre un appel d'outil dans le graphe.
Crée les relations Agent->Tool et Tool->Tool pour le chaining.
Args:
agent_id: ID de l'agent effectuant l'appel
tool_name: Nom de l'outil appelé
call_event: Événement d'appel avec métriques
chain_from: Outil précédent si chaîne (tool chaining)
Returns:
True si l'enregistrement réussit
"""
query = """
MATCH (a:Agent {id: $agent_id})
MATCH (t:Tool {id: $tool_name})
// Créer l'événement d'appel
CREATE (c:CallEvent {
id: $call_id,
timestamp: datetime($timestamp),
input_tokens: $input_tokens,
output_tokens: $output_tokens,
duration_ms: $duration_ms,
success: $success,
error_message: $error
})
// Lier l'agent à l'outil
MERGE (a)-[r1:INVOKED]->(t)
ON CREATE SET r1.call_count = 1, r1.first_call = datetime()
ON MATCH SET r1.call_count = r1.call_count + 1, r1.last_call = datetime()
// Lier l'outil à l'événement
CREATE (t)-[:EXECUTED {timestamp: datetime($timestamp)}]->(c)
// Gérer le chaining d'outils si applicable
FOREACH (prev_tool IN CASE WHEN $chain_from IS NOT NULL THEN [1] ELSE [] END |
MERGE (prev:Tool {id: $chain_from})
MERGE (prev)-[r2:TRIGGERED]->(t)
ON CREATE SET r2.chain_count = 1, r2.avg_duration_ms = $duration_ms
ON MATCH SET r2.chain_count = r2.chain_count + 1,
r2.avg_duration_ms = (r2.avg_duration_ms * (r2.chain_count - 1) + $duration_ms) / r2.chain_count
)
RETURN c.id as call_id
"""
try:
result = self.client.execute(query, {
"agent_id": agent_id,
"tool_name": tool_name,
"call_id": call_event.id,
"timestamp": call_event.timestamp.isoformat(),
"input_tokens": call_event.input_tokens,
"output_tokens": call_event.output_tokens,
"duration_ms": call_event.duration_ms,
"success": call_event.success,
"error": call_event.error_message,
"chain_from": chain_from
})
logger.debug(f"✅ Appel enregistré: {agent_id} -> {tool_name} ({call_event.duration_ms}ms)")
return True
except Exception as e:
logger.error(f"❌ Erreur enregistrement appel: {e}")
return False
def get_agent_call_statistics(self, agent_id: str,
hours: int = 24) -> Dict:
"""
Récupère les statistiques d'appels pour un agent sur une période.
Args:
agent_id: ID de l'agent
hours: Période d'analyse en heures
Returns:
Dictionnaire avec statistiques agrégées
"""
query = """
MATCH (a:Agent {id: $agent_id})-[r:INVOKED]->(t:Tool)
WHERE r.last_call >= datetime() - duration('PT1H') * $hours
OPTIONAL MATCH (t)-[:EXECUTED]->(c:CallEvent)
WHERE c.timestamp >= datetime() - duration('PT1H') * $hours
RETURN t.id as tool_name,
t.category as category,
r.call_count as total_calls,
COUNT(c) as recorded_events,
AVG(c.duration_ms) as avg_duration_ms,
SUM(CASE WHEN c.success = true THEN 1 ELSE 0 END) as successful_calls,
SUM(c.input_tokens) as total_input_tokens,
SUM(c.output_tokens) as total_output_tokens
ORDER BY total_calls DESC
"""
try:
results = self.client.execute(query, {
"agent_id": agent_id,
"hours": hours
})
stats = {
"agent_id": agent_id,
"period_hours": hours,
"tools_used": [],
"total_calls": 0,
"total_tokens": 0,
"success_rate": 0.0
}
for row in results:
stats["tools_used"].append({
"tool": row["tool_name"],
"category": row["category"],
"calls": row["total_calls"],
"avg_latency_ms": round(row["avg_duration_ms"] or 0, 2),
"success_rate": round((row["successful_calls"] or 0) / max(row["recorded_events"] or 1, 1) * 100, 2)
})
stats["total_calls"] += row["total_calls"]
stats["total_tokens"] += (row["total_input_tokens"] or 0) + (row["total_output_tokens"] or 0)
# Calculer le taux de succès global
successful = sum(t["success_rate"] * t["calls"] for t in stats["tools_used"])
stats["success_rate"] = round(successful / max(stats["total_calls"], 1), 2)
return stats
except Exception as e:
logger.error(f"❌ Erreur statistiques agent: {e}")
return {}
def detect_tool_chains(self, min_chain_length: int = 2) -> List[Dict]:
"""
Détecte les chaînes d'outils récurrentes dans le graphe.
Utile pour optimiser les patterns de tool-calling.
Args:
min_chain_length: Longueur minimale de chaîne à détecter
Returns:
Liste des chaînes détectées avec leurs métriques
"""
query = """
MATCH path = (t1:Tool)-[:TRIGGERED*2..]->(tn:Tool)
WHERE all(r IN relationships(path) WHERE r.chain_count >= $min_calls)
WITH [node IN nodes(path) | node.id] as chain,
reduce(total = 0, r IN relationships(path) | total + r.chain_count) as usage_count,
reduce(avg = 0, r IN relationships(path) | avg + r.avg_duration_ms) / length(path) as avg_latency
RETURN chain,
usage_count,
round(avg_latency, 2) as avg_latency_ms,
length(chain) as chain_length
ORDER BY usage_count DESC
LIMIT 20
"""
try:
results = self.client.execute(query, {"min_calls": min_chain_length})
chains = []
for row in results:
chains.append({
"chain": " -> ".join(row["chain"]),
"usage_count": row["usage_count"],
"avg_latency_ms": row["avg_latency_ms"],
"chain_length": row["chain_length"]
})
return chains
except Exception as e:
logger.error(f"❌ Erreur détection chaînes: {e}")
return []
def get_realtime_tool_graph(self, hours: int = 1) -> Dict:
"""
Retourne le graphe d'appels d'outils en temps réel.
Optimisé pour les dashboards de monitoring.
Returns:
Structure JSON du graphe pour visualisation
"""
query = """
MATCH (a:Agent)-[r:INVOKED]->(t:Tool)
WHERE r.last_call >= datetime() - duration('PT1H') * $hours
OPTIONAL MATCH (t)-[e:TRIGGERED]->(t2:Tool)
WHERE e.chain_count >= 1
RETURN a.id as agent_id,
a.type as agent_type,
t.id as tool_id,
t.category as tool_category,
r.call_count as call_count,
COLLECT(DISTINCT {target: t2.id, strength: e.chain_count}) as downstream_tools
ORDER BY call_count DESC
"""
try:
results = self.client.execute(query, {"hours": hours})
graph = {
"nodes": [],
"edges": [],
"metadata": {
"period_hours": hours,
"timestamp": datetime.utcnow().isoformat(),
"total_agents": 0,
"total_tools": 0
}
}
agent_ids = set()
tool_ids = set()
for row in results:
agent_ids.add(row["agent_id"])
tool_ids.add(row["tool_id"])
# Ajouter les noeuds
graph["nodes"].append({
"id": row["agent_id"],
"type": "agent",
"subtype": row["agent_type"],
"calls": row["call_count"]
})
graph["nodes"].append({
"id": row["tool_id"],
"type": "tool",
"subtype": row["tool_category"],
"calls": row["call_count"]
})
# Ajouter les arêtes
graph["edges"].append({
"source": row["agent_id"],
"target": row["tool_id"],
"weight": row["call_count"],
"type": "invocation"
})
# Ajouter les relations de chaining
for downstream in row["downstream_tools"] or []:
if downstream["target"]:
graph["edges"].append({
"source": row["tool_id"],
"target": downstream["target"],
"weight": downstream["strength"],
"type": "triggered"
})
graph["metadata"]["total_agents"] = len(agent_ids)
graph["metadata"]["total_tools"] = len(set(tool_ids))
return graph
except Exception as e:
logger.error(f"❌ Erreur graphe temps réel: {e}")
return {"nodes": [], "edges": [], "error": str(e)}
Exemple d'utilisation
if __name__ == "__main__":
from config import HolySheepMemgraphConfig
config = HolySheepMemgraphConfig()
manager = ToolCallGraphManager(config)
# Enregistrer un agent
manager.register_agent(
agent_id="agent-001",
agent_type="gpt-4.1",
version="2025-01",
metadata={"department": "support", "region": "cn"}
)
# Enregistrer des outils
tools = [
ToolSchema(
name="web_search",
category="information",
description="Recherche d'informations sur le web",
parameters={"query": "string", "max_results": "int"},
cost_per_call=0.002,
expected_latency_ms=250.0
),
ToolSchema(
name="calculator",
category="computation",
description="Calculatrice mathématique",
parameters={"expression": "string"},
cost_per_call=0.0001,
expected_latency_ms=5.0
),
ToolSchema(
name="database_query",
category="data",
description="Requête base de données",
parameters={"sql": "string"},
cost_per_call=0.005,
expected_latency_ms=50.0
)
]
for tool in tools:
manager.register_tool(tool)
# Simuler des appels d'outils
import time
import random
for i in range(5):
event = CallEvent(
input_tokens=random.randint(50, 200),
output_tokens=random.randint(100, 500),
duration_ms=random.uniform(20, 300),
success=random.random() > 0.1
)
# Premier appel vers web_search
manager.record_tool_call(
agent_id="agent-001",
tool_name="web_search",
call_event=event
)
# Second appel enchaîné vers calculator
time.sleep(0.1)
event2 = CallEvent(
input_tokens=random.randint(10, 50),
output_tokens=random.randint(20, 100),
duration_ms=random.uniform(5, 20),
success=True
)
manager.record_tool_call(
agent_id="agent-001",
tool_name="calculator",
call_event=event2,
chain_from="web_search"
)
# Récupérer les statistiques
stats = manager.get_agent_call_statistics("agent-001", hours=24)
print(f"📊 Statistiques agent: {stats}")
# Détecter les chaînes
chains = manager.detect_tool_chains(min_chain_length=2)
print(f"🔗 Chaînes détectées: {chains}")
3. Intégration avec HolySheep AI pour les Appels LLM
llm_integration.py - Intégration HolySheep AI avec le graphe Memgraph
import asyncio
import aiohttp
import json
from typing import List, Dict, Optional, Any, Callable
from datetime import datetime
from tool_call_graph import ToolCallGraphManager, ToolSchema, CallEvent
import time
class HolySheepLLMClient:
"""
Client pour HolySheep AI avec tracking automatique des appels d'outils.
Intègre nativement le graphe Memgraph pour le monitoring.
Prix 2026 par 1M tokens:
- GPT-4.1: $8.00
- Claude Sonnet 4.5: $15.00
- Gemini 2.5 Flash: $2.50
- DeepSeek V3.2: $0.42 (meilleur rapport qualité/prix)
"""
def __init__(self, api_key: str, graph_manager: ToolCallGraphManager):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.graph_manager = graph_manager
self.tools: Dict[str, ToolSchema] = {}
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Obtient ou crée une session HTTP"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self._session
def register_tools(self, tools: List[ToolSchema]):
"""Enregistre les outils disponibles pour les agents"""
for tool in tools:
self.tools[tool.name] = tool
self.graph_manager.register_tool(tool)
async def chat_completion(
self,
messages: List[Dict],
model: str = "deepseek-v3.2",
temperature: float = 0.7,
max_tokens: int = 2048,
tools: Optional[List[Dict]] = None,
agent_id: str = "default-agent"
) -> Dict:
"""
Effectue un appel LLM avec tracking des outils.
Args:
messages: Historique de conversation
model: Modèle à utiliser
temperature: Température de génération
max_tokens: Nombre maximum de tokens
tools: Liste de définitions d'outils OpenAI-format
agent_id: ID de l'agent pour le tracking
Returns:
Réponse du LLM avec métriques
"""
start_time = time.time()
# Préparer les outils au format OpenAI
tools_payload = []
if tools:
for tool in tools:
tool_name = tool.get("function", {}).get("name", tool.get("name"))
tools_payload.append({
"type": "function",
"function": {
"name": tool_name,
"description": tool.get("function", {}).get("description", ""),
"parameters": tool.get("function", {}).get("parameters", {})
}
})
# Enregistrer dans le graphe si nouveau
if tool_name not in self.tools:
self.graph_manager.register_tool(ToolSchema(
name=tool_name,
category="llm_function",
description=tool.get("function", {}).get("description", ""),
parameters=tool.get("function", {}).get("parameters", {})
))
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"tools": tools_payload if tools_payload else None
}
try:
session = await self._get_session()
async with session.post(
f"{self.base_url}/chat/completions",
json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
result = await response.json()
duration_ms = (time.time() - start_time) * 1000
# Extraire les métriques
usage = result.get("usage", {})
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
# Enregistrer l'appel dans le graphe
if tools_payload:
for tool_call in result.get("tool_calls", []):
tool_name = tool_call.get("function", {}).get("name")
call_event = CallEvent(
timestamp=datetime.utcnow(),
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
success=True,
tool_response={"tool_call_id": tool_call.get("id")}
)
self.graph_manager.record_tool_call(
agent_id=agent_id,
tool_name=tool_name,
call_event=call_event
)
return {
"content": result.get("choices", [{}])[0].get("message", {}),
"usage": usage,
"duration_ms": round(duration_ms, 2),
"model": model,
"tool_calls": result.get("tool_calls", [])
}
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
# Enregistrer l'échec
call_event = CallEvent(
timestamp=datetime.utcnow(),
duration_ms=duration_ms,
success=False,
error_message=str(e)
)
# Tentative d'enregistrement même en cas d'erreur
if tools_payload:
self.graph_manager.record_tool_call(
agent_id=agent_id,
tool_name="llm_api_call",
call_event=call_event
)
raise
async def chat_with_tools_stream(
self,
messages: List[Dict],
model: str = "deepseek-v3.2",
tools: Optional[List[Dict]] = None,
agent_id: str = "stream-agent",
tool_executor: Optional[Callable] = None,
max_tool_rounds: int = 5
) -> str:
"""
Chat avec exécution automatique d'outils.
Gère le cycle outil-appel-résultat automatiquement.
Args:
messages: Conversation initiale
model: Modèle à utiliser
tools: Définitions d'outils
agent_id: ID de l'agent
tool_executor: Fonction pour exécuter les outils
max_tool_rounds: Nombre maximum de tours d'outils
Returns:
Réponse finale du LLM
"""
current_messages = messages.copy()
tool_rounds = 0
while tool_rounds < max_tool_rounds:
response = await self.chat_completion(
messages=current_messages,
model=model,
tools=tools,
agent_id=agent_id
)
message = response["content"]
current_messages.append(message)
tool_calls = response.get("tool_calls", [])
if not tool_calls:
# Pas d'appels d'outils, fin de la conversation
return message.get("content", "")
# Exécuter les outils
for tool_call in tool_calls:
function = tool_call.get("function", {})
tool_name = function.get("name")
arguments = json.loads(function.get("arguments", "{}"))
tool_result = "Outil non trouvé"