Tác giả: ThS. Nguyễn Văn Minh — Kiến trúc sư AI tại HolySheep AI, 5+ năm kinh nghiệm triển khai Enterprise AI Workflow

Mở Đầu: Khi "ConnectionError: timeout" Phá Vỡ Báo Cáo Cuối Tháng

Tôi vẫn nhớ rõ buổi sáng thứ Hai đầu tuần — team tài chính gửi Slack với 47 tin nhắn hoảng loạn. Pipeline báo cáo chi phí chạy 4 tiếng, toàn bộ timeout ở bước gọi API. Lỗi cụ thể:

openai.error.APIConnectionError: Error communicating with OpenAI
HTTPSConnectionPool(host='api.openai.com', port=443): 
Max retries exceeded with url: /v1/chat/completions
(Caused by ConnectTimeoutError(<pip._vendor.urllib3.connection.TimeoutError...>))
Cost lost: ~$340 for failed batch processing

Sau 3 ngày debug, chúng tôi phát hiện vấn đề nằm ở kiến trúc gọi API không tối ưu và chi phí bị đội lên 300% so với dự kiến. Bài viết này chia sẻ cách tôi xây dựng Cost Analysis Workflow trong Dify với HolySheep AI — giảm 85% chi phí, độ trễ dưới 50ms.

1. Tại Sao Chọn HolySheep AI Cho Dify Workflow?

Trước khi đi vào code, tôi muốn chia sẻ lý do chúng tôi chọn HolySheep thay vì API gốc:

2. So Sánh Chi Phí Thực Tế (Updated 2026)

ModelGiá gốc ($/MTok)HolySheep ($/MTok)Tiết kiệm
GPT-4.1$60$886.7%
Claude Sonnet 4.5$45$1566.7%
Gemini 2.5 Flash$15$2.5083.3%
DeepSeek V3.2$2.80$0.4285%

3. Kiến Trúc Cost Analysis Workflow Trong Dify

3.1 Cấu Trúc Tổng Quan

Workflow gồm 5 module chính:

  1. Data Ingestion — Thu thập data từ nhiều nguồn
  2. Categorization Engine — Phân loại chi phí tự động
  3. Anomaly Detection — Phát hiện chi phí bất thường
  4. Budget Optimization — Đề xuất tối ưu hóa
  5. Report Generation — Tạo báo cáo cuối cùng

3.2 Code Mẫu: Kết Nối HolySheep API Với Dify

# config.py - Cấu hình kết nối HolySheep API
import os
from openai import OpenAI

class HolySheepConfig:
    """Cấu hình HolySheep AI cho Dify Workflow"""
    
    BASE_URL = "https://api.holysheep.ai/v1"  # KHÔNG dùng api.openai.com
    
    # Lấy API key từ environment variable
    API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
    
    # Timeout settings (ms)
    TIMEOUT_MS = 5000
    
    # Retry configuration
    MAX_RETRIES = 3
    RETRY_DELAY_MS = 1000

class CostAnalysisModels:
    """Chọn model phù hợp cho từng tác vụ"""
    
    # Categorization: Dùng DeepSeek V3.2 (rẻ nhất, nhanh)
    CATEGORIZATION_MODEL = "deepseek-v3.2"
    
    # Anomaly Detection: Dùng Gemini 2.5 Flash (cân bằng)
    ANOMALY_MODEL = "gemini-2.5-flash"
    
    # Report Generation: Dùng GPT-4.1 (chất lượng cao)
    REPORT_MODEL = "gpt-4.1"

Khởi tạo client

def get_holysheep_client(): """Factory function cho HolySheep client""" return OpenAI( base_url=HolySheepConfig.BASE_URL, api_key=HolySheepConfig.API_KEY, timeout=HolySheepConfig.TIMEOUT_MS / 1000, max_retries=HolySheepConfig.MAX_RETRIES ) print(f"✅ HolySheep Client configured: {HolySheepConfig.BASE_URL}")
# data_processor.py - Module thu thập và xử lý dữ liệu chi phí
import json
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass
from config import get_holysheep_client, CostAnalysisModels

@dataclass
class ExpenseRecord:
    """Định dạng bản ghi chi phí"""
    id: str
    date: str
    amount: float
    currency: str
    category: str
    vendor: str
    description: str
    metadata: Dict

