Graph Definitions¶
Graph
¶
add_conditional_edges(source: str, path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self
¶
Add a conditional edge from the starting node to any number of destination nodes.
Parameters:
-
source
(
) –str The starting node. This conditional edge will run when exiting this node.
-
path
(
) –Union [Callable ,Runnable ]The callable that determines the next node or nodes. If not specifying
path_map
it should return one or more nodes. If it returns END, the graph will stop execution. -
path_map
(
, default:Optional [dict [Hashable ,str ]]None
) –Optional mapping of paths to node names. If omitted the paths returned by
path
should be node names. -
then
(
, default:Optional [str ]None
) –The name of a node to execute after the nodes selected by
path
.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
Without typehints on the path
function's return value (e.g., -> Literal["foo", "__end__"]:
)
or a path_map, the graph visualization assumes the edge could transition to any node in the graph.
set_entry_point(key: str) -> Self
¶
Specifies the first node to be called in the graph.
Equivalent to calling add_edge(START, key)
.
Parameters:
-
key
(
) –str The key of the node to set as the entry point.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
set_conditional_entry_point(path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self
¶
Sets a conditional entry point in the graph.
Parameters:
-
path
(
) –Union [Callable ,Runnable ]The callable that determines the next node or nodes. If not specifying
path_map
it should return one or more nodes. If it returns END, the graph will stop execution. -
path_map
(
, default:Optional [dict [str ,str ]]None
) –Optional mapping of paths to node names. If omitted the paths returned by
path
should be node names. -
then
(
, default:Optional [str ]None
) –The name of a node to execute after the nodes selected by
path
.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
set_finish_point(key: str) -> Self
¶
Marks a node as a finish point of the graph.
If the graph reaches this node, it will cease execution.
Parameters:
-
key
(
) –str The key of the node to set as the finish point.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
CompiledGraph
¶
Bases:
stream_mode: StreamMode = stream_mode
class-attribute
instance-attribute
¶
Mode to stream output, defaults to 'values'.
stream_eager: bool = stream_eager
class-attribute
instance-attribute
¶
Whether to force emitting stream events eagerly, automatically turned on for stream_mode "messages" and "custom".
stream_channels: Optional[Union[str, Sequence[str]]] = stream_channels
class-attribute
instance-attribute
¶
Channels to stream, defaults to all channels not in reserved channels
step_timeout: Optional[float] = step_timeout
class-attribute
instance-attribute
¶
Maximum time to wait for a step to complete, in seconds. Defaults to None.
debug: bool = debug if debug is not None else get_debug()
instance-attribute
¶
Whether to print debug information during execution. Defaults to False.
checkpointer: Checkpointer = checkpointer
class-attribute
instance-attribute
¶
Checkpointer used to save and load graph state. Defaults to None.
store: Optional[BaseStore] = store
class-attribute
instance-attribute
¶
Memory store to use for SharedValues. Defaults to None.
retry_policy: Optional[RetryPolicy] = retry_policy
class-attribute
instance-attribute
¶
Retry policy to use when running tasks. Set to None to disable.
get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
¶
Get the current state of the graph.
aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
async
¶
Get the current state of the graph.
update_state(config: RunnableConfig, values: Optional[Union[dict[str, Any], Any]], as_node: Optional[str] = None) -> RunnableConfig
¶
Update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
aupdate_state(config: RunnableConfig, values: dict[str, Any] | Any, as_node: Optional[str] = None) -> RunnableConfig
async
¶
Update the state of the graph asynchronously with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
stream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> Iterator[Union[dict[str, Any], Any]]
¶
Stream graph steps for a single input.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –The configuration to use for the run.
-
stream_mode
(
, default:Optional [Union [StreamMode ,list [StreamMode ]]]None
) –The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –The keys to stream, defaults to all non-context channels.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt before, defaults to all nodes in the graph.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt after, defaults to all nodes in the graph.
-
debug
(
, default:Optional [bool ]None
) –Whether to print debug information during execution, defaults to False.
-
subgraphs
(
, default:bool False
) –Whether to stream subgraphs, defaults to False.
Yields:
-
–Union [dict [str ,Any ],Any ]The output of each step in the graph. The output shape depends on the stream_mode.
Examples:
Using different stream modes with a graph:
>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
... alist: Annotated[list, operator.add]
... another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
>>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
... print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
>>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
... print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
>>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
... print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
astream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> AsyncIterator[Union[dict[str, Any], Any]]
async
¶
Stream graph steps for a single input.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –The configuration to use for the run.
-
stream_mode
(
, default:Optional [Union [StreamMode ,list [StreamMode ]]]None
) –The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –The keys to stream, defaults to all non-context channels.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt before, defaults to all nodes in the graph.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt after, defaults to all nodes in the graph.
-
debug
(
, default:Optional [bool ]None
) –Whether to print debug information during execution, defaults to False.
-
subgraphs
(
, default:bool False
) –Whether to stream subgraphs, defaults to False.
Yields:
-
–AsyncIterator [Union [dict [str ,Any ],Any ]]The output of each step in the graph. The output shape depends on the stream_mode.
Examples:
Using different stream modes with a graph:
>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
... alist: Annotated[list, operator.add]
... another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
... print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
... print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
... print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
invoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any]
¶
Run the graph with a single input and config.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input data for the graph. It can be a dictionary or any other type.
-
config
(
, default:Optional [RunnableConfig ]None
) –Optional. The configuration for the graph run.
-
stream_mode
(
, default:StreamMode 'values'
) –Optional[str]. The stream mode for the graph run. Default is "values".
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –Optional. The output keys to retrieve from the graph run.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt the graph run before.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt the graph run after.
-
debug
(
, default:Optional [bool ]None
) –Optional. Enable debug mode for the graph run.
-
**kwargs
(
, default:Any {}
) –Additional keyword arguments to pass to the graph run.
Returns:
-
–Union [dict [str ,Any ],Any ]The output of the graph run. If stream_mode is "values", it returns the latest output.
-
–Union [dict [str ,Any ],Any ]If stream_mode is not "values", it returns a list of output chunks.
ainvoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any]
async
¶
Asynchronously invoke the graph on a single input.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input data for the computation. It can be a dictionary or any other type.
-
config
(
, default:Optional [RunnableConfig ]None
) –Optional. The configuration for the computation.
-
stream_mode
(
, default:StreamMode 'values'
) –Optional. The stream mode for the computation. Default is "values".
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –Optional. The output keys to include in the result. Default is None.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt before. Default is None.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt after. Default is None.
-
debug
(
, default:Optional [bool ]None
) –Optional. Whether to enable debug mode. Default is None.
-
**kwargs
(
, default:Any {}
) –Additional keyword arguments.
Returns:
-
–Union [dict [str ,Any ],Any ]The result of the computation. If stream_mode is "values", it returns the latest value.
-
–Union [dict [str ,Any ],Any ]If stream_mode is "chunks", it returns a list of chunks.
get_graph(config: Optional[RunnableConfig] = None, *, xray: Union[int, bool] = False) -> DrawableGraph
¶
Returns a drawable representation of the computation graph.
StateGraph
¶
Bases:
A graph whose nodes communicate by reading and writing to a shared state.
The signature of each node is State -> Partial
Each state key can optionally be annotated with a reducer function that will be used to aggregate the values of that key received from multiple nodes. The signature of a reducer function is (Value, Value) -> Value.
Parameters:
-
state_schema
(
, default:Type [Any ]None
) –The schema class that defines the state.
-
config_schema
(
, default:Optional [Type [Any ]]None
) –The schema class that defines the configuration. Use this to expose configurable parameters in your API.
Examples:
>>> from langchain_core.runnables import RunnableConfig
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.checkpoint.memory import MemorySaver
>>> from langgraph.graph import StateGraph
>>>
>>> def reducer(a: list, b: int | None) -> list:
... if b is not None:
... return a + [b]
... return a
>>>
>>> class State(TypedDict):
... x: Annotated[list, reducer]
>>>
>>> class ConfigSchema(TypedDict):
... r: float
>>>
>>> graph = StateGraph(State, config_schema=ConfigSchema)
>>>
>>> def node(state: State, config: RunnableConfig) -> dict:
... r = config["configurable"].get("r", 1.0)
... x = state["x"][-1]
... next_value = x * r * (1 - x)
... return {"x": next_value}
>>>
>>> graph.add_node("A", node)
>>> graph.set_entry_point("A")
>>> graph.set_finish_point("A")
>>> compiled = graph.compile()
>>>
>>> print(compiled.config_specs)
[ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]
>>>
>>> step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
>>> print(step1)
{'x': [0.5, 0.75]}
add_conditional_edges(source: str, path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self
¶
Add a conditional edge from the starting node to any number of destination nodes.
Parameters:
-
source
(
) –str The starting node. This conditional edge will run when exiting this node.
-
path
(
) –Union [Callable ,Runnable ]The callable that determines the next node or nodes. If not specifying
path_map
it should return one or more nodes. If it returns END, the graph will stop execution. -
path_map
(
, default:Optional [dict [Hashable ,str ]]None
) –Optional mapping of paths to node names. If omitted the paths returned by
path
should be node names. -
then
(
, default:Optional [str ]None
) –The name of a node to execute after the nodes selected by
path
.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
Without typehints on the path
function's return value (e.g., -> Literal["foo", "__end__"]:
)
or a path_map, the graph visualization assumes the edge could transition to any node in the graph.
set_entry_point(key: str) -> Self
¶
Specifies the first node to be called in the graph.
Equivalent to calling add_edge(START, key)
.
Parameters:
-
key
(
) –str The key of the node to set as the entry point.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
set_conditional_entry_point(path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self
¶
Sets a conditional entry point in the graph.
Parameters:
-
path
(
) –Union [Callable ,Runnable ]The callable that determines the next node or nodes. If not specifying
path_map
it should return one or more nodes. If it returns END, the graph will stop execution. -
path_map
(
, default:Optional [dict [str ,str ]]None
) –Optional mapping of paths to node names. If omitted the paths returned by
path
should be node names. -
then
(
, default:Optional [str ]None
) –The name of a node to execute after the nodes selected by
path
.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
set_finish_point(key: str) -> Self
¶
Marks a node as a finish point of the graph.
If the graph reaches this node, it will cease execution.
Parameters:
-
key
(
) –str The key of the node to set as the finish point.
Returns:
-
Self
(
) –Self The instance of the graph, allowing for method chaining.
add_node(node: Union[str, RunnableLike], action: Optional[RunnableLike] = None, *, metadata: Optional[dict[str, Any]] = None, input: Optional[Type[Any]] = None, retry: Optional[RetryPolicy] = None) -> Self
¶
Adds a new node to the state graph.
Will take the name of the function/runnable as the node name.
Parameters:
-
node
(Union[str, RunnableLike)]
) –The function or runnable this node will run.
-
action
(
, default:Optional [RunnableLike ]None
) –The action associated with the node. (default: None)
-
metadata
(
, default:Optional [dict [str ,Any ]]None
) –The metadata associated with the node. (default: None)
-
input
(
, default:Optional [Type [Any ]]None
) –The input schema for the node. (default: the graph's input schema)
-
retry
(
, default:Optional [RetryPolicy ]None
) –The policy for retrying the node. (default: None)
Raises:
-
–ValueError If the key is already being used as a state key.
Examples:
>>> from langgraph.graph import START, StateGraph
...
>>> def my_node(state, config):
... return {"x": state["x"] + 1}
...
>>> builder = StateGraph(dict)
>>> builder.add_node(my_node) # node name will be 'my_node'
>>> builder.add_edge(START, "my_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}
>>> builder = StateGraph(dict)
>>> builder.add_node("my_fair_node", my_node)
>>> builder.add_edge(START, "my_fair_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}
Returns:
-
Self
(
) –Self The instance of the state graph, allowing for method chaining.
add_edge(start_key: Union[str, list[str]], end_key: str) -> Self
¶
Adds a directed edge from the start node (or list of start nodes) to the end node.
When a single start node is provided, the graph will wait for that node to complete before executing the end node. When multiple start nodes are provided, the graph will wait for ALL of the start nodes to complete before executing the end node.
Parameters:
-
start_key
(
) –Union [str ,list [str ]]The key(s) of the start node(s) of the edge.
-
end_key
(
) –str The key of the end node of the edge.
Raises:
-
–ValueError If the start key is 'END' or if the start key or end key is not present in the graph.
Returns:
-
Self
(
) –Self The instance of the state graph, allowing for method chaining.
add_sequence(nodes: Sequence[Union[RunnableLike, tuple[str, RunnableLike]]]) -> Self
¶
Add a sequence of nodes that will be executed in the provided order.
Parameters:
-
nodes
(
) –Sequence [Union [RunnableLike ,tuple [str ,RunnableLike ]]]A sequence of RunnableLike objects (e.g. a LangChain Runnable or a callable) or (name, RunnableLike) tuples. If no names are provided, the name will be inferred from the node object (e.g. a runnable or a callable name). Each node will be executed in the order provided.
Raises:
-
–ValueError if the sequence is empty.
-
–ValueError if the sequence contains duplicate node names.
Returns:
-
Self
(
) –Self The instance of the state graph, allowing for method chaining.
compile(checkpointer: Checkpointer = None, *, store: Optional[BaseStore] = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, debug: bool = False) -> CompiledStateGraph
¶
Compiles the state graph into a CompiledGraph
object.
The compiled graph implements the Runnable
interface and can be invoked,
streamed, batched, and run asynchronously.
Parameters:
-
checkpointer
(
, default:Optional [Union [Checkpointer ,Literal [False]]]None
) –A checkpoint saver object or flag. If provided, this Checkpointer serves as a fully versioned "short-term memory" for the graph, allowing it to be paused, resumed, and replayed from any point. If None, it may inherit the parent graph's checkpointer when used as a subgraph. If False, it will not use or inherit any checkpointer.
-
interrupt_before
(
, default:Optional [Sequence [str ]]None
) –An optional list of node names to interrupt before.
-
interrupt_after
(
, default:Optional [Sequence [str ]]None
) –An optional list of node names to interrupt after.
-
debug
(
, default:bool False
) –A flag indicating whether to enable debug mode.
Returns:
-
CompiledStateGraph
(
) –CompiledStateGraph The compiled state graph.
CompiledStateGraph
¶
Bases:
stream_mode: StreamMode = stream_mode
class-attribute
instance-attribute
¶
Mode to stream output, defaults to 'values'.
stream_eager: bool = stream_eager
class-attribute
instance-attribute
¶
Whether to force emitting stream events eagerly, automatically turned on for stream_mode "messages" and "custom".
stream_channels: Optional[Union[str, Sequence[str]]] = stream_channels
class-attribute
instance-attribute
¶
Channels to stream, defaults to all channels not in reserved channels
step_timeout: Optional[float] = step_timeout
class-attribute
instance-attribute
¶
Maximum time to wait for a step to complete, in seconds. Defaults to None.
debug: bool = debug if debug is not None else get_debug()
instance-attribute
¶
Whether to print debug information during execution. Defaults to False.
checkpointer: Checkpointer = checkpointer
class-attribute
instance-attribute
¶
Checkpointer used to save and load graph state. Defaults to None.
store: Optional[BaseStore] = store
class-attribute
instance-attribute
¶
Memory store to use for SharedValues. Defaults to None.
retry_policy: Optional[RetryPolicy] = retry_policy
class-attribute
instance-attribute
¶
Retry policy to use when running tasks. Set to None to disable.
get_graph(config: Optional[RunnableConfig] = None, *, xray: Union[int, bool] = False) -> DrawableGraph
¶
Returns a drawable representation of the computation graph.
get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
¶
Get the current state of the graph.
aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
async
¶
Get the current state of the graph.
update_state(config: RunnableConfig, values: Optional[Union[dict[str, Any], Any]], as_node: Optional[str] = None) -> RunnableConfig
¶
Update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
aupdate_state(config: RunnableConfig, values: dict[str, Any] | Any, as_node: Optional[str] = None) -> RunnableConfig
async
¶
Update the state of the graph asynchronously with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
stream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> Iterator[Union[dict[str, Any], Any]]
¶
Stream graph steps for a single input.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –The configuration to use for the run.
-
stream_mode
(
, default:Optional [Union [StreamMode ,list [StreamMode ]]]None
) –The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –The keys to stream, defaults to all non-context channels.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt before, defaults to all nodes in the graph.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt after, defaults to all nodes in the graph.
-
debug
(
, default:Optional [bool ]None
) –Whether to print debug information during execution, defaults to False.
-
subgraphs
(
, default:bool False
) –Whether to stream subgraphs, defaults to False.
Yields:
-
–Union [dict [str ,Any ],Any ]The output of each step in the graph. The output shape depends on the stream_mode.
Examples:
Using different stream modes with a graph:
>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
... alist: Annotated[list, operator.add]
... another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
>>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
... print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
>>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
... print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
>>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
... print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
astream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> AsyncIterator[Union[dict[str, Any], Any]]
async
¶
Stream graph steps for a single input.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –The configuration to use for the run.
-
stream_mode
(
, default:Optional [Union [StreamMode ,list [StreamMode ]]]None
) –The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –The keys to stream, defaults to all non-context channels.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt before, defaults to all nodes in the graph.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt after, defaults to all nodes in the graph.
-
debug
(
, default:Optional [bool ]None
) –Whether to print debug information during execution, defaults to False.
-
subgraphs
(
, default:bool False
) –Whether to stream subgraphs, defaults to False.
Yields:
-
–AsyncIterator [Union [dict [str ,Any ],Any ]]The output of each step in the graph. The output shape depends on the stream_mode.
Examples:
Using different stream modes with a graph:
>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
... alist: Annotated[list, operator.add]
... another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
... print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
... print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
... print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
invoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any]
¶
Run the graph with a single input and config.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input data for the graph. It can be a dictionary or any other type.
-
config
(
, default:Optional [RunnableConfig ]None
) –Optional. The configuration for the graph run.
-
stream_mode
(
, default:StreamMode 'values'
) –Optional[str]. The stream mode for the graph run. Default is "values".
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –Optional. The output keys to retrieve from the graph run.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt the graph run before.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt the graph run after.
-
debug
(
, default:Optional [bool ]None
) –Optional. Enable debug mode for the graph run.
-
**kwargs
(
, default:Any {}
) –Additional keyword arguments to pass to the graph run.
Returns:
-
–Union [dict [str ,Any ],Any ]The output of the graph run. If stream_mode is "values", it returns the latest output.
-
–Union [dict [str ,Any ],Any ]If stream_mode is not "values", it returns a list of output chunks.
ainvoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any]
async
¶
Asynchronously invoke the graph on a single input.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]The input data for the computation. It can be a dictionary or any other type.
-
config
(
, default:Optional [RunnableConfig ]None
) –Optional. The configuration for the computation.
-
stream_mode
(
, default:StreamMode 'values'
) –Optional. The stream mode for the computation. Default is "values".
-
output_keys
(
, default:Optional [Union [str ,Sequence [str ]]]None
) –Optional. The output keys to include in the result. Default is None.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt before. Default is None.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Optional. The nodes to interrupt after. Default is None.
-
debug
(
, default:Optional [bool ]None
) –Optional. Whether to enable debug mode. Default is None.
-
**kwargs
(
, default:Any {}
) –Additional keyword arguments.
Returns:
-
–Union [dict [str ,Any ],Any ]The result of the computation. If stream_mode is "values", it returns the latest value.
-
–Union [dict [str ,Any ],Any ]If stream_mode is "chunks", it returns a list of chunks.
add_messages(left: Messages, right: Messages, *, format: Optional[Literal['langchain-openai']] = None) -> Messages
¶
Merges two lists of messages, updating existing messages by ID.
By default, this ensures the state is "append-only", unless the new message has the same ID as an existing message.
Parameters:
-
left
(
) –Messages The base list of messages.
-
right
(
) –Messages The list of messages (or single message) to merge into the base list.
-
format
(
, default:Optional [Literal ['langchain-openai']]None
) –The format to return messages in. If None then messages will be returned as is. If 'langchain-openai' then messages will be returned as BaseMessage objects with their contents formatted to match OpenAI message format, meaning contents can be string, 'text' blocks, or 'image_url' blocks and tool responses are returned as their own ToolMessages.
REQUIREMENT: Must have
langchain-core>=0.3.11
installed to use this feature.
Returns:
-
–Messages A new list of messages with the messages from
right
merged intoleft
. -
–Messages If a message in
right
has the same ID as a message inleft
, the -
–Messages message from
right
will replace the message fromleft
.
Examples:
>>> from langchain_core.messages import AIMessage, HumanMessage
>>> msgs1 = [HumanMessage(content="Hello", id="1")]
>>> msgs2 = [AIMessage(content="Hi there!", id="2")]
>>> add_messages(msgs1, msgs2)
[HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]
>>> msgs1 = [HumanMessage(content="Hello", id="1")]
>>> msgs2 = [HumanMessage(content="Hello again", id="1")]
>>> add_messages(msgs1, msgs2)
[HumanMessage(content='Hello again', id='1')]
>>> from typing import Annotated
>>> from typing_extensions import TypedDict
>>> from langgraph.graph import StateGraph
>>>
>>> class State(TypedDict):
... messages: Annotated[list, add_messages]
...
>>> builder = StateGraph(State)
>>> builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> graph = builder.compile()
>>> graph.invoke({})
{'messages': [AIMessage(content='Hello', id=...)]}
>>> from typing import Annotated
>>> from typing_extensions import TypedDict
>>> from langgraph.graph import StateGraph, add_messages
>>>
>>> class State(TypedDict):
... messages: Annotated[list, add_messages(format='langchain-openai')]
...
>>> def chatbot_node(state: State) -> list:
... return {"messages": [
... {
... "role": "user",
... "content": [
... {
... "type": "text",
... "text": "Here's an image:",
... "cache_control": {"type": "ephemeral"},
... },
... {
... "type": "image",
... "source": {
... "type": "base64",
... "media_type": "image/jpeg",
... "data": "1234",
... },
... },
... ]
... },
... ]}
>>> builder = StateGraph(State)
>>> builder.add_node("chatbot", chatbot_node)
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> graph = builder.compile()
>>> graph.invoke({"messages": []})
{
'messages': [
HumanMessage(
content=[
{"type": "text", "text": "Here's an image:"},
{
"type": "image_url",
"image_url": {"url": "data:image/jpeg;base64,1234"},
},
],
),
]
}
..versionchanged:: 0.2.61
Support for 'format="langchain-openai"' flag added.