실제 개발 현장에서 만나는 문제: "ConnectionError: timeout connecting to M-Pesa sandbox — 예상 응답 시간 30초 초과"

케냐의 M-Pesa는 아프리카 7개국 5,000만 명 이상이 사용하는 모바일 머니 서비스입니다. 그러나 M-Pesa API와 AI 챗봇을 직접 연동하면 다음과 같은 벽에 부딪힙니다:

저는 3개월간 M-Pesa + AI 챗봇 프로젝트를 진행하며 수십 개의 버그를 직접 겪었습니다. 이 가이드에서는 HolySheep AI를 사용하여 신뢰할 수 있는 글로벌 연결로这些问题를 해결하는 방법을 단계별로 설명합니다.

아프리카 모바일 결제 시장과 M-Pesa 이해

M-Pesa 기본 개념

M-Pesa(Mobile Pesa)는 사파리콤(Safaricom)이 2007년 케냐에서 시작한 모바일 금융 서비스입니다. 은행 계좌 없이도 휴대폰으로 송금, 결제, 대출이 가능합니다.

# M-Pesa API 엔드포인트 구조
M-PESA_API_BASE = "https://api.safaricom.co.ke"
M-PESA_SANDBOX = "https://sandbox.safaricom.co.ke"

주요 API 엔드포인트

ENDPOINTS = { "oauth": "/oauth/v1/generate?grant_type=client_credentials", "register_url": "/mpesa/c2b/v1/registerurl", "simulate": "/mpesa/c2b/v1/simulate", "stk_push": "/mpesa/stkpush/v1/processrequest", "stk_query": "/mpesa/stkpush/v1/query", "b2c": "/mpesa/b2c/v1/paymentrequest", "reversal": "/mpesa/b2c/v1/paymentrequest", "transaction_status": "/mpesa/transactionstatus/v1/query", "balance": "/mpesa/accountbalance/v1/query" }

M-Pesa 결제 흐름 아키텍처

# M-Pesa STK Push 결제 흐름

1. 사용자: 챗봇에 "1000 원 결제" 입력

2. AI 챗봇: HolySheep AI로 결제 의도 분석

3. 서버: M-Pesa OAuth 토큰 발급 요청

4. M-Pesa: Access Token 반환 (1시간 유효)

5. 서버: STK Push 요청 (Phone, Amount, Account)

6. M-Pesa: CheckoutRequestID 반환

7. 사용자 폰: M-Pesa 팝업 → PIN 입력

8. 서버: 폴링으로 결제 결과 확인

9. AI 챗봇: 결제 완료 메시지 전송

import asyncio from dataclasses import dataclass from typing import Optional from enum import Enum class PaymentStatus(Enum): PENDING = "pending" COMPLETED = "completed" FAILED = "failed" TIMEOUT = "timeout" CANCELLED = "cancelled" @dataclass class MpesaPayment: checkout_request_id: str merchant_request_id: str customer_message: str response_code: str response_description: str status: PaymentStatus = PaymentStatus.PENDING

프로젝트 구조와 환경 설정

디렉토리 구조

# 프로젝트 디렉토리 구조
mpesa-ai-chatbot/
├── config/
│   ├── __init__.py
│   ├── settings.py          # 환경 변수 설정
│   └── mpesa_config.py      # M-Pesa API 키/시크릿
├── services/
│   ├── __init__.py
│   ├── mpesa_service.py     # M-Pesa API 통신
│   ├── ai_service.py        # HolySheep AI 연동
│   └── payment_processor.py # 결제 처리 로직
├── models/
│   ├── __init__.py
│   ├── database.py          # SQLite/PostgreSQL
│   └── schemas.py           # Pydantic 스키마
├── api/
│   ├── __init__.py
│   ├── webhooks.py          # M-Pesa 콜백 핸들러
│   └── routes.py            # FastAPI 라우트
├── tests/
│   ├── test_mpesa.py
│   └── test_ai_integration.py
├── main.py                  # FastAPI 앱 진입점
├── requirements.txt
└── .env

requirements.txt

# 핵심 의존성
fastapi==0.115.0
uvicorn==0.32.0
httpx==0.27.2
python-dotenv==1.0.1
pydantic==2.9.2
pydantic-settings==2.5.2
aiosqlite==0.20.0
python-jose==3.3.0
passlib==1.7.4
loguru==0.7.2

환경 변수 설정 (.env)

# M-Pesa Developer Portal에서 발급받은 자격 증명
MPESA_CONSUMER_KEY=your_consumer_key_here
MPESA_CONSUMER_SECRET=your_consumer_secret_here
MPESA_SHORTCODE=174379
MPESA_PASSKEY=your_passkey_here
MPESA_CALLBACK_URL=https://yourdomain.com/api/webhooks/mpesa

HolySheep AI API 설정

HOLYSHEEP_API_KEY=YOUR_HOLYSHEEP_API_KEY HOLYSHEEP_BASE_URL=https://api.holysheep.ai/v1

애플리케이션 설정

APP_ENV=development # development, staging, production LOG_LEVEL=INFO DATABASE_URL=sqlite:///./mpesa_ai.db

결제 설정

PAYMENT_TIMEOUT_SECONDS=90 PAYMENT_POLL_INTERVAL_SECONDS=2 MAX_PAYMENT_RETRY=3

M-Pesa API 서비스 구현

M-Pesa 서비스 클래스

# services/mpesa_service.py
import httpx
import base64
import time
import asyncio
from datetime import datetime
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from loguru import logger
from config.settings import settings

@dataclass
class MpesaToken:
    access_token: str
    expires_at: float
    token_type: str = "Bearer"
    
    @property
    def is_expired(self) -> bool:
        # 토큰 만료 5분 전부터 갱신
        return time.time() >= (self.expires_at - 300)

