Functional API
task(__func_or_none__: Optional[Union[Callable[P, T], Callable[P, Awaitable[T]]]] = None, *, retry: Optional[RetryPolicy] = None) -> Union[Callable[[Callable[P, Awaitable[T]]], Callable[P, asyncio.Future[T]]], Callable[[Callable[P, T]], Callable[P, concurrent.futures.Future[T]]], Callable[P, asyncio.Future[T]], Callable[P, concurrent.futures.Future[T]]]
¶
Define a LangGraph task using the task
decorator.
Experimental
This is an experimental API that is subject to change. Do not use for production code.
Requires python 3.11 or higher for async functions
The task
decorator supports both sync and async functions. To use async
functions, ensure that you are using Python 3.11 or higher.
Tasks can only be called from within an entrypoint or from within a StateGraph. A task can be called like a regular function with the following differences:
- When a checkpointer is enabled, the function inputs and outputs must be serializable.
- The decorated function can only be called from within an entrypoint or StateGraph.
- Calling the function produces a future. This makes it easy to parallelize tasks.
Parameters:
-
retry
(
, default:Optional [RetryPolicy ]None
) –An optional retry policy to use for the task in case of a failure.
Returns:
-
–Union [Callable [[Callable [P ,Awaitable [T ]]],Callable [P ,Future [T ]]],Callable [[Callable [P ,T ]],Callable [P ,Future [T ]]],Callable [P ,Future [T ]],Callable [P ,Future [T ]]]A callable function when used as a decorator.
Sync Task
from langgraph.func import entrypoint, task
@task
def add_one(a: int) -> int:
return a + 1
@entrypoint()
def add_one(numbers: list[int]) -> list[int]:
futures = [add_one(n) for n in numbers]
results = [f.result() for f in futures]
return results
# Call the entrypoint
add_one.invoke([1, 2, 3]) # Returns [2, 3, 4]
Async Task
import asyncio
from langgraph.func import entrypoint, task
@task
async def add_one(a: int) -> int:
return a + 1
@entrypoint()
async def add_one(numbers: list[int]) -> list[int]:
futures = [add_one(n) for n in numbers]
return asyncio.gather(*futures)
# Call the entrypoint
await add_one.ainvoke([1, 2, 3]) # Returns [2, 3, 4]
entrypoint(*, checkpointer: Optional[BaseCheckpointSaver] = None, store: Optional[BaseStore] = None, config_schema: Optional[type[Any]] = None) -> Callable[[types.FunctionType], Pregel]
¶
Define a LangGraph workflow using the entrypoint
decorator.
Experimental
This is an experimental API that is subject to change. Do not use for production code.
The decorated function must accept a single parameter, which serves as the input to the function. This input parameter can be of any type. Use a dictionary to pass multiple parameters to the function.
The decorated function also has access to these optional parameters:
writer
: AStreamWriter
instance for writing data to a stream.config
: A configuration object for accessing workflow settings.previous
: The previous return value for the given thread (available only when a checkpointer is provided).
The entrypoint decorator can be applied to sync functions, async functions, generator functions, and async generator functions.
For generator functions, the previous
parameter will represent a list of
the values previously yielded by the generator. During a run any values yielded
by the generator, will be written to the custom
stream.
Parameters:
-
checkpointer
(
, default:Optional [BaseCheckpointSaver ]None
) –Specify a checkpointer to create a workflow that can persist its state across runs.
-
store
(
, default:Optional [BaseStore ]None
) –A generalized key-value store. Some implementations may support semantic search capabilities through an optional
index
configuration. -
config_schema
(
, default:Optional [type [Any ]]None
) –Specifies the schema for the configuration object that will be passed to the workflow.
Returns:
-
–Callable [[FunctionType ],Pregel ]A decorator that converts a function into a Pregel graph.
Using entrypoint and tasks
import time
from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
@task
def compose_essay(topic: str) -> str:
time.sleep(1.0) # Simulate slow operation
return f"An essay about {topic}"
@entrypoint(checkpointer=MemorySaver())
def review_workflow(topic: str) -> dict:
"""Manages the workflow for generating and reviewing an essay.
The workflow includes:
1. Generating an essay about the given topic.
2. Interrupting the workflow for human review of the generated essay.
Upon resuming the workflow, compose_essay task will not be re-executed
as its result is cached by the checkpointer.
Args:
topic (str): The subject of the essay.
Returns:
dict: A dictionary containing the generated essay and the human review.
"""
essay_future = compose_essay(topic)
essay = essay_future.result()
human_review = interrupt({
"question": "Please provide a review",
"essay": essay
})
return {
"essay": essay,
"review": human_review,
}
# Example configuration for the workflow
config = {
"configurable": {
"thread_id": "some_thread"
}
}
# Topic for the essay
topic = "cats"
# Stream the workflow to generate the essay and await human review
for result in review_workflow.stream(topic, config):
print(result)
# Example human review provided after the interrupt
human_review = "This essay is great."
# Resume the workflow with the provided human review
for result in review_workflow.stream(Command(resume=human_review), config):
print(result)
Accessing the previous return value
When a checkpointer is enabled the function can access the previous return value of the previous invocation on the same thread id.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
@entrypoint(checkpointer=MemorySaver())
def my_workflow(input_data: str, previous: Optional[str] = None) -> str:
return "world"
# highlight-next-line
config = {
"configurable": {
"thread_id":
}
}
my_workflow.invoke("hello")