Skip to content

How to stream data from within a tool

Prerequisites

This guide assumes familiarity with the following:

If your graph calls tools that use LLMs or any other streaming APIs, you might want to surface partial results during the execution of the tool, especially if the tool takes a longer time to run.

  1. To stream arbitrary data from inside a tool you can use stream_mode="custom" and get_stream_writer():

    from langgraph.config import get_stream_writer
    
    def tool(tool_arg: str):
        writer = get_stream_writer()
        for chunk in custom_data_stream():
            # stream any arbitrary data
            writer(chunk)
        ...
    
    for chunk in graph.stream(
        inputs,
        stream_mode="custom"
    ):
        print(chunk)
    
  2. To stream LLM tokens generated by a tool calling an LLM you can use stream_mode="messages":

    from langgraph.graph import StateGraph, MessagesState
    from langchain_openai import ChatOpenAI
    
    model = ChatOpenAI()
    
    def tool(tool_arg: str):
        model.invoke(tool_arg)
        ...
    
    def call_tools(state: MessagesState):
        tool_call = get_tool_call(state)
        tool_result = tool(**tool_call["args"])
        ...
    
    graph = (
        StateGraph(MessagesState)
        .add_node(call_tools)
        ...
        .compile()
    
    for msg, metadata in graph.stream(
        inputs,
        stream_mode="messages"
    ):
        print(msg)
    

Using without LangChain

If you need to stream data from inside tools without using LangChain, you can use stream_mode="custom". Check out the example below to learn more.

Async in Python < 3.11

When using Python < 3.11 with async code, please ensure you manually pass the RunnableConfig through to the chat model when invoking it like so: model.ainvoke(..., config). The stream method collects all events from your nested code using a streaming tracer passed as a callback. In 3.11 and above, this is automatically handled via contextvars; prior to 3.11, asyncio's tasks lacked proper contextvar support, meaning that the callbacks will only propagate if you manually pass the config through. We do this in the call_model function below.

Setup

First, let's install the required packages and set our API keys

%%capture --no-stderr
%pip install -U langgraph langchain-openai

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")
OPENAI_API_KEY:  ········

Set up LangSmith for LangGraph development

Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.

Streaming custom data

We'll use a prebuilt ReAct agent for this guide:

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer


@tool
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()

    # this can be replaced with any actual streaming logic that you might have
    items = ["books", "penciles", "pictures"]
    for chunk in items:
        writer({"custom_tool_data": chunk})

    return ", ".join(items)


llm = ChatOpenAI(model_name="gpt-4o-mini")
tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)

API Reference: tool | ChatOpenAI | create_react_agent

Let's now invoke our agent with an input that requires a tool call:

inputs = {
    "messages": [  
        {"role": "user", "content": "what items are in the office?"}
    ]
}
async for chunk in agent.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk)
{'custom_tool_data': 'books'}
{'custom_tool_data': 'penciles'}
{'custom_tool_data': 'pictures'}

Streaming LLM tokens

from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import RunnableConfig


@tool
async def get_items(
    place: str,
    # Manually accept config (needed for Python <= 3.10)
    config: RunnableConfig,
) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    # Attention: when using async, you should be invoking the LLM using ainvoke!
    # If you fail to do so, streaming will NOT work.
    response = await llm.ainvoke(
        [
            {
                "role": "user",
                "content": (
                    f"Can you tell me what kind of items i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. And include a brief description of each item."
                ),
            }
        ],
        config,
    )
    return response.content


tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)

API Reference: AIMessageChunk | RunnableConfig

inputs = {
    "messages": [  
        {"role": "user", "content": "what items are in the bedroom?"}
    ]
}
async for msg, metadata in agent.astream(
    inputs,
    stream_mode="messages",
):
    if (
        isinstance(msg, AIMessageChunk)
        and msg.content
        # Stream all messages from the tool node
        and metadata["langgraph_node"] == "tools"
    ):
        print(msg.content, end="|", flush=True)
Certainly|!| Here| are| three| items| you| might| find| in| a| bedroom|:

|1|.| **|Bed|**|:| The| central| piece| of| furniture| in| a| bedroom|,| typically| consisting| of| a| mattress| supported| by| a| frame|.| It| is| designed| for| sleeping| and| can| vary| in| size| from| twin| to| king|.| Beds| often| have| bedding|,| including| sheets|,| pillows|,| and| comfort|ers|,| to| enhance| comfort|.

|2|.| **|D|resser|**|:| A| piece| of| furniture| with| drawers| used| for| storing| clothing| and| personal| items|.| Dress|ers| often| have| a| flat| surface| on| top|,| which| can| be| used| for| decorative| items|,| a| mirror|,| or| personal| accessories|.| They| help| keep| the| bedroom| organized| and| clutter|-free|.

|3|.| **|Night|stand|**|:| A| small| table| or| cabinet| placed| beside| the| bed|,| used| for| holding| items| such| as| a| lamp|,| alarm| clock|,| books|,| or| personal| items|.| Night|stands| provide| convenience| for| easy| access| to| essentials| during| the| night|,| adding| functionality| and| style| to| the| bedroom| decor|.|

Example without LangChain

You can also stream data from within tool invocations without using LangChain. Below example demonstrates how to do it for a graph with a single tool-executing node. We'll leave it as an exercise for the reader to implement ReAct agent from scratch without using LangChain.

import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# this is our tool
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# this is the tool-calling graph node
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)  
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)

API Reference: StateGraph | START

Let's now invoke our graph with an AI message that contains a tool call:

inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)
Sure|!| Here| are| three| common| items| you| might| find| in| a| bedroom|:

|1|.| **|Bed|**|:| The| focal| point| of| the| bedroom|,| a| bed| typically| consists| of| a| mattress| resting| on| a| frame|,| and| it| may| include| pillows| and| bedding|.| It| provides| a| comfortable| place| for| sleeping| and| resting|.

|2|.| **|D|resser|**|:| A| piece| of| furniture| with| multiple| drawers|,| a| dresser| is| used| for| storing| clothes|,| accessories|,| and| personal| items|.| It| often| has| a| flat| surface| that| may| be| used| to| display| decorative| items| or| a| mirror|.

|3|.| **|Night|stand|**|:| Also| known| as| a| bedside| table|,| a| night|stand| is| placed| next| to| the| bed| and| typically| holds| items| like| lamps|,| books|,| alarm| clocks|,| and| personal| belongings| for| convenience| during| the| night|.

|These| items| contribute| to| the| functionality| and| comfort| of| the| bedroom| environment|.|

Comments