作为企业 AI 基础设施负责人,我最近接到一个硬核需求:公司要在两个月内上线一个支持 10 万+ 文档的智能问答系统,覆盖 PDF、Word、Excel、PPT、HTML 等 12 种格式。当时团队只有我和另一位后端工程师,时间紧、格式杂、延迟要求高(P99 < 500ms)。这篇文章复盘我们如何用 Unstructured + LangChain + HolySheep API 搭起这套 RAG 系统,以及踩过的那些坑。
一、为什么选择 Unstructured + LangChain
选型阶段对比了三套方案:自己写解析器、Cloudflare D1 + Workers AI、Unstructured + LangChain。最终选择后者的核心理由:
- Unstructured 支持 30+ 文档格式,自动识别表格、图像、标题层级,一个 SDK 解决所有解析问题
- LangChain 的 LCEL(LangChain Expression Language)让向量存储、检索、生成的流程编排变得清晰可控
- 配合 HolySheep AI 的国内直连优势(延迟 < 50ms)和 ¥1=$1 的无损汇率,我们可以把省下的成本投入模型微调
二、环境搭建
# Python 3.10+ 环境
pip install "unstructured[all-pkgs]" langchain langchain-community \
langchain-huggingface pymupdf python-docx openpyxl \
python-pptx playwright chromadb
如需处理加密 PDF
pip install pikepdf
Chrome 驱动(处理动态网页)
playwright install chromium
三、Unstructured 文档解析核心代码
文档解析是整个流水线的瓶颈点。我们的策略是:本地能用 Unstructured Local 处理的先处理,复杂布局再调用远程 API。
from unstructured.partition.api import partition_via_api, partition_text
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.docx import partition_docx
from unstructured.chunking.title import chunk_by_title
from typing import List, Dict, Any
import hashlib
class DocumentProcessor:
"""统一文档处理流水线"""
def __init__(self, api_key: str):
# HolySheep API 配置 - 国内直连,延迟 < 50ms
self.api_key = api_key
self.base_url = "https://api.holysheep.ai/v1"
def process_pdf(self, file_path: str) -> List[Dict[str, Any]]:
"""处理 PDF 文件,提取文本和表格"""
elements = partition_pdf(
filename=file_path,
strategy="hi_res", # 高分辨率策略,保留布局
extract_images_in_pdf=True,
image_output_dir_path="./images"
)
# 按标题分块
chunks = chunk_by_title(elements, max_characters=1000)
return [{"content": str(el), "type": el.category} for el in chunks]
def process_docx(self, file_path: str) -> List[Dict[str, Any]]:
"""处理 Word 文档"""
elements = partition_docx(filename=file_path)
chunks = chunk_by_title(elements, max_characters=800)
return [{"content": str(el), "type": el.category} for el in chunks]
def process_batch_via_api(self, file_paths: List[str]) -> List[Dict]:
"""批量处理 - 通过 HolySheep API 调用远程解析服务"""
import requests
# 批量上传文件
files = []
for path in file_paths:
with open(path, "rb") as f:
files.append(("files", (path.split("/")[-1], f.read())))
# 调用 Unstructured API(可通过 HolySheep 中转优化网络)
response = requests.post(
f"{self.base_url}/unstructured/batch",
headers={"Authorization": f"Bearer {self.api_key}"},
files=files,
timeout=120
)
if response.status_code != 200:
raise RuntimeError(f"Unstructured API Error: {response.text}")
return response.json()["documents"]
def generate_doc_id(self, file_path: str) -> str:
"""生成文档唯一 ID"""
return hashlib.md5(file_path.encode()).hexdigest()[:12]
初始化处理器
processor = DocumentProcessor(api_key="YOUR_HOLYSHEEP_API_KEY")
四、LangChain RAG 流水线
解析后的文本块需要向量化存入向量数据库。我们用 Chroma(本地部署,零成本),配合 HolySheep 的 embedding 模型。
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import ChatPromptTemplate
from typing import List
class RAGPipeline:
"""检索增强生成流水线"""
def __init__(self, api_key: str, persist_dir: str = "./chroma_db"):
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": "cpu"}
)
# HolySheep ChatGPT 接口 - 汇率 ¥1=$1,DeepSeek V3.2 仅 $0.42/MTok
self.llm = ChatOpenAI(
base_url="https://api.holysheep.ai/v1",
api_key=api_key,
model="gpt-4.1", # 也可选 claude-sonnet-4.5、deepseek-v3.2
temperature=0.3,
timeout=30
)
self.vectorstore = None
self.persist_dir = persist_dir
def index_documents(self, chunks: List[Dict], namespace: str = "default"):
"""将文档块索引到向量数据库"""
docs = [
Document(
page_content=chunk["content"],
metadata={
"type": chunk.get("type", "text"),
"source": chunk.get("source", "unknown")
}
)
for chunk in chunks
]
# 批量写入,batch_size 控制内存
self.vectorstore = Chroma.from_documents(
documents=docs,
embedding=self.embeddings,
persist_directory=self.persist_dir,
collection_name=namespace
)
self.vectorstore.persist()
print(f"已索引 {len(docs)} 个文档块")
def query(self, question: str, top_k: int = 4) -> str:
"""执行 RAG 查询"""
if not self.vectorstore:
raise ValueError("请先调用 index_documents")
# 检索
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": top_k, "filter": {"type": {"$ne": "Image"}}}
)
# 提示词模板
template = """基于以下参考资料回答问题。如果资料不足,直接说不知道。
参考资料:
{context}
问题:{question}
回答:"""
prompt = ChatPromptTemplate.from_template(template)
# 构建 LCEL 链
chain = (
{"context": retriever | self._format_docs, "question": RunnablePassthrough()}
| prompt
| self.llm
| StrOutputParser()
)
return chain.invoke(question)
@staticmethod
def _format_docs(docs: List[Document]) -> str:
return "\n\n".join([f"[来源 {i+1}] {doc.page_content}" for i, doc in enumerate(docs)])
使用示例
rag = RAGPipeline(api_key="YOUR_HOLYSHEEP_API_KEY")
rag.index_documents(your_chunks)
answer = rag.query("公司年假政策是什么?")
print(answer)
五、生产环境部署注意事项
- 并发控制:使用 Semaphore 限制同时解析的文档数(建议 5-10),避免 OOM
- 异步处理:大文件用 Celery + Redis 异步队列,小文件走同步 FastAPI
- 缓存策略:已解析的文档计算 MD5,命中缓存则跳过解析步骤
- 成本优化:文本类文档用
strategy="fast",仅表格和图片用"hi_res"
六、实战成本对比
以处理 10 万份文档(平均 10 页/份)为例,对比 HolySheep 与官方 API 的成本:
| 项目 | HolySheep | OpenAI 官方 |
|---|---|---|
| 汇率 | ¥1 = $1(节省 85%+) | ¥7.3 = $1 |
| Embedding 费用 | 免费(本地模型) | $0.0001/1K tokens |
| GPT-4.1 生成 | $8/MTok | $60/MTok |
| DeepSeek V3.2 | $0.42/MTok(性价比最高) | 无 |
| 充值方式 | 微信/支付宝直连 | 需 Visa 卡 |
实测用 DeepSeek V3.2 处理问答,响应延迟 < 200ms,P99 < 400ms,完全满足需求。
常见报错排查
错误 1:PDF 解析内存溢出(OOM)
# 错误信息
MemoryError: Unable to allocate array with shape (..., ...) and data type uint8
解决方案:限制图像分辨率,启用分页加载
elements = partition_pdf(
filename=file_path,
strategy="fast", # 改用快速策略
infer_table_structure=False, # 禁用表格识别以节省内存
max_partition=500, # 单次最大分区数
chunking_strategy="basic"
)
或使用流式处理
from unstructured.partition.pdf import partition_pdf_or_html
for element in partition_pdf_or_html(filename=file_path, max_characters=1000):
yield element
错误 2:Unstructured API 超时
# 错误信息
requests.exceptions.ReadTimeout: HTTPSConnectionPool(...)
解决方案:增加超时时间 + 重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def process_with_retry(file_path: str):
elements = partition_via_api(
filename=file_path,
api_key="YOUR_HOLYSHEEP_API_KEY",
api_url="https://api.unstructured.io/general/v0/general",
timeout=180, # 增加到 180 秒
max_retries=3
)
return elements
错误 3:LangChain 向量检索结果为空
# 错误信息
Empty response from vectorstore
排查步骤:
1. 检查向量数据库是否正确持久化
import os
print(f"DB 路径存在: {os.path.exists('./chroma_db')}")
2. 验证 embedding 维度
test_embedding = embeddings.embed_query("测试文本")
print(f"Embedding 维度: {len(test_embedding)}") # 应该是 384
3. 检查 Chroma 连接
from langchain_community.vectorstores import Chroma
db = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
count = db._collection.count()
print(f"向量数量: {count}")
4. 扩大搜索范围
retriever = db.as_retriever(
search_kwargs={"k": 10} # 增加 top_k
)
错误 4:HolySheep API 认证失败
# 错误信息
AuthenticationError: Incorrect API key provided
排查清单:
1. 确认 key 格式正确(以 sk- 开头)
import os
api_key = os.getenv("HOLYSHEEP_API_KEY")
if not api_key or not api_key.startswith("sk-"):
raise ValueError("请检查 API Key 格式")
2. 验证 base_url 是否正确
print(f"请求地址: {base_url}") # 应该是 https://api.holysheep.ai/v1
3. 测试连通性
import requests
resp = requests.get(
"https://api.holysheep.ai/v1/models",
headers={"Authorization": f"Bearer {api_key}"},
timeout=10
)
print(f"可用模型: {resp.json()}")
总结
从零搭建这套系统,我最大的感悟是:文档解析是 RAG 的地基,地基不稳,后面的检索和生成都会出问题。Unstructured 把 80% 的格式问题封装好了,但剩余 20% 的边界情况(加密 PDF、水印图片、嵌套表格)仍需针对性优化。
如果你也在做类似的项目,建议先用小批量文档跑通全流程,验证延迟和成本后再大规模部署。HolySheep 的 ¥1=$1 汇率和国内直连特性,确实能显著降低开发和运营成本,值得一试。