ในฐานะวิศวกรที่ต้องเข้าร่วมประชุมวีดีโอคอลหลายสิบชั่วโมงต่อสัปดาห์ ผมเชื่อว่าหลายคนคงประสบปัญหาเดียวกัน — จดบันทึกไม่ทัน พลาดรายละเอียดสำคัญ และลืม follow-up task ที่มอบหมายในที่ประชุม ในบทความนี้ผมจะแบ่งปันประสบการณ์ตรงในการสร้าง AI Meeting Assistant ที่ทำทั้ง transcribe เสียงแบบ real-time, สรุปประเด็นสำคัญ และแยก action items ออกมาอัตโนมัติ
สถาปัตยกรรมระบบโดยรวม
ระบบที่ผมออกแบบประกอบด้วย 3 ส่วนหลัก:
- Audio Streaming Module — รับเสียงจาก WebRTC/Microphone และส่งต่อให้ Whisper API
- Real-time Processing Pipeline — xử lý transcript ต่อเนื่องด้วย streaming response
- AI Analysis Engine — ใช้ LLM สรุปและแยก tasks ออกจาก conversation
สถาปัตยกรรมนี้ใช้งานได้กับทั้ง Zoom, Google Meet, Teams หรือแม้แต่การประชุมออฟไลน์ผ่านไมค์โฟน
การตั้งค่า Audio Streaming
เริ่มต้นด้วยการตั้งค่า audio capture module ที่ทำงานแบบ non-blocking เพื่อไม่ให้ block main thread ขณะที่รอ API response
# audio_streamer.py
import asyncio
import numpy as np
import speech_recognition as sr
from typing import AsyncGenerator, Optional
import websockets
import json
class AudioStreamer:
"""
Real-time audio streaming module สำหรับ meeting transcription
รองรับทั้ง microphone input และ WebRTC stream
"""
def __init__(self,
sample_rate: int = 16000,
chunk_duration: float = 1.0,
language: str = "th"):
self.sample_rate = sample_rate
self.chunk_duration = chunk_duration
self.chunk_size = int(sample_rate * chunk_duration)
self.language = language
self.recognizer = sr.Recognizer()
self.recognizer.energy_threshold = 300
self.recognizer.pause_threshold = 0.8
async def capture_microphone(self) -> AsyncGenerator[np.ndarray, None]:
"""Capture audio from microphone เป็น async generator"""
import pyaudio
audio = pyaudio.PyAudio()
stream = audio.open(
format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size
)
try:
while True:
try:
# อ่าน audio chunk
data = stream.read(self.chunk_size, exception_on_overflow=False)
audio_chunk = np.frombuffer(data, dtype=np.int16)
# แปลงเป็น float32 สำหรับ processing
audio_float = audio_chunk.astype(np.float32) / 32768.0
yield audio_float
except Exception as e:
print(f"Audio capture error: {e}")
await asyncio.sleep(0.1)
finally:
stream.stop_stream()
stream.close()
audio.terminate()
async def capture_from_buffer(self,
audio_queue: asyncio.Queue) -> AsyncGenerator[str, None]:
"""
อ่าน audio จาก queue และ transcribe ด้วย WebSocket
สำหรับกรณี integrate กับ WebRTC
"""
accumulated_text = []
while True:
try:
# รอ audio data จาก queue
audio_data = await asyncio.wait_for(
audio_queue.get(),
timeout=5.0
)
# Transcribe ด้วย streaming approach
text = await self._transcribe_streaming(audio_data)
if text:
accumulated_text.append(text)
yield " ".join(accumulated_text)
except asyncio.TimeoutError:
# Timeout = จบ utterance
if accumulated_text:
final_text = " ".join(accumulated_text)
accumulated_text = []
yield f"[END] {final_text}"
async def _transcribe_streaming(self, audio_data: bytes) -> Optional[str]:
"""เรียก Whisper API แบบ streaming ผ่าน WebSocket"""
try:
async with websockets.connect(
"wss://api.holysheep.ai/v1/audio/transcriptions/ws"
) as ws:
await ws.send(json.dumps({
"model": "whisper-1",
"language": self.language,
"format": "text"
}))
await ws.send(audio_data)
response = await ws.recv()
result = json.loads(response)
return result.get("text", "")
except Exception as e:
print(f"Transcription error: {e}")
return None
AI Summary & Task Extraction Engine
หลังจากได้ transcript แล้ว ขั้นตอนสำคัญคือการสรุปและแยก tasks ออกมา ผมใช้ HolySheep AI เพราะมี latency เฉลี่ยต่ำกว่า 50ms และราคาถูกกว่า OpenAI ถึง 85% ทำให้สามารถ process การประชุมยาวได้โดยไม่ต้องกังวลเรื่องค่าใช้จ่าย
# meeting_processor.py
import httpx
import asyncio
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class MeetingSummary:
"""Structured meeting summary output"""
overview: str
key_points: List[str]
decisions: List[str]
action_items: List[Dict[str, str]] # {task, assignee, deadline, priority}
sentiment: str
next_meeting: Optional[str]
class MeetingProcessor:
"""
AI-powered meeting processor สำหรับ summarize และ extract tasks
ใช้ HolySheep AI API สำหรับ LLM inference
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.client = httpx.AsyncClient(timeout=60.0)
async def process_meeting(self,
transcript: str,
meeting_title: str = "",
participants: List[str] = []) -> MeetingSummary:
"""
Process complete meeting transcript
ใช้ parallel calls สำหรับ summarize และ task extraction
"""
# Prompt สำหรับสรุปประเด็นหลัก
summary_prompt = f"""คุณคือ AI meeting assistant ที่ช่วยสรุปการประชุม
Transcript การประชุม:
{transcript}
กรุณาสรุปเป็น JSON format ดังนี้:
{{
"overview": "สรุปภาพรวมการประชุม 2-3 ประโยค",
"key_points": ["ประเด็นสำคัญที่ 1", "ประเด็นสำคัญที่ 2", ...],
"decisions": ["การตัดสินใจที่ 1", "การตัดสินใจที่ 2", ...],
"sentiment": "positive/neutral/negative",
"next_meeting": "กำหนดการประชุมครั้งต่อไป (ถ้ามี)"
}}
Output เป็น JSON อย่างเดียว ไม่ต้องมี markdown"""
# Prompt สำหรับแยก action items
tasks_prompt = f"""จาก transcript การประชุมด้านล่าง ให้แยก tasks ที่ต้องทำ:
Transcript:
{transcript}
ส่งออกเป็น JSON array:
[
{{
"task": "รายละเอียด task",
"assignee": "ชื่อคนรับผิดชอบ (ถ้าระบุ)",
"deadline": "กำหนดส่ง (ถ้ามี)",
"priority": "high/medium/low"
}}
]
ถ้าไม่มี tasks ให้ส่ง empty array []
Output เป็น JSON อย่างเดียว:"""
# Execute both calls in parallel
summary_task = self._call_llm(summary_prompt, model="deepseek-chat")
tasks_task = self._call_llm(tasks_prompt, model="deepseek-chat")
summary_response, tasks_response = await asyncio.gather(
summary_task, tasks_task
)
# Parse responses
summary_data = json.loads(summary_response)
action_items = json.loads(tasks_response)
return MeetingSummary(
overview=summary_data.get("overview", ""),
key_points=summary_data.get("key_points", []),
decisions=summary_data.get("decisions", []),
action_items=action_items,
sentiment=summary_data.get("sentiment", "neutral"),
next_meeting=summary_data.get("next_meeting")
)
async def _call_llm(self, prompt: str, model: str = "deepseek-chat") -> str:
"""เรียก HolySheep AI API สำหรับ LLM inference"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [
{"role": "user", "content": prompt}
],
"temperature": 0.3, # Low temperature for consistent structured output
"max_tokens": 2000
}
response = await self.client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
async def process_streaming(self,
partial_transcript: str) -> Dict[str, any]:
"""
Real-time processing สำหรับ interim summaries
ใช้สำหรับแสดงผลระหว่างประชุม
"""
prompt = f"""สรุปสิ่งที่พูดคุยกันในการประชุมตอนนี้ (interim summary):
{partial_transcript}
ให้สรุปสั้นๆ 2-3 ประโยค และ list action items ที่เห็น:"""
response = await self._call_llm(prompt, model="gpt-4o-mini") # Fast model for interim
return {"interim_summary": response}
async def generate_minutes(self, summary: MeetingSummary) -> str:
"""Generate formal meeting minutes"""
prompt = f"""สร้างรายงานการประชุมอย่างเป็นทางการ:
ภาพรวม: {summary.overview}
ประเด็นสำคัญ:
{chr(10).join(f"- {p}" for p in summary.key_points)}
การตัดสินใจ:
{chr(10).join(f"- {d}" for d in summary.decisions)}
Tasks:
{chr(10).join(f"- [{item['priority'].upper()}] {item['task']} | ผู้รับ: {item['assignee']} | กำหนด: {item['deadline']}"
for item in summary.action_items)}
กรุณาจัด format เป็น official meeting minutes:"""
return await self._call_llm(prompt, model="deepseek-chat")
async def close(self):
await self.client.aclose()
WebSocket Server สำหรับ Real-time Updates
เพื่อให้ frontend รับ updates แบบ real-time ผมสร้าง WebSocket server ที่ทำหน้าที่เป็น bridge ระหว่าง audio stream และ AI processing
# ws_server.py
import asyncio
import websockets
import json
import logging
from typing import Set, Dict
from audio_streamer import AudioStreamer
from meeting_processor import MeetingProcessor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MeetingWebSocketServer:
"""
WebSocket server สำหรับ real-time meeting assistant
รองรับ multiple concurrent meetings
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.active_connections: Set[websockets.WebSocketServerProtocol] = set()
self.meetings: Dict[str, dict] = {}
self.audio_streamer = AudioStreamer()
self.processor = MeetingProcessor(api_key)
async def register(self, websocket: websockets.WebSocketServerProtocol):
"""Register new WebSocket connection"""
self.active_connections.add(websocket)
logger.info(f"Client connected. Total: {len(self.active_connections)}")
async def unregister(self, websocket: websockets.WebSocketServerProtocol):
"""Unregister WebSocket connection"""
self.active_connections.discard(websocket)
logger.info(f"Client disconnected. Total: {len(self.active_connections)}")
async def handle_meeting(self, websocket: websockets.WebSocketServerProtocol):
"""Handle meeting session"""
meeting_id = None
try:
async for message in websocket:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "start":
# เริ่ม meeting session ใหม่
meeting_id = data.get("meeting_id", f"meeting_{id(websocket)}")
self.meetings[meeting_id] = {
"transcript": [],
"start_time": asyncio.get_event_loop().time(),
"participants": data.get("participants", [])
}
await websocket.send(json.dumps({
"type": "started",
"meeting_id": meeting_id
}))
logger.info(f"Meeting started: {meeting_id}")
elif msg_type == "audio":
# รับ audio chunk และ transcribe
if meeting_id:
transcript = await self._process_audio(data["audio"])
if transcript:
self.meetings[meeting_id]["transcript"].append(transcript)
# Broadcast to all participants
await self._broadcast(websocket, {
"type": "transcript",
"text": transcript,
"timestamp": asyncio.get_event_loop().time()
})
# Auto-generate interim summary ทุก 5 utterances
if len(self.meetings[meeting_id]["transcript"]) % 5 == 0:
interim = await self.processor.process_streaming(
" ".join(self.meetings[meeting_id]["transcript"])
)
await self._broadcast(websocket, {
"type": "interim_summary",
"data": interim
})
elif msg_type == "end":
# จบ meeting และ generate final summary
if meeting_id:
final_summary = await self._generate_final_summary(meeting_id)
await websocket.send(json.dumps({
"type": "summary",
"data": {
"overview": final_summary.overview,
"key_points": final_summary.key_points,
"decisions": final_summary.decisions,
"action_items": final_summary.action_items,
"sentiment": final_summary.sentiment,
"minutes": await self.processor.generate_minutes(final_summary)
}
}))
del self.meetings[meeting_id]
meeting_id = None
logger.info("Meeting ended and summary generated")
elif msg_type == "get_summary":
# Request current summary (on-demand)
if meeting_id and meeting_id in self.meetings:
summary = await self.processor.process_meeting(
" ".join(self.meetings[meeting_id]["transcript"])
)
await websocket.send(json.dumps({
"type": "summary",
"data": {
"overview": summary.overview,
"key_points": summary.key_points,
"action_items": summary.action_items
}
}))
except websockets.exceptions.ConnectionClosed:
logger.info("Connection closed by client")
finally:
if meeting_id and meeting_id in self.meetings:
del self.meetings[meeting_id]
await self.unregister(websocket)
async def _process_audio(self, audio_data: str) -> str:
"""Process audio data through Whisper"""
# ใช้ AudioStreamer สำหรับ transcription
# ใน production อาจใช้ dedicated Whisper service
return audio_data # Simplified for demo
async def _broadcast(self,
sender: websockets.WebSocketServerProtocol,
message: