How to stream LLM tokens from your graph¶
When building LLM applications with LangGraph, you might want to stream individual LLM tokens from the LLM calls inside LangGraph nodes. You can do so via graph.stream(..., stream_mode="messages")
:
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
def call_model(state: State):
model.invoke(...)
...
graph = (
StateGraph(State)
.add_node(call_model)
...
.compile()
for msg, metadata in graph.stream(inputs, stream_mode="messages"):
print(msg)
The streamed outputs will be tuples of (message chunk, metadata)
:
- message chunk is the token streamed by the LLM
- metadata is a dictionary with information about the graph node where the LLM was called as well as the LLM invocation metadata
Using without LangChain
If you need to stream LLM tokens without using LangChain, you can use stream_mode="custom"
to stream the outputs from LLM provider clients directly. Check out the example below to learn more.
Async in Python < 3.11
When using Python < 3.11 with async code, please ensure you manually pass the RunnableConfig
through to the chat model when invoking it like so: model.ainvoke(..., config)
.
The stream method collects all events from your nested code using a streaming tracer passed as a callback. In 3.11 and above, this is automatically handled via contextvars; prior to 3.11, asyncio's tasks lacked proper contextvar
support, meaning that the callbacks will only propagate if you manually pass the config through. We do this in the call_model
function below.
Setup¶
First we need to install the packages required
Next, we need to set API keys for OpenAI (the LLM we will use).
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
Set up LangSmith for LangGraph development
Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.
Note
Note that in call_model(state: State, config: RunnableConfig):
below, we a) accept the RunnableConfig
in the node function and b) pass it in as the second arg for model.ainvoke(..., config)
. This is optional for python >= 3.11.
Example¶
Below we demonstrate an example with two LLM calls in a single node.
from typing import TypedDict
from langgraph.graph import START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI
# Note: we're adding the tags here to be able to filter the model outputs down the line
joke_model = ChatOpenAI(model="gpt-4o-mini", tags=["joke"])
poem_model = ChatOpenAI(model="gpt-4o-mini", tags=["poem"])
class State(TypedDict):
topic: str
joke: str
poem: str
async def call_model(state, config):
topic = state["topic"]
print("Writing joke...")
# Note: Passing the config through explicitly is required for python < 3.11
# Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config,
)
print("\n\nWriting poem...")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}],
config,
)
return {"joke": joke_response.content, "poem": poem_response.content}
graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()
API Reference: START | StateGraph | ChatOpenAI
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages",
):
if msg.content:
print(msg.content, end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?
|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|
Writing poem...
In| sun|lit| patches|,| sleek| and| sly|,|
|Wh|isk|ers| twitch| as| shadows| fly|.|
|With| velvet| paws| and| eyes| so| bright|,|
|They| dance| through| dreams|,| both| day| and| night|.|
|A| playful| p|ounce|,| a| gentle| p|urr|,|
|In| every| leap|,| a| soft| allure|.|
|Cur|led| in| warmth|,| a| silent| grace|,|
|Each| furry| friend|,| a| warm| embrace|.|
|Myst|ery| wrapped| in| fur| and| charm|,|
|A| soothing| presence|,| a| gentle| balm|.|
|In| their| gaze|,| the| world| slows| down|,|
|For| in| their| realm|,| we're| all| ren|own|.|
{'langgraph_step': 1,
'langgraph_node': 'call_model',
'langgraph_triggers': ['start:call_model'],
'langgraph_path': ('__pregel_pull', 'call_model'),
'langgraph_checkpoint_ns': 'call_model:6ddc5f0f-1dd0-325d-3014-f949286ce595',
'checkpoint_ns': 'call_model:6ddc5f0f-1dd0-325d-3014-f949286ce595',
'ls_provider': 'openai',
'ls_model_name': 'gpt-4o-mini',
'ls_model_type': 'chat',
'ls_temperature': 0.7,
'tags': ['poem']}
Filter to specific LLM invocation¶
You can see that we're streaming tokens from all of the LLM invocations. Let's now filter the streamed tokens to include only a specific LLM invocation. We can use the streamed metadata and filter events using the tags we've added to the LLMs previously:
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages",
):
if msg.content and "joke" in metadata.get("tags", []):
print(msg.content, end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?
|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|
Writing poem...
Example without LangChain¶
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"
async def stream_tokens(model_name: str, messages: list[dict]):
response = await openai_client.chat.completions.create(
messages=messages, model=model_name, stream=True
)
role = None
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
yield {"role": role, "content": delta.content}
async def call_model(state, config, writer):
topic = state["topic"]
joke = ""
poem = ""
print("Writing joke...")
async for msg_chunk in stream_tokens(
model_name, [{"role": "user", "content": f"Write a joke about {topic}"}]
):
joke += msg_chunk["content"]
metadata = {**config["metadata"], "tags": ["joke"]}
chunk_to_stream = (msg_chunk, metadata)
writer(chunk_to_stream)
print("\n\nWriting poem...")
async for msg_chunk in stream_tokens(
model_name, [{"role": "user", "content": f"Write a short poem about {topic}"}]
):
poem += msg_chunk["content"]
metadata = {**config["metadata"], "tags": ["poem"]}
chunk_to_stream = (msg_chunk, metadata)
writer(chunk_to_stream)
return {"joke": joke, "poem": poem}
graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()
stream_mode="custom"
When streaming LLM tokens without LangChain, we recommend using stream_mode="custom"
. This allows you to explicitly control which data from the LLM provider APIs to include in LangGraph streamed outputs, including any additional metadata.
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="custom",
):
print(msg["content"], end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?
|Because| it| wanted| to| keep| an| eye| on| the|
Writing poem...
mouse|!|In| sun|lit| patches|,| they| stretch| and| y|awn|,|
|With| whispered| paws| at| the| break| of| dawn|.|
|Wh|isk|ers| twitch| in| the| morning| light|,|
|Sil|ken| shadows|,| a| graceful| sight|.|
|The| gentle| p|urr|s|,| a| soothing| song|,|
|In| a| world| of| comfort|,| where| they| belong|.|
|M|yster|ious| hearts| wrapped| in| soft|est| fur|,|
|F|eline| whispers| in| every| p|urr|.|
|Ch|asing| dreams| on| a| moon|lit| chase|,|
|With| a| flick| of| a| tail|,| they| glide| with| grace|.|
|Oh|,| playful| spirits| of| whisk|ered| cheer|,|
|In| your| quiet| company|,| the| world| feels| near|.| |
{'langgraph_step': 1,
'langgraph_node': 'call_model',
'langgraph_triggers': ['start:call_model'],
'langgraph_path': ('__pregel_pull', 'call_model'),
'langgraph_checkpoint_ns': 'call_model:3fa3fbe1-39d8-5209-dd77-0da38d4cc1c9',
'tags': ['poem']}
To filter to the specific LLM invocation, you can use the streamed metadata:
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="custom",
):
if "poem" in metadata.get("tags", []):
print(msg["content"], end="|", flush=True)
Writing joke...
Writing poem...
In| shadows| soft|,| they| weave| and| play|,|
|With| whispered| paws|,| they| greet| the| day|.|
|Eyes| like| lantern|s|,| bright| and| keen|,|
|Guard|ians| of| secrets|,| unseen|,| serene|.|
|They| twist| and| stretch| in| sun|lit| beams|,|
|Ch|asing| the| echoes| of| half|-|formed| dreams|.|
|With| p|urring| songs| that| soothe| the| night|,|
|F|eline| spirits|,| pure| delight|.|
|On| windows|ills|,| they| perch| and| stare|,|
|Ad|vent|urers| bold| with| a| graceful| flair|.|
|In| every| leap| and| playful| bound|,|
|The| magic| of| cats|—|where| love| is| found|.|