How to stream data from within a tool¶
If your graph calls tools that use LLMs or any other streaming APIs, you might want to surface partial results during the execution of the tool, especially if the tool takes a longer time to run.
-
To stream arbitrary data from inside a tool you can use
stream_mode="custom"
andget_stream_writer()
: -
To stream LLM tokens generated by a tool calling an LLM you can use
stream_mode="messages"
:from langgraph.graph import StateGraph, MessagesState from langchain_openai import ChatOpenAI model = ChatOpenAI() def tool(tool_arg: str): model.invoke(tool_arg) ... def call_tools(state: MessagesState): tool_call = get_tool_call(state) tool_result = tool(**tool_call["args"]) ... graph = ( StateGraph(MessagesState) .add_node(call_tools) ... .compile() for msg, metadata in graph.stream( inputs, stream_mode="messages" ): print(msg)
Using without LangChain
If you need to stream data from inside tools without using LangChain, you can use stream_mode="custom"
. Check out the example below to learn more.
Async in Python < 3.11
When using Python < 3.11 with async code, please ensure you manually pass the RunnableConfig
through to the chat model when invoking it like so: model.ainvoke(..., config)
.
The stream method collects all events from your nested code using a streaming tracer passed as a callback. In 3.11 and above, this is automatically handled via contextvars; prior to 3.11, asyncio's tasks lacked proper contextvar
support, meaning that the callbacks will only propagate if you manually pass the config through. We do this in the call_model
function below.
Setup¶
First, let's install the required packages and set our API keys
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
Set up LangSmith for LangGraph development
Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.
Streaming custom data¶
We'll use a prebuilt ReAct agent for this guide:
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer
@tool
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
# this can be replaced with any actual streaming logic that you might have
items = ["books", "penciles", "pictures"]
for chunk in items:
writer({"custom_tool_data": chunk})
return ", ".join(items)
llm = ChatOpenAI(model_name="gpt-4o-mini")
tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)
API Reference: tool | ChatOpenAI | create_react_agent
Let's now invoke our agent with an input that requires a tool call:
inputs = {
"messages": [
{"role": "user", "content": "what items are in the office?"}
]
}
async for chunk in agent.astream(
inputs,
stream_mode="custom",
):
print(chunk)
Streaming LLM tokens¶
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import RunnableConfig
@tool
async def get_items(
place: str,
# Manually accept config (needed for Python <= 3.10)
config: RunnableConfig,
) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
# Attention: when using async, you should be invoking the LLM using ainvoke!
# If you fail to do so, streaming will NOT work.
response = await llm.ainvoke(
[
{
"role": "user",
"content": (
f"Can you tell me what kind of items i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. And include a brief description of each item."
),
}
],
config,
)
return response.content
tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)
API Reference: AIMessageChunk | RunnableConfig
inputs = {
"messages": [
{"role": "user", "content": "what items are in the bedroom?"}
]
}
async for msg, metadata in agent.astream(
inputs,
stream_mode="messages",
):
if (
isinstance(msg, AIMessageChunk)
and msg.content
# Stream all messages from the tool node
and metadata["langgraph_node"] == "tools"
):
print(msg.content, end="|", flush=True)
Certainly|!| Here| are| three| items| you| might| find| in| a| bedroom|:
|1|.| **|Bed|**|:| The| central| piece| of| furniture| in| a| bedroom|,| typically| consisting| of| a| mattress| supported| by| a| frame|.| It| is| designed| for| sleeping| and| can| vary| in| size| from| twin| to| king|.| Beds| often| have| bedding|,| including| sheets|,| pillows|,| and| comfort|ers|,| to| enhance| comfort|.
|2|.| **|D|resser|**|:| A| piece| of| furniture| with| drawers| used| for| storing| clothing| and| personal| items|.| Dress|ers| often| have| a| flat| surface| on| top|,| which| can| be| used| for| decorative| items|,| a| mirror|,| or| personal| accessories|.| They| help| keep| the| bedroom| organized| and| clutter|-free|.
|3|.| **|Night|stand|**|:| A| small| table| or| cabinet| placed| beside| the| bed|,| used| for| holding| items| such| as| a| lamp|,| alarm| clock|,| books|,| or| personal| items|.| Night|stands| provide| convenience| for| easy| access| to| essentials| during| the| night|,| adding| functionality| and| style| to| the| bedroom| decor|.|
Example without LangChain¶
You can also stream data from within tool invocations without using LangChain. Below example demonstrates how to do it for a graph with a single tool-executing node. We'll leave it as an exercise for the reader to implement ReAct agent from scratch without using LangChain.
import operator
import json
from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"
async def stream_tokens(model_name: str, messages: list[dict]):
response = await openai_client.chat.completions.create(
messages=messages, model=model_name, stream=True
)
role = None
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
yield {"role": role, "content": delta.content}
# this is our tool
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
response = ""
async for msg_chunk in stream_tokens(
model_name,
[
{
"role": "user",
"content": (
"Can you tell me what kind of items "
f"i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. "
"And include a brief description of each item."
),
}
],
):
response += msg_chunk["content"]
writer(msg_chunk)
return response
class State(TypedDict):
messages: Annotated[list[dict], operator.add]
# this is the tool-calling graph node
async def call_tool(state: State):
ai_message = state["messages"][-1]
tool_call = ai_message["tool_calls"][-1]
function_name = tool_call["function"]["name"]
if function_name != "get_items":
raise ValueError(f"Tool {function_name} not supported")
function_arguments = tool_call["function"]["arguments"]
arguments = json.loads(function_arguments)
function_response = await get_items(**arguments)
tool_message = {
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": function_response,
}
return {"messages": [tool_message]}
graph = (
StateGraph(State)
.add_node(call_tool)
.add_edge(START, "call_tool")
.compile()
)
API Reference: StateGraph | START
Let's now invoke our graph with an AI message that contains a tool call:
inputs = {
"messages": [
{
"content": None,
"role": "assistant",
"tool_calls": [
{
"id": "1",
"function": {
"arguments": '{"place":"bedroom"}',
"name": "get_items",
},
"type": "function",
}
],
}
]
}
async for chunk in graph.astream(
inputs,
stream_mode="custom",
):
print(chunk["content"], end="|", flush=True)
Sure|!| Here| are| three| common| items| you| might| find| in| a| bedroom|:
|1|.| **|Bed|**|:| The| focal| point| of| the| bedroom|,| a| bed| typically| consists| of| a| mattress| resting| on| a| frame|,| and| it| may| include| pillows| and| bedding|.| It| provides| a| comfortable| place| for| sleeping| and| resting|.
|2|.| **|D|resser|**|:| A| piece| of| furniture| with| multiple| drawers|,| a| dresser| is| used| for| storing| clothes|,| accessories|,| and| personal| items|.| It| often| has| a| flat| surface| that| may| be| used| to| display| decorative| items| or| a| mirror|.
|3|.| **|Night|stand|**|:| Also| known| as| a| bedside| table|,| a| night|stand| is| placed| next| to| the| bed| and| typically| holds| items| like| lamps|,| books|,| alarm| clocks|,| and| personal| belongings| for| convenience| during| the| night|.
|These| items| contribute| to| the| functionality| and| comfort| of| the| bedroom| environment|.|