class MpesaService:
    """M-Pesa API와 통신하는 서비스 클래스"""
    
    def __init__(self):
        self.consumer_key = settings.MPESA_CONSUMER_KEY
        self.consumer_secret = settings.MPESA_CONSUMER_SECRET
        self.shortcode = settings.MPESA_SHORTCODE
        self.passkey = settings.MPESA_PASSKEY
        self.callback_url = settings.MPESA_CALLBACK_URL
        
        # 프로덕션/샌드박스 동적 전환
        self.base_url = (
            "https://api.safaricom.co.ke" 
            if settings.APP_ENV == "production" 
            else "https://sandbox.safaricom.co.ke"
        )
        
        self._token: Optional[MpesaToken] = None
        self._client = httpx.AsyncClient(timeout=30.0)
    
    def _get_auth_header(self) -> str:
        """Basic Auth 헤더 생성 (Consumer Key:Consumer Secret)"""
        credentials = f"{self.consumer_key}:{self.consumer_secret}"
        encoded = base64.b64encode(credentials.encode()).decode()
        return f"Basic {encoded}"
    
    async def get_access_token(self) -> MpesaToken:
        """
        OAuth 2.0 토큰 발급
        토큰은 1시간 유효하며 캐싱하여 재발급 최소화
        """
        # 캐시된 토큰이 유효하면 반환
        if self._token and not self._token.is_expired:
            logger.debug("Cached M-Pesa token reused")
            return self._token
        
        url = f"{self.base_url}/oauth/v1/generate"
        params = {"grant_type": "client_credentials"}
        headers = {"Authorization": self._get_auth_header()}
        
        logger.info("Requesting new M-Pesa access token")
        
        try:
            response = await self._client.get(url, params=params, headers=headers)
            response.raise_for_status()
            
            data = response.json()
            self._token = MpesaToken(
                access_token=data["access_token"],
                expires_at=time.time() + data["expires_in"],
                token_type=data.get("token_type", "Bearer")
            )
            
            logger.success(f"M-Pesa token obtained, expires in {data['expires_in']}s")
            return self._token
            
        except httpx.HTTPStatusError as e:
            logger.error(f"M-Pesa auth failed: {e.response.status_code} - {e.response.text}")
            raise MpesaAuthError(f"Authentication failed: {e.response.text}")
        except httpx.RequestError as e:
            logger.error(f"M-Pesa connection error: {str(e)}")
            raise MpesaConnectionError(f"Connection failed: {str(e)}")
    
    async def stk_push_request(
        self,
        phone: str,
        amount: int,
        account_reference: str,
        transaction_desc: str = "Payment"
    ) -> Dict[str, Any]:
        """
        STK Push 결제 요청 (Lipa na M-Pesa Online)
        
        Args:
            phone: Kenya 형식 전화번호 (254712345678)
            amount: 금액 (KES)
            account_reference: 상점 계정 ID
            transaction_desc: 거래 설명
        
        Returns:
            M-Pesa API 응답 딕셔너리
        """
        token = await self.get_access_token()
        
        # Password = Shortcode + Passkey + Timestamp
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        password = base64.b64encode(
            f"{self.shortcode}{self.passkey}{timestamp}".encode()
        ).decode()
        
        url = f"{self.base_url}/mpesa/stkpush/v1/processrequest"
        headers = {
            "Authorization": f"{token.token_type} {token.access_token}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "BusinessShortCode": self.shortcode,
            "Password": password,
            "Timestamp": timestamp,
            "TransactionType": "CustomerPayBillOnline",
            "Amount": str(amount),
            "PartyA": phone,
            "PartyB": self.shortcode,
            "PhoneNumber": phone,
            "CallBackURL": self.callback_url,
            "AccountReference": account_reference,
            "TransactionDesc": transaction_desc
        }
        
        logger.info(f"STK Push request: Phone={phone}, Amount={amount} KES")
        
        try:
            response = await self._client.post(url, json=payload, headers=headers)
            response.raise_for_status()
            result = response.json()
            
            logger.success(f"STK Push response: {result.get('ResponseCode')}")
            return result
            
        except httpx.HTTPStatusError as e:
            logger.error(f"STK Push failed: {e.response.status_code}")
            raise MpesaAPIError(f"STK Push failed: {e.response.json()}")
    
    async def stk_push_query(self, checkout_request_id: str) -> Dict[str, Any]:
        """STK Push 결과 조회 (폴링용)"""
        token = await self.get_access_token()
        
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        password = base64.b64encode(
            f"{self.shortcode}{self.passkey}{timestamp}".encode()
        ).decode()
        
        url = f"{self.base_url}/mpesa/stkpush/v1/query"
        headers = {
            "Authorization": f"{token.token_type} {token.access_token}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "BusinessShortCode": self.shortcode,
            "Password": password,
            "Timestamp": timestamp,
            "CheckoutRequestID": checkout_request_id
        }
        
        response = await self._client.post(url, json=payload, headers=headers)
        response.raise_for_status()
        return response.json()
    
    async def wait_for_payment(
        self,
        checkout_request_id: str,
        timeout: int = 90,
        poll_interval: int = 2
    ) -> Dict[str, Any]:
        """
        결제 완료 대기 (폴링 방식)
        
        M-Pesa 팝업은 사용자가 최대 90초 내에 PIN 입력
        폴링으로 상태 확인 후 결과 반환
        """
        start_time = time.time()
        poll_count = 0
        
        logger.info(f"Starting payment poll for {checkout_request_id}")
        
        while time.time() - start_time < timeout:
            poll_count += 1
            elapsed = int(time.time() - start_time)
            
            try:
                result = await self.stk_push_query(checkout_request_id)
                result_code = result.get("ResultCode")
                
                logger.debug(f"Poll #{poll_count} at {elapsed}s: ResultCode={result_code}")
                
                if result_code == "0":
                    # 결제 성공
                    return {
                        "status": "completed",
                        "checkout_request_id": checkout_request_id,
                        "result_code": result_code,
                        "result_desc": result.get("ResultDesc", "Payment successful"),
                        "metadata": result.get("CallbackMetadata", {}),
                        "poll_attempts": poll_count
                    }
                elif result_code == "1032":
                    # 사용자 취소
                    return {
                        "status": "cancelled",
                        "checkout_request_id": checkout_request_id,
                        "result_code": result_code,
                        "result_desc": "Payment cancelled by user"
                    }
                elif result_code in ["2017", "2001"]:
                    # 무효한 요청
                    return {
                        "status": "failed",
                        "checkout_request_id": checkout_request_id,
                        "result_code": result_code,
                        "result_desc": result.get("ResultDesc", "Invalid request")
                    }
                    
            except Exception as e:
                logger.warning(f"Poll error at {elapsed}s: {str(e)}")
            
            await asyncio.sleep(poll_interval)
        
        # 타임아웃
        logger.warning(f"Payment timeout after {timeout}s ({poll_count} polls)")
        return {
            "status": "timeout",
            "checkout_request_id": checkout_request_id,
            "result_desc": "Payment timed out",
            "poll_attempts": poll_count
        }
    
    async def close(self):
        """HTTP 클라이언트 정리"""
        await self._client.aclose()

class MpesaAuthError(Exception):
    """M-Pesa 인증 오류"""
    pass

class MpesaConnectionError(Exception):
    """M-Pesa 연결 오류"""
    pass

class MpesaAPIError(Exception):
    """M-Pesa API 오류"""
    pass

HolySheep AI 서비스 연동

AI 서비스 클래스 구현

# services/ai_service.py
import json
import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from loguru import logger
import httpx
from config.settings import settings

@dataclass
class ChatMessage:
    role: str  # system, user, assistant
    content: str

@dataclass
class ChatCompletionResponse:
    content: str
    model: str
    usage: Dict[str, int]
    finish_reason: str

class HolySheepAIService:
    """
    HolySheep AI API를 사용하여 M-Pesa 결제 관련
    AI 챗봇 서비스 제공
    """
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key or settings.HOLYSHEEP_API_KEY
        self.base_url = settings.HOLYSHEEP_BASE_URL
        self._client = httpx.AsyncClient(timeout=60.0)
        
        # 시스템 프롬프트 - M-Pesa 결제 특화
        self.system_prompt = """당신은 아프리카에서 M-Pesa 결제를 도와주는 AI 어시스턴트입니다.

핵심 기능:
1. M-Pesa STK Push 결제를 위한 금액, 전화번호 수집
2. 결제 상태 확인 및 결과 안내
3. 일반 고객 문의 응대

결제 관련 규칙:
- 금액은 항상 KES (케냐 시링) 단위
- 전화번호는 254로 시작하는 Kenya 형식
- 최소 결제 금액: 1 KES
- 최대 결제 금액: 150,000 KES

응답 형식:
- 결제 요청 시: 확인 메시지 + 결제 진행 버튼
- 결제 완료 시: 영수증 정보 포함 축하 메시지
- 오류 시: 명확한 오류 설명 + 재시도 안내

친절하고 전문적인 케냐 영어/스왈릴리어 혼합어로 응답합니다."""
    
    async def chat(
        self,
        messages: List[ChatMessage],
        model: str = "gpt-4o",
        temperature: float = 0.7,
        max_tokens: int = 500
    ) -> ChatCompletionResponse:
        """
        HolySheep AI 채팅 완료 API 호출
        
        Args:
            messages: 대화 메시지 목록
            model: 사용할 모델 (gpt-4o, claude-sonnet-4-20250514, gemini-2.5-flash 등)
            temperature: 창의성 온도 (0~2)
            max_tokens: 최대 토큰 수
        
        Returns:
            AI 응답
        """
        # OpenAI 호환 형식으로 변환
        formatted_messages = []
        
        # 시스템 프롬프트 추가
        if not any(m.role == "system" for m in messages):
            formatted_messages.append({
                "role": "system",
                "content": self.system_prompt
            })
        
        for msg in messages:
            formatted_messages.append({
                "role": msg.role,
                "content": msg.content
            })
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": formatted_messages,
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        logger.debug(f"Calling HolySheep AI: model={model}")
        
        try:
            response = await self._client.post(
                f"{self.base_url}/chat/completions",
                json=payload,
                headers=headers
            )
            
            response.raise_for_status()
            data = response.json()
            
            choice = data["choices"][0]
            usage = data.get("usage", {})
            
            logger.debug(f"AI response: {len(choice['message']['content'])} chars")
            
            return ChatCompletionResponse(
                content=choice["message"]["content"],
                model=data.get("model", model),
                usage={
                    "prompt_tokens": usage.get("prompt_tokens", 0),
                    "completion_tokens": usage.get("completion_tokens", 0),
                    "total_tokens": usage.get("total_tokens", 0)
                },
                finish_reason=choice.get("finish_reason", "stop")
            )
            
        except httpx.HTTPStatusError as e:
            logger.error(f"AI API error: {e.response.status_code}")
            if e.response.status_code == 401:
                raise AIAuthError("Invalid API key. Please check your HolySheep API key.")
            elif e.response.status_code == 429:
                raise AIRateLimitError("Rate limit exceeded. Please wait and retry.")
            raise AIAPIError(f"API error: {e.response.text}")
        except httpx.RequestError as e:
            logger.error(f"AI connection error: {str(e)}")
            raise AIConnectionError(f"Connection failed: {str(e)}")
    
    async def analyze_payment_intent(
        self,
        user_message: str
    ) -> Dict[str, Any]:
        """
        사용자 메시지에서 결제 의도 분석
        
        Returns:
            {
                "intent": "payment_request" | "payment_status" | "general_inquiry",
                "amount": int | None,
                "phone": str | None,
                "confidence": float
            }
        """
        prompt = f"""Analyze this message and extract payment information:

Message: "{user_message}"

Respond in JSON format:
{{
    "intent": "payment_request" | "payment_status_check" | "balance_inquiry" | "general_inquiry",
    "amount": number or null (in KES, extract only if explicitly mentioned),
    "phone": "254XXXXXXXXX" format or null,
    "confidence": 0.0 to 1.0,
    "entities": {{}}
}}"""
        
        response = await self.chat(
            messages=[ChatMessage(role="user", content=prompt)],
            model="gpt-4o-mini",
            temperature=0.3,
            max_tokens=200
        )
        
        try:
            result = json.loads(response.content)
            logger.info(f"Intent analysis: {result['intent']}, confidence={result['confidence']}")
            return result
        except json.JSONDecodeError:
            logger.warning(f"Failed to parse intent: {response.content}")
            return {
                "intent": "general_inquiry",
                "amount": None,
                "phone": None,
                "confidence": 0.0
            }
    
    async def close(self):
        """HTTP 클라이언트 정리"""
        await self._client.aclose()

