LangChain


LangChain

概念

  • LLM框架
  • 可以帮助:统一模型接口、提示词管理、Chain、Agent、记忆、RAG

创建chain、agent

加载大模型

# 环境
python:3.13
torch:2.10.0+cu126
tokenizers:0.22.2
transformers:5.3.0
langchain:1.2.12
langchain-hugginface:1.2.1
langchain-openai:1.1.11
# load_model.py
# 加载大模型脚本,使用LLM的时候调用
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# pip install langchain-huggingface langchain-openai
from langchain_huggingface import HuggingFacePipeline, ChatHuggingFace
from langchain_openai import ChatOpenAI


def local_model():
    # 加载模型
    model_name = "./model"
    # 加载分词器
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    # 加载模型AutoModelForCausalLM会自动识别模型类型并加载对应的因果语言模型
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        dtype=torch.float16,
        device_map="auto"
    )
    if "cuda" in str(model.device):
        print(f"模型运行在GPU上: {model.device}")
        print(f"显存使用量: {torch.cuda.memory_allocated() / 1024 ** 3} GB")
    else:
        print("模型运行在CPU上")

    # 创建pipeline
    pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=512,
        temperature=0.2,
        repetition_penalty=1.1,
        return_full_text=False,
        device_map="auto"
    )
    llm_pipeline = HuggingFacePipeline(pipeline=pipe)
    return ChatHuggingFace(llm=llm_pipeline)


def remote_model():
    base_url = "xxxxx"
    api_key = "xxxxx"
    model = "xxxx"
    llm = ChatOpenAI(
        model=model,
        temperature=0.2,
        top_p=0.9,
        frequency_penalty=0,
        openai_api_key=api_key,
        openai_api_base=base_url,
        max_retries=2,
        timeout=60,
    )
    return llm

使用demo

# pip install langchain
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_core.prompts import PromptTemplate
from load_model import local_model

# 加载大模型
llm = local_model()


@tool
def get_weather(city: str) -> str:
    """获取一个城市的天气"""
    print("get_weather tool")
    return f"{city}的天气是大太阳!气温27度" if city == "重庆" else "{city}的天气是!气温18度"


# ============   简单agent   ============
def simple_agent():
    agent = create_agent(
        model=llm,
        tools=[get_weather],
        system_prompt="你是一个天气预报员",
    )
    # Run the agent
    res = agent.invoke(
        {"messages": [{"role": "user", "content": "重庆天气怎么样?"}]}
    )
    print(res["messages"][-1].content)


# ============   chain   ============
def simple_chain():
    prompt = PromptTemplate.from_template(
        "请为一个新式 {product} 设计一个有创意、有幽默感的名称,解释这样设计的含义"
    )
    chain = prompt | llm
    result = chain.invoke({"product": "智能水杯"})
    print(result.content)


# ============   chain   ============
def multy_chain():
    prompt1 = PromptTemplate.from_template(
        "请为一个新式 {product} 设计一个有创意、有幽默感的名称,解释这样设计的含义"
    )
    prompt2 = PromptTemplate.from_template(
        "请对以下内容进行改进,使其更具吸引力:\n{initial_output}"
    )
    chain = (
            prompt1 | llm | prompt2 | llm
    )
    result = chain.invoke({"product": "智能水杯"})
    print(result.content)


# ============   逐一输出   ============
def stream_print():
    for chunk in llm.stream("为什么夏亚阿兹纳布尔加入扎比家吉翁军?"):
        for char in chunk.content:
            print(char, end="|", flush=True)


stream_print()

Agent进阶

# 带记忆,输出格式化,agent协调多个tool
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime
from dataclasses import dataclass
from load_model import local_model

from langgraph.checkpoint.memory import InMemorySaver


SYSTEM_PROMPT = """你是一位擅长用双关语表达的专家天气预报员。
你可以使用两个工具:
- get_weather_for_location:用于获取指定城市的天气。
- get_user_location:用于获取用户的位置。
"""


llm = local_model()


@dataclass
class Context:
    """自定义运行时上下文模式。"""
    user_id: str


@tool
def get_weather_for_location(city: str) -> str:
    """获取指定城市的天气。"""
    print("天气工具")
    return f"{city}总是阳光明媚!气温27度" if city == "重庆" else f"{city}在下大雨!气温16度"