class CostDataProcessor:
    """Xử lý dữ liệu chi phí cho workflow"""
    
    def __init__(self):
        self.client = get_holysheep_client()
        self.expenses: List[ExpenseRecord] = []
    
    def load_from_json(self, filepath: str) -> List[ExpenseRecord]:
        """Đọc data từ JSON file"""
        with open(filepath, 'r', encoding='utf-8') as f:
            raw_data = json.load(f)
        
        self.expenses = [
            ExpenseRecord(**item) for item in raw_data
        ]
        return self.expenses
    
    def load_from_api(self, api_endpoint: str) -> List[ExpenseRecord]:
        """Load từ API endpoint"""
        import requests
        response = requests.get(api_endpoint, timeout=10)
        response.raise_for_status()
        raw_data = response.json()
        
        self.expenses = [
            ExpenseRecord(**item) for item in raw_data.get('data', [])
        ]
        return self.expenses
    
    def aggregate_by_category(self) -> Dict[str, float]:
        """Tổng hợp chi phí theo category"""
        summary = {}
        for expense in self.expenses:
            category = expense.category
            amount_usd = self._convert_to_usd(expense.amount, expense.currency)
            summary[category] = summary.get(category, 0) + amount_usd
        return summary
    
    def _convert_to_usd(self, amount: float, currency: str) -> float:
        """Convert sang USD (tỷ giá 2026)"""
        rates = {
            'USD': 1.0,
            'CNY': 0.14,  # ¥1 = $0.14
            'EUR': 1.08,
            'VND': 0.00004
        }
        return amount * rates.get(currency, 1.0)

Demo usage

processor = CostDataProcessor() print(f"✅ Loaded {len(processor.expenses)} expense records")
# categorization_engine.py - AI-powered expense categorization
import json
from typing import List, Dict, Optional
from openai import APIError, RateLimitError
from config import get_holysheep_client, CostAnalysisModels

class CategorizationEngine:
    """Engine phân loại chi phí tự động bằng AI"""
    
    CATEGORIES = [
        "Infrastructure", "Personnel", "Marketing", 
        "R&D", "Operations", "Legal", "Travel", "Miscellaneous"
    ]
    
    def __init__(self):
        self.client = get_holysheep_client()
        self.model = CostAnalysisModels.CATEGORIZATION_MODEL
    
    def categorize_batch(
        self, 
        expenses: List[Dict],
        batch_size: int = 50
    ) -> List[Dict]:
        """
        Phân loại hàng loạt expense records
        
        Args:
            expenses: Danh sách expense records
            batch_size: Số lượng xử lý mỗi batch
        
        Returns:
            List of expenses với category được gán
        """
        results = []
        
        # Xử lý theo batch để tránh rate limit
        for i in range(0, len(expenses), batch_size):
            batch = expenses[i:i + batch_size]
            
            try:
                categorized = self._process_single_batch(batch)
                results.extend(categorized)
                print(f"✅ Processed batch {i//batch_size + 1}: {len(batch)} items")
                
            except RateLimitError as e:
                # Xử lý rate limit - wait và retry
                import time
                wait_time = int(str(e).split("retry after ")[-1].split(" seconds")[0]) if "retry after" in str(e) else 60
                print(f"⏳ Rate limit hit. Waiting {wait_time}s...")
                time.sleep(wait_time)
                # Retry batch
                categorized = self._process_single_batch(batch)
                results.extend(categorized)
                
            except APIError as e:
                print(f"❌ API Error: {e}")
                # Fallback: gán 'Miscellaneous' cho batch lỗi
                for item in batch:
                    item['category'] = 'Miscellaneous'
                    item['category_confidence'] = 0.0
                    item['error'] = str(e)
                results.extend(batch)
        
        return results
    
    def _process_single_batch(self, batch: List[Dict]) -> List[Dict]:
        """Xử lý một batch expenses"""
        
        # Build prompt
        prompt = self._build_categorization_prompt(batch)
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": f"""Bạn là chuyên gia phân loại chi phí doanh nghiệp.
                    Categories: {', '.join(self.CATEGORIES)}
                    Trả về JSON array với format:
                    [{{"id": "...", "category": "...", "confidence": 0.0-1.0}}]"""
                },
                {
                    "role": "user", 
                    "content": prompt
                }
            ],
            temperature=0.3,  # Low temperature cho consistency
            max_tokens=2000
        )
        
        # Parse response
        content = response.choices[0].message.content
        # Extract JSON từ response
        json_str = self._extract_json(content)
        
        category_map = json.loads(json_str)
        
        # Merge kết quả vào original data
        id_to_category = {item['id']: item for item in category_map}
        
        results = []
        for expense in batch:
            cat_info = id_to_category.get(expense['id'], {})
            expense['category'] = cat_info.get('category', 'Miscellaneous')
            expense['category_confidence'] = cat_info.get('confidence', 0.0)
            results.append(expense)
        
        return results
    
    def _build_categorization_prompt(self, batch: List[Dict]) -> str:
        """Build prompt cho categorization task"""
        items_str = "\n".join([
            f"- ID: {e['id']}, Amount: {e.get('amount', 0)} {e.get('currency', 'USD')}, "
            f"Vendor: {e.get('vendor', 'N/A')}, Description: {e.get('description', '')}"
            for e in batch
        ])
        
        return f"""Phân loại các chi phí sau vào 8 categories đã cho:

{items_str}

Chỉ trả về JSON array, không giải thích."""

    def _extract_json(self, content: str) -> str:
        """Extract JSON string từ response"""
        content = content.strip()
        # Tìm JSON array
        start = content.find('[')
        end = content.rfind(']') + 1
        if start != -1 and end != 0:
            return content[start:end]
        raise ValueError(f"Không tìm thấy JSON trong response: {content[:100]}")

Usage example

engine = CategorizationEngine() sample_expenses = [ {"id": "EXP001", "amount": 5000, "currency": "USD", "vendor": "AWS", "description": "Cloud hosting fees"}, {"id": "EXP002", "amount": 15000, "currency": "CNY", "vendor": "Tencent", "description": "Marketing campaign Q1"} ] results = engine.categorize_batch(sample_expenses) print(json.dumps(results, indent=2))

3.3 Code Mẫu: Anomaly Detection Với Gemini 2.5 Flash

# anomaly_detector.py - Phát hiện chi phí bất thường
from typing import List, Dict, Tuple
from datetime import datetime, timedelta
import statistics
from config import get_holysheep_client, CostAnalysisModels

class AnomalyDetector:
    """Phát hiện chi phí bất thường sử dụng AI"""
    
    def __init__(self):
        self.client = get_holysheep_client()
        self.model = CostAnalysisModels.ANOMALY_MODEL
    
    def detect_anomalies(
        self,
        categorized_expenses: List[Dict],
        threshold_zscore: float = 2.0
    ) -> Tuple[List[Dict], Dict]:
        """
        Phát hiện anomalies sử dụng statistical + AI hybrid approach
        
        Args:
            categorized_expenses: Expenses đã được phân loại
            threshold_zscore: Ngưỡng Z-score để flag outliers
        
        Returns:
            (anomalies, summary_statistics)
        """
        
        # Step 1: Statistical outlier detection
        by_category = self._group_by_category(categorized_expenses)
        statistical_anomalies = self._find_statistical_anomalies(
            by_category, threshold_zscore
        )
        
        # Step 2: AI-powered semantic analysis
        # Filter expenses > $1000 cho AI analysis (tiết kiệm cost)
        significant_expenses = [
            e for e in categorized_expenses 
            if e.get('amount', 0) > 1000
        ]
        
        ai_anomalies = self._find_ai_anomalies(significant_expenses)
        
        # Step 3: Merge kết quả
        all_anomalies = self._merge_anomaly_lists(
            statistical_anomalies, ai_anomalies
        )
        
        # Calculate summary
        summary = {
            "total_expenses": len(categorized_expenses),
            "total_amount": sum(e.get('amount', 0) for e in categorized_expenses),
            "anomalies_count": len(all_anomalies),
            "anomalies_value": sum(e.get('amount', 0) for e in all_anomalies),
            "anomaly_rate": len(all_anomalies) / len(categorized_expenses) * 100
        }
        
        return all_anomalies, summary
    
    def _group_by_category(self, expenses: List[Dict]) -> Dict[str, List[Dict]]:
        """Group expenses by category"""
        groups = {}
        for expense in expenses:
            cat = expense.get('category', 'Unknown')
            if cat not in groups:
                groups[cat] = []
            groups[cat].append(expense)
        return groups
    
    def _find_statistical_anomalies(
        self, 
        by_category: Dict[str, List[Dict]], 
        threshold: float
    ) -> List[Dict]:
        """Tìm outliers dựa trên Z-score"""
        anomalies = []
        
        for category, items in by_category.items():
            amounts = [i.get('amount', 0) for i in items]
            
            if len(amounts) < 3:
                continue
            
            mean = statistics.mean(amounts)
            stdev = statistics.stdev(amounts) if len(amounts) > 1 else 0
            
            for item in items:
                if stdev > 0:
                    zscore = abs((item['amount'] - mean) / stdev)
                    if zscore > threshold:
                        item['anomaly_type'] = 'statistical'
                        item['zscore'] = round(zscore, 2)
                        anomalies.append(item)
        
        return anomalies
    
    def _find_ai_anomalies(self, expenses: List[Dict]) -> List[Dict]:
        """Sử dụng AI để phát hiện semantic anomalies"""
        if not expenses:
            return []
        
        # Batch request để tiết kiệm cost
        prompt = self._build_anomaly_prompt(expenses)
        
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {
                        "role": "system",
                        "content": """Bạn là chuyên gia phân tích chi phí doanh nghiệp.
                        Phân tích các chi phí và đánh dấu những khoản bất thường.
                        Trả về JSON array: [{"id": "...", "is_anomaly": true/false, "reason": "..."}]"""
                    },
                    {"role": "user", "content": prompt}
                ],
                temperature=0.2,
                max_tokens=1500
            )
            
            content = response.choices[0].message.content
            import json
            json_str = content[content.find('['):content.rfind(']')+1]
            ai_results = json.loads(json_str)
            
            # Map back to expenses
            id_to_result = {r['id']: r for r in ai_results}
            anomalies = []
            
            for expense in expenses:
                result = id_to_result.get(expense['id'], {})
                if result.get('is_anomaly'):
                    expense['anomaly_type'] = 'ai_semantic'
                    expense['anomaly_reason'] = result.get('reason', '')
                    anomalies.append(expense)
            
            return anomalies
            
        except Exception as e:
            print(f"⚠️ AI anomaly detection failed: {e}")
            return []
    
    def _build_anomaly_prompt(self, expenses: List[Dict]) -> str:
        """Build prompt cho AI anomaly detection"""
        items = []
        for e in expenses[:30]:  # Limit 30 items per request
            items.append(
                f"ID: {e['id']}, Amount: ${e.get('amount', 0):,.2f}, "
                f"Vendor: {e.get('vendor', 'N/A')}, "
                f"Category: {e.get('category', 'N/A')}, "
                f"Description: {e.get('description', 'N/A')}"
            )
        return "\n".join(items)
    
    def _merge_anomaly_lists(
        self, 
        list1: List[Dict], 
        list2: List[Dict]
    ) -> List[Dict]:
        """Merge hai danh sách anomalies, loại bỏ trùng lặp"""
        seen_ids = set()
        merged = []
        
        for item in list1 + list2:
            if item['id'] not in seen_ids:
                seen_ids.add(item['id'])
                merged.append(item)
        
        return merged

Demo

detector = AnomalyDetector() sample_data = [ {"id": "EXP001", "amount": 50000, "category": "Infrastructure", "vendor": "AWS", "description": "EC2 instances"}, {"id": "EXP002", "amount": 250000, "category": "Infrastructure", "vendor": "AWS", "description": "Unexpected spike"}, ] anomalies, summary = detector.detect_anomalies(sample_data) print(f"🚨 Found {len(anomalies)} anomalies out of {summary['total_expenses']}")

3.4 Code Mẫu: Report Generation Với GPT-4.1

# report_generator.py - Tạo báo cáo chi phí chuyên nghiệp
from typing import List, Dict, Optional
from datetime import datetime
import json
from config import get_holysheep_client, CostAnalysisModels

class CostReportGenerator:
    """Tạo báo cáo chi phí tự động với AI"""
    
    def __init__(self):
        self.client = get_holysheep_client()
        self.model = CostAnalysisModels.REPORT_MODEL
    
    def generate_comprehensive_report(
        self,
        expenses: List[Dict],
        anomalies: List[Dict],
        summary: Dict,
        period: str = "Q1 2026",
        language: str = "vi"
    ) -> Dict:
        """
        Generate báo cáo chi phí toàn diện
        
        Args:
            expenses: Danh sách chi phí đã phân loại
            anomalies: Danh sách anomalies
            summary: Summary statistics
            period: Kỳ báo cáo
            language: Ngôn ngữ báo cáo
        
        Returns:
            Dict chứa report components
        """
        
        # Prepare data summary
        by_category = self._calculate_breakdown(expenses)
        trends = self._analyze_trends(expenses)
        
        # Generate executive summary
        executive_summary = self._generate_executive_summary(
            summary, by_category, len(anomalies), period, language
        )
        
        # Generate detailed analysis
        detailed_analysis = self._generate_detailed_analysis(
            by_category, anomalies, trends, language
        )
        
        # Generate recommendations
        recommendations = self._generate_recommendations(
            summary, anomalies, by_category, language
        )
        
        return {
            "period": period,
            "generated_at": datetime.now().isoformat(),
            "executive_summary": executive_summary,
            "detailed_analysis": detailed_analysis,
            "recommendations": recommendations,
            "raw_data": {
                "total_expenses": summary['total_expenses'],
                "total_amount": summary['total_amount'],
                "by_category": by_category,
                "anomalies_count": len(anomalies),
                "anomaly_rate": f"{summary.get('anomaly_rate', 0):.1f}%"
            }
        }
    
    def _calculate_breakdown(self, expenses: List[Dict]) -> Dict:
        """Calculate chi phí theo category"""
        breakdown = {}
        total = 0
        
        for expense in expenses:
            cat = expense.get('category', 'Unknown')
            amount = expense.get('amount', 0)
            breakdown[cat] = breakdown.get(cat, 0) + amount
            total += amount
        
        # Convert to percentages
        breakdown_pct = {}
        for cat, amount in breakdown.items():
            breakdown_pct[cat] = {
                "amount": round(amount, 2),
                "percentage": round(amount / total * 100, 1) if total > 0 else 0
            }
        
        return breakdown_pct
    
    def _analyze_trends(self, expenses: List[Dict]) -> Dict:
        """Phân tích xu hướng chi phí"""
        # Group by month
        monthly = {}
        for expense in expenses:
            date = expense.get('date', '')[:7]  # YYYY-MM
            monthly[date] = monthly.get(date, 0) + expense.get('amount', 0)
        
        # Calculate growth
        months = sorted(monthly.keys())
        trends = {
            "monthly": monthly,
            "growth_rate": 0.0
        }
        
        if len(months) >= 2:
            first = monthly[months[0]]
            last = monthly[months[-1]]
            if first > 0:
                trends['growth_rate'] = round((last - first) / first * 100, 1)
        
        return trends
    
    def _generate_executive_summary(
        self,
        summary: Dict,
        breakdown: Dict,
        anomaly_count: int,
        period: str,
        language: str
    ) -> str:
        """Generate executive summary bằng AI"""
        
        top_category = max(breakdown.items(), key=lambda x: x[1]['amount']) if breakdown else ("N/A", {"amount": 0})
        
        prompt = f"""Tạo executive summary cho báo cáo chi phí {period}.

