Skip to content

How to stream LLM tokens from your graph

Prerequisites

This guide assumes familiarity with the following:

When building LLM applications with LangGraph, you might want to stream individual LLM tokens from the LLM calls inside LangGraph nodes. You can do so via graph.stream(..., stream_mode="messages"):

from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
def call_model(state: State):
    model.invoke(...)
    ...

graph = (
    StateGraph(State)
    .add_node(call_model)
    ...
    .compile()

for msg, metadata in graph.stream(inputs, stream_mode="messages"):
    print(msg)

The streamed outputs will be tuples of (message chunk, metadata):

  • message chunk is the token streamed by the LLM
  • metadata is a dictionary with information about the graph node where the LLM was called as well as the LLM invocation metadata

Using without LangChain

If you need to stream LLM tokens without using LangChain, you can use stream_mode="custom" to stream the outputs from LLM provider clients directly. Check out the example below to learn more.

Async in Python < 3.11

When using Python < 3.11 with async code, please ensure you manually pass the RunnableConfig through to the chat model when invoking it like so: model.ainvoke(..., config). The stream method collects all events from your nested code using a streaming tracer passed as a callback. In 3.11 and above, this is automatically handled via contextvars; prior to 3.11, asyncio's tasks lacked proper contextvar support, meaning that the callbacks will only propagate if you manually pass the config through. We do this in the call_model function below.

Setup

First we need to install the packages required

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai

Next, we need to set API keys for OpenAI (the LLM we will use).

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")

Set up LangSmith for LangGraph development

Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.

Note

Note that in call_model(state: State, config: RunnableConfig): below, we a) accept the RunnableConfig in the node function and b) pass it in as the second arg for model.ainvoke(..., config). This is optional for python >= 3.11.

Example

Below we demonstrate an example with two LLM calls in a single node.

from typing import TypedDict
from langgraph.graph import START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI


# Note: we're adding the tags here to be able to filter the model outputs down the line
joke_model = ChatOpenAI(model="gpt-4o-mini", tags=["joke"])
poem_model = ChatOpenAI(model="gpt-4o-mini", tags=["poem"])


class State(TypedDict):
    topic: str
    joke: str
    poem: str


async def call_model(state, config):
    topic = state["topic"]
    print("Writing joke...")
    # Note: Passing the config through explicitly is required for python < 3.11
    # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
    joke_response = await joke_model.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    print("\n\nWriting poem...")
    poem_response = await poem_model.ainvoke(
        [{"role": "user", "content": f"Write a short poem about {topic}"}],
        config,
    )
    return {"joke": joke_response.content, "poem": poem_response.content}


graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()

API Reference: START | StateGraph | ChatOpenAI

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",
):
    if msg.content:
        print(msg.content, end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|

Writing poem...
In| sun|lit| patches|,| sleek| and| sly|,|  
|Wh|isk|ers| twitch| as| shadows| fly|.|  
|With| velvet| paws| and| eyes| so| bright|,|  
|They| dance| through| dreams|,| both| day| and| night|.|  

|A| playful| p|ounce|,| a| gentle| p|urr|,|  
|In| every| leap|,| a| soft| allure|.|  
|Cur|led| in| warmth|,| a| silent| grace|,|  
|Each| furry| friend|,| a| warm| embrace|.|  

|Myst|ery| wrapped| in| fur| and| charm|,|  
|A| soothing| presence|,| a| gentle| balm|.|  
|In| their| gaze|,| the| world| slows| down|,|  
|For| in| their| realm|,| we're| all| ren|own|.|

metadata
{'langgraph_step': 1,
 'langgraph_node': 'call_model',
 'langgraph_triggers': ['start:call_model'],
 'langgraph_path': ('__pregel_pull', 'call_model'),
 'langgraph_checkpoint_ns': 'call_model:6ddc5f0f-1dd0-325d-3014-f949286ce595',
 'checkpoint_ns': 'call_model:6ddc5f0f-1dd0-325d-3014-f949286ce595',
 'ls_provider': 'openai',
 'ls_model_name': 'gpt-4o-mini',
 'ls_model_type': 'chat',
 'ls_temperature': 0.7,
 'tags': ['poem']}

Filter to specific LLM invocation

You can see that we're streaming tokens from all of the LLM invocations. Let's now filter the streamed tokens to include only a specific LLM invocation. We can use the streamed metadata and filter events using the tags we've added to the LLMs previously:

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",
):
    if msg.content and "joke" in metadata.get("tags", []):
        print(msg.content, end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|

Writing poem...

Example without LangChain

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )

    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