@tool
def get_user_location(runtime: ToolRuntime[Context]) -> str:
    """根据用户 ID 获取用户信息。"""
    user_id = runtime.context.user_id
    print("位置")
    return "重庆" if user_id == "1" else "北京"


# 定义响应格式
@dataclass
class ResponseFormat:
    """代理的响应模式。"""
    # 带双关语的回应(始终必需)
    punny_response: str
    # 天气的任何有趣信息(如果有)
    weather_conditions: str | None = None


# 设置记忆
checkpointer = InMemorySaver()
# `thread_id` 是给定对话的唯一标识符。
config = {"configurable": {"thread_id": "1"}}


agent = create_agent(
    model=llm,
    tools=[get_user_location,get_weather_for_location],
    context_schema=Context,
    checkpointer=checkpointer,
    system_prompt=SYSTEM_PROMPT,
    response_format=ResponseFormat
)

# Run the agent
result = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫阿姆罗雷,今天天气怎么样?"}]},
    config=config,
    context=Context(user_id="1")
)
# print(result["messages"][-1].content)
print(result['structured_response'])
result = agent.invoke(
    {"messages": [{"role": "user", "content": "谢谢,我叫什么名字?你记住了吗?"}]},
    config=config,
    context=Context(user_id="1")
)
# print(result["messages"][-1].content)
print(result['structured_response'])

带记忆chain

# 带记忆的调用链
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
# pip install langchain-community 0.4.1
from langchain_community.chat_message_histories import ChatMessageHistory
from load_model import local_model


llm = local_model()
# 创建 RunnableSequence
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个助手。请根据对话历史回答问题。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])
runnable = prompt | llm
# 初始化消息历史存储
store = {}


def get_session_history(session_id: str) -> ChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]


# 创建 RunnableWithMessageHistory
chain = RunnableWithMessageHistory(
    runnable,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history"
)

# 调用链
session_id = "1"
response = chain.invoke(
    {"input": "我叫秦始皇"},
    config={"configurable": {"session_id": session_id}}
)
print(response.content)

response = chain.invoke(
    {"input": "我的名字是什么?"},
    config={"configurable": {"session_id": session_id}}
)
print(response.content)

RAG demo

import bs4
from langchain_community.document_loaders import (
    PyPDFLoader, Docx2txtLoader,
    CSVLoader, UnstructuredExcelLoader,WebBaseLoader
)
# pip install sentence-transformers pip install chromadb pip install "unstructured[excel]"
# pip install msoffcrypto-tool pip install docx2txt
from pathlib import Path
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from load_model import local_model


llm = local_model()


def load_all_formats(paths):
    """支持PDF/DOC/CSV/Excel"""
    docs = []
    for path in paths:
        ext = Path(path).suffix.lower()
        try:
            if ext == '.pdf':
                loader = PyPDFLoader(path)
            elif ext in ['.docx', '.doc']:
                loader = Docx2txtLoader(path)
            elif ext == '.csv':
                loader = CSVLoader(path, encoding="utf-8")
            elif ext in ['.xlsx', '.xls']:
                loader = UnstructuredExcelLoader(path, mode="single")
            else:
                print(f"不支持: {path}")
                continue
            page_docs = loader.load()
            docs.extend(page_docs)
            print(f"{path}: {len(page_docs)}页/表")
        except Exception as e:
            print(f"{path}: {e}")
    return docs


def rerank_bge(query, docs, top_k=3):
    # 重排器
    reranker_embeddings = HuggingFaceEmbeddings(
        model_name="../myQdrant/bge-large-zh",  # 本地路径
        model_kwargs={'device': 'cuda'},
        encode_kwargs={'normalize_embeddings': True}
    )
    query_emb = reranker_embeddings.embed_query(query)
    doc_embs = reranker_embeddings.embed_documents([doc.page_content for doc in docs])
    scores = []
    for doc_emb in doc_embs:
        score = sum(a * b for a, b in zip(query_emb, doc_emb))  # 点积=余弦(已归一化)
        scores.append(score)
    # 重排
    ranked = sorted(zip(scores, docs), key=lambda x: x[0], reverse=True)
    return [doc for _, doc in ranked[:top_k]]


