Der Betrieb einer Knowledge Base ist kein statisches Unterfangen. In der Produktionsumgebung unseres Unternehmens standen wir kürzlich vor einem kritischen Systemausfall: Die Knowledge Base lieferte veraltete Antworten, weil ConnectionError: timeout bei der Vollindexierung auftrat und dadurch der gesamte Sync-Prozess hängte. Die Benutzer erhielten Informationen aus dem Jahr 2023, obwohl wir bereits Q1/2026 erreichten. Dieser Artikel zeigt, wie Sie mit inkrementeller Indizierung und intelligenter Dokumentenablaufverwaltung solche Szenarien vermeiden.

Warum Vollindizierung problematisch ist

Bei großen Wissensdatenbanken mit 10.000+ Dokumenten dauert eine vollständige Neuindizierung Minuten bis Stunden. Während dieser Zeit sind Ihre Nutzer mit veralteten Daten blockiert. Die Lösung liegt in der inkrementellen Indizierung: Nur neue oder geänderte Dokumente werden aktualisiert, während unveränderte Einträge ihre Indexierung behalten.

HolySheep AI bietet hier einen entscheidenden Vorteil: Unsere API erreicht konsistent <50ms Latenz pro Indexierungsanfrage, was selbst bei mehreren tausend Änderungen pro Tag zu keiner merklichen Verzögerung führt. Im Vergleich zu Konkurrenzprodukten sparen Sie mit unseren transparenten Preisen ab ¥1 pro Dollar über 85% der Kosten.

Architektur der inkrementellen Aktualisierung

Das Herzstück bildet ein Change-Tracking-System, das Documente anhand von Metadaten oder Hash-Werten vergleicht:

import hashlib
import time
from datetime import datetime, timedelta

class IncrementalIndexManager:
    """Verwaltet inkrementelle Index-Updates für die HolySheep AI API"""
    
    def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.known_hashes = {}  # document_id -> content_hash
        self.expired_docs = []  # Dokument-IDs mit Ablaufdatum
    
    def compute_content_hash(self, content: str) -> str:
        """Berechnet SHA-256 Hash für Inhaltsänderungserkennung"""
        return hashlib.sha256(content.encode('utf-8')).hexdigest()
    
    def detect_changes(self, documents: list) -> tuple:
        """
        Vergleicht Dokumente mit dem letzten Indexierungsstand.
        Returns: (new_or_updated_docs, deleted_doc_ids)
        """
        new_or_updated = []
        current_ids = set()
        
        for doc in documents:
            doc_id = doc['id']
            content_hash = self.compute_content_hash(doc['content'])
            current_ids.add(doc_id)
            
            # Prüfe ob Dokument neu oder geändert ist
            if doc_id not in self.known_hashes:
                new_or_updated.append(doc)
            elif self.known_hashes[doc_id] != content_hash:
                doc['update_timestamp'] = int(time.time())
                new_or_updated.append(doc)
        
        # Identifiziere gelöschte Dokumente
        deleted_ids = set(self.known_hashes.keys()) - current_ids
        
        return new_or_updated, list(deleted_ids)
    
    def check_expiration(self, doc: dict, ttl_days: int = 90) -> bool:
        """Prüft ob Dokument das Ablaufdatum überschritten hat"""
        if 'created_at' not in doc:
            return False
        created = datetime.fromisoformat(doc['created_at'])
        expiry = created + timedelta(days=ttl_days)
        return datetime.now() > expiry

Integration mit der HolySheep AI Knowledge API

Die HolySheep API unterstützt sowohl die Stapelverarbeitung für neue Dokumente als auch das selektive Löschen abgelaufener Einträge. Nachfolgend ein vollständiges Sync-Skript:

import requests
import json
from typing import List, Dict, Optional

class HolySheepKnowledgeSync:
    """Synchronisiert Knowledge Base mit HolySheep AI über inkrementelle Updates"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.index_manager = IncrementalIndexManager(api_key)
    
    def upsert_documents(self, documents: List[Dict]) -> Dict:
        """
        Fügt neue oder aktualisierte Dokumente zum Index hinzu.
        Nutzt Batch-Upload für Effizienz (max 100 pro Request).
        """
        new_docs, _ = self.index_manager.detect_changes(documents)
        
        if not new_docs:
            return {"status": "no_changes", "processed": 0}
        
        # Stapelverarbeitung in Blöcken von 100
        results = {"added": 0, "updated": 0, "errors": []}
        
        for i in range(0, len(new_docs), 100):
            batch = new_docs[i:i+100]
            payload = {
                "documents": batch,
                "update_strategy": "incremental"  # Nur neue/änderte updaten
            }
            
            try:
                response = requests.post(
                    f"{self.base_url}/knowledge/upsert",
                    headers=self.headers,
                    json=payload,
                    timeout=30
                )
                response.raise_for_status()
                
                result = response.json()
                results["added"] += result.get("added_count", 0)
                results["updated"] += result.get("updated_count", 0)
                
                # Aktualisiere lokale Hash-Registry
                for doc in batch:
                    doc_id = doc['id']
                    content = doc['content']
                    self.index_manager.known_hashes[doc_id] = \
                        self.index_manager.compute_content_hash(content)
                        
            except requests.exceptions.Timeout:
                results["errors"].append({
                    "error": "TimeoutError",
                    "batch_start": i,
                    "retry": True
                })
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 401:
                    results["errors"].append({
                        "error": "Unauthorized",
                        "message": "API-Schlüssel prüfen"
                    })
                else:
                    results["errors"].append({
                        "error": str(e),
                        "batch_start": i
                    })
        
        return results
    
    def delete_expired_documents(self, docs_with_ttl: List[Dict]) -> Dict:
        """Entfernt Dokumente deren TTL abgelaufen ist"""
        expired_ids = []
        
        for doc in docs_with_ttl:
            if self.index_manager.check_expiration(doc, ttl_days=90):
                expired_ids.append(doc['id'])
        
        if not expired_ids:
            return {"status": "no_expired", "deleted": 0}
        
        payload = {"document_ids": expired_ids}
        
        try:
            response = requests.post(
                f"{self.base_url}/knowledge/delete",
                headers=self.headers,
                json=payload,
                timeout=