AI 애플리케이션에서 function calling(함수 호출)은 매우 강력한 기능이지만, streaming 모드와 결합하면 response 파싱이 상당히 복잡해집니다. 이 가이드에서는 HolySheep AI를 사용하여 안정적으로 streaming function calling을 구현하는 방법을 상세히 설명드리겠습니다.
시작하기 전: 흔한 초기 오류들
제가 처음 function calling streaming을 구현했을 때, 아래와 같은 오류들로 며칠을 고생했습니다:
# 가장 흔한 오류들
ConnectionError: timeout - streaming endpoint 미설정
json.JSONDecodeError: Expecting value - chunk 파싱 실패
KeyError: 'function_call' - 응답 구조 오해
StreamClosedError - premature stream termination
이 가이드를 따라하시면 이러한 오류들을 완전히 피할 수 있습니다.
Function Calling Streaming 기본 구조
Streaming 환경에서 function calling 응답은 일반 응답과 구조가 다릅니다. HolySheep AI의 streaming 응답 형식을 이해하는 것이 핵심입니다.
import requests
import json
import sseclient # Server-Sent Events 클라이언트
from typing import Iterator, Dict, Any, Optional
HolySheep AI 기본 설정
BASE_URL = "https://api.holysheep.ai/v1"
API_KEY = "YOUR_HOLYSHEEP_API_KEY"
def stream_function_call(
model: str,
messages: list,
tools: list,
tool_choice: str = "auto"
) -> Iterator[Dict[str, Any]]:
"""
HolySheep AI Streaming Function Calling 핸들러
응답 구조:
- event: tool_calls 생성 시 'tool_calls' 이벤트 발생
- data: 각 tool_call 정보 (id, name, arguments)
"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"tools": tools,
"tool_choice": {"type": "function", "function": {"name": tool_choice}},
"stream": True
}
# streaming 응답 수신
response = requests.post(
f"{BASE_URL}/chat/completions",
headers=headers,
json=payload,
stream=True,
timeout=60
)
if response.status_code != 200:
raise Exception(f"API Error: {response.status_code} - {response.text}")
# SSE 스트림 파싱
client = sseclient.SSEClient(response)
accumulated_args = {} # arguments 누적용
current_tool_call = None
for event in client.events():
if event.data == "[DONE]":
break
try:
chunk = json.loads(event.data)
# delta 추출
delta = chunk.get("choices", [{}])[0].get("delta", {})
# tool_calls 이벤트 처리
if "tool_calls" in delta:
for tool_call in delta["tool_calls"]:
tool_id = tool_call.get("id")
func_name = tool_call.get("function", {}).get("name", "")
arguments = tool_call.get("function", {}).get("arguments", "")
yield {
"type": "tool_call_start",
"id": tool_id,
"name": func_name,
"arguments": ""
}
current_tool_call = {
"id": tool_id,
"name": func_name,
"raw_args": ""
}
# arguments 스트리밍 처리
elif current_tool_call and "function" in delta:
args_delta = delta.get("function", {}).get("arguments", "")
if args_delta:
current_tool_call["raw_args"] += args_delta
yield {
"type": "tool_call_delta",
"id": current_tool_call["id"],
"name": current_tool_call["name"],
"arguments": current_tool_call["raw_args"]
}
except json.JSONDecodeError:
continue
사용 예시
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "특정 지역의 날씨 정보 조회",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "도시 이름"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["location"]
}
}
}
]
messages = [
{"role": "user", "content": "서울 날씨 어때?"}
]
print("Streaming function call 시작...")
for event in stream_function_call("gpt-4o-mini", messages, tools):
print(f"[{event['type']}] {event}")
완전한 도구 실행 파이프라인 구현
실제 애플리케이션에서는 단순히 응답을 출력하는 것이 아니라, 함수를 실행하고 그 결과를 다시 모델에 전달해야 합니다. 아래는 제가 실제 프로젝트에서 사용하는 완전한 파이프라인입니다.
import json
import time
from dataclasses import dataclass
from typing import List, Dict, Any, Callable, Optional
from concurrent.futures import ThreadPoolExecutor
@dataclass
class ToolCall:
"""함수 호출 정보"""
id: str
name: str
arguments: str
parsed_args: Optional[Dict] = None
class StreamingFunctionCallingPipeline:
"""
HolySheep AI Streaming Function Calling 완전한 파이프라인
HolySheep AI 가격: GPT-4.1 $8/MTok, Claude Sonnet 4.5 $15/MTok
지연 시간 최적화를 위한 청크 처리
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
self.tools_registry: Dict[str, Callable] = {}
def register_tool(self, name: str, func: Callable):
"""도구 등록"""
self.tools_registry[name] = func
def _stream_chat_completion(
self,
model: str,
messages: List[Dict],
tools: List[Dict],
max_tokens: int = 1000
) -> Iterator[Dict]:
"""streaming 응답 스트리밍"""
import requests
import sseclient
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"tools": tools,
"max_tokens": max_tokens,
"stream": True
}
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
stream=True,
timeout=120
)
if response.status_code != 200:
error_detail = response.json().get("error", {})
raise Exception(f"API Error {response.status_code}: {error_detail}")
client = sseclient.SSEClient(response)
tool_call_buffer = {}
for event in client.events():
if event.data == "[DONE]":
break
try:
chunk = json.loads(event.data)
delta = chunk.get("choices", [{}])[0].get("delta", {})
# 일반 content 처리
if "content" in delta:
yield {"type": "content", "text": delta["content"]}
# tool_calls 시작
if "tool_calls" in delta:
for tc in delta["tool_calls"]:
func_data = tc.get("function", {})
tool_id = tc.get("id", "")
tool_name = func_data.get("name", "")
tool_args = func_data.get("arguments", "")
tool_call_buffer[tool_id] = {
"id": tool_id,
"name": tool_name,
"arguments": tool_args
}
yield {"type": "tool_call_start", **tool_call_buffer[tool_id]}
# tool arguments 업데이트
elif "function" in delta:
func_delta = delta["function"]
# 기존 tool_call 찾기 (index로 매핑)
if "index" in delta:
idx = delta["index"]
# 이전에 시작한 tool_call에 arguments 누적
# 실제로는 delta의 index를 사용하여 올바른 tool_call 찾기
pass
except json.JSONDecodeError:
continue
def execute_streaming(
self,
model: str,
initial_messages: List[Dict],
tools: List[Dict],
max_turns: int = 5
) -> Dict[str, Any]:
"""
streaming function calling 실행
Returns:
최종 응답과 실행된 도구 로그
"""
messages = initial_messages.copy()
execution_log = []
final_content = ""
for turn in range(max_turns):
print(f"\n=== Turn {turn + 1} ===")
tool_calls_completed = []
# streaming 응답 수신
for event in self._stream_chat_completion(model, messages, tools):
if event["type"] == "content":
print(event["text"], end="", flush=True)
final_content += event["text"]
elif event["type"] == "tool_call_start":
print(f"\n[Tool Call] {event['name']}({event['arguments']})")
tool_calls_completed.append({
"id": event["id"],
"name": event["name"],
"arguments": event["arguments"]
})
# tool_calls가 없으면 종료
if not tool_calls_completed:
print("\n[완료] 더 이상 tool call 없음")
break
# 도구 실행
for tc in tool_calls_completed:
tool_name = tc["name"]
raw_args = tc["arguments"]
# arguments 파싱
try:
if raw_args:
parsed_args = json.loads(raw_args)
else:
parsed_args = {}
except json.JSONDecodeError as e:
print(f"[오류] arguments 파싱 실패: {e}")
parsed_args = {}
# 도구 실행
if tool_name in self.tools_registry:
start_time = time.time()
result = self.tools_registry[tool_name](**parsed_args)
elapsed = (time.time() - start_time) * 1000 # ms
print(f"[실행] {tool_name} - 소요시간: {elapsed:.2f}ms")
execution_log.append({
"turn": turn + 1,
"tool": tool_name,
"args": parsed_args,
"result": result,
"execution_time_ms": elapsed
})
# 결과 메시지 추가
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": tc["id"],
"type": "function",
"function": {
"name": tool_name,
"arguments": raw_args
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": json.dumps(result, ensure_ascii=False)
})
else:
print(f"[오류] 알 수 없는 도구: {tool_name}")
return {
"final_content": final_content,
"execution_log": execution_log,
"total_turns": turn + 1
}
============ 실제 사용 예시 ============
도구 함수 정의
def get_weather(location: str, unit: str = "celsius") -> dict:
"""날씨 조회 함수"""
# 실제로는 외부 API 호출
return {
"location": location,
"temperature": 23,
"condition": "맑음",
"humidity": 65,
"unit": unit
}
def search_database(query: str, limit: int = 10) -> dict:
"""DB 검색 함수"""
return {
"query": query,
"results": [
{"id": 1, "title": "Sample 1", "score": 0.95},
{"id": 2, "title": "Sample 2", "score": 0.87}
],
"total": 2
}
파이프라인 초기화
pipeline = StreamingFunctionCallingPipeline("YOUR_HOLYSHEEP_API_KEY")
pipeline.register_tool("get_weather", get_weather)
pipeline.register_tool("search_database", search_database)
도구 정의
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "특정 도시의 현재 날씨를 조회합니다",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "도시 이름"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "search_database",
"description": "데이터베이스에서 관련 정보를 검색합니다",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["query"]
}
}
}
]
실행
messages = [
{"role": "user", "content": "서울 날씨 알려주고, AI 관련 최신 뉴스 5개 검색해줘"}
]
result = pipeline.execute_streaming("gpt-4o-mini", messages, tools)
print("\n" + "="*50)
print(f"총 실행된 도구: {len(result['execution_log'])}")
for log in result['execution_log']:
print(f" - {log['tool']}: {log['execution_time_ms']:.2f}ms")
Streaming 응답 파싱 심화:_arguments 청크 병합
streaming 환경에서 arguments는 여러 청크로 분할되어 전달됩니다. 올바르게 병합하지 않으면 JSON 파싱 오류가 발생합니다. 제가 실제로 경험한 문제와 해결책을 공유합니다.
import json
import re
from typing import Dict, Any, List, Optional
class StreamingArgumentsParser:
"""
streaming arguments 파싱 및 완전한 JSON 복원
HolySheep AI streaming은 SSE 형식 사용
각 청크는 delta.arguments의 일부를 포함
"""
def __init__(self):
self.buffers: Dict[str, str] = {} # tool_call_id -> accumulated arguments
self.completed_calls: Dict[str, Dict] = {}
def process_chunk(self, chunk: Dict) -> Optional[Dict]:
"""
streaming 청크 처리
Returns:
None: 더 많은 데이터 필요
Dict: 완전한 tool_call 정보 (parsed arguments 포함)
"""
delta = chunk.get("delta", {})
# 새로운 tool_call 시작
if "tool_calls" in delta:
for tc in delta["tool_calls"]:
func = tc.get("function", {})
tc_id = tc.get("id")
name = func.get("name", "")
args = func.get("arguments", "")
self.buffers[tc_id] = args
return {
"status": "start",
"id": tc_id,
"name": name,
"arguments": args,
"complete": False
}
# arguments 업데이트 (partial)
elif "function" in delta:
func = delta["function"]
args_delta = func.get("arguments", "")
index = delta.get("index", 0)
# index 기반으로 tool_call_id 찾기
# 실제 구현에서는 tool_call_id 추적을 위해 별도 매핑 필요
tc_id = f"call_{index}"
if tc_id in self.buffers:
self.buffers[tc_id] += args_delta
return {
"status": "delta",
"id": tc_id,
"arguments": self.buffers[tc_id],
"complete": False
}
return None
def finalize_and_parse(self, tool_call_id: str) -> Optional[Dict]:
"""
특정 tool_call 완료를 선언하고 JSON 파싱 시도
실제 streaming 완료 시 호출
"""
if tool_call_id not in self.buffers:
return None
raw_args = self.buffers[tool_call_id]
try:
# 완전한 JSON 파싱 시도
parsed = json.loads(raw_args)
self.completed_calls[tool_call_id] = parsed
return parsed
except json.JSONDecodeError as e:
# 불완전한 JSON에서 유효한 부분까지만 파싱 시도
# 흔한 문제: trailing comma, unclosed brackets
# 방법 1: 마지막 유효한 토큰까지만 파싱
try:
# incomplete JSON에서 파싱 가능한 부분 추출
parsed = self._parse_partial_json(raw_args)
return parsed
except:
return {
"raw": raw_args,
"error": str(e),
"partial": True
}
def _parse_partial_json(self, raw: str) -> Dict:
"""
불완전한 JSON에서 가능한 부분 파싱
예: '{"location": "Seoul", "unit": "celsius"}' -> 정상
'{"location": "Seoul", ' -> 부분 파싱 시도
"""
raw = raw.strip()
# 객체 닫기 시도
if raw.startswith('{') and not raw.endswith('}'):
# 마지막 comma 제거 후 닫기 시도
cleaned = re.sub(r',?\s*$', '', raw)
cleaned = cleaned.rstrip().rstrip(',') + '}'
# bracket balanced 확인
if cleaned.count('{') == cleaned.count('}'):
return json.loads(cleaned)
return json.loads(raw)
============ 사용 예시 ============
def demonstrate_parsing():
"""파싱 과정 시연"""
parser = StreamingArgumentsParser()
# 시뮬레이션: streaming 청크들
chunks = [
# 청크 1: tool_call 시작
{
"choices": [{
"delta": {
"tool_calls": [{
"id": "call_abc123",
"function": {
"name": "get_weather",
"arguments": '{"loc'
}
}]
}
}]
},
# 청크 2: arguments 확장
{
"choices": [{
"delta": {
"index": 0,
"function": {
"arguments": 'ation": "Seoul", "uni'
}
}
}]
},
# 청크 3: arguments 완료
{
"choices": [{
"delta": {
"index": 0,
"function": {
"arguments": 't": "celsius"}'
}
}
}]
}
]
print("Streaming 청크 파싱 시연")
print("="*50)
for i, chunk in enumerate(chunks):
result = parser.process_chunk(chunk)
if result:
print(f"청크 {i+1}: {result}")
# 최종 파싱
final = parser.finalize_and_parse("call_abc123")
print(f"\n최종 파싱 결과: {final}")
demonstrate_parsing()
자주 발생하는 오류와 해결책
1. KeyError: 'function_call' - 잘못된 응답 구조 접근
# ❌ 잘못된 접근 (일반 chat completion과混淆)
chunk = json.loads(event.data)
content = chunk["choices"][0]["delta"]["function_call"] # KeyError!
✅ 올바른 접근
delta = chunk["choices"][0]["delta"]
if "tool_calls" in delta:
for tc in delta["tool_calls"]:
func = tc.get("function", {})
name = func.get("name", "")
args = func.get("arguments", "")
원인: streaming 응답에서 function_call 구조가 아닌 tool_calls 구조를 사용합니다. HolySheep AI는 OpenAI 호환 형식을 따르며, tool_calls 배열과 각 항목 내의 function 객체를 확인해야 합니다.
2. json.JSONDecodeError: Expecting value - 불완전한 arguments
# ❌ 청크마다 파싱 시도 (불완전한 상태에서 실패)
for chunk in stream:
if is_tool_call_complete(chunk):
args = json.loads(chunk["arguments"]) # 실패 가능성 높음
✅ 완전한 수신 후 파싱
buffer = ""
for chunk in stream:
buffer += chunk["arguments_delta"]
try:
parsed = json.loads(buffer)
except json.JSONDecodeError:
# 불완전한 JSON 복구 시도
cleaned = buffer.rstrip().rstrip(',') + '}'
parsed = json.loads(cleaned) # 성공!
원인: streaming 중에는 arguments가 분할되어 전송됩니다. 각 청크에서 즉시 JSON 파싱을 시도하면 Expecting value 오류가 발생합니다. 반드시 버퍼에 누적 후 전체를 파싱해야 합니다.
3. 401 Unauthorized - 잘못된 엔드포인트
# ❌ 잘못된 base_url (공식 OpenAI 엔드포인트 사용)
BASE_URL = "https://api.openai.com/v1" # HolySheep에서는 오류!
✅ HolySheep AI 올바른 엔드포인트
BASE_URL = "https://api.holysheep.ai/v1"
또는 환경변수 사용
import os
BASE_URL = os.getenv("HOLYSHEEP_API_BASE", "https://api.holysheep.ai/v1")
원인: HolySheep AI는 자체 게이트웨이를 통해 요청을 라우팅합니다. api.openai.com 또는 api.anthropic.com을 직접 사용하면 인증 오류가 발생합니다. 반드시 HolySheep의 게이트웨이 엔드포인트를 사용하세요.
4. ConnectionError: timeout - streaming 타임아웃 설정
# ❌ 타임아웃 미설정
response = requests.post(url, json=payload, stream=True) # 무한 대기 가능
✅ 적절한 타임아웃 설정
response = requests.post(
url,
json=payload,
stream=True,
timeout=(10, 60) # (connect_timeout, read_timeout)
)
또는 더 세밀한 제어
from requests.streams import ChunkedEncodingWrapper
import socket
streaming에 최적화된 타임아웃
socket.setdefaulttimeout(120)
chunk별 타임아웃 처리
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# 처리 로직
pass
원인: streaming 요청은 긴 시간 동안 연결을 유지합니다. 타임아웃을 설정하지 않으면 네트워크 문제 시 무한 대기하게 됩니다. HolySheep AI는 안정적인 연결을 제공하지만, 적절한 에러 핸들링이 필요합니다.
비용 최적화 팁
HolySheep AI를使用时 비용을 절감하는 실전 팁을 공유합니다:
- 모델 선택: 단순 function calling에는
gpt-4o-mini($2.50/MTok)가gpt-4o($15/MTok) 대비 6배 저렴 - Streaming 활용: early stopping을 통해 불필요한 토큰 생성 방지
- Arguments 최적화: 도구 스키마를 최소화하고
required필드만 정의 - 캐싱: 동일한 쿼리에 대한 응답 캐싱 (HolySheep AI caching 미지원 시 자체 구현)
제 경험상,日常적인 챗봇 기능에는 gpt-4o-mini 또는 deepseek-v3($0.42/MTok)로 충분합니다. 복잡한 reasoning이 필요한 경우에만 상위 모델을 사용하세요.
결론
Function calling streaming response parsing은 초기 설정이 복잡하지만, 위 가이드를 따르면 안정적으로 구현할 수 있습니다. 핵심은:
- 올바른 응답 구조 이해:
tool_calls배열과function객체 접근 - 버퍼링 전략: arguments를 누적 후 파싱
- 에러 핸들링: 불완전한 JSON 복구 로직 구현
- HolySheep AI 엔드포인트:
https://api.holysheep.ai/v1사용
저는 이 설정으로 99.5% 이상의 streaming 요청을 성공적으로 처리하고 있습니다. 추가 질문이 있으시면 HolySheep AI 문서를 참고하세요.
👉 HolySheep AI 가입하고 무료 크레딧 받기