def rag_chain(inputs):
    # ========== WEB获取 =============
    # bs4_strainer = bs4.SoupStrainer(class_=("post-title", "post-header", "post-content"))
    common_selectors = [
        "article", "main", ".content", ".post-content", ".article-body",
        "#content", "#main", ".entry-content", "div[class*='content']",
        ".post", ".blog-post", "section"
    ]
    strainer = bs4.SoupStrainer(name=common_selectors)
    web_loader = WebBaseLoader(
        web_paths=(f"https://langchain-doc.cn/v1/python/langchain/philosophy.html",),
        bs_kwargs={"parse_only": strainer}
    )
    # docs = web_loader.load()
    # ========== PDF 获取 ============
    pdf_paths = [f"C:\\Users\\zwq\\Desktop\\demo.xlsx", f"C:\\Users\\zwq\\doc\\test.pdf", f"C:\\Users\\zwq\\Desktop\\test.docx"]
    docs = load_all_formats(pdf_paths)
    # 切割文本
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,  # 一个块最大字符
        chunk_overlap=200,  # 允许一个块重复的字符,保证语义
        add_start_index=True,  # 每个块在全局的index
    )
    all_splits = text_splitter.split_documents(docs)
    print(f"Split blog post into {len(all_splits)} sub-documents.")

    # 向量存储
    embeddings = HuggingFaceEmbeddings(
        model_name="../myQdrant/bge-large-zh"
    )
    vectorstore = Chroma.from_documents(all_splits, embeddings)
    retriever = vectorstore.as_retriever(search_kwargs={"k": 6})  # Step1: 检索6个候选

    # 检索
    question = inputs["question"]
    docs = retriever.invoke(question)  # Step1: bge-large-zh检索
    reranked_docs = rerank_bge(question, docs)  # Step2: bge-large-zh重排
    context = "\n\n".join([doc.page_content for doc in reranked_docs])
    print(context)
    prompt = ChatPromptTemplate.from_template("""
    根据以下文档回答问题。

    文档:
    {context}

    问题:{question}
    回答:
    """)
    chain = prompt | llm | StrOutputParser()
    return chain.invoke({"context": context, "question": question})


web_question = "LangChain最新版本的前一个版本发布时间是什么时候?"
pdf_question = "达梦数据库(广州)有限公司的详细地址"
print(rag_chain({"question": pdf_question}))

RAG from Qdrant

from langchain_core.documents import Document
from qdrant_client import QdrantClient
# pip install langchain-qdrant
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_huggingface import HuggingFaceEmbeddings
from load_model import local_model


llm = local_model()

client = QdrantClient(
    host="localhost",
    api_key="xxx",
    port=6333,
    https=False,
    timeout=10.0
)


def qdrant_search(collect, query, k=3):
    embeddings = HuggingFaceEmbeddings(model_name="../myQdrant/bge-large-zh")
    query_emb = embeddings.embed_query(query)
    response = client.query_points(
        collection_name=collect,
        query=query_emb,
        limit=k,
        with_payload=True
    )
    docs = []
    for hit in response.points:
        full_payload = hit.payload
        content = str(full_payload)
        docs.append(Document(
            page_content=content,
            metadata={"score": hit.score, "id": hit.id}
        ))
    return docs


def rerank_bge(query, docs, top_k: int = 3):
    """bge-large-zh重排"""
    embeddings = HuggingFaceEmbeddings(model_name="../myQdrant/bge-large-zh")
    query_emb = embeddings.embed_query(query)
    doc_embs = embeddings.embed_documents([doc.page_content for doc in docs])
    scores = [sum(a * b for a, b in zip(query_emb, doc_emb)) for doc_emb in doc_embs]
    ranked = sorted(zip(scores, docs), key=lambda x: x[0], reverse=True)
    return [doc for _, doc in ranked[:top_k]]


def rag_chain(inputs):
    # 检索
    question = inputs["question"]
    collection_name = "order_ex"
    docs = qdrant_search(collection_name, question, 6)
    # 重排
    reranked_docs = rerank_bge(question, docs)
    context = "\n\n".join([doc.page_content for doc in reranked_docs])
    print(context)
    prompt = ChatPromptTemplate.from_template("""
    根据以下文档回答问题。

    文档:
    {context}

    问题:{question}
    回答:
    """)
    chain = prompt | llm | StrOutputParser()
    return chain.invoke({"context": context, "question": question})


question = "LangChain最新版本的前一个版本发布时间是什么时候?"
print(rag_chain({"question": question}))

  目录