ในฐานะวิศวกรที่ต้องเข้าร่วมประชุมวีดีโอคอลหลายสิบชั่วโมงต่อสัปดาห์ ผมเชื่อว่าหลายคนคงประสบปัญหาเดียวกัน — จดบันทึกไม่ทัน พลาดรายละเอียดสำคัญ และลืม follow-up task ที่มอบหมายในที่ประชุม ในบทความนี้ผมจะแบ่งปันประสบการณ์ตรงในการสร้าง AI Meeting Assistant ที่ทำทั้ง transcribe เสียงแบบ real-time, สรุปประเด็นสำคัญ และแยก action items ออกมาอัตโนมัติ

สถาปัตยกรรมระบบโดยรวม

ระบบที่ผมออกแบบประกอบด้วย 3 ส่วนหลัก:

สถาปัตยกรรมนี้ใช้งานได้กับทั้ง Zoom, Google Meet, Teams หรือแม้แต่การประชุมออฟไลน์ผ่านไมค์โฟน

การตั้งค่า Audio Streaming

เริ่มต้นด้วยการตั้งค่า audio capture module ที่ทำงานแบบ non-blocking เพื่อไม่ให้ block main thread ขณะที่รอ API response

# audio_streamer.py
import asyncio
import numpy as np
import speech_recognition as sr
from typing import AsyncGenerator, Optional
import websockets
import json

class AudioStreamer:
    """
    Real-time audio streaming module สำหรับ meeting transcription
    รองรับทั้ง microphone input และ WebRTC stream
    """
    
    def __init__(self, 
                 sample_rate: int = 16000,
                 chunk_duration: float = 1.0,
                 language: str = "th"):
        self.sample_rate = sample_rate
        self.chunk_duration = chunk_duration
        self.chunk_size = int(sample_rate * chunk_duration)
        self.language = language
        self.recognizer = sr.Recognizer()
        self.recognizer.energy_threshold = 300
        self.recognizer.pause_threshold = 0.8
        
    async def capture_microphone(self) -> AsyncGenerator[np.ndarray, None]:
        """Capture audio from microphone เป็น async generator"""
        import pyaudio
        
        audio = pyaudio.PyAudio()
        stream = audio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=self.sample_rate,
            input=True,
            frames_per_buffer=self.chunk_size
        )
        
        try:
            while True:
                try:
                    # อ่าน audio chunk
                    data = stream.read(self.chunk_size, exception_on_overflow=False)
                    audio_chunk = np.frombuffer(data, dtype=np.int16)
                    
                    # แปลงเป็น float32 สำหรับ processing
                    audio_float = audio_chunk.astype(np.float32) / 32768.0
                    
                    yield audio_float
                    
                except Exception as e:
                    print(f"Audio capture error: {e}")
                    await asyncio.sleep(0.1)
                    
        finally:
            stream.stop_stream()
            stream.close()
            audio.terminate()
    
    async def capture_from_buffer(self, 
                                   audio_queue: asyncio.Queue) -> AsyncGenerator[str, None]:
        """
        อ่าน audio จาก queue และ transcribe ด้วย WebSocket
        สำหรับกรณี integrate กับ WebRTC
        """
        accumulated_text = []
        
        while True:
            try:
                # รอ audio data จาก queue
                audio_data = await asyncio.wait_for(
                    audio_queue.get(), 
                    timeout=5.0
                )
                
                # Transcribe ด้วย streaming approach
                text = await self._transcribe_streaming(audio_data)
                
                if text:
                    accumulated_text.append(text)
                    yield " ".join(accumulated_text)
                    
            except asyncio.TimeoutError:
                # Timeout = จบ utterance
                if accumulated_text:
                    final_text = " ".join(accumulated_text)
                    accumulated_text = []
                    yield f"[END] {final_text}"
                    
    async def _transcribe_streaming(self, audio_data: bytes) -> Optional[str]:
        """เรียก Whisper API แบบ streaming ผ่าน WebSocket"""
        try:
            async with websockets.connect(
                "wss://api.holysheep.ai/v1/audio/transcriptions/ws"
            ) as ws:
                await ws.send(json.dumps({
                    "model": "whisper-1",
                    "language": self.language,
                    "format": "text"
                }))
                await ws.send(audio_data)
                
                response = await ws.recv()
                result = json.loads(response)
                return result.get("text", "")
                
        except Exception as e:
            print(f"Transcription error: {e}")
            return None

AI Summary & Task Extraction Engine

หลังจากได้ transcript แล้ว ขั้นตอนสำคัญคือการสรุปและแยก tasks ออกมา ผมใช้ HolySheep AI เพราะมี latency เฉลี่ยต่ำกว่า 50ms และราคาถูกกว่า OpenAI ถึง 85% ทำให้สามารถ process การประชุมยาวได้โดยไม่ต้องกังวลเรื่องค่าใช้จ่าย

