Mở đầu: Câu chuyện thực tế từ dự án thương mại điện tử quy mô triệu người dùng
Tôi vẫn nhớ rõ ngày đó — tuần trước Black Friday 2025, hệ thống chatbot chăm sóc khách hàng của một sàn thương mại điện tử lớn tại Việt Nam bắt đầu trả về timeout liên tục. Độ trễ trung bình tăng từ 800ms lên 12 giây. Đội dev gọi nhau lúc 2h sáng. Khách hàng than phiền trên fanpage. Và tất cả chỉ vì một thay đổi nhỏ trong code đã được deploy lên production mà không qua kiểm thử. Kể từ sau incident đó, tôi bắt đầu xây dựng một hệ thống AB testing và gray release hoàn chỉnh cho các API AI. Và công cụ trung tâm mà tôi chọn là HolySheep API中转站 — nền tảng không chỉ giúp tiết kiệm 85%+ chi phí API mà còn cung cấp hạ tầng để triển khai deployment chiến lược an toàn. Bài viết này là toàn bộ kiến thức tôi đã đúc kết sau 8 tháng vận hành thực tế, bao gồm source code có thể chạy ngay, các con số benchmark chính xác, và những bài học xương máu khi triển khai gray release cho hệ thống AI.AB分流 là gì và tại sao cần thiết cho API AI
AB分流 (A/B Split Testing) là kỹ thuật phân chia lưu lượng traffic giữa hai hoặc nhiều phiên bản khác nhau của hệ thống. Trong bối cảnh API AI, AB分流 cho phép bạn:
- Kiểm thử model mới mà không ảnh hưởng toàn bộ người dùng
- So sánh hiệu suất giữa các nhà cung cấp (OpenAI, Anthropic, Google)
- Triển khai feature mới với rủi ro thấp nhất
- Tối ưu chi phí bằng cách route request đến provider rẻ hơn
Kiến trúc hệ thống HolySheep Gray Release
Trước khi đi vào code, hãy hiểu rõ kiến trúc tổng thể. Hệ thống gồm 4 layer chính:
- Load Balancer Layer: Phân phối request theo tỷ lệ A/B
- Feature Flag Service: Toggle feature on/off theo user segment
- API Gateway: Route đến HolySheep endpoint với logic validation
- Telemetry Layer: Thu thập metrics, latency, error rate
Triển khai AB Router với HolySheep
1. Cấu hình cơ bản với Python SDK
# holy_sheep_ab_router.py
Cấu hình AB分流 cơ bản với HolySheep API Gateway
Author: HolySheep AI Engineering Team
import hashlib
import time
import httpx
import json
from typing import Literal, Optional
from dataclasses import dataclass
from enum import Enum
class ABGroup(Enum):
CONTROL = "control" # Phiên bản cũ
VARIANT = "variant" # Phiên bản mới
CANARY = "canary" # Canary release (10% traffic)
@dataclass
class ABConfig:
"""Cấu hình AB Test"""
experiment_id: str
control_provider: Literal["openai", "anthropic", "google", "deepseek"]
variant_provider: Literal["openai", "anthropic", "google", "deepseek"]
canary_percentage: float = 0.1 # 10% đi canary
variant_percentage: float = 0.3 # 30% đi variant
@dataclass
class RequestMetrics:
"""Metrics cho một request"""
request_id: str
latency_ms: float
token_used: int
cost_usd: float
error: Optional[str]
group: ABGroup
class HolySheepABRouter:
"""AB Router sử dụng HolySheep API Gateway"""
BASE_URL = "https://api.holysheep.ai/v1"
# Pricing 2026 (USD per 1M tokens)
PRICING = {
"openai": {
"gpt-4.1": {"input": 8.0, "output": 32.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
},
"anthropic": {
"claude-sonnet-4.5": {"input": 15.0, "output": 75.0},
"claude-3-5-haiku": {"input": 0.80, "output": 4.0},
},
"google": {
"gemini-2.5-flash": {"input": 2.50, "output": 10.0},
"gemini-2.5-pro": {"input": 15.0, "output": 60.0},
},
"deepseek": {
"deepseek-v3.2": {"input": 0.42, "output": 1.68},
}
}
PROVIDER_MODELS = {
"openai": "gpt-4.1",
"anthropic": "claude-sonnet-4.5",
"google": "gemini-2.5-flash",
"deepseek": "deepseek-v3.2"
}
def __init__(self, api_key: str, config: ABConfig):
self.api_key = api_key
self.config = config
self.metrics: list[RequestMetrics] = []
self.client = httpx.Client(
base_url=self.BASE_URL,
timeout=30.0,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
)
def _hash_user_id(self, user_id: str) -> float:
"""Tạo hash deterministic từ user_id để ensure consistency"""
hash_input = f"{self.config.experiment_id}:{user_id}"
hash_value = hashlib.md5(hash_input.encode()).hexdigest()
return int(hash_value, 16) / (16 ** 32)
def _determine_group(self, user_id: str) -> ABGroup:
"""Xác định group cho user dựa trên hash (đảm bảo stable)"""
bucket = self._hash_user_id(user_id)
if bucket < self.config.canary_percentage:
return ABGroup.CANARY
elif bucket < self.config.canary_percentage + self.config.variant_percentage:
return ABGroup.VARIANT
else:
return ABGroup.CONTROL
def _get_provider_for_group(self, group: ABGroup) -> str:
"""Map group sang provider cụ thể"""
if group == ABGroup.CONTROL:
return self.config.control_provider
elif group == ABGroup.VARIANT:
return self.config.variant_provider
else: # CANARY - luôn dùng provider rẻ nhất cho test
return "deepseek"
def _calculate_cost(self, provider: str, model: str, input_tokens: int, output_tokens: int) -> float:
"""Tính chi phí theo pricing thực tế 2026"""
if provider not in self.PRICING:
return 0.0
model_pricing = self.PRICING[provider].get(model, {})
if not model_pricing:
return 0.0
input_cost = (input_tokens / 1_000_000) * model_pricing.get("input", 0)
output_cost = (output_tokens / 1_000_000) * model_pricing.get("output", 0)
return input_cost + output_cost
async def chat_completion(self, user_id: str, messages: list, **kwargs):
"""Gửi request qua HolySheep với AB routing"""
start_time = time.perf_counter()
request_id = f"{self.config.experiment_id}_{int(time.time() * 1000)}"
group = self._determine_group(user_id)
provider = self._get_provider_for_group(group)
model = self.PROVIDER_MODELS[provider]
try:
# Map request format cho HolySheep endpoint
response = self.client.post(
"/chat/completions",
json={
"model": model,
"messages": messages,
"temperature": kwargs.get("temperature", 0.7),
"max_tokens": kwargs.get("max_tokens", 2048),
}
)
response.raise_for_status()
result = response.json()
latency_ms = (time.perf_counter() - start_time) * 1000
tokens_used = result.get("usage", {}).get("total_tokens", 0)
cost = self._calculate_cost(
provider, model,
result.get("usage", {}).get("prompt_tokens", 0),
result.get("usage", {}).get("completion_tokens", 0)
)
self.metrics.append(RequestMetrics(
request_id=request_id,
latency_ms=latency_ms,
token_used=tokens_used,
cost_usd=cost,
error=None,
group=group
))
return {
"content": result["choices"][0]["message"]["content"],
"group": group.value,
"provider": provider,
"model": model,
"latency_ms": round(latency_ms, 2),
"cost_usd": round(cost, 6),
"tokens": tokens_used
}
except httpx.HTTPStatusError as e:
error_msg = f"HTTP {e.response.status_code}: {e.response.text}"
self.metrics.append(RequestMetrics(
request_id=request_id,
latency_ms=(time.perf_counter() - start_time) * 1000,
token_used=0,
cost_usd=0,
error=error_msg,
group=group
))
raise
def get_experiment_report(self) -> dict:
"""Generate báo cáo A/B test"""
if not self.metrics:
return {"error": "No metrics collected yet"}
report = {}
for group in ABGroup:
group_metrics = [m for m in self.metrics if m.group == group]
if group_metrics:
latencies = [m.latency_ms for m in group_metrics]
costs = [m.cost_usd for m in group_metrics]
errors = [m for m in group_metrics if m.error]
report[group.value] = {
"request_count": len(group_metrics),
"avg_latency_ms": round(sum(latencies) / len(latencies), 2),
"p50_latency_ms": round(sorted(latencies)[len(latencies) // 2], 2),
"p95_latency_ms": round(sorted(latencies)[int(len(latencies) * 0.95)], 2),
"total_cost_usd": round(sum(costs), 6),
"error_rate": round(len(errors) / len(group_metrics) * 100, 2),
"total_tokens": sum(m.token_used for m in group_metrics)
}
return report
============== USAGE EXAMPLE ==============
if __name__ == "__main__":
# Khởi tạo AB Router với config
config = ABConfig(
experiment_id="ecom_chatbot_v2",
control_provider="openai", # Control: GPT-4.1
variant_provider="anthropic", # Variant: Claude Sonnet 4.5
canary_percentage=0.05, # 5% đi deepseek
variant_percentage=0.30 # 30% đi Claude
)
router = HolySheepABRouter(
api_key="YOUR_HOLYSHEEP_API_KEY", # Thay bằng key thực tế
config=config
)
# Simulate requests
import asyncio
async def run_test():
test_users = [f"user_{i:04d}" for i in range(100)]
for user_id in test_users:
try:
result = await router.chat_completion(
user_id=user_id,
messages=[
{"role": "system", "content": "Bạn là trợ lý chăm sóc khách hàng"},
{"role": "user", "content": "Tôi muốn đổi địa chỉ giao hàng"}
],
temperature=0.7,
max_tokens=500
)
print(f"{user_id} -> {result['group']} ({result['provider']}) "
f"Latency: {result['latency_ms']}ms | Cost: ${result['cost_usd']:.6f}")
except Exception as e:
print(f"{user_id} -> ERROR: {e}")
# Print experiment report
print("\n" + "="*60)
print("EXPERIMENT REPORT")
print("="*60)
report = router.get_experiment_report()
for group, stats in report.items():
print(f"\n{group.upper()}:")
for key, value in stats.items():
print(f" {key}: {value}")
# Chạy test (comment out nếu chỉ muốn xem code)
# asyncio.run(run_test())
print("AB Router module đã được định nghĩa. Chạy run_test() để thực thi.")
2. Feature Flags và Canary Deployment
# holy_sheep_feature_flags.py
Feature Flags System cho gradual rollout với HolySheep
Hỗ trợ: percentage rollout, user segment, time-based, gradual phase
import time
import json
import redis
import hashlib
from typing import Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
class UserSegment(Enum):
ALL = "all"
FREE_TIER = "free_tier"
PREMIUM = "premium"
BETA_TESTERS = "beta_testers"
INTERNAL = "internal"
@dataclass
class FeatureConfig:
"""Cấu hình chi tiết cho một feature flag"""
flag_name: str
description: str
enabled: bool = False
# Rollout strategy
rollout_percentage: float = 0.0 # 0.0 - 100.0
rollout_start: Optional[datetime] = None
rollout_end: Optional[datetime] = None
# Targeting
target_segments: list[UserSegment] = field(default_factory=lambda: [UserSegment.ALL])
target_user_ids: set[str] = field(default_factory=set)
exclude_user_ids: set[str] = field(default_factory=set)
# Variant configuration
variants: dict[str, dict[str, Any]] = field(default_factory=dict)
default_variant: str = "control"
# Metadata
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
owner: str = ""
jira_ticket: str = ""
class FeatureFlagService:
"""Service quản lý feature flags với persistence"""
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self._redis = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self._cache: dict[str, FeatureConfig] = {}
self._cache_ttl = 60 # seconds
def _get_cache_key(self, flag_name: str) -> str:
return f"ff:{flag_name}"
def _hash_user_for_percentage(self, flag_name: str, user_id: str) -> float:
"""Deterministic hash cho percentage rollout"""
hash_input = f"{flag_name}:{user_id}"
hash_value = hashlib.md5(hash_input.encode()).hexdigest()
return int(hash_value[:8], 16) / (16 ** 8)
def register_flag(self, config: FeatureConfig) -> bool:
"""Đăng ký một feature flag mới"""
try:
self._redis.set(
self._get_cache_key(config.flag_name),
json.dumps({
"flag_name": config.flag_name,
"description": config.description,
"enabled": config.enabled,
"rollout_percentage": config.rollout_percentage,
"rollout_start": config.rollout_start.isoformat() if config.rollout_start else None,
"rollout_end": config.rollout_end.isoformat() if config.rollout_end else None,
"target_segments": [s.value for s in config.target_segments],
"target_user_ids": list(config.target_user_ids),
"exclude_user_ids": list(config.exclude_user_ids),
"variants": config.variants,
"default_variant": config.default_variant,
"created_at": config.created_at.isoformat(),
"updated_at": config.updated_at.isoformat(),
"owner": config.owner,
"jira_ticket": config.jira_ticket,
}),
ex=3600 # TTL 1 hour
)
self._cache[config.flag_name] = config
return True
except Exception as e:
print(f"Error registering flag: {e}")
return False
def get_flag(self, flag_name: str) -> Optional[FeatureConfig]:
"""Lấy cấu hình feature flag"""
# Check cache first
if flag_name in self._cache:
return self._cache[flag_name]
# Load from Redis
cached = self._redis.get(self._get_cache_key(flag_name))
if cached:
data = json.loads(cached)
config = FeatureConfig(
flag_name=data["flag_name"],
description=data["description"],
enabled=data["enabled"],
rollout_percentage=data["rollout_percentage"],
rollout_start=datetime.fromisoformat(data["rollout_start"]) if data["rollout_start"] else None,
rollout_end=datetime.fromisoformat(data["rollout_end"]) if data["rollout_end"] else None,
target_segments=[UserSegment(s) for s in data["target_segments"]],
target_user_ids=set(data["target_user_ids"]),
exclude_user_ids=set(data["exclude_user_ids"]),
variants=data["variants"],
default_variant=data["default_variant"],
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
owner=data.get("owner", ""),
jira_ticket=data.get("jira_ticket", ""),
)
self._cache[flag_name] = config
return config
return None
def is_enabled(
self,
flag_name: str,
user_id: str,
user_segment: UserSegment = UserSegment.ALL,
user_metadata: Optional[dict] = None
) -> bool:
"""Kiểm tra xem feature có được bật cho user không"""
config = self.get_flag(flag_name)
if not config:
return False
if not config.enabled:
return False
# Check time window
now = datetime.now()
if config.rollout_start and now < config.rollout_start:
return False
if config.rollout_end and now > config.rollout_end:
return False
# Check user exclusion
if user_id in config.exclude_user_ids:
return False
# Check explicit user targeting
if config.target_user_ids and user_id in config.target_user_ids:
return True
# Check segment targeting
if config.target_segments and UserSegment.ALL not in config.target_segments:
if user_segment not in config.target_segments:
return False
# Check percentage rollout
bucket = self._hash_user_for_percentage(flag_name, user_id)
return (bucket * 100) < config.rollout_percentage
def get_variant(
self,
flag_name: str,
user_id: str,
user_segment: UserSegment = UserSegment.ALL
) -> str:
"""Lấy variant mà user nhận được"""
config = self.get_flag(flag_name)
if not config or not self.is_enabled(flag_name, user_id, user_segment):
return config.default_variant if config else "control"
# Deterministic variant selection
bucket = self._hash_user_for_percentage(flag_name, f"{user_id}:variant")
variants = list(config.variants.keys()) if config.variants else ["control", "variant"]
# Weighted selection
for variant_name, variant_config in config.variants.items():
weight = variant_config.get("weight", 50)
bucket -= weight / 100
if bucket <= 0:
return variant_name
return config.default_variant
def disable_flag(self, flag_name: str) -> bool:
"""Emergency disable - rollback feature ngay lập tức"""
config = self.get_flag(flag_name)
if config:
config.enabled = False
config.updated_at = datetime.now()
return self.register_flag(config)
return False
def gradual_rollout(self, flag_name: str, target_percentage: float, step: float = 10) -> list[dict]:
"""Tăng dần rollout percentage theo từng bước"""
config = self.get_flag(flag_name)
if not config:
return [{"error": "Flag not found"}]
steps = []
current = config.rollout_percentage
while current < target_percentage:
current = min(current + step, target_percentage)
config.rollout_percentage = current
config.updated_at = datetime.now()
self.register_flag(config)
steps.append({
"timestamp": datetime.now().isoformat(),
"percentage": current,
"status": "deployed"
})
# Wait 5 minutes between steps in production
# time.sleep(300) # Uncomment in production
return steps
============== HOLYSHEEP INTEGRATION ==============
class HolySheepCanaryController:
"""Controller cho Canary Deployment với HolySheep"""
def __init__(self, feature_flags: FeatureFlagService):
self.flags = feature_flags
self.BASE_URL = "https://api.holysheep.ai/v1"
def setup_rag_canary(self, user_id: str, user_segment: UserSegment) -> dict:
"""Cấu hình RAG system với canary routing"""
# Kiểm tra feature flags
rag_enabled = self.flags.is_enabled("rag_v3", user_id, user_segment)
embedding_model = self.flags.get_variant("embedding_model", user_id, user_segment)
vector_db = self.flags.get_variant("vector_db", user_id, user_segment)
# Cấu hình HolySheep API routing
provider_map = {
"openai": {
"embedding": "text-embedding-3-large",
"llm": "gpt-4.1"
},
"cohere": {
"embedding": "embed- multilingual-v3.0",
"llm": "command-r-plus"
}
}
if embedding_model == "cohere":
config = {
"endpoint": f"{self.BASE_URL}/embeddings",
"model": provider_map["cohere"]["embedding"],
"vector_dimension": 1024,
"provider": "cohere",
"canary": rag_enabled,
"fallback": "openai"
}
else:
config = {
"endpoint": f"{self.BASE_URL}/embeddings",
"model": provider_map["openai"]["embedding"],
"vector_dimension": 3072,
"provider": "openai",
"canary": rag_enabled,
"fallback": "cohere"
}
return config
def setup_streaming_canary(self, user_id: str) -> dict:
"""Cấu hình streaming với canary percentage"""
# Streaming quality tiers
config = {
"streaming_enabled": self.flags.is_enabled("streaming_v2", user_id),
"chunk_size": 20 if self.flags.get_variant("streaming_quality", user_id) == "high" else 50,
"buffer_size": 512,
"provider": self.flags.get_variant("streaming_provider", user_id)
}
return config
============== USAGE EXAMPLE ==============
if __name__ == "__main__":
# Initialize service
ff_service = FeatureFlagService(redis_host="localhost", redis_port=6379)
canary_controller = HolySheepCanaryController(ff_service)
# Register RAG v3 feature flag
rag_config = FeatureConfig(
flag_name="rag_v3",
description="RAG system phiên bản 3 với hybrid search",
enabled=True,
rollout_percentage=25.0, # Bắt đầu 25%
rollout_start=datetime.now(),
target_segments=[UserSegment.PREMIUM, UserSegment.BETA_TESTERS],
variants={
"control": {"weight": 50, "description": "RAG v2 hiện tại"},
"variant": {"weight": 50, "description": "RAG v3 hybrid search"}
},
default_variant="control",
owner="ml-team",
jira_ticket="ML-1234"
)
ff_service.register_flag(rag_config)
# Register embedding model experiment
embedding_config = FeatureConfig(
flag_name="embedding_model",
description="So sánh OpenAI vs Cohere embeddings",
enabled=True,
rollout_percentage=50.0,
variants={
"openai": {"weight": 50},
"cohere": {"weight": 50}
},
default_variant="openai",
owner="search-team",
jira_ticket="SRCH-567"
)
ff_service.register_flag(embedding_config)
# Test user targeting
test_users = [
("user_001", UserSegment.PREMIUM),
("user_002", UserSegment.FREE_TIER),
("user_003", UserSegment.BETA_TESTERS),
("internal_dev", UserSegment.INTERNAL),
]
print("="*70)
print("FEATURE FLAG EVALUATION TEST")
print("="*70)
for user_id, segment in test_users:
rag_enabled = ff_service.is_enabled("rag_v3", user_id, segment)
rag_variant = ff_service.get_variant("rag_v3", user_id, segment)
embedding = ff_service.get_variant("embedding_model", user_id, segment)
canary_config = canary_controller.setup_rag_canary(user_id, segment)
print(f"\n👤 {user_id} ({segment.value})")
print(f" RAG v3: {'✅ ENABLED' if rag_enabled else '❌ DISABLED'} | Variant: {rag_variant}")
print(f" Embedding: {embedding}")
print(f" Provider: {canary_config['provider']} | Dimension: {canary_config['vector_dimension']}")
print("\n" + "="*70)
print("Gradual Rollout Status")
print("="*70)
# Simulate gradual rollout
for pct in [25, 50, 75, 100]:
rag_config.rollout_percentage = pct
rag_config.updated_at = datetime.now()
ff_service.register_flag(rag_config)
affected_users = sum(
1 for uid, _ in test_users
if ff_service.is_enabled("rag_v3", uid)
)
print(f"Rollout {pct}%: {affected_users}/{len(test_users)} users affected")
3. Telemetry và Monitoring Dashboard
# holy_sheep_telemetry.py
Monitoring System cho A/B Testing với Prometheus/Grafana integration
Real-time metrics: latency, error rate, token usage, cost efficiency
import time
import asyncio
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry, push_to_gateway
import json
Prometheus metrics
REGISTRY = CollectorRegistry()
Counters
REQUEST_COUNTER = Counter(
'holysheep_requests_total',
'Total requests by group and provider',
['experiment_id', 'group', 'provider', 'model'],
registry=REGISTRY
)
ERROR_COUNTER = Counter(
'holysheep_errors_total',
'Total errors by type',
['experiment_id', 'error_type', 'provider'],
registry=REGISTRY
)
Histograms
LATENCY_HISTOGRAM = Histogram(
'holysheep_request_latency_seconds',
'Request latency in seconds',
['experiment_id', 'group', 'provider'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
registry=REGISTRY
)
TOKEN_USAGE_HISTOGRAM = Histogram(
'holysheep_tokens_used',
'Token usage per request',
['experiment_id', 'group', 'provider', 'token_type'],
buckets=[10, 50, 100, 500, 1000, 5000, 10000, 50000],
registry=REGISTRY
)
Gauges
ACTIVE_USERS_GAUGE = Gauge(
'holysheep_active_users',
'Active users in experiment',
['experiment_id', 'group'],
registry=REGISTRY
)
COST_ACCUMULATOR_GAUGE = Gauge(
'holysheep_total_cost_usd',
'Accumulated cost in USD',
['experiment_id', 'group', 'provider'],
registry=REGISTRY
)
@dataclass
class LatencyStats:
"""Thống kê latency"""
count: int = 0
sum_ms: float = 0.0
min_ms: float = float('inf')
max_ms: float = 0.0
p50_ms: float = 0.0
p95_ms: float = 0.0
p99_ms: float = 0.0
def add(self, latency_ms: float):
self.count += 1
self.sum_ms += latency_ms
self.min_ms = min(self.min_ms, latency_ms)
self.max_ms = max(self.max_ms, latency_ms)
def calculate_percentiles(self, values: list):
if not values:
return
sorted_values = sorted(values)
self.p50_ms = sorted_values[int(len(sorted_values) * 0.50)]
self.p95_ms = sorted_values[int(len(sorted_values) * 0.95)]
self.p99_ms = sorted_values[int(len(sorted_values) * 0.99)]
@property
def avg_ms(self) -> float:
return self.sum_ms / self.count if self.count > 0 else 0.0
@dataclass
class ExperimentMetrics:
"""Metrics tổng hợp cho một experiment"""
experiment_id: str
start_time: datetime
end_time: Optional[datetime] = None
# Raw data
requests: list = field(default_factory=list)
errors: list = field(default_factory=list)
# Aggregated stats
latency_by_group: dict = field(default_factory=lambda: defaultdict(LatencyStats))
latency_by_provider: dict = field(default_factory=lambda: defaultdict(LatencyStats))
token_usage: dict = field(default_factory=lambda: defaultdict(int))
cost_by_group: dict = field(default_factory=lambda: defaultdict(float))
cost_by_provider: dict = field(default_factory=lambda: defaultdict(float))
errors_by_type: dict = field(default_factory=lambda: defaultdict(int))
def finalize(self):
"""Tính toán final statistics"""
# Calculate percentiles per group
for group, stats in self.latency_by_group.items():
group_latencies = [r.latency_ms for r in self.requests if r.group == group]
stats.calculate_percentiles(group_latencies)
for provider, stats in self.latency_by_provider.items():
provider_latencies = [r.latency_ms for r in self.requests if r.provider == provider]
stats.calculate_percentiles(provider_latencies)
self.end_time = datetime.now()
def generate_report(self) -> dict:
"""Generate comprehensive A/B test report"""
self.finalize()
total_requests = len(self.requests)
total_errors = len(self.errors)
if total_requests == 0:
return {"status": "no_data", "message": "No requests recorded"}
# Group comparison
group_comparison = {}
for group in set(r.group for r in self.requests):
group_requests = [r for r in self.requests if r.group == group]
group_errors = [r