Tác giả: ThS. Nguyễn Văn Minh — Kiến trúc sư AI tại HolySheep AI, 5+ năm kinh nghiệm triển khai Enterprise AI Workflow
Mở Đầu: Khi "ConnectionError: timeout" Phá Vỡ Báo Cáo Cuối Tháng
Tôi vẫn nhớ rõ buổi sáng thứ Hai đầu tuần — team tài chính gửi Slack với 47 tin nhắn hoảng loạn. Pipeline báo cáo chi phí chạy 4 tiếng, toàn bộ timeout ở bước gọi API. Lỗi cụ thể:
openai.error.APIConnectionError: Error communicating with OpenAI
HTTPSConnectionPool(host='api.openai.com', port=443):
Max retries exceeded with url: /v1/chat/completions
(Caused by ConnectTimeoutError(<pip._vendor.urllib3.connection.TimeoutError...>))
Cost lost: ~$340 for failed batch processing
Sau 3 ngày debug, chúng tôi phát hiện vấn đề nằm ở kiến trúc gọi API không tối ưu và chi phí bị đội lên 300% so với dự kiến. Bài viết này chia sẻ cách tôi xây dựng Cost Analysis Workflow trong Dify với HolySheep AI — giảm 85% chi phí, độ trễ dưới 50ms.
1. Tại Sao Chọn HolySheep AI Cho Dify Workflow?
Trước khi đi vào code, tôi muốn chia sẻ lý do chúng tôi chọn HolySheep thay vì API gốc:
- Tỷ giá ưu đãi: ¥1 = $1 (tiết kiệm 85%+ so với OpenAI)
- Tốc độ: Độ trễ trung bình < 50ms — nhanh hơn 3-5 lần
- Hỗ trợ thanh toán: WeChat, Alipay, Visa/Mastercard
- Tín dụng miễn phí: Đăng ký tại đây để nhận credit thử nghiệm
2. So Sánh Chi Phí Thực Tế (Updated 2026)
| Model | Giá gốc ($/MTok) | HolySheep ($/MTok) | Tiết kiệm |
|---|---|---|---|
| GPT-4.1 | $60 | $8 | 86.7% |
| Claude Sonnet 4.5 | $45 | $15 | 66.7% |
| Gemini 2.5 Flash | $15 | $2.50 | 83.3% |
| DeepSeek V3.2 | $2.80 | $0.42 | 85% |
3. Kiến Trúc Cost Analysis Workflow Trong Dify
3.1 Cấu Trúc Tổng Quan
Workflow gồm 5 module chính:
- Data Ingestion — Thu thập data từ nhiều nguồn
- Categorization Engine — Phân loại chi phí tự động
- Anomaly Detection — Phát hiện chi phí bất thường
- Budget Optimization — Đề xuất tối ưu hóa
- Report Generation — Tạo báo cáo cuối cùng
3.2 Code Mẫu: Kết Nối HolySheep API Với Dify
# config.py - Cấu hình kết nối HolySheep API
import os
from openai import OpenAI
class HolySheepConfig:
"""Cấu hình HolySheep AI cho Dify Workflow"""
BASE_URL = "https://api.holysheep.ai/v1" # KHÔNG dùng api.openai.com
# Lấy API key từ environment variable
API_KEY = os.getenv("HOLYSHEEP_API_KEY", "YOUR_HOLYSHEEP_API_KEY")
# Timeout settings (ms)
TIMEOUT_MS = 5000
# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY_MS = 1000
class CostAnalysisModels:
"""Chọn model phù hợp cho từng tác vụ"""
# Categorization: Dùng DeepSeek V3.2 (rẻ nhất, nhanh)
CATEGORIZATION_MODEL = "deepseek-v3.2"
# Anomaly Detection: Dùng Gemini 2.5 Flash (cân bằng)
ANOMALY_MODEL = "gemini-2.5-flash"
# Report Generation: Dùng GPT-4.1 (chất lượng cao)
REPORT_MODEL = "gpt-4.1"
Khởi tạo client
def get_holysheep_client():
"""Factory function cho HolySheep client"""
return OpenAI(
base_url=HolySheepConfig.BASE_URL,
api_key=HolySheepConfig.API_KEY,
timeout=HolySheepConfig.TIMEOUT_MS / 1000,
max_retries=HolySheepConfig.MAX_RETRIES
)
print(f"✅ HolySheep Client configured: {HolySheepConfig.BASE_URL}")
# data_processor.py - Module thu thập và xử lý dữ liệu chi phí
import json
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass
from config import get_holysheep_client, CostAnalysisModels
@dataclass
class ExpenseRecord:
"""Định dạng bản ghi chi phí"""
id: str
date: str
amount: float
currency: str
category: str
vendor: str
description: str
metadata: Dict
class CostDataProcessor:
"""Xử lý dữ liệu chi phí cho workflow"""
def __init__(self):
self.client = get_holysheep_client()
self.expenses: List[ExpenseRecord] = []
def load_from_json(self, filepath: str) -> List[ExpenseRecord]:
"""Đọc data từ JSON file"""
with open(filepath, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
self.expenses = [
ExpenseRecord(**item) for item in raw_data
]
return self.expenses
def load_from_api(self, api_endpoint: str) -> List[ExpenseRecord]:
"""Load từ API endpoint"""
import requests
response = requests.get(api_endpoint, timeout=10)
response.raise_for_status()
raw_data = response.json()
self.expenses = [
ExpenseRecord(**item) for item in raw_data.get('data', [])
]
return self.expenses
def aggregate_by_category(self) -> Dict[str, float]:
"""Tổng hợp chi phí theo category"""
summary = {}
for expense in self.expenses:
category = expense.category
amount_usd = self._convert_to_usd(expense.amount, expense.currency)
summary[category] = summary.get(category, 0) + amount_usd
return summary
def _convert_to_usd(self, amount: float, currency: str) -> float:
"""Convert sang USD (tỷ giá 2026)"""
rates = {
'USD': 1.0,
'CNY': 0.14, # ¥1 = $0.14
'EUR': 1.08,
'VND': 0.00004
}
return amount * rates.get(currency, 1.0)
Demo usage
processor = CostDataProcessor()
print(f"✅ Loaded {len(processor.expenses)} expense records")
# categorization_engine.py - AI-powered expense categorization
import json
from typing import List, Dict, Optional
from openai import APIError, RateLimitError
from config import get_holysheep_client, CostAnalysisModels
class CategorizationEngine:
"""Engine phân loại chi phí tự động bằng AI"""
CATEGORIES = [
"Infrastructure", "Personnel", "Marketing",
"R&D", "Operations", "Legal", "Travel", "Miscellaneous"
]
def __init__(self):
self.client = get_holysheep_client()
self.model = CostAnalysisModels.CATEGORIZATION_MODEL
def categorize_batch(
self,
expenses: List[Dict],
batch_size: int = 50
) -> List[Dict]:
"""
Phân loại hàng loạt expense records
Args:
expenses: Danh sách expense records
batch_size: Số lượng xử lý mỗi batch
Returns:
List of expenses với category được gán
"""
results = []
# Xử lý theo batch để tránh rate limit
for i in range(0, len(expenses), batch_size):
batch = expenses[i:i + batch_size]
try:
categorized = self._process_single_batch(batch)
results.extend(categorized)
print(f"✅ Processed batch {i//batch_size + 1}: {len(batch)} items")
except RateLimitError as e:
# Xử lý rate limit - wait và retry
import time
wait_time = int(str(e).split("retry after ")[-1].split(" seconds")[0]) if "retry after" in str(e) else 60
print(f"⏳ Rate limit hit. Waiting {wait_time}s...")
time.sleep(wait_time)
# Retry batch
categorized = self._process_single_batch(batch)
results.extend(categorized)
except APIError as e:
print(f"❌ API Error: {e}")
# Fallback: gán 'Miscellaneous' cho batch lỗi
for item in batch:
item['category'] = 'Miscellaneous'
item['category_confidence'] = 0.0
item['error'] = str(e)
results.extend(batch)
return results
def _process_single_batch(self, batch: List[Dict]) -> List[Dict]:
"""Xử lý một batch expenses"""
# Build prompt
prompt = self._build_categorization_prompt(batch)
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": f"""Bạn là chuyên gia phân loại chi phí doanh nghiệp.
Categories: {', '.join(self.CATEGORIES)}
Trả về JSON array với format:
[{{"id": "...", "category": "...", "confidence": 0.0-1.0}}]"""
},
{
"role": "user",
"content": prompt
}
],
temperature=0.3, # Low temperature cho consistency
max_tokens=2000
)
# Parse response
content = response.choices[0].message.content
# Extract JSON từ response
json_str = self._extract_json(content)
category_map = json.loads(json_str)
# Merge kết quả vào original data
id_to_category = {item['id']: item for item in category_map}
results = []
for expense in batch:
cat_info = id_to_category.get(expense['id'], {})
expense['category'] = cat_info.get('category', 'Miscellaneous')
expense['category_confidence'] = cat_info.get('confidence', 0.0)
results.append(expense)
return results
def _build_categorization_prompt(self, batch: List[Dict]) -> str:
"""Build prompt cho categorization task"""
items_str = "\n".join([
f"- ID: {e['id']}, Amount: {e.get('amount', 0)} {e.get('currency', 'USD')}, "
f"Vendor: {e.get('vendor', 'N/A')}, Description: {e.get('description', '')}"
for e in batch
])
return f"""Phân loại các chi phí sau vào 8 categories đã cho:
{items_str}
Chỉ trả về JSON array, không giải thích."""
def _extract_json(self, content: str) -> str:
"""Extract JSON string từ response"""
content = content.strip()
# Tìm JSON array
start = content.find('[')
end = content.rfind(']') + 1
if start != -1 and end != 0:
return content[start:end]
raise ValueError(f"Không tìm thấy JSON trong response: {content[:100]}")
Usage example
engine = CategorizationEngine()
sample_expenses = [
{"id": "EXP001", "amount": 5000, "currency": "USD",
"vendor": "AWS", "description": "Cloud hosting fees"},
{"id": "EXP002", "amount": 15000, "currency": "CNY",
"vendor": "Tencent", "description": "Marketing campaign Q1"}
]
results = engine.categorize_batch(sample_expenses)
print(json.dumps(results, indent=2))
3.3 Code Mẫu: Anomaly Detection Với Gemini 2.5 Flash
# anomaly_detector.py - Phát hiện chi phí bất thường
from typing import List, Dict, Tuple
from datetime import datetime, timedelta
import statistics
from config import get_holysheep_client, CostAnalysisModels
class AnomalyDetector:
"""Phát hiện chi phí bất thường sử dụng AI"""
def __init__(self):
self.client = get_holysheep_client()
self.model = CostAnalysisModels.ANOMALY_MODEL
def detect_anomalies(
self,
categorized_expenses: List[Dict],
threshold_zscore: float = 2.0
) -> Tuple[List[Dict], Dict]:
"""
Phát hiện anomalies sử dụng statistical + AI hybrid approach
Args:
categorized_expenses: Expenses đã được phân loại
threshold_zscore: Ngưỡng Z-score để flag outliers
Returns:
(anomalies, summary_statistics)
"""
# Step 1: Statistical outlier detection
by_category = self._group_by_category(categorized_expenses)
statistical_anomalies = self._find_statistical_anomalies(
by_category, threshold_zscore
)
# Step 2: AI-powered semantic analysis
# Filter expenses > $1000 cho AI analysis (tiết kiệm cost)
significant_expenses = [
e for e in categorized_expenses
if e.get('amount', 0) > 1000
]
ai_anomalies = self._find_ai_anomalies(significant_expenses)
# Step 3: Merge kết quả
all_anomalies = self._merge_anomaly_lists(
statistical_anomalies, ai_anomalies
)
# Calculate summary
summary = {
"total_expenses": len(categorized_expenses),
"total_amount": sum(e.get('amount', 0) for e in categorized_expenses),
"anomalies_count": len(all_anomalies),
"anomalies_value": sum(e.get('amount', 0) for e in all_anomalies),
"anomaly_rate": len(all_anomalies) / len(categorized_expenses) * 100
}
return all_anomalies, summary
def _group_by_category(self, expenses: List[Dict]) -> Dict[str, List[Dict]]:
"""Group expenses by category"""
groups = {}
for expense in expenses:
cat = expense.get('category', 'Unknown')
if cat not in groups:
groups[cat] = []
groups[cat].append(expense)
return groups
def _find_statistical_anomalies(
self,
by_category: Dict[str, List[Dict]],
threshold: float
) -> List[Dict]:
"""Tìm outliers dựa trên Z-score"""
anomalies = []
for category, items in by_category.items():
amounts = [i.get('amount', 0) for i in items]
if len(amounts) < 3:
continue
mean = statistics.mean(amounts)
stdev = statistics.stdev(amounts) if len(amounts) > 1 else 0
for item in items:
if stdev > 0:
zscore = abs((item['amount'] - mean) / stdev)
if zscore > threshold:
item['anomaly_type'] = 'statistical'
item['zscore'] = round(zscore, 2)
anomalies.append(item)
return anomalies
def _find_ai_anomalies(self, expenses: List[Dict]) -> List[Dict]:
"""Sử dụng AI để phát hiện semantic anomalies"""
if not expenses:
return []
# Batch request để tiết kiệm cost
prompt = self._build_anomaly_prompt(expenses)
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": """Bạn là chuyên gia phân tích chi phí doanh nghiệp.
Phân tích các chi phí và đánh dấu những khoản bất thường.
Trả về JSON array: [{"id": "...", "is_anomaly": true/false, "reason": "..."}]"""
},
{"role": "user", "content": prompt}
],
temperature=0.2,
max_tokens=1500
)
content = response.choices[0].message.content
import json
json_str = content[content.find('['):content.rfind(']')+1]
ai_results = json.loads(json_str)
# Map back to expenses
id_to_result = {r['id']: r for r in ai_results}
anomalies = []
for expense in expenses:
result = id_to_result.get(expense['id'], {})
if result.get('is_anomaly'):
expense['anomaly_type'] = 'ai_semantic'
expense['anomaly_reason'] = result.get('reason', '')
anomalies.append(expense)
return anomalies
except Exception as e:
print(f"⚠️ AI anomaly detection failed: {e}")
return []
def _build_anomaly_prompt(self, expenses: List[Dict]) -> str:
"""Build prompt cho AI anomaly detection"""
items = []
for e in expenses[:30]: # Limit 30 items per request
items.append(
f"ID: {e['id']}, Amount: ${e.get('amount', 0):,.2f}, "
f"Vendor: {e.get('vendor', 'N/A')}, "
f"Category: {e.get('category', 'N/A')}, "
f"Description: {e.get('description', 'N/A')}"
)
return "\n".join(items)
def _merge_anomaly_lists(
self,
list1: List[Dict],
list2: List[Dict]
) -> List[Dict]:
"""Merge hai danh sách anomalies, loại bỏ trùng lặp"""
seen_ids = set()
merged = []
for item in list1 + list2:
if item['id'] not in seen_ids:
seen_ids.add(item['id'])
merged.append(item)
return merged
Demo
detector = AnomalyDetector()
sample_data = [
{"id": "EXP001", "amount": 50000, "category": "Infrastructure",
"vendor": "AWS", "description": "EC2 instances"},
{"id": "EXP002", "amount": 250000, "category": "Infrastructure",
"vendor": "AWS", "description": "Unexpected spike"},
]
anomalies, summary = detector.detect_anomalies(sample_data)
print(f"🚨 Found {len(anomalies)} anomalies out of {summary['total_expenses']}")
3.4 Code Mẫu: Report Generation Với GPT-4.1
# report_generator.py - Tạo báo cáo chi phí chuyên nghiệp
from typing import List, Dict, Optional
from datetime import datetime
import json
from config import get_holysheep_client, CostAnalysisModels
class CostReportGenerator:
"""Tạo báo cáo chi phí tự động với AI"""
def __init__(self):
self.client = get_holysheep_client()
self.model = CostAnalysisModels.REPORT_MODEL
def generate_comprehensive_report(
self,
expenses: List[Dict],
anomalies: List[Dict],
summary: Dict,
period: str = "Q1 2026",
language: str = "vi"
) -> Dict:
"""
Generate báo cáo chi phí toàn diện
Args:
expenses: Danh sách chi phí đã phân loại
anomalies: Danh sách anomalies
summary: Summary statistics
period: Kỳ báo cáo
language: Ngôn ngữ báo cáo
Returns:
Dict chứa report components
"""
# Prepare data summary
by_category = self._calculate_breakdown(expenses)
trends = self._analyze_trends(expenses)
# Generate executive summary
executive_summary = self._generate_executive_summary(
summary, by_category, len(anomalies), period, language
)
# Generate detailed analysis
detailed_analysis = self._generate_detailed_analysis(
by_category, anomalies, trends, language
)
# Generate recommendations
recommendations = self._generate_recommendations(
summary, anomalies, by_category, language
)
return {
"period": period,
"generated_at": datetime.now().isoformat(),
"executive_summary": executive_summary,
"detailed_analysis": detailed_analysis,
"recommendations": recommendations,
"raw_data": {
"total_expenses": summary['total_expenses'],
"total_amount": summary['total_amount'],
"by_category": by_category,
"anomalies_count": len(anomalies),
"anomaly_rate": f"{summary.get('anomaly_rate', 0):.1f}%"
}
}
def _calculate_breakdown(self, expenses: List[Dict]) -> Dict:
"""Calculate chi phí theo category"""
breakdown = {}
total = 0
for expense in expenses:
cat = expense.get('category', 'Unknown')
amount = expense.get('amount', 0)
breakdown[cat] = breakdown.get(cat, 0) + amount
total += amount
# Convert to percentages
breakdown_pct = {}
for cat, amount in breakdown.items():
breakdown_pct[cat] = {
"amount": round(amount, 2),
"percentage": round(amount / total * 100, 1) if total > 0 else 0
}
return breakdown_pct
def _analyze_trends(self, expenses: List[Dict]) -> Dict:
"""Phân tích xu hướng chi phí"""
# Group by month
monthly = {}
for expense in expenses:
date = expense.get('date', '')[:7] # YYYY-MM
monthly[date] = monthly.get(date, 0) + expense.get('amount', 0)
# Calculate growth
months = sorted(monthly.keys())
trends = {
"monthly": monthly,
"growth_rate": 0.0
}
if len(months) >= 2:
first = monthly[months[0]]
last = monthly[months[-1]]
if first > 0:
trends['growth_rate'] = round((last - first) / first * 100, 1)
return trends
def _generate_executive_summary(
self,
summary: Dict,
breakdown: Dict,
anomaly_count: int,
period: str,
language: str
) -> str:
"""Generate executive summary bằng AI"""
top_category = max(breakdown.items(), key=lambda x: x[1]['amount']) if breakdown else ("N/A", {"amount": 0})
prompt = f"""Tạo executive summary cho báo cáo chi phí {period}.
Số liệu:
- Tổng chi phí: ${summary.get('total_amount', 0):,.2f}
- Số lượng giao dịch: {summary.get('total_expenses', 0)}
- Chi phí bất thường: {anomaly_count} ({summary.get('anomaly_rate', 0):.1f}%)
- Top category: {top_category[0]} (${top_category[1]['amount']:,.2f})
Trả về 3-4 câu ngắn gọn, phù hợp cho CEO/Board đọc."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "Bạn là chuyên gia tài chính CFO. Viết ngắn gọn, chuyên nghiệp."},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=300
)
return response.choices[0].message.content
def _generate_detailed_analysis(
self,
breakdown: Dict,
anomalies: List[Dict],
trends: Dict,
language: str
) -> str:
"""Generate detailed analysis section"""
breakdown_text = "\n".join([
f"- {cat}: ${data['amount']:,.2f} ({data['percentage']}%)"
for cat, data in sorted(breakdown.items(), key=lambda x: x[1]['amount'], reverse=True)
])
anomaly_text = "\n".join([
f"- {a.get('id')}: ${a.get('amount', 0):,.2f} - {a.get('anomaly_reason', a.get('anomaly_type'))}"
for a in anomalies[:5]
]) if anomalies else "Không có chi phí bất thường đáng chú ý."
prompt = f"""Viết phần Detailed Analysis cho báo cáo chi phí:
1. Chi phí theo category:
{breakdown_text}
2. Các chi phí bất thường cần lưu ý:
{anomaly_text}
3. Xu hướng:
- Monthly data: {trends.get('monthly', {})}
- Growth rate: {trends.get('growth_rate', 0)}%
Viết 2-3 đoạn phân tích chuyên sâu, phù hợp cho CFO và Finance Team."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "Bạn là chuyên gia phân tích tài chính. Phân tích chi tiết, có số liệu cụ thể."},
{"role": "user", "content": prompt}
],
temperature=0.4,
max_tokens=800
)
return response.choices[0].message.content
def _generate_recommendations(
self,
summary: Dict,
anomalies: List[Dict],
breakdown: Dict,
language: str
) -> List[Dict]:
"""Generate actionable recommendations"""
prompt = f"""Đề xuất 5 actions cụ thể để tối ưu chi phí dựa trên:
- Total spend: ${summary.get('total_amount', 0):,.2f}
- Top spending categories: {list(breakdown.keys())[:3]}
- Anomalies: {len(anomalies)} transactions flagged
Trả về JSON array format:
[{{"priority": "high/medium/low", "action": "...", "expected_savings": "$X,XXX", "timeline": "..."}}]"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "Bạn là consultant tối ưu chi phí. Đề xuất thực tế, có số liệu."},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=1000
)
content = response.choices[0].message.content
json_str = content[content.find('['):content.rfind(']')+1]
return json.loads(json_str)
def export_to_json(self, report: Dict, filename: str):
"""Export report ra JSON file"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"✅ Report exported to {filename}")
def export_to_markdown(self, report: Dict, filename: str):
"""Export report ra Markdown file"""
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"# Báo Cáo Chi Phí - {report['period']}\n\n")
f.write(f"**Generated:** {report['generated_at']}\n\n")
f.write("## Executive Summary\n\n")
f.write(f"{report['executive_summary']}\n\n")
f.write("## Detailed Analysis\n\n")
f.write(f"{report['detailed_analysis']}\n\n")
f.write("## Recommendations\n\n")
for i, rec in enumerate(report['recommendations'], 1):
f.write(f"{i}. **[{rec.get('priority', 'medium').upper()}]** {rec.get('action', '')}\n")
f.write(f" - Expected Savings: {rec.get('expected_savings', 'TBD')}\n")
f.write(f" - Timeline: {rec.get('timeline', 'TBD')}\n\n")
print(f"✅ Markdown report exported to {filename}")
Demo usage
generator = CostReportGenerator()
sample_report = generator.generate_comprehensive_report(
expenses=[
{"id": "EXP001", "amount": 15000, "category": "Infrastructure",
"vendor": "AWS", "date": "2026-01-15"},
{"id": "EXP002", "amount": 8000, "category": "Marketing",
"vendor": "Google", "date": "2026-01-20"}
],
anomalies=[],
summary={"total_expenses": 2, "total_amount": 23000, "anomaly_rate": 0},
period="Q1 2026"
)
generator.export_to_json(sample_report, "cost_report_q1_2026.json")
4. Triển Khai Hoàn Chỉnh Trong Dify
4.1 Dify Workflow Configuration
Để tích hợp vào Dify, tạo Custom Python Node với code sau:
# dify_cost_analysis_node.py - Dify Custom Node
"""
Dify Custom Node: Cost Analysis Workflow
Gắn vào Dify Workflow Editor → Add Custom Node → Paste code này
"""
import json
import sys
from typing import Dict, Any, List
Import các modules đã định nghĩa
Trong Dify, paste toàn bộ code từ các file trên vào đây
class DifyCostAnalysisNode:
"""Main node handler cho Dify workflow"""
def __init__(self, inputs: Dict[str, Any]):
self.inputs = inputs
self.results = {}
def execute(self) -> Dict[str, Any]:
"""
Main execution method - Dify sẽ gọi method này
Returns:
Dict chứa output để pass sang node tiếp theo
"""
# 1. Load expenses data
expense_source = self.inputs.get('expense_source', 'api')
if expense_source == 'json':
from data_processor import CostDataProcessor
processor = CostDataProcessor()
expenses = processor.load_from_json(
self.inputs.get('json_file_path', 'expenses.json')
)
else:
from data_processor import CostDataProcessor
processor = CostDataProcessor()
expenses = processor.load_from_api(
self.inputs.get('api_endpoint')
)
# 2. Categorize expenses
from categorization_engine import CategorizationEngine
categorizer = CategorizationEngine()
categorized = categorizer.categorize_batch(
[e.__dict__ for e in expenses],
batch_size=int(self.inputs.get('batch_size', 50))
)
# 3. Detect anomalies
from anomaly_detector import AnomalyDetector
detector = AnomalyDetector()
anomalies, summary = detector.detect_anomalies(categorized)
# 4. Generate report
from report_generator import CostReportGenerator
generator = CostReportGenerator()
report = generator.generate_comprehensive_report(
expenses