Số liệu:
- Tổng chi phí: ${summary.get('total_amount', 0):,.2f}
- Số lượng giao dịch: {summary.get('total_expenses', 0)}
- Chi phí bất thường: {anomaly_count} ({summary.get('anomaly_rate', 0):.1f}%)
- Top category: {top_category[0]} (${top_category[1]['amount']:,.2f})

Trả về 3-4 câu ngắn gọn, phù hợp cho CEO/Board đọc."""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "Bạn là chuyên gia tài chính CFO. Viết ngắn gọn, chuyên nghiệp."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=300
        )
        
        return response.choices[0].message.content
    
    def _generate_detailed_analysis(
        self,
        breakdown: Dict,
        anomalies: List[Dict],
        trends: Dict,
        language: str
    ) -> str:
        """Generate detailed analysis section"""
        
        breakdown_text = "\n".join([
            f"- {cat}: ${data['amount']:,.2f} ({data['percentage']}%)"
            for cat, data in sorted(breakdown.items(), key=lambda x: x[1]['amount'], reverse=True)
        ])
        
        anomaly_text = "\n".join([
            f"- {a.get('id')}: ${a.get('amount', 0):,.2f} - {a.get('anomaly_reason', a.get('anomaly_type'))}"
            for a in anomalies[:5]
        ]) if anomalies else "Không có chi phí bất thường đáng chú ý."
        
        prompt = f"""Viết phần Detailed Analysis cho báo cáo chi phí:

