How to stream LLM tokens from your graph¶
In this example, we will stream tokens from the language model powering an agent. We will use a ReAct agent as an example.
Note
If you are using a version of @langchain/core
< 0.2.3, when calling chat models or LLMs you need to call await model.stream()
within your nodes to get token-by-token streaming events, and aggregate final outputs if needed to update the graph state. In later versions of @langchain/core
, this occurs automatically, and you can call await model.invoke()
.
For more on how to upgrade @langchain/core
, check out the instructions here.
This how-to guide closely follows the others in this directory, showing how to incorporate the functionality into a prototypical agent in LangGraph.
Streaming Support
Token streaming is supported by many, but not all chat models. Check to see if your LLM integration supports token streaming here (doc). Note that some integrations may support general token streaming but lack support for streaming tool calls.
Note
In this how-to, we will create our agent from scratch to be transparent (but verbose). You can accomplish similar functionality using the createReactAgent({ llm, tools })
(API doc) constructor. This may be more appropriate if you are used to LangChain's AgentExecutor class.
Setup¶
This guide will use OpenAI's GPT-4o model. We will optionally set our API key for LangSmith tracing, which will give us best-in-class observability.
// process.env.OPENAI_API_KEY = "sk_...";
// Optional, add tracing in LangSmith
// process.env.LANGCHAIN_API_KEY = "ls__...";
// process.env.LANGCHAIN_CALLBACKS_BACKGROUND = "true";
// process.env.LANGCHAIN_TRACING = "true";
// process.env.LANGCHAIN_PROJECT = "Stream Tokens: LangGraphJS";
Define the state¶
The state is the interface for all of the nodes in our graph.
import { Annotation } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
});
import { tool } from "@langchain/core/tools";
import { z } from "zod";
const searchTool = tool((_) => {
// This is a placeholder for the actual implementation
return "Cold, with a low of 3℃";
}, {
name: "search",
description:
"Use to surf the web, fetch current information, check the weather, and retrieve other information.",
schema: z.object({
query: z.string().describe("The query to use in your search."),
}),
});
await searchTool.invoke({ query: "What's the weather like?" });
const tools = [searchTool];
We can now wrap these tools in a prebuilt ToolNode. This object will actually run the tools (functions) whenever they are invoked by our LLM.
import { ToolNode } from "@langchain/langgraph/prebuilt";
const toolNode = new ToolNode(tools);
Set up the model¶
Now load the chat model.
- It should work with messages. We will represent all agent state in the form of messages, so it needs to be able to work well with them.
- It should work with tool calling, meaning it can return function arguments in its response.
Note
These model requirements are not general requirements for using LangGraph - they are just requirements for this one example.
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "gpt-4o-mini",
temperature: 0,
streaming: true
});
After you've done this, we should make sure the model knows that it has these tools available to call. We can do this by calling bindTools.
const boundModel = model.bindTools(tools);
Define the graph¶
We can now put it all together.
import { StateGraph, END } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";
const routeMessage = (state: typeof StateAnnotation.State) => {
const { messages } = state;
const lastMessage = messages[messages.length - 1] as AIMessage;
// If no tools are called, we can finish (respond to the user)
if (!lastMessage?.tool_calls?.length) {
return END;
}
// Otherwise if there is, we continue and call the tools
return "tools";
};
const callModel = async (
state: typeof StateAnnotation.State,
) => {
// For versions of @langchain/core < 0.2.3, you must call `.stream()`
// and aggregate the message from chunks instead of calling `.invoke()`.
const { messages } = state;
const responseMessage = await boundModel.invoke(messages);
return { messages: [responseMessage] };
};
const workflow = new StateGraph(StateAnnotation)
.addNode("agent", callModel)
.addNode("tools", toolNode)
.addEdge("__start__", "agent")
.addConditionalEdges("agent", routeMessage)
.addEdge("tools", "agent");
const agent = workflow.compile();
import * as tslab from "tslab";
const runnableGraph = agent.getGraph();
const image = await runnableGraph.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();
await tslab.display.png(new Uint8Array(arrayBuffer));
Streaming LLM Tokens¶
You can access the LLM tokens as they are produced by each node with two methods:
- The
stream
method along withstreamMode: "messages"
- The
streamEvents
method
The stream method¶
Compatibility
This section requires @langchain/langgraph>=0.2.20
. For help upgrading, see this guide.
For this method, you must be using an LLM that supports streaming as well and enable it when constructing the LLM (e.g. new ChatOpenAI({ model: "gpt-4o-mini", streaming: true })
) or call .stream
on the internal LLM call.
import { isAIMessageChunk } from "@langchain/core/messages";
const stream = await agent.stream(
{ messages: [{ role: "user", content: "What's the current weather in Nepal?" }] },
{ streamMode: "messages" },
);
for await (const [message, _metadata] of stream) {
if (isAIMessageChunk(message) && message.tool_call_chunks?.length) {
console.log(`${message.getType()} MESSAGE TOOL CALL CHUNK: ${message.tool_call_chunks[0].args}`);
} else {
console.log(`${message.getType()} MESSAGE CONTENT: ${message.content}`);
}
}
ai MESSAGE TOOL CALL CHUNK: ai MESSAGE TOOL CALL CHUNK: {" ai MESSAGE TOOL CALL CHUNK: query ai MESSAGE TOOL CALL CHUNK: ":" ai MESSAGE TOOL CALL CHUNK: current ai MESSAGE TOOL CALL CHUNK: weather ai MESSAGE TOOL CALL CHUNK: in ai MESSAGE TOOL CALL CHUNK: Nepal ai MESSAGE TOOL CALL CHUNK: "} ai MESSAGE CONTENT: tool MESSAGE CONTENT: Cold, with a low of 3℃ ai MESSAGE CONTENT: ai MESSAGE CONTENT: The ai MESSAGE CONTENT: current ai MESSAGE CONTENT: weather ai MESSAGE CONTENT: in ai MESSAGE CONTENT: Nepal ai MESSAGE CONTENT: is ai MESSAGE CONTENT: cold ai MESSAGE CONTENT: , ai MESSAGE CONTENT: with ai MESSAGE CONTENT: a ai MESSAGE CONTENT: low ai MESSAGE CONTENT: temperature ai MESSAGE CONTENT: of ai MESSAGE CONTENT: ai MESSAGE CONTENT: 3 ai MESSAGE CONTENT: ℃ ai MESSAGE CONTENT: . ai MESSAGE CONTENT:
The streamEvents method¶
You can also use the streamEvents
method like this:
const eventStream = await agent.streamEvents(
{ messages: [{ role: "user", content: "What's the weather like today?" }] },
{
version: "v2",
}
);
for await (const { event, data } of eventStream) {
if (event === "on_chat_model_stream" && isAIMessageChunk(data.chunk)) {
if (data.chunk.tool_call_chunks !== undefined && data.chunk.tool_call_chunks.length > 0) {
console.log(data.chunk.tool_call_chunks);
}
}
}
[ { name: 'search', args: '', id: 'call_fNhlT6qSYWdJGPSYaVqLtTKO', index: 0, type: 'tool_call_chunk' } ] [ { name: undefined, args: '{"', id: undefined, index: 0, type: 'tool_call_chunk' } ] [ { name: undefined, args: 'query', id: undefined, index: 0, type: 'tool_call_chunk' } ] [ { name: undefined, args: '":"', id: undefined, index: 0, type: 'tool_call_chunk' } ] [ { name: undefined, args: 'current', id: undefined, index: 0, type: 'tool_call_chunk' } ] [ { name: undefined, args: ' weather', id: undefined, index: 0, type: 'tool_call_chunk' } ] [ { name: undefined, args: ' today', id: undefined, index: 0, type: 'tool_call_chunk' } ] [ { name: undefined, args: '"}', id: undefined, index: 0, type: 'tool_call_chunk' } ]