LangChain
概念
- LLM框架
- 可以帮助:统一模型接口、提示词管理、Chain、Agent、记忆、RAG
创建chain、agent
加载大模型
# 环境
python:3.13
torch:2.10.0+cu126
tokenizers:0.22.2
transformers:5.3.0
langchain:1.2.12
langchain-hugginface:1.2.1
langchain-openai:1.1.11
# load_model.py
# 加载大模型脚本,使用LLM的时候调用
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# pip install langchain-huggingface langchain-openai
from langchain_huggingface import HuggingFacePipeline, ChatHuggingFace
from langchain_openai import ChatOpenAI
def local_model():
# 加载模型
model_name = "./model"
# 加载分词器
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 加载模型AutoModelForCausalLM会自动识别模型类型并加载对应的因果语言模型
model = AutoModelForCausalLM.from_pretrained(
model_name,
dtype=torch.float16,
device_map="auto"
)
if "cuda" in str(model.device):
print(f"模型运行在GPU上: {model.device}")
print(f"显存使用量: {torch.cuda.memory_allocated() / 1024 ** 3} GB")
else:
print("模型运行在CPU上")
# 创建pipeline
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=512,
temperature=0.2,
repetition_penalty=1.1,
return_full_text=False,
device_map="auto"
)
llm_pipeline = HuggingFacePipeline(pipeline=pipe)
return ChatHuggingFace(llm=llm_pipeline)
def remote_model():
base_url = "xxxxx"
api_key = "xxxxx"
model = "xxxx"
llm = ChatOpenAI(
model=model,
temperature=0.2,
top_p=0.9,
frequency_penalty=0,
openai_api_key=api_key,
openai_api_base=base_url,
max_retries=2,
timeout=60,
)
return llm
使用demo
# pip install langchain
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_core.prompts import PromptTemplate
from load_model import local_model
# 加载大模型
llm = local_model()
@tool
def get_weather(city: str) -> str:
"""获取一个城市的天气"""
print("get_weather tool")
return f"{city}的天气是大太阳!气温27度" if city == "重庆" else "{city}的天气是!气温18度"
# ============ 简单agent ============
def simple_agent():
agent = create_agent(
model=llm,
tools=[get_weather],
system_prompt="你是一个天气预报员",
)
# Run the agent
res = agent.invoke(
{"messages": [{"role": "user", "content": "重庆天气怎么样?"}]}
)
print(res["messages"][-1].content)
# ============ chain ============
def simple_chain():
prompt = PromptTemplate.from_template(
"请为一个新式 {product} 设计一个有创意、有幽默感的名称,解释这样设计的含义"
)
chain = prompt | llm
result = chain.invoke({"product": "智能水杯"})
print(result.content)
# ============ chain ============
def multy_chain():
prompt1 = PromptTemplate.from_template(
"请为一个新式 {product} 设计一个有创意、有幽默感的名称,解释这样设计的含义"
)
prompt2 = PromptTemplate.from_template(
"请对以下内容进行改进,使其更具吸引力:\n{initial_output}"
)
chain = (
prompt1 | llm | prompt2 | llm
)
result = chain.invoke({"product": "智能水杯"})
print(result.content)
# ============ 逐一输出 ============
def stream_print():
for chunk in llm.stream("为什么夏亚阿兹纳布尔加入扎比家吉翁军?"):
for char in chunk.content:
print(char, end="|", flush=True)
stream_print()
Agent进阶
# 带记忆,输出格式化,agent协调多个tool
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime
from dataclasses import dataclass
from load_model import local_model
from langgraph.checkpoint.memory import InMemorySaver
SYSTEM_PROMPT = """你是一位擅长用双关语表达的专家天气预报员。
你可以使用两个工具:
- get_weather_for_location:用于获取指定城市的天气。
- get_user_location:用于获取用户的位置。
"""
llm = local_model()
@dataclass
class Context:
"""自定义运行时上下文模式。"""
user_id: str
@tool
def get_weather_for_location(city: str) -> str:
"""获取指定城市的天气。"""
print("天气工具")
return f"{city}总是阳光明媚!气温27度" if city == "重庆" else f"{city}在下大雨!气温16度"
@tool
def get_user_location(runtime: ToolRuntime[Context]) -> str:
"""根据用户 ID 获取用户信息。"""
user_id = runtime.context.user_id
print("位置")
return "重庆" if user_id == "1" else "北京"
# 定义响应格式
@dataclass
class ResponseFormat:
"""代理的响应模式。"""
# 带双关语的回应(始终必需)
punny_response: str
# 天气的任何有趣信息(如果有)
weather_conditions: str | None = None
# 设置记忆
checkpointer = InMemorySaver()
# `thread_id` 是给定对话的唯一标识符。
config = {"configurable": {"thread_id": "1"}}
agent = create_agent(
model=llm,
tools=[get_user_location,get_weather_for_location],
context_schema=Context,
checkpointer=checkpointer,
system_prompt=SYSTEM_PROMPT,
response_format=ResponseFormat
)
# Run the agent
result = agent.invoke(
{"messages": [{"role": "user", "content": "我叫阿姆罗雷,今天天气怎么样?"}]},
config=config,
context=Context(user_id="1")
)
# print(result["messages"][-1].content)
print(result['structured_response'])
result = agent.invoke(
{"messages": [{"role": "user", "content": "谢谢,我叫什么名字?你记住了吗?"}]},
config=config,
context=Context(user_id="1")
)
# print(result["messages"][-1].content)
print(result['structured_response'])
带记忆chain
# 带记忆的调用链
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
# pip install langchain-community 0.4.1
from langchain_community.chat_message_histories import ChatMessageHistory
from load_model import local_model
llm = local_model()
# 创建 RunnableSequence
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个助手。请根据对话历史回答问题。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
runnable = prompt | llm
# 初始化消息历史存储
store = {}
def get_session_history(session_id: str) -> ChatMessageHistory:
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
# 创建 RunnableWithMessageHistory
chain = RunnableWithMessageHistory(
runnable,
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# 调用链
session_id = "1"
response = chain.invoke(
{"input": "我叫秦始皇"},
config={"configurable": {"session_id": session_id}}
)
print(response.content)
response = chain.invoke(
{"input": "我的名字是什么?"},
config={"configurable": {"session_id": session_id}}
)
print(response.content)
RAG demo
import bs4
from langchain_community.document_loaders import (
PyPDFLoader, Docx2txtLoader,
CSVLoader, UnstructuredExcelLoader,WebBaseLoader
)
# pip install sentence-transformers pip install chromadb pip install "unstructured[excel]"
# pip install msoffcrypto-tool pip install docx2txt
from pathlib import Path
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from load_model import local_model
llm = local_model()
def load_all_formats(paths):
"""支持PDF/DOC/CSV/Excel"""
docs = []
for path in paths:
ext = Path(path).suffix.lower()
try:
if ext == '.pdf':
loader = PyPDFLoader(path)
elif ext in ['.docx', '.doc']:
loader = Docx2txtLoader(path)
elif ext == '.csv':
loader = CSVLoader(path, encoding="utf-8")
elif ext in ['.xlsx', '.xls']:
loader = UnstructuredExcelLoader(path, mode="single")
else:
print(f"不支持: {path}")
continue
page_docs = loader.load()
docs.extend(page_docs)
print(f"{path}: {len(page_docs)}页/表")
except Exception as e:
print(f"{path}: {e}")
return docs
def rerank_bge(query, docs, top_k=3):
# 重排器
reranker_embeddings = HuggingFaceEmbeddings(
model_name="../myQdrant/bge-large-zh", # 本地路径
model_kwargs={'device': 'cuda'},
encode_kwargs={'normalize_embeddings': True}
)
query_emb = reranker_embeddings.embed_query(query)
doc_embs = reranker_embeddings.embed_documents([doc.page_content for doc in docs])
scores = []
for doc_emb in doc_embs:
score = sum(a * b for a, b in zip(query_emb, doc_emb)) # 点积=余弦(已归一化)
scores.append(score)
# 重排
ranked = sorted(zip(scores, docs), key=lambda x: x[0], reverse=True)
return [doc for _, doc in ranked[:top_k]]
def rag_chain(inputs):
# ========== WEB获取 =============
# bs4_strainer = bs4.SoupStrainer(class_=("post-title", "post-header", "post-content"))
common_selectors = [
"article", "main", ".content", ".post-content", ".article-body",
"#content", "#main", ".entry-content", "div[class*='content']",
".post", ".blog-post", "section"
]
strainer = bs4.SoupStrainer(name=common_selectors)
web_loader = WebBaseLoader(
web_paths=(f"https://langchain-doc.cn/v1/python/langchain/philosophy.html",),
bs_kwargs={"parse_only": strainer}
)
# docs = web_loader.load()
# ========== PDF 获取 ============
pdf_paths = [f"C:\\Users\\zwq\\Desktop\\demo.xlsx", f"C:\\Users\\zwq\\doc\\test.pdf", f"C:\\Users\\zwq\\Desktop\\test.docx"]
docs = load_all_formats(pdf_paths)
# 切割文本
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 一个块最大字符
chunk_overlap=200, # 允许一个块重复的字符,保证语义
add_start_index=True, # 每个块在全局的index
)
all_splits = text_splitter.split_documents(docs)
print(f"Split blog post into {len(all_splits)} sub-documents.")
# 向量存储
embeddings = HuggingFaceEmbeddings(
model_name="../myQdrant/bge-large-zh"
)
vectorstore = Chroma.from_documents(all_splits, embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 6}) # Step1: 检索6个候选
# 检索
question = inputs["question"]
docs = retriever.invoke(question) # Step1: bge-large-zh检索
reranked_docs = rerank_bge(question, docs) # Step2: bge-large-zh重排
context = "\n\n".join([doc.page_content for doc in reranked_docs])
print(context)
prompt = ChatPromptTemplate.from_template("""
根据以下文档回答问题。
文档:
{context}
问题:{question}
回答:
""")
chain = prompt | llm | StrOutputParser()
return chain.invoke({"context": context, "question": question})
web_question = "LangChain最新版本的前一个版本发布时间是什么时候?"
pdf_question = "达梦数据库(广州)有限公司的详细地址"
print(rag_chain({"question": pdf_question}))
RAG from Qdrant
from langchain_core.documents import Document
from qdrant_client import QdrantClient
# pip install langchain-qdrant
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_huggingface import HuggingFaceEmbeddings
from load_model import local_model
llm = local_model()
client = QdrantClient(
host="localhost",
api_key="xxx",
port=6333,
https=False,
timeout=10.0
)
def qdrant_search(collect, query, k=3):
embeddings = HuggingFaceEmbeddings(model_name="../myQdrant/bge-large-zh")
query_emb = embeddings.embed_query(query)
response = client.query_points(
collection_name=collect,
query=query_emb,
limit=k,
with_payload=True
)
docs = []
for hit in response.points:
full_payload = hit.payload
content = str(full_payload)
docs.append(Document(
page_content=content,
metadata={"score": hit.score, "id": hit.id}
))
return docs
def rerank_bge(query, docs, top_k: int = 3):
"""bge-large-zh重排"""
embeddings = HuggingFaceEmbeddings(model_name="../myQdrant/bge-large-zh")
query_emb = embeddings.embed_query(query)
doc_embs = embeddings.embed_documents([doc.page_content for doc in docs])
scores = [sum(a * b for a, b in zip(query_emb, doc_emb)) for doc_emb in doc_embs]
ranked = sorted(zip(scores, docs), key=lambda x: x[0], reverse=True)
return [doc for _, doc in ranked[:top_k]]
def rag_chain(inputs):
# 检索
question = inputs["question"]
collection_name = "order_ex"
docs = qdrant_search(collection_name, question, 6)
# 重排
reranked_docs = rerank_bge(question, docs)
context = "\n\n".join([doc.page_content for doc in reranked_docs])
print(context)
prompt = ChatPromptTemplate.from_template("""
根据以下文档回答问题。
文档:
{context}
问题:{question}
回答:
""")
chain = prompt | llm | StrOutputParser()
return chain.invoke({"context": context, "question": question})
question = "LangChain最新版本的前一个版本发布时间是什么时候?"
print(rag_chain({"question": question}))