1. Chi phí theo category:
{breakdown_text}

2. Các chi phí bất thường cần lưu ý:
{anomaly_text}

3. Xu hướng:
- Monthly data: {trends.get('monthly', {})}
- Growth rate: {trends.get('growth_rate', 0)}%

Viết 2-3 đoạn phân tích chuyên sâu, phù hợp cho CFO và Finance Team."""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "Bạn là chuyên gia phân tích tài chính. Phân tích chi tiết, có số liệu cụ thể."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.4,
            max_tokens=800
        )
        
        return response.choices[0].message.content
    
    def _generate_recommendations(
        self,
        summary: Dict,
        anomalies: List[Dict],
        breakdown: Dict,
        language: str
    ) -> List[Dict]:
        """Generate actionable recommendations"""
        
        prompt = f"""Đề xuất 5 actions cụ thể để tối ưu chi phí dựa trên:

- Total spend: ${summary.get('total_amount', 0):,.2f}
- Top spending categories: {list(breakdown.keys())[:3]}
- Anomalies: {len(anomalies)} transactions flagged

Trả về JSON array format:
[{{"priority": "high/medium/low", "action": "...", "expected_savings": "$X,XXX", "timeline": "..."}}]"""
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "Bạn là consultant tối ưu chi phí. Đề xuất thực tế, có số liệu."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=1000
        )
        
        content = response.choices[0].message.content
        json_str = content[content.find('['):content.rfind(']')+1]
        return json.loads(json_str)
    
    def export_to_json(self, report: Dict, filename: str):
        """Export report ra JSON file"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        print(f"✅ Report exported to {filename}")
    
    def export_to_markdown(self, report: Dict, filename: str):
        """Export report ra Markdown file"""
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(f"# Báo Cáo Chi Phí - {report['period']}\n\n")
            f.write(f"**Generated:** {report['generated_at']}\n\n")
            f.write("## Executive Summary\n\n")
            f.write(f"{report['executive_summary']}\n\n")
            f.write("## Detailed Analysis\n\n")
            f.write(f"{report['detailed_analysis']}\n\n")
            f.write("## Recommendations\n\n")
            for i, rec in enumerate(report['recommendations'], 1):
                f.write(f"{i}. **[{rec.get('priority', 'medium').upper()}]** {rec.get('action', '')}\n")
                f.write(f"   - Expected Savings: {rec.get('expected_savings', 'TBD')}\n")
                f.write(f"   - Timeline: {rec.get('timeline', 'TBD')}\n\n")
        print(f"✅ Markdown report exported to {filename}")

Demo usage

generator = CostReportGenerator() sample_report = generator.generate_comprehensive_report( expenses=[ {"id": "EXP001", "amount": 15000, "category": "Infrastructure", "vendor": "AWS", "date": "2026-01-15"}, {"id": "EXP002", "amount": 8000, "category": "Marketing", "vendor": "Google", "date": "2026-01-20"} ], anomalies=[], summary={"total_expenses": 2, "total_amount": 23000, "anomaly_rate": 0}, period="Q1 2026" ) generator.export_to_json(sample_report, "cost_report_q1_2026.json")

4. Triển Khai Hoàn Chỉnh Trong Dify

4.1 Dify Workflow Configuration

Để tích hợp vào Dify, tạo Custom Python Node với code sau:

# dify_cost_analysis_node.py - Dify Custom Node
"""
Dify Custom Node: Cost Analysis Workflow
Gắn vào Dify Workflow Editor → Add Custom Node → Paste code này
"""

import json
import sys
from typing import Dict, Any, List

Import các modules đã định nghĩa

Trong Dify, paste toàn bộ code từ các file trên vào đây

class DifyCostAnalysisNode: """Main node handler cho Dify workflow""" def __init__(self, inputs: Dict[str, Any]): self.inputs = inputs self.results = {} def execute(self) -> Dict[str, Any]: """ Main execution method - Dify sẽ gọi method này Returns: Dict chứa output để pass sang node tiếp theo """ # 1. Load expenses data expense_source = self.inputs.get('expense_source', 'api') if expense_source == 'json': from data_processor import CostDataProcessor processor = CostDataProcessor() expenses = processor.load_from_json( self.inputs.get('json_file_path', 'expenses.json') ) else: from data_processor import CostDataProcessor processor = CostDataProcessor() expenses = processor.load_from_api( self.inputs.get('api_endpoint') ) # 2. Categorize expenses from categorization_engine import CategorizationEngine categorizer = CategorizationEngine() categorized = categorizer.categorize_batch( [e.__dict__ for e in expenses], batch_size=int(self.inputs.get('batch_size', 50)) ) # 3. Detect anomalies from anomaly_detector import AnomalyDetector detector = AnomalyDetector() anomalies, summary = detector.detect_anomalies(categorized) # 4. Generate report from report_generator import CostReportGenerator generator = CostReportGenerator() report = generator.generate_comprehensive_report( expenses