async def call_model(state, config, writer):
    topic = state["topic"]
    joke = ""
    poem = ""

    print("Writing joke...")
    async for msg_chunk in stream_tokens(
        model_name, [{"role": "user", "content": f"Write a joke about {topic}"}]
    ):
        joke += msg_chunk["content"]
        metadata = {**config["metadata"], "tags": ["joke"]}
        chunk_to_stream = (msg_chunk, metadata)
        writer(chunk_to_stream)

    print("\n\nWriting poem...")
    async for msg_chunk in stream_tokens(
        model_name, [{"role": "user", "content": f"Write a short poem about {topic}"}]
    ):
        poem += msg_chunk["content"]
        metadata = {**config["metadata"], "tags": ["poem"]}
        chunk_to_stream = (msg_chunk, metadata)
        writer(chunk_to_stream)

    return {"joke": joke, "poem": poem}


graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()

stream_mode="custom"

When streaming LLM tokens without LangChain, we recommend using stream_mode="custom". This allows you to explicitly control which data from the LLM provider APIs to include in LangGraph streamed outputs, including any additional metadata.

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="custom",
):
    print(msg["content"], end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the|

Writing poem...
 mouse|!|In| sun|lit| patches|,| they| stretch| and| y|awn|,|  
|With| whispered| paws| at| the| break| of| dawn|.|  
|Wh|isk|ers| twitch| in| the| morning| light|,|  
|Sil|ken| shadows|,| a| graceful| sight|.|  

|The| gentle| p|urr|s|,| a| soothing| song|,|  
|In| a| world| of| comfort|,| where| they| belong|.|  
|M|yster|ious| hearts| wrapped| in| soft|est| fur|,|  
|F|eline| whispers| in| every| p|urr|.|  

|Ch|asing| dreams| on| a| moon|lit| chase|,|  
|With| a| flick| of| a| tail|,| they| glide| with| grace|.|  
|Oh|,| playful| spirits| of| whisk|ered| cheer|,|  
|In| your| quiet| company|,| the| world| feels| near|.|  |

metadata
{'langgraph_step': 1,
 'langgraph_node': 'call_model',
 'langgraph_triggers': ['start:call_model'],
 'langgraph_path': ('__pregel_pull', 'call_model'),
 'langgraph_checkpoint_ns': 'call_model:3fa3fbe1-39d8-5209-dd77-0da38d4cc1c9',
 'tags': ['poem']}

To filter to the specific LLM invocation, you can use the streamed metadata:

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="custom",
):
    if "poem" in metadata.get("tags", []):
        print(msg["content"], end="|", flush=True)
Writing joke...


Writing poem...
In| shadows| soft|,| they| weave| and| play|,|  
|With| whispered| paws|,| they| greet| the| day|.|  
|Eyes| like| lantern|s|,| bright| and| keen|,|  
|Guard|ians| of| secrets|,| unseen|,| serene|.|  

|They| twist| and| stretch| in| sun|lit| beams|,|  
|Ch|asing| the| echoes| of| half|-|formed| dreams|.|  
|With| p|urring| songs| that| soothe| the| night|,|  
|F|eline| spirits|,| pure| delight|.|  

|On| windows|ills|,| they| perch| and| stare|,|  
|Ad|vent|urers| bold| with| a| graceful| flair|.|  
|In| every| leap| and| playful| bound|,|  
|The| magic| of| cats|—|where| love| is| found|.|

Comments