# meeting_processor.py
import httpx
import asyncio
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class MeetingSummary:
    """Structured meeting summary output"""
    overview: str
    key_points: List[str]
    decisions: List[str]
    action_items: List[Dict[str, str]]  # {task, assignee, deadline, priority}
    sentiment: str
    next_meeting: Optional[str]

class MeetingProcessor:
    """
    AI-powered meeting processor สำหรับ summarize และ extract tasks
    ใช้ HolySheep AI API สำหรับ LLM inference
    """
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.holysheep.ai/v1"
        self.client = httpx.AsyncClient(timeout=60.0)
        
    async def process_meeting(self, 
                               transcript: str,
                               meeting_title: str = "",
                               participants: List[str] = []) -> MeetingSummary:
        """
        Process complete meeting transcript
        ใช้ parallel calls สำหรับ summarize และ task extraction
        """
        
        # Prompt สำหรับสรุปประเด็นหลัก
        summary_prompt = f"""คุณคือ AI meeting assistant ที่ช่วยสรุปการประชุม
        
Transcript การประชุม:
{transcript}

กรุณาสรุปเป็น JSON format ดังนี้:
{{
    "overview": "สรุปภาพรวมการประชุม 2-3 ประโยค",
    "key_points": ["ประเด็นสำคัญที่ 1", "ประเด็นสำคัญที่ 2", ...],
    "decisions": ["การตัดสินใจที่ 1", "การตัดสินใจที่ 2", ...],
    "sentiment": "positive/neutral/negative",
    "next_meeting": "กำหนดการประชุมครั้งต่อไป (ถ้ามี)"
}}

Output เป็น JSON อย่างเดียว ไม่ต้องมี markdown"""
        
        # Prompt สำหรับแยก action items
        tasks_prompt = f"""จาก transcript การประชุมด้านล่าง ให้แยก tasks ที่ต้องทำ:

Transcript:
{transcript}

ส่งออกเป็น JSON array:
[
    {{
        "task": "รายละเอียด task",
        "assignee": "ชื่อคนรับผิดชอบ (ถ้าระบุ)",
        "deadline": "กำหนดส่ง (ถ้ามี)",
        "priority": "high/medium/low"
    }}
]

ถ้าไม่มี tasks ให้ส่ง empty array []

Output เป็น JSON อย่างเดียว:"""

        # Execute both calls in parallel
        summary_task = self._call_llm(summary_prompt, model="deepseek-chat")
        tasks_task = self._call_llm(tasks_prompt, model="deepseek-chat")
        
        summary_response, tasks_response = await asyncio.gather(
            summary_task, tasks_task
        )
        
        # Parse responses
        summary_data = json.loads(summary_response)
        action_items = json.loads(tasks_response)
        
        return MeetingSummary(
            overview=summary_data.get("overview", ""),
            key_points=summary_data.get("key_points", []),
            decisions=summary_data.get("decisions", []),
            action_items=action_items,
            sentiment=summary_data.get("sentiment", "neutral"),
            next_meeting=summary_data.get("next_meeting")
        )
    
    async def _call_llm(self, prompt: str, model: str = "deepseek-chat") -> str:
        """เรียก HolySheep AI API สำหรับ LLM inference"""
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "temperature": 0.3,  # Low temperature for consistent structured output
            "max_tokens": 2000
        }
        
        response = await self.client.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload
        )
        
        response.raise_for_status()
        data = response.json()
        
        return data["choices"][0]["message"]["content"]
    
    async def process_streaming(self, 
                                 partial_transcript: str) -> Dict[str, any]:
        """
        Real-time processing สำหรับ interim summaries
        ใช้สำหรับแสดงผลระหว่างประชุม
        """
        
        prompt = f"""สรุปสิ่งที่พูดคุยกันในการประชุมตอนนี้ (interim summary):

{partial_transcript}

ให้สรุปสั้นๆ 2-3 ประโยค และ list action items ที่เห็น:"""
        
        response = await self._call_llm(prompt, model="gpt-4o-mini")  # Fast model for interim
        
        return {"interim_summary": response}
    
    async def generate_minutes(self, summary: MeetingSummary) -> str:
        """Generate formal meeting minutes"""
        
        prompt = f"""สร้างรายงานการประชุมอย่างเป็นทางการ:

ภาพรวม: {summary.overview}

ประเด็นสำคัญ:
{chr(10).join(f"- {p}" for p in summary.key_points)}

การตัดสินใจ:
{chr(10).join(f"- {d}" for d in summary.decisions)}

Tasks:
{chr(10).join(f"- [{item['priority'].upper()}] {item['task']} | ผู้รับ: {item['assignee']} | กำหนด: {item['deadline']}" 
             for item in summary.action_items)}

กรุณาจัด format เป็น official meeting minutes:"""
        
        return await self._call_llm(prompt, model="deepseek-chat")

    async def close(self):
        await self.client.aclose()