class AIError(Exception):
    """AI 서비스 기본 오류"""
    pass

class AIAuthError(AIError):
    """AI 인증 오류"""
    pass

class AIRateLimitError(AIError):
    """AI rate limit 초과"""
    pass

class AIConnectionError(AIError):
    """AI 연결 오류"""
    pass

class AIAPIError(AIError):
    """AI API 오류"""
    pass

M-Pesa + AI 통합 결제 처리기

# services/payment_processor.py
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from loguru import logger
from services.mpesa_service import MpesaService, MpesaAuthError
from services.ai_service import HolySheepAIService, ChatMessage, AIError

@dataclass
class PaymentSession:
    """결제 세션 관리"""
    session_id: str
    user_phone: str
    amount: Optional[int] = None
    checkout_request_id: Optional[str] = None
    status: str = "initiated"
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    error_message: Optional[str] = None

class PaymentProcessor:
    """
    M-Pesa 결제 + AI 챗봇 통합 처리기
    
    결제 프로세스:
    1. AI가 사용자 의도 파악
    2. 결제 정보(금액, 전화번호) 수집
    3. M-Pesa STK Push 요청
    4. AI가 결제 진행 상황 안내
    5. M-Pesa 콜백 또는 폴링으로 결과 수신
    6. AI가 최종 결과 전달
    """
    
    def __init__(self):
        self.mpesa = MpesaService()
        self.ai = HolySheepAIService()
        self._sessions: Dict[str, PaymentSession] = {}
    
    def _validate_phone(self, phone: str) -> bool:
        """케냐 전화번호 유효성 검증"""
        cleaned = phone.replace("+", "").replace(" ", "").replace("-", "")
        
        # Kenya 형식: 254XXXXXXXXX (12자리)
        if cleaned.startswith("254") and len(cleaned) == 12:
            return True
        # 07XXXXXXXX (10자리) → 254 변환
        if cleaned.startswith("07") and len(cleaned) == 10:
            return True
        # 7XXXXXXXX (9자리) → 254 변환
        if cleaned.startswith("7") and len(cleaned) == 9:
            return True
        
        return False
    
    def _normalize_phone(self, phone: str) -> str:
        """전화번호를 Kenya 표준 형식으로 변환"""
        cleaned = phone.replace("+", "").replace(" ", "").replace("-", "")
        
        if cleaned.startswith("254"):
            return cleaned
        if cleaned.startswith("07"):
            return "254" + cleaned[1:]
        if cleaned.startswith("7"):
            return "254" + cleaned
        
        return cleaned
    
    async def process_message(
        self,
        session_id: str,
        user_phone: str,
        user_message: str
    ) -> Dict[str, Any]:
        """
        통합 메시지 처리: AI 분석 → 결제 실행 → 응답 반환
        
        Returns:
            {
                "ai_response": str,
                "action_taken": str,
                "session_data": dict
            }
        """
        session = self._sessions.get(session_id)
        if not session:
            session = PaymentSession(
                session_id=session_id,
                user_phone=self._normalize_phone(user_phone)
            )
            self._sessions[session_id] = session
        
        logger.info(f"Processing message for session {session_id}")
        
        # 1단계: AI로 의도 분석
        intent_result = await self.ai.analyze_payment_intent(user_message)
        intent = intent_result["intent"]
        
        # 2단계: 의도에 따른 처리
        if intent == "payment_request" and intent_result.get("amount"):
            # 자동 결제 진행
            amount = intent_result["amount"]
            return await self._initiate_payment(
                session, 
                amount, 
                intent_result.get("phone") or session.user_phone
            )
        
        elif intent == "payment_status_check" and session.checkout_request_id:
            # 결제 상태 확인
            return await self._check_payment_status(session)
        
        elif session.amount and session.checkout_request_id and session.status == "awaiting_confirmation":
            # 금액 확인 후 결제 진행
            if "yes" in user_message.lower() or "confirm" in user_message.lower():
                return await self._initiate_payment(
                    session,
                    session.amount,
                    session.user_phone
                )
        
        # 일반 대화: AI 응답만 반환
        ai_response = await self.ai.chat(
            messages=[
                ChatMessage(role="user", content=user_message)
            ]
        )
        
        return {
            "ai_response": ai_response.content,
            "action_taken": "conversation",
            "session_data": self._session_to_dict(session)
        }
    
    async def _initiate_payment(
        self,
        session: PaymentSession,
        amount: int,
        phone: str
    ) -> Dict[str, Any]:
        """결제 세션 시작"""
        normalized_phone = self._normalize_phone(phone)
        
        if not self._validate_phone(normalized_phone):
            error_msg = "Invalid phone number. Please provide a valid Kenya number (254XXXXXXXXX)."
            return {
                "ai_response": error_msg,
                "action_taken": "validation_error",
                "error": "invalid_phone"
            }
        
        if amount < 1 or amount > 150000:
            return {
                "ai_response": "Amount must be between 1 and 150,000 KES.",
                "action_taken": "validation_error",
                "error": "invalid_amount"
            }
        
        session.amount = amount
        session.user_phone = normalized_phone
        session.status = "processing"
        session.updated_at = datetime.now()
        
        logger.info(f"Initiating payment: {amount} KES to {normalized_phone}")
        
        try:
            # M-Pesa STK Push 요청
            mpesa_result = await self.mpesa.stk_push_request(
                phone=normalized_phone,
                amount=amount,
                account_reference=f"ORDER-{session.session_id[:8]}",
                transaction_desc=f"AI Chatbot Payment - {session.session_id}"
            )
            
            session.checkout_request_id = mpesa_result.get("CheckoutRequestID")
            session.status = "awaiting_payment"
            session.updated_at = datetime.now()
            
            response_msg = (
                f"✅ Payment request sent!\n\n"
                f"📱 Check your M-Pesa phone ({normalized_phone})\n"
                f"💰 Amount: {amount} KES\n\n"
                f"Please enter your M-Pesa PIN on your phone to confirm.\n"
                f"I'll confirm once the payment is complete."
            )
            
            return {
                "ai_response": response_msg,
                "action_taken": "payment_initiated",
                "checkout_request_id": session.checkout_request_id,
                "session_data": self._session_to_dict(session)
            }
            
        except MpesaAuthError as e:
            session.status = "failed"
            session.error_message = f"Auth error: {str(e)}"
            logger.error(f"M-Pesa auth error: {str(e)}")
            return {
                "ai_response": "Payment service temporarily unavailable. Please try again later.",
                "action_taken": "payment_failed",
                "error": "auth_error"
            }
        except Exception as e:
            session.status = "failed"
            session.error_message = str(e)
            logger.error(f"Payment error: {str(e)}")
            return {
                "ai_response": f"Payment failed: {str(e)}. Please try again.",
                "action_taken": "payment_failed",
                "error": str(e)
            }
    
    async def _check_payment_status(self, session: PaymentSession) -> Dict[str, Any]:
        """결제 상태 확인 (폴링)"""
        if not session.checkout_request_id:
            return {
                "ai_response": "No active payment found for this session.",
                "action_taken": "no_payment"
            }
        
        session.status = "checking"
        session.updated_at = datetime.now()
        
        logger.info(f"Checking payment status: {session.checkout_request_id}")
        
        result = await self.mpesa.wait_for_payment(
            checkout_request_id=session.checkout_request_id,
            timeout=90,
            poll_interval=2
        )
        
        if result["status"] == "completed":
            session.status = "completed"
            session.updated_at = datetime.now()
            
            receipt = result.get("metadata", {}).get("Item", [])
            receipt_number = next(
                (item.get("Value") for item in receipt if item.get("Name") == "MpesaReceiptNumber"),
                "N/A"
            )
            
            response = (
                f"🎉 **Payment Successful!**\n\n"
                f"📋 Receipt: {receipt_number}\n"
                f"💵 Amount: {session.amount} KES\n"
                f"✅ Status: Completed\n\n"
                f"Thank you for your payment!"
            )
            
        elif result["status"] == "cancelled":
            session.status = "cancelled"
            session.updated_at = datetime.now()
            response = "❌ Payment was cancelled. Use /pay to start a new payment."
            
        elif result["status"] == "timeout":
            session.status = "timeout"
            session.updated_at = datetime.now()
            response = (
                "⏰ Payment timed out. The M-Pesa request expired.\n"
                "Please try again with /pay."
            )
            
        else:
            session.status = "failed"
            session.updated_at = datetime.now()
            response = f"❌ Payment failed: {result.get('result_desc', 'Unknown error')}"
        
        return {
            "ai_response": response,
            "action_taken": f"payment_{result['status']}",
            "session_data": self._session_to_dict(session)
        }
    
    def _session_to_dict(self, session: PaymentSession) -> Dict[str, Any]:
        """세션 데이터를 딕셔너리로 변환"""
        return {
            "session_id": session.session_id,
            "phone": session.user_phone,
            "amount": session.amount,
            "checkout_request_id": session.checkout_request_id,
            "status": session.status,
            "created_at": session.created_at.isoformat(),
            "updated_at": session.updated_at.isoformat(),
            "error": session.error_message
        }
    
    async def handle_mpesa_callback(self, callback_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        M-Pesa 콜백 처리 (웹훅 엔드포인트)
        
        M-Pesa가 결제가 완료되면 이 엔드포인트를 호출합니다.
        """
        logger.info(f"M-Pesa callback received: {callback_data}")
        
        try:
            result = callback_data.get("Body", {}).get("stkCallback", {})
            result_code = result.get("ResultCode")
            checkout_request_id = result.get("CheckoutRequestID")
            
            # 세션查找
            session = None
            for s in self._sessions.values():
                if s.checkout_request_id == checkout_request_id:
                    session = s
                    break
            
            if not session:
                logger.warning(f"Session not found for callback: {checkout_request_id}")
                return {"status": "ignored", "message": "Session not found"}
            
            if result_code == 0:
                # 결제 성공
                metadata = result.get("CallbackMetadata", {}).get("Item", [])
                
                amount = next(
                    (item.get("Value") for item in metadata if item.get("Name") == "Amount"),
                    0
                )
                receipt = next(
                    (item.get("Value") for item in metadata if item.get("Name") == "MpesaReceiptNumber"),
                    ""
                )
                timestamp = next(
                    (item.get("Value") for item in metadata if item.get("Name") == "TransactionDate"),
                    0
                )
                
                session.status = "completed"
                session.updated_at = datetime.now()
                
                logger.success(f"Payment completed: Receipt={receipt}, Amount={amount}")
                
                return {
                    "status": "success",
                    "receipt": receipt,
                    "amount": amount,
                    "transaction_date": timestamp
                }
            else:
                # 결제 실패
                result_desc = result.get("ResultDesc", "Unknown error")
                session.status = "failed"
                session.error_message = result_desc
                session.updated_at = datetime.now()
                
                logger.error(f"Payment failed: {result_desc}")
                
                return {
                    "status": "failed",
                    "error": result_desc
                }
                
        except Exception as e:
            logger.error(f"Callback processing error: {str(e)}")
            return {"status": "error", "message": str(e)}
    
    async def close(self):
        """모든 서비스 연결 정리"""
        await self.mpesa.close()
        await self.ai.close()

FastAPI 웹 애플리케이션

# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, List
from loguru import logger
import uuid
from contextlib import asynccontextmanager

from services.payment_processor import PaymentProcessor
from config.settings import settings

전역 프로세서 인스턴스

processor: Optional[PaymentProcessor] = None @asynccontextmanager async def lifespan(app: FastAPI): """애플리케이션 라이프사이클 관리""" global processor processor = PaymentProcessor() logger.info("Payment processor initialized") yield await processor.close() logger.info("Payment processor closed") app = FastAPI( title="M-Pesa AI Chatbot API", description="아프리카 M-Pesa 결제를 AI 챗봇과 통합하는 API", version="1.0.0",