บทความนี้จะพาคุณเจาะลึกการใช้งาน ChatCompletion API อย่างเป็นระบบ ตั้งแต่โครงสร้างคำขอ การแยกวิเคราะห์การตอบกลับ ไปจนถึงการเขียนโค้ดระดับ Production พร้อม Benchmark จริง สำหรับวิศวกรที่ต้องการสร้างระบบที่เชื่อถือได้และประหยัดต้นทุน
ทำความเข้าใจ ChatCompletion API Endpoint
ChatCompletion API ทำงานผ่าน REST endpoint ด้วย HTTP POST method โดยมีโครงสร้างพื้นฐานดังนี้:
- Method: POST
- Content-Type: application/json
- Authentication: Bearer Token ใน Authorization Header
สำหรับการใช้งานจริงใน Production ฉันแนะนำให้ใช้ HolySheep AI ซึ่งให้อัตรา ¥1=$1 ประหยัดได้ถึง 85% พร้อม latency เฉลี่ยต่ำกว่า 50ms
โครงสร้าง Request Object เชิงลึก
2.1 Required Parameters
ส่วนที่จำเป็นต้องมีในทุกคำขอคือ model และ messages array:
import requests
import json
def create_chat_request(model: str, messages: list) -> dict:
"""
สร้าง request payload สำหรับ ChatCompletion API
model: ชื่อโมเดล เช่น gpt-4, gpt-4-turbo, gpt-3.5-turbo
messages: list of message objects ที่มี role และ content
"""
return {
"model": model,
"messages": messages,
"stream": False # ปิด streaming เพื่อรับ response ทั้งหมดในครั้งเดียว
}
ตัวอย่างการใช้งาน
payload = create_chat_request(
model="gpt-4",
messages=[
{"role": "system", "content": "คุณเป็นผู้ช่วย AI ที่เชี่ยวชาญ"},
{"role": "user", "content": "อธิบายเรื่อง REST API โดยย่อ"}
]
)
print(json.dumps(payload, indent=2, ensure_ascii=False))
2.2 Message Structure และ Roles
Message Object ประกอบด้วย 3 roles หลักที่คุณต้องเข้าใจ:
- system: กำหนดพฤติกรรมและบทบาทของ AI
- user: ข้อความจากผู้ใช้
- assistant: การตอบกลับก่อนหน้าจาก AI (ใช้ใน conversation)
# Multi-turn conversation example
messages = [
# System prompt กำหนด persona
{"role": "system", "content": "คุณเป็น senior software architect"},
# Turn 1: User asks
{"role": "user", "content": "ออกแบบ microservices architecture สำหรับ e-commerce"},
# Turn 1: Assistant responds
{"role": "assistant", "content": "สำหรับ e-commerce ผมแนะนำให้แบ่งเป็น services หลักดังนี้..."},
# Turn 2: User asks follow-up
{"role": "user", "content": "แล้ว API Gateway ควรจัดการเรื่องอะไรบ้าง?"}
]
ส่ง conversation history ทั้งหมดเพื่อให้ AI เข้าใจ context
response = requests.post(
f"{BASE_URL}/chat/completions",
headers=HEADERS,
json={"model": "gpt-4", "messages": messages}
)
การแยกวิเคราะห์ Response Object
Response จาก ChatCompletion API มีโครงสร้างที่ค่อนข้างลึก ต้องแยกวิเคราะห์ให้ถูกต้อง:
import json
from dataclasses import dataclass
from typing import Optional
@dataclass
class ChatResponse:
"""Data class สำหรับเก็บข้อมูล response อย่างเป็นระบบ"""
content: str
model: str
completion_tokens: int
prompt_tokens: int
total_tokens: int
finish_reason: str
response_id: str
def parse_chat_completion(response: requests.Response) -> ChatResponse:
"""
แยกวิเคราะห์ response จาก ChatCompletion API
"""
data = response.json()
# Extract choices (มักมีแค่ 1 choice ในกรณีปกติ)
choice = data["choices"][0]
# Extract usage statistics สำหรับคำนวณ cost
usage = data.get("usage", {})
return ChatResponse(
content=choice["message"]["content"],
model=data["model"],
completion_tokens=usage.get("completion_tokens", 0),
prompt_tokens=usage.get("prompt_tokens", 0),
total_tokens=usage.get("total_tokens", 0),
finish_reason=choice["finish_reason"],
response_id=data["id"]
)
ตัวอย่างการใช้งาน
response = requests.post(
f"{BASE_URL}/chat/completions",
headers=HEADERS,
json={"model": "gpt-4", "messages": messages}
)
result = parse_chat_completion(response)
print(f"Content: {result.content}")
print(f"Tokens used: {result.total_tokens}")
print(f"Cost: ${calculate_cost(result.total_tokens, result.model)}")
โค้ด Production พร้อม Error Handling และ Retry Logic
ในระบบ Production คุณต้องจัดการกับหลาย edge cases เช่น rate limiting, timeout, และ server errors:
import time
import logging
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
class ChatCompletionClient:
"""Production-ready client สำหรับ ChatCompletion API"""
def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
self.base_url = base_url
self.session = self._create_session_with_retries()
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def _create_session_with_retries(self) -> requests.Session:
"""Setup session พร้อม automatic retry สำหรับ transient errors"""
session = requests.Session()
# Retry strategy: 3 attempts, exponential backoff
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST", "GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def chat(
self,
model: str,
messages: list,
temperature: float = 0.7,
max_tokens: int = 2048,
timeout: int = 60
) -> ChatResponse:
"""
Send chat completion request พร้อม full error handling
"""
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
try:
response = self.session.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json=payload,
timeout=timeout
)
# Handle specific HTTP errors
if response.status_code == 401:
raise AuthenticationError("Invalid API key")
elif response.status_code == 429:
raise RateLimitError("Rate limit exceeded")
elif response.status_code >= 500:
raise ServerError(f"Server error: {response.status_code}")
response.raise_for_status()
return parse_chat_completion(response)
except requests.exceptions.Timeout:
logger.error("Request timeout after %ds", timeout)
raise TimeoutError(f"Request timed out after {timeout} seconds")
except requests.exceptions.ConnectionError as e:
logger.error("Connection error: %s", str(e))
raise ConnectionError(f"Failed to connect: {str(e)}")
Initialize client
client = ChatCompletionClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
base_url="https://api.holysheep.ai/v1"
)
การควบคุม Concurrent Requests และ Rate Limiting
สำหรับระบบที่ต้องรับ request จำนวนมาก คุณต้องจัดการ concurrency อย่างเหมาะสม:
- Semaphore: จำกัดจำนวน concurrent requests
- ThreadPoolExecutor: ประมวลผลหลาย requests พร้อมกัน
- Batch Processing: รวม requests ที่คล้ายกันเข้าด้วยกัน
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
class ConcurrentChatClient:
"""Client ที่รองรับ concurrent requests"""
def __init__(self, api_key: str, max_concurrent: int = 10):
self.client = ChatCompletionClient(api_key)
self.semaphore = asyncio.Semaphore(max_concurrent)
async def chat_async(self, model: str, messages: list, request_id: str) -> dict:
"""Async version พร้อม semaphore ควบคุม concurrency"""
async with self.semaphore:
try:
# Run sync code in thread pool
result = await asyncio.to_thread(
self.client.chat, model, messages
)
return {
"request_id": request_id,
"status": "success",
"content": result.content,
"tokens": result.total_tokens
}
except Exception as e:
return {
"request_id": request_id,
"status": "error",
"error": str(e)
}
async def batch_chat(self, requests: list[dict]) -> list[dict]:
"""ประมวลผล batch ของ requests พร้อมกัน"""
tasks = [
self.chat_async(
model=req["model"],
messages=req["messages"],
request_id=req.get("id", f"req_{i}")
)
for i, req in enumerate(requests)
]
results = await asyncio.gather(*tasks)
return results
ตัวอย่างการใช้งาน batch
async def main():
client = ConcurrentChatClient(
api_key="YOUR_HOLYSHEEP_API_KEY",
max_concurrent=10
)
requests
แหล่งข้อมูลที่เกี่ยวข้อง
บทความที่เกี่ยวข้อง