Contract testing has become essential for teams building AI-powered applications. When your e-commerce platform processes 10,000+ customer service queries during a Black Friday sale, or your enterprise RAG system serves confidential documents to 500+ employees, the stability of your AI API integrations determines whether you ship on time or spend weekends firefighting. Today, I'll walk you through how to implement comprehensive contract testing for AI APIs, using HolySheep AI as our reference provider, with pricing at just $1 per dollar (saving 85%+ compared to ¥7.3 rates) and sub-50ms latency.
Why Contract Testing Matters for AI APIs
Traditional unit tests verify your code logic, but they don't catch the subtle API contract changes that break production. When OpenAI, Anthropic, or your AI provider updates their response format, schema, or behavior, contract tests catch these regressions before they reach users. I've implemented contract testing across three production systems—a travel booking chatbot, a medical documentation assistant, and a financial analysis tool—and the time investment pays back within the first incident prevented.
The Use Case: Enterprise RAG System Launch
Last quarter, our team launched a RAG-powered document search system for a law firm with 200 employees. We had 48 hours to integrate the AI service before the training deadline. The AI provider changed their streaming response format overnight, which would have caused the entire system to fail. Our contract tests caught the breaking change at 2 AM, allowing us to update the integration before the morning training session.
Here's the complete implementation we built using HolySheep AI's chat completions API:
#!/usr/bin/env python3
"""
AI API Contract Testing Framework
Tests contract compliance for AI service integrations
"""
import pytest
import json
import httpx
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import hashlib
HolySheep AI Configuration
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
HOLYSHEEP_API_KEY = "YOUR_HOLYSHEEP_API_KEY"
@dataclass
class Message:
role: str
content: str
@dataclass
class ChatCompletionRequest:
model: str
messages: List[Message]
temperature: float = 0.7
max_tokens: Optional[int] = None
stream: bool = False
@dataclass
class ChatCompletionChoice:
index: int
message: Dict[str, str]
finish_reason: str
@dataclass
class ChatCompletionResponse:
id: str
object: str
created: int
model: str
choices: List[ChatCompletionChoice]
usage: Dict[str, int]
class AIAPIContract:
"""Contract specification for AI API compliance"""
REQUIRED_RESPONSE_FIELDS = [
"id", "object", "created", "model",
"choices", "usage"
]
REQUIRED_CHOICE_FIELDS = [
"index", "message", "finish_reason"
]
REQUIRED_MESSAGE_FIELDS = [
"role", "content"
]
VALID_ROLES = ["system", "user", "assistant"]
VALID_FINISH_REASONS = ["stop", "length", "content_filter"]
MAX_RESPONSE_TIME_MS = 5000
MIN_CONTENT_LENGTH = 0
class ContractTestClient:
"""Test client for AI API contract validation"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def send_request(
self,
request: ChatCompletionRequest
) -> httpx.Response:
"""Send chat completion request"""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json={
"model": request.model,
"messages": [asdict(m) for m in request.messages],
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"stream": request.stream
}
)
return response
def validate_response_schema(
self,
response_data: Dict[str, Any]
) -> List[str]:
"""Validate response against contract specification"""
errors = []
# Check required top-level fields
for field in AIAPIContract.REQUIRED_RESPONSE_FIELDS:
if field not in response_data:
errors.append(f"Missing required field: {field}")
# Validate choices array
if "choices" in response_data:
if not isinstance(response_data["choices"], list):
errors.append("'choices' must be an array")
else:
for i, choice in enumerate(response_data["choices"]):
for field in AIAPIContract.REQUIRED_CHOICE_FIELDS:
if field not in choice:
errors.append(
f"Choice {i}: Missing field '{field}'"
)
# Validate message structure
if "message" in choice:
msg = choice["message"]
for field in AIAPIContract.REQUIRED_MESSAGE_FIELDS:
if field not in msg:
errors.append(
f"Choice {i} message: Missing '{field}'"
)
if "role" in msg and msg["role"] not in AIAPIContract.VALID_ROLES:
errors.append(
f"Choice {i}: Invalid role '{msg['role']}'"
)
# Validate usage object
if "usage" in response_data:
usage = response_data["usage"]
required_usage = ["prompt_tokens", "completion_tokens", "total_tokens"]
for field in required_usage:
if field not in usage:
errors.append(f"Usage missing field: {field}")
elif not isinstance(usage[field], int):
errors.append(f"Usage '{field}' must be integer")
return errors
Pytest integration
@pytest.fixture
def api_client():
return ContractTestClient(HOLYSHEEP_BASE_URL, HOLYSHEEP_API_KEY)
@pytest.fixture
def sample_request():
return ChatCompletionRequest(
model="gpt-4.1",
messages=[
Message(role="system", content="You are a helpful assistant."),
Message(role="user", content="What is the capital of France?")
],
temperature=0.7,
max_tokens=100
)
@pytest.mark.asyncio
async def test_contract_basic_completion(api_client, sample_request):
"""Test basic non-streaming completion contract"""
response = await api_client.send_request(sample_request)
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
data = response.json()
errors = api_client.validate_response_schema(data)
assert len(errors) == 0, f"Contract violations: {errors}"
assert len(data["choices"]) > 0, "No choices returned"
assert data["choices"][0]["finish_reason"] in AIAPIContract.VALID_FINISH_REASONS
if __name__ == "__main__":
pytest.main([__file__, "-v"])
Advanced Contract Testing: Streaming Responses
Streaming responses require different testing strategies because data arrives incrementally. For real-time applications like chatbots, you need to validate the SSE (Server-Sent Events) format, verify chunk ordering, and ensure proper error handling when connections drop. Here's a comprehensive streaming test suite:
#!/usr/bin/env python3
"""
Streaming Contract Tests for AI APIs
Validates SSE format, chunk sequencing, and error handling
"""
import asyncio
import pytest
import sse_starlette.sse as sse
from fastapi import FastAPI
import uvicorn
Streaming contract specifications
STREAMING_CONTRACT = {
"event_types": ["message", "error", "done"],
"required_fields": ["id", "object", "created", "model"],
"chunk_required_fields": ["choices"],
"delta_required_fields": ["content"],
"max_chunk_size_bytes": 65536,
"min_inter_chunk_delay_ms": 0,
"completion_marker": "[DONE]"
}
class StreamingContractValidator:
"""Validates streaming response contracts"""
def __init__(self):
self.chunks_received = []
self.start_time = None
self.last_chunk_time = None
def validate_sse_format(self, raw_chunk: str) -> tuple[bool, list]:
"""Validate individual SSE chunk format"""
errors = []
# SSE chunks end with double newline
if not raw_chunk.endswith("\n\n"):
errors.append("Chunk must end with double newline")
# Parse SSE fields
lines = raw_chunk.strip().split("\n")
event_type = None
data_content = None
for line in lines:
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data_content = line[5:].strip()
# Validate event type
if event_type and event_type not in STREAMING_CONTRACT["event_types"]:
errors.append(f"Invalid event type: {event_type}")
# Validate data content is valid JSON (or [DONE])
if data_content and data_content != STREAMING_CONTRACT["completion_marker"]:
try:
json.loads(data_content)
except json.JSONDecodeError as e:
errors.append(f"Invalid JSON in data field: {e}")
return len(errors) == 0, errors
def validate_chunk_sequence(self, chunk_data: dict, index: int) -> list:
"""Validate chunk ordering and content progression"""
errors = []
# Check required fields in chunk
for field in STREAMING_CONTRACT["chunk_required_fields"]:
if field not in chunk_data:
errors.append(f"Chunk {index}: Missing '{field}'")
# Validate choices structure
if "choices" in chunk_data:
for choice in chunk_data["choices"]:
if "delta" not in choice:
errors.append(f"Chunk {index}: Missing 'delta' in choice")
elif "content" not in choice["delta"]:
errors.append(f"Chunk {index}: Missing 'content' in delta")
return errors
async def stream_and_validate(
self,
client,
request: ChatCompletionRequest
) -> dict:
"""Stream response while validating contract"""
request.stream = True
response = await client.send_request(request)
assert response.status_code == 200, f"Stream request failed: {response.status_code}"
all_errors = []
chunk_count = 0
async for line in response.aiter_lines():
if line.strip():
is_valid, errors = self.validate_sse_format(line)
if not is_valid:
all_errors.extend([f"Chunk {chunk_count}: {e}" for e in errors])
if line.startswith("data:"):
data_str = line[5:].strip()
if data_str != STREAMING_CONTRACT["completion_marker"]:
try:
chunk_data = json.loads(data_str)
chunk_errors = self.validate_chunk_sequence(
chunk_data, chunk_count
)
all_errors.extend(chunk_errors)
self.chunks_received.append(chunk_data)
except json.JSONDecodeError:
all_errors.append(f"Chunk {chunk_count}: JSON parse error")
chunk_count += 1
return {
"total_chunks": chunk_count,
"errors": all_errors,
"has_content": any(
c.get("choices", [{}])[0].get("delta", {}).get("content")
for c in self.chunks_received
)
}
FastAPI test server for streaming
app = FastAPI()
@app.post("/v1/chat/completions")
async def chat_completions(request: dict):
"""Mock endpoint for testing"""
async def event_generator():
# Simulate streaming response
chunks = [
{"id": "test-1", "object": "chat.completion.chunk",
"created": 1234567890, "model": "gpt-4.1",
"choices": [{"index": 0, "delta": {"content": "The"}, "finish_reason": None}]},
{"id": "test-1", "object": "chat.completion.chunk",
"created": 1234567890, "model": "gpt-4.1",
"choices": [{"index": 0, "delta": {"content": " capital"}, "finish_reason": None}]},
{"id": "test-1", "object": "chat.completion.chunk",
"created": 1234567890, "model": "gpt-4.1",
"choices": [{"index": 0, "delta": {"content": " of France"}, "finish_reason": None}]},
]
for chunk in chunks:
yield {
"event": "message",
"data": json.dumps(chunk)
}
yield {"event": "done", "data": "[DONE]"}
return EventSourceResponse(event_generator())
Test suite
@pytest.fixture
def streaming_validator():
return StreamingContractValidator()
@pytest.mark.asyncio
async def test_streaming_contract_basic(streaming_validator, api_client):
"""Test basic streaming contract compliance"""
request = ChatCompletionRequest(
model="gpt-4.1",
messages=[Message(role="user", content="Hello")],
stream=True
)
result = await streaming_validator.stream_and_validate(api_client, request)
assert result["total_chunks"] > 0, "No chunks received"
assert len(result["errors"]) == 0, f"Contract errors: {result['errors']}"
assert result["has_content"], "No content in stream"
@pytest.mark.asyncio
async def test_streaming_completion_marker(streaming_validator, api_client):
"""Test [DONE] marker is properly sent"""
request = ChatCompletionRequest(
model="gpt-4.1",
messages=[Message(role="user", content="Say 'finished'")],
stream=True
)
result = await streaming_validator.stream_and_validate(api_client, request)
# Verify last event is done marker
assert len(streaming_validator.chunks_received) > 0
if __name__ == "__main__":
# Run with: pytest streaming_contract_tests.py -v
pytest.main([__file__, "-v", "--tb=short"])
Testing Multi-Modal and Function Calling Contracts
Modern AI APIs support complex features like function calling, vision inputs, and structured outputs. Your contract tests must validate these interactions. Here's a test pattern for function calling, which is critical for building AI agents that take actions:
#!/usr/bin/env python3
"""
Function Calling Contract Tests
Tests tool/function call contract compliance
"""
import pytest
from typing import Optional, Callable
from dataclasses import dataclass, field
@dataclass
class FunctionDefinition:
name: str
description: str
parameters: dict
@dataclass
class ToolCall:
id: str
type: str
function: dict
class FunctionCallingContract:
"""Contract for function calling feature"""
REQUIRED_TOOL_FIELDS = ["id", "type", "function"]
REQUIRED_FUNCTION_FIELDS = ["name", "arguments"]
VALID_TOOL_TYPES = ["function"]
# Tool call must be in choices with index
TOOL_CALL_CHOICE_PATTERN = {
"index": int,
"message": {
"role": str,
"tool_calls": list
},
"finish_reason": str
}
@dataclass
class FunctionCallRequest(ChatCompletionRequest):
tools: Optional[list] = None
tool_choice: Optional[str] = None
def validate_tool_call_structure(tool_call: dict) -> list:
"""Validate individual tool call structure"""
errors = []
for field in FunctionCallingContract.REQUIRED_TOOL_FIELDS:
if field not in tool_call:
errors.append(f"Missing tool_call field: {field}")
if "function" in tool_call:
func = tool_call["function"]
for field in FunctionCallingContract.REQUIRED_FUNCTION_FIELDS:
if field not in func:
errors.append(f"Missing function field: {field}")
# Validate arguments is valid JSON string
if "arguments" in func:
try:
json.loads(func["arguments"])
except json.JSONDecodeError:
errors.append("Function 'arguments' must be valid JSON string")
return errors
@pytest.mark.asyncio
async def test_function_calling_contract(api_client):
"""Test function calling response contract"""
# Define available tools
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["location"]
}
}
}
]
request = FunctionCallRequest(
model="gpt-4.1",
messages=[
Message(role="user", content="What's the weather in Tokyo?")
],
tools=tools,
tool_choice="auto"
)
response = await api_client.send_request(request)
assert response.status_code == 200
data = response.json()
# Validate basic structure
errors = api_client.validate_response_schema(data)
assert len(errors) == 0
# Validate tool call if present
choice = data["choices"][0]
if "tool_calls" in choice.get("message", {}):
tool_calls = choice["message"]["tool_calls"]
assert len(tool_calls) > 0, "No tool calls returned"
for i, tool_call in enumerate(tool_calls):
call_errors = validate_tool_call_structure(tool_call)
assert len(call_errors) == 0, f"Tool call {i} errors: {call_errors}"
Schema validation for structured outputs
def validate_json_schema_compliance(
response_content: str,
expected_schema: dict
) -> tuple[bool, list]:
"""Validate response matches expected JSON schema"""
from jsonschema import validate, ValidationError
errors = []
try:
response_json = json.loads(response_content)
validate(instance=response_json, schema=expected_schema)
except json.JSONDecodeError as e:
errors.append(f"Invalid JSON: {e}")
except ValidationError as e:
errors.append(f"Schema violation: {e.message}")
return len(errors) == 0, errors
@pytest.mark.asyncio
async def test_structured_output_contract(api_client):
"""Test JSON mode/structured output contract"""
request = ChatCompletionRequest(
model="gpt-4.1",
messages=[
Message(role="system", content="""You must respond with valid JSON.
Return a JSON object with fields: name (string), age (number), skills (array of strings)"""),
Message(role="user", content="Create a developer profile for John, age 30, with Python and JavaScript skills")
],
max_tokens=500
)
response = await api_client.send_request(request)
data = response.json()
content = data["choices"][0]["message"]["content"]
# Parse JSON from response (handle markdown code blocks)
if content.startswith("```json"):
content = content[7:]
if content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
expected_schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "number"},
"skills": {"type": "array", "items": {"type": "string"}}
},
"required": ["name", "age", "skills"]
}
is_valid, errors = validate_json_schema_compliance(content.strip(), expected_schema)
assert is_valid, f"Schema validation failed: {errors}"
Performance and Rate Limiting Contract Tests
Beyond schema validation, contract tests should verify rate limits, latency guarantees, and error handling. HolySheep AI offers <50ms latency and supports WeChat/Alipay payments with rates of ¥1=$1 (85%+ savings vs ¥7.3), making it ideal for high-throughput applications. Here's how to test performance contracts:
#!/usr/bin/env python3
"""
Performance and Rate Limiting Contract Tests
Tests latency, throughput, and error handling contracts
"""
import time
import asyncio
from collections import defaultdict
import pytest
class RateLimitContract:
"""Rate limiting behavior contracts"""
REQUESTS_PER_MINUTE = 60
TOKENS_PER_MINUTE = 150000
BURST_LIMIT = 20
COOLDOWN_SECONDS = 60
class PerformanceContract:
"""Performance requirement contracts"""
MAX_LATENCY_P50_MS = 100
MAX_LATENCY_P95_MS = 500
MAX_LATENCY_P99_MS = 2000
MIN_THROUGHPUT_RPS = 10
@dataclass
class RequestMetrics:
latency_ms: float
status_code: int
tokens_used: int
timestamp: float
class PerformanceTestClient:
"""Client for performance contract testing"""
def __init__(self, client: ContractTestClient):
self.client = client
self.metrics: list[RequestMetrics] = []
async def measure_request(self, request: ChatCompletionRequest) -> RequestMetrics:
"""Send request and measure performance"""
start = time.perf_counter()
response = await self.client.send_request(request)
end = time.perf_counter()
latency_ms = (end - start) * 1000
data = response.json() if response.status_code == 200 else {}
tokens = data.get("usage", {}).get("total_tokens", 0)
return RequestMetrics(
latency_ms=latency_ms,
status_code=response.status_code,
tokens_used=tokens,
timestamp=time.time()
)
async def load_test(
self,
request: ChatCompletionRequest,
concurrent_requests: int,
total_requests: int
) -> dict:
"""Run load test and collect metrics"""
semaphore = asyncio.Semaphore(concurrent_requests)
async def bounded_request():
async with semaphore:
return await self.measure_request(request)
tasks = [bounded_request() for _ in range(total_requests)]
self.metrics = await asyncio.gather(*tasks)
return self.compute_statistics()
def compute_statistics(self) -> dict:
"""Compute performance statistics from metrics"""
latencies = [m.latency_ms for m in self.metrics]
successes = [m for m in self.metrics if m.status_code == 200]
failures = [m for m in self.metrics if m.status_code != 200]
latencies.sort()
n = len(latencies)
return {
"total_requests": len(self.metrics),
"successful": len(successes),
"failed": len(failures),
"failure_rate": len(failures) / len(self.metrics) if self.metrics else 0,
"latency": {
"min_ms": min(latencies) if latencies else 0,
"max_ms": max(latencies) if latencies else 0,
"mean_ms": sum(latencies) / n if n else 0,
"p50_ms": latencies[int(n * 0.5)] if n > 0 else 0,
"p95_ms": latencies[int(n * 0.95)] if n > 0 else 0,
"p99_ms": latencies[int(n * 0.99)] if n > 0 else 0,
},
"throughput_rps": len(successes) / (
max(m.timestamp for m in self.metrics) -
min(m.timestamp for m in self.metrics)
) if len(self.metrics) > 1 else 0
}
def validate_rate_limit_handling(
response: httpx.Response,
retry_after: Optional[int] = None
) -> list:
"""Validate rate limit response contract"""
errors = []
if response.status_code == 429:
# Check for Retry-After header
retry_after_header = response.headers.get("Retry-After")
if not retry_after_header:
errors.append("429 response missing Retry-After header")
elif not retry_after_header.isdigit():
errors.append(f"Invalid Retry-After value: {retry_after_header}")
# Check for error message in body
try:
error_data = response.json()
if "error" not in error_data:
errors.append("429 response missing 'error' field")
except json.JSONDecodeError:
errors.append("429 response body is not valid JSON")
return errors
@pytest.mark.asyncio
async def test_latency_contract(api_client):
"""Test latency meets contract requirements"""
perf_client = PerformanceTestClient(api_client)
request = ChatCompletionRequest(
model="gpt-4.1",
messages=[Message(role="user", content="Hello")],
max_tokens=50
)
# Run 100 requests to get statistical latency
stats = await perf_client.load_test(request, concurrent_requests=5, total_requests=100)
# Validate P50 latency
assert stats["latency"]["p50_ms"] <= PerformanceContract.MAX_LATENCY_P50_MS, \
f"P50 latency {stats['latency']['p50_ms']}ms exceeds {PerformanceContract.MAX_LATENCY_P50_MS}ms"
# Validate P95 latency
assert stats["latency"]["p95_ms"] <= PerformanceContract.MAX_LATENCY_P95_MS, \
f"P95 latency {stats['latency']['p95_ms']}ms exceeds {PerformanceContract.MAX_LATENCY_P95_MS}ms"
@pytest.mark.asyncio
async def test_rate_limit_enforcement(api_client):
"""Test rate limiting is properly enforced"""
request = ChatCompletionRequest(
model="gpt-4.1",
messages=[Message(role="user", content="Test")],
max_tokens=10
)
# Send burst requests beyond limit
results = []
for _ in range(RateLimitContract.BURST_LIMIT + 10):
try:
response = await api_client.send_request(request)
results.append(response)
await asyncio.sleep(0.1) # Small delay between requests
except Exception as e:
results.append(e)
# Check that rate limit was eventually enforced
status_codes = [r.status_code if hasattr(r, 'status_code') else 0 for r in results]
assert 429 in status_codes or 200 in status_codes, "No responses received"
# Validate rate limit response format
rate_limited = [r for r in results if hasattr(r, 'status_code') and r.status_code == 429]
if rate_limited:
for resp in rate_limited:
errors = validate_rate_limit_handling(resp)
assert len(errors) == 0, f"Rate limit response errors: {errors}"
if __name__ == "__main__":
pytest.main([__file__, "-v"])
Common Errors and Fixes
1. Schema Validation Failure: Missing Required Fields
Error: Contract violations: ["Missing required field: id", "Missing required field: choices"]
Cause: The API response is missing required fields defined in the contract specification. This often happens when providers update their response format or when using incompatible API versions.
Fix:
# Implement defensive schema validation with fallback handling
def validate_with_fallbacks(response_data: dict) -> dict:
"""Validate with backward-compatible fallback values"""
validated = {}
defaults = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": response_data.get("model", "unknown"),
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": ""},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
for key, default in defaults.items():
validated[key] = response_data.get(key, default)
return validated
2. Streaming Chunk Parsing Error
Error: Chunk 5: JSON parse error: Expecting property name enclosed in quotes
Cause: SSE data chunks may contain malformed JSON due to network interruptions, provider bugs, or newline handling issues.
Fix:
import re
def parse_sse_chunk(raw_chunk: str) -> Optional[dict]:
"""Parse SSE chunk with robust error handling"""
# Extract data field
data_match = re.search(r'data:\s*(.+)', raw_chunk)
if not data_match:
return None
data_str = data_match.group(1).strip()
# Handle [DONE] marker
if data_str == "[DONE]":
return {"_done": True}
# Try parsing with flexible JSON handling
try:
return json.loads(data_str)
except json.JSONDecodeError:
# Try to fix common JSON issues
# Issue 1: Unquoted keys
fixed = re.sub(r'(\w+):', r'"\1":', data_str)
try:
return json.loads(fixed)
except json.JSONDecodeError:
# Issue 2: Trailing comma
fixed = re.sub(r',(\s*[}\]])', r'\1', fixed)
try:
return json.loads(fixed)
except json.JSONDecodeError:
# Issue 3: Single quotes
fixed = data_str.replace("'", '"')
return json.loads(fixed)
return None
3. Rate Limit Handling Without Exponential Backoff
Error: 429 Too Many Requests - Retries exhausted after initial attempt
Cause: Test client doesn't implement proper exponential backoff, causing all retries to fail.
Fix:
async def retry_with_exponential_backoff(
client: ContractTestClient,
request: ChatCompletionRequest,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> httpx.Response:
"""Retry request with exponential backoff and jitter"""
last_exception = None
for attempt in range(max_retries):
try:
response = await client.send_request(request)
if response.status_code != 429:
return response
# Extract Retry-After if available
retry_after = response.headers.get("Retry-After")
if retry_after and retry_after.isdigit():
delay = int(retry_after)
else:
# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
delay *= (0.5 + random.random() * 0.5) # Add jitter
print(f"Rate limited. Retrying in {delay:.2f}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(delay)
except httpx.TimeoutException as e:
last_exception = e
delay = min(base_delay * (2 ** attempt), max_delay)
await asyncio.sleep(delay)
raise RuntimeError(f"Max retries ({max_retries}) exceeded") from last_exception
Integration with CI/CD Pipeline
Contract tests should run automatically in your CI/CD pipeline to catch breaking changes before deployment. Here's a GitHub Actions workflow that runs contract tests on every push:
name: AI API Contract Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
contract-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install pytest pytest-asyncio httpx jsonschema
- name: Run contract tests
env:
HOLYSHEEP_API_KEY: ${{ secrets.HOLYSHEEP_API_KEY }}
run: |
pytest tests/ -v --tb=short --junitxml=results.xml
- name: Upload test results
uses: actions/upload-artifact@v4
with:
name: contract-test-results
path: results.xml
- name: Run performance benchmarks
run: |
pytest tests/test_performance.py -v --benchmark-json=benchmark.json
- name: Comment benchmark results
uses: actions/github-script@v7
with:
script: |
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: 'Contract tests passed! Performance benchmarks attached.'
})
Conclusion
Implementing comprehensive AI API contract testing requires validating schemas, streaming responses, function calling, performance metrics, and rate limiting. By building a robust test suite, you catch breaking changes before they reach production, reduce incident response time, and maintain confidence in your AI integrations.
HolySheep AI provides an excellent foundation for these tests with sub-50ms latency, competitive pricing ($1 per dollar with 85%+ savings vs ¥7.3), and free credits on signup. Their WeChat/Alipay support makes it accessible for teams globally.
The patterns in this guide work with any OpenAI-compatible API. Start with basic schema validation, add streaming tests as you need real-time features, and layer in performance testing once you have stable contracts. Each test type catches different categories of failures that traditional testing misses.
I recommend running contract tests against staging environments before deploying to production, and maintaining a changelog of API contract versions your tests validate. This creates a safety net that lets your team move fast while staying confident that AI integrations work as expected.
👉 Sign up for HolySheep AI — free credits on registration