LangChainでストリーミングを有効にする方法:包括的なガイド

LLM用のLangchainストリーミングを有効にする方法は、この記事を読んで解明してみてください!

Anakin AIを無料で利用開始

LangChainでストリーミングを有効にする方法:包括的なガイド

Start for free
目次

以下のHTMLコードを日本語に翻訳してください。 次のようなURL: https://anakin.ai/apps/keyword または https://anakin.ai/blog/keyword を次のように変換してください: https://anakin.ai/ja-jp/apps/keyword https://anakin.ai/ja-jp/blog/keyword 以下はHTMLコードです:

💡
Claude 3.5 Sonnetを試してみたいですか?

オールインワンの価格タグであらゆるAIモデルにアクセスできるAIプラットフォームをお探しですか?

それなら、Anakin AIを見逃すことはできません!

Anakin AIは、ワークフロー自動化のためのオールインワンプラットフォームで、使いやすいノーコードアプリビルダーを使用して強力なAIアプリを作成できます。 Llama 3ClaudeGPT-4Uncensored LLMsStable Diffusion... を使用して、数分で夢のようなAIアプリを作成しましょう!

ストリーミングは、現代の言語モデルアプリケーションにおける重要な機能であり、リアルタイムの出力生成とユーザーエクスペリエンスの向上を可能にします。この記事では、LangChainを使用したストリーミングを有効にするプロセスについて詳しく説明し、プロジェクトでこの強力な機能を実装するための手順とサンプルコードを提供します。

LangChainストリーミングの基本を理解する

LangChainでは、言語モデルによって生成されるコンテンツをリアルタイムで受け取るためのストリーミングのサポートが提供されています。これは、レスポンシブなアプリケーションやチャットボットの作成に特に便利です。

LangChainストリーミングのキーコンセプト

LangChainでのストリーミングは、Runnableインターフェースを介して実装されており、次の2つの主なアプローチがあります:

  1. stream()astream():これらのメソッドは、チェーンから最終出力をストリーミングします。
  2. stream_events()astream_log():これらのメソッドは、中間ステップと最終出力の両方をストリーミングします。

LangChainストリーミングの環境をセットアップする

ストリーミングの実装に入る前に、環境が適切に設定されていることを確認してください。

ストリーミング用のLangChainのインストール

まず、LangChainと必要な依存関係をインストールしてください:

pip install langchain langchain-openai

LangChainストリーミング環境の設定

APIキーとその他の環境変数を設定してください:

import os
from dotenv import load_dotenv

load_dotenv()
os.environ["OPENAI_API_KEY"] = "ここにAPIキーを記入してください"

基本的なLangChainストリーミングの実装

LangChainを使用したストリーミングの基本的な例から始めましょう。

シンプルなLangChainストリーミングチェーンの作成

以下は、出力をストリーミングする基本的なチェーンです:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 言語モデルの初期化
llm = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)

# プロンプトテンプレートの作成
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")

# シンプルなチェーンの作成
chain = prompt | llm | StrOutputParser()

# 出力をストリーミングする
for chunk in chain.stream({"topic": "programming"}):
    print(chunk, end="", flush=True)

高度なLangChainストリーミングテクニック

次に、LangChainを使用したより高度なストリーミングテクニックを探ってみましょう。

LangChainエージェントとのストリーミング

LangChainエージェントは、より複雑なタスクに対してもストリーミングを活用することができます:

from langchain.agents import AgentType, initialize_agent
from langchain.tools import DuckDuckGoSearchRun

# 検索ツールの初期化
search = DuckDuckGoSearchRun()

# ストリーミングを使用したエージェントの作成
agent = initialize_agent(
    [search],
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# エージェントの思考プロセスとアクションをストリーミングする
for chunk in agent.stream("What's the latest news about AI?"):
    print(chunk, end="", flush=True)

カスタムコールバックを使用したLangChainストリーミングの実装

カスタムコールバックを使用すると、ストリーミングプロセスをより制御することができます:

from langchain.callbacks.base import BaseCallbackHandler

class StreamingCallback(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(token, end="", flush=True)

# チェーンでカスタムコールバックを使用する
chain.invoke(
    {"topic": "artificial intelligence"},
    config={"callbacks": [StreamingCallback()]}
)

LangChainストリーミングのパフォーマンスを最適化する

LangChainストリーミングの最高のパフォーマンスを得るために、次の最適化技術を考慮してください。

LangChainストリーミングでのバッチ処理

複数の入力を処理する場合、バッチ処理を行うことでスループットを向上させることができます:

from langchain_core.runnables import RunnablePassthrough

# バッチ処理を行うチェーンの作成
batched_chain = (
    RunnablePassthrough.batch(batch_size=2)
    | prompt
    | llm.batch()
    | StrOutputParser.batch()
)

# バッチ処理された結果をストリーミングする
inputs = [{"topic": "Python"}, {"topic": "JavaScript"}, {"topic": "Rust"}]
for batch in batched_chain.stream(inputs):
    for result in batch:
        print(result, end="", flush=True)
    print("\n---")

非同期LangChainストリーミング

非同期環境でのパフォーマンスを向上させるために:

import asyncio

async def stream_async():
    async for chunk in chain.astream({"topic": "machine learning"}):
        print(chunk, end="", flush=True)

asyncio.run(stream_async())

LangChainストリーミングでのエラー処理

ストリーミングでのエラー処理は、スムーズなユーザーエクスペリエンスを確保するために重要です。

LangChainストリーミングエラー処理の実装

ストリーミング中にエラーを処理する方法の例です:

try:
    for chunk in chain.stream({"topic": "error handling"}):
        print(chunk, end="", flush=True)
except Exception as e:
    print(f"\nエラーが発生しました:{str(e)}")

LangChainストリーミングをWebアプリケーションと統合する

ストリーミングはWebアプリケーションの反応速度を大幅に向上させることができます。LangChainストリーミングをシンプルなFlaskアプリケーションと統合する方法を見てみましょう。

LangChainストリーミングを使用したFlaskアプリケーションの作成

from flask import Flask, Response, stream_with_context

app = Flask(__name__)

@app.route('/stream')
def stream():
    def generate():
        for chunk in chain.stream({"topic": "web development"}):
            yield chunk + " "

    return Response(stream_with_context(generate()), mimetype='text/plain')

if __name__ == '__main__':
    app.run(debug=True)

LangChainストリーミングの高度な使用例

LangChainストリーミングのさらなる高度な使用例を見てみましょう。

ドキュメントQ&A用のLangChainストリーミングの実装

ドキュメントの質問応答システムにストリーミングを使用する例です:

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.runnables import RunnablePassthrough

# ドキュメントを含むベクトルストアがあると仮定します
vectorstore = Chroma(embedding_function=OpenAIEmbeddings())

retriever = vectorstore.as_retriever()

template = """以下のコンテキストに基づいて質問に答えてください:
{context}

質問:{question}
"""
prompt = ChatPromptTemplate.from_template(template)

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

for chunk in chain.stream("フランスの首都は何ですか?"):
    print(chunk, end="", flush=True)

複数ステップタスクのLangChainストリーミングの実装

複数のステップを含む複雑なタスクには、中間結果をストリーミングすることができます:

from langchain_core.runnables import RunnableSequence

def format_step(step):
    return f"ステップ{step['step_number']}:{step['description']}"

step_chain = (
    ChatPromptTemplate.from_template("ステップ{step_number}を提供してください")
    | llm
    | StrOutputParser()
)

multi_step_chain = RunnableSequence(
    {"steps": step_chain.map(),
     "task": lambda x: x["task"],
     "step_number": lambda x: list(range(1, x["num_steps"] + 1))}
)

for chunk in multi_step_chain.stream({"task": "ケーキの焼き方", "num_steps": 5}):
    if isinstance(chunk, dict) and "steps" in chunk:
        for step in chunk["steps"]:
            print(format_step(step))
    else:
        print(chunk, end="", flush=True)

結論

LangChainのストリーミングを有効にすると、レスポンシブでダイナミックな言語モデルアプリケーションを作成するための可能性が広がります。単純なテキスト生成から複雑な複数ステップタスクまで、ストリーミングによりリアルタイムでの対話やユーザーエクスペリエンスの向上が可能になります。このガイドで提供された技術と例に従うことで、LangChainストリーミングのパワーを活用し、より魅力的で効率的なAIパワードアプリケーションを開発することができます。

常にエラーをスムーズに処理し、大規模なアプリケーションの場合はパフォーマンスを最適化し、ストリーミングを実装する際には使用ケースの特定の要件を考慮してください。LangChainの柔軟でパワフルなストリーミング機能を活用することで、ユーザーの入力や状況の変化にリアルタイムで反応する高度なAIアプリケーションを構築することができます。

💡
Claude 3.5 Sonnetを試してみたいですか?

オールインワンの価格タグであらゆるAIモデルにアクセスできるAIプラットフォームをお探しですか?

それなら、Anakin AIを見逃すことはできません!

Anakin AIは、ワークフロー自動化のためのオールインワンプラットフォームで、使いやすいノーコードアプリビルダーを使用して強力なAIアプリを作成できます。 Llama 3ClaudeGPT-4Uncensored LLMsStable Diffusion... を使用して、数分で夢のようなAIアプリを作成しましょう!