WebSocket Server สำหรับ Real-time Updates

เพื่อให้ frontend รับ updates แบบ real-time ผมสร้าง WebSocket server ที่ทำหน้าที่เป็น bridge ระหว่าง audio stream และ AI processing

# ws_server.py
import asyncio
import websockets
import json
import logging
from typing import Set, Dict
from audio_streamer import AudioStreamer
from meeting_processor import MeetingProcessor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MeetingWebSocketServer:
    """
    WebSocket server สำหรับ real-time meeting assistant
    รองรับ multiple concurrent meetings
    """
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.active_connections: Set[websockets.WebSocketServerProtocol] = set()
        self.meetings: Dict[str, dict] = {}
        self.audio_streamer = AudioStreamer()
        self.processor = MeetingProcessor(api_key)
        
    async def register(self, websocket: websockets.WebSocketServerProtocol):
        """Register new WebSocket connection"""
        self.active_connections.add(websocket)
        logger.info(f"Client connected. Total: {len(self.active_connections)}")
        
    async def unregister(self, websocket: websockets.WebSocketServerProtocol):
        """Unregister WebSocket connection"""
        self.active_connections.discard(websocket)
        logger.info(f"Client disconnected. Total: {len(self.active_connections)}")
        
    async def handle_meeting(self, websocket: websockets.WebSocketServerProtocol):
        """Handle meeting session"""
        meeting_id = None
        
        try:
            async for message in websocket:
                data = json.loads(message)
                msg_type = data.get("type")
                
                if msg_type == "start":
                    # เริ่ม meeting session ใหม่
                    meeting_id = data.get("meeting_id", f"meeting_{id(websocket)}")
                    self.meetings[meeting_id] = {
                        "transcript": [],
                        "start_time": asyncio.get_event_loop().time(),
                        "participants": data.get("participants", [])
                    }
                    
                    await websocket.send(json.dumps({
                        "type": "started",
                        "meeting_id": meeting_id
                    }))
                    
                    logger.info(f"Meeting started: {meeting_id}")
                    
                elif msg_type == "audio":
                    # รับ audio chunk และ transcribe
                    if meeting_id:
                        transcript = await self._process_audio(data["audio"])
                        
                        if transcript:
                            self.meetings[meeting_id]["transcript"].append(transcript)
                            
                            # Broadcast to all participants
                            await self._broadcast(websocket, {
                                "type": "transcript",
                                "text": transcript,
                                "timestamp": asyncio.get_event_loop().time()
                            })
                            
                            # Auto-generate interim summary ทุก 5 utterances
                            if len(self.meetings[meeting_id]["transcript"]) % 5 == 0:
                                interim = await self.processor.process_streaming(
                                    " ".join(self.meetings[meeting_id]["transcript"])
                                )
                                await self._broadcast(websocket, {
                                    "type": "interim_summary",
                                    "data": interim
                                })
                                
                elif msg_type == "end":
                    # จบ meeting และ generate final summary
                    if meeting_id:
                        final_summary = await self._generate_final_summary(meeting_id)
                        
                        await websocket.send(json.dumps({
                            "type": "summary",
                            "data": {
                                "overview": final_summary.overview,
                                "key_points": final_summary.key_points,
                                "decisions": final_summary.decisions,
                                "action_items": final_summary.action_items,
                                "sentiment": final_summary.sentiment,
                                "minutes": await self.processor.generate_minutes(final_summary)
                            }
                        }))
                        
                        del self.meetings[meeting_id]
                        meeting_id = None
                        logger.info("Meeting ended and summary generated")
                        
                elif msg_type == "get_summary":
                    # Request current summary (on-demand)
                    if meeting_id and meeting_id in self.meetings:
                        summary = await self.processor.process_meeting(
                            " ".join(self.meetings[meeting_id]["transcript"])
                        )
                        await websocket.send(json.dumps({
                            "type": "summary",
                            "data": {
                                "overview": summary.overview,
                                "key_points": summary.key_points,
                                "action_items": summary.action_items
                            }
                        }))
                        
        except websockets.exceptions.ConnectionClosed:
            logger.info("Connection closed by client")
        finally:
            if meeting_id and meeting_id in self.meetings:
                del self.meetings[meeting_id]
            await self.unregister(websocket)
            
    async def _process_audio(self, audio_data: str) -> str:
        """Process audio data through Whisper"""
        # ใช้ AudioStreamer สำหรับ transcription
        # ใน production อาจใช้ dedicated Whisper service
        return audio_data  # Simplified for demo
        
    async def _broadcast(self, 
                         sender: websockets.WebSocketServerProtocol,
                         message: