Messages - Python SDK feature guide
This page shows how to do the following:
Signals
How to develop with Signals using the Python SDK.
A Signal is a message sent asynchronously to a running Workflow Execution which can be used to change the state and control the flow of a Workflow Execution. It can only deliver data to a Workflow Execution that has not already closed.
Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.
There are two steps for adding support for a Signal to your Workflow code:
- Defining the Signal - You specify the name and data structure used by Temporal Clients when sending the Signal.
- Handling the Signal - You write code that will be invoked when the Signal is received from a Temporal Client.
After defining and handling your Signal, you can send it from a Temporal Client or from another Workflow Execution.
Define Signal
How to define a Signal using the Python SDK.
A Signal has a name and can have arguments.
- The name, also called a Signal type, is a string.
- The arguments must be serializable.
To define a Signal, set the Signal decorator
@workflow.signal
on the Signal function inside your Workflow.
Non-dynamic methods can only have positional arguments. Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed.
Return values from Signal methods are ignored.
How to customize names
You can have a name parameter to customize the Signal's name, otherwise it defaults to the name of the Signal method.
View the source code
in the context of the rest of the application code.
from temporalio import workflow
# ...
@workflow.signal
async def submit_greeting(self, name: str) -> None:
await self._pending_greetings.put(name)
@workflow.signal
def exit(self) -> None:
# ...
@workflow.signal(name="Custom Signal Name")
async def custom_signal(self, name: str) -> None:
await self._pending_greetings.put(name)
Handle Signal
How to handle a Signal using the Python SDK.
Workflows listen for Signals by the Signal's name.
Signal handlers are functions defined in the Workflow that listen for incoming Signals of a given type. These handlers define how a Workflow should react when it receives a specific type of Signal.
To send a Signal to the Workflow, use the signal method from the WorkflowHandle class.
from temporalio.client import Client
# ...
# ...
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")
Send a Signal from a Temporal Client
How to send a Signal from a Temporal Client using the Python SDK.
When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.
To send a Signal from the Client, use the signal() function on the Workflow handle.
To get the Workflow handle, you can use any of the following options.
- Use the get_workflow_handle() method.
- Use the get_workflow_handle_for() method to get a type-safe Workflow handle by its Workflow Id.
- Use the start_workflow() to start a Workflow and return its handle.
View the source code
in the context of the rest of the application code.
from temporalio.client import Client
# ...
# ...
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
GreetingWorkflow.run,
id="your-greeting-workflow",
task_queue="signal-tq",
)
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")
Send a Signal from a Workflow
How to send a Signal from a Workflow using the Python SDK.
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Use get_external_workflow_handle_for
to get a typed Workflow handle to an existing Workflow by its identifier.
Use get_external_workflow_handle
when you don't know the type of the other Workflow.
The Workflow Type passed is only for type annotations and not for validation.
# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")
Signal-With-Start
How to send a Signal-With-Start using the Python SDK.
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
To send a Signal-With-Start in Python, use the start_workflow()
method and pass the start_signal
argument with the name of your Signal.
from temporalio.client import Client
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)
Queries
A Query is a synchronous operation that is used to get the state of a Workflow Execution.
How to define a Query
A Query has a name and can have arguments.
- The name, also called a Query type, is a string.
- The arguments must be serializable.
To define a Query, set the Query decorator @workflow.query
on the Query function inside your Workflow.
Customize names
You can have a name parameter to customize the Query's name, otherwise it defaults to the name of the Query method.
You can either set the name
or the dynamic
parameter in a Query's decorator, but not both.
# ...
@workflow.query
def greeting(self) -> str:
return self._greeting
Handle a Query
How to handle a Query
Queries are handled by your Workflow.
Don’t include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.
To send a Query to the Workflow, use the query
method from the WorkflowHandle
class.
# ...
result = await handle.query(GreetingWorkflow.greeting)
Send a Query
How to send a Query
Queries are sent from a Temporal Client.
To send a Query to a Workflow Execution from Client code, use the query()
method on the Workflow handle.
# ...
result = await handle.query(GreetingWorkflow.greeting)
Develop with Updates
An Update is an operation that can mutate the state of a Workflow Execution and return a response.
How to define an Update
Workflow Updates handlers are methods in your Workflow Definition designed to handle updates. These updates can be triggered during the lifecycle of a Workflow Execution.
Define an Update Handler
To define an update handler, use the @workflow.update decorator on a method within your Workflow. This decorator can be applied to both asynchronous and synchronous methods.
- Decorator Usage: Apply
@workflow.update
to the method intended to handle updates. - Overriding: If a method with this decorator is overridden, the overriding method should also be decorated with
@workflow.update
. - Validator Method: Optionally, you can define a validator method for the update handler. This validator is specified using
@update_handler_method_name.validator
and is invoked before the update handler. - Method Parameters: Update handlers should only use positional parameters. For non-dynamic methods, it's recommended to use a single parameter that is an object or data class, which allows for future expansion of fields.
- Return Values: The update handler can return a serializable value. This value is sent back to the caller of the update.
# ...
@workflow.update
async def update_workflow_status(self) -> str:
self.is_complete = True
return "Workflow status updated"
Send an Update from a Temporal Client
How to send an Update from a Temporal Client
To send a Workflow Update from a Temporal Client, call the execute_update method on the WorkflowHandle class.
# ...
update_result = await handle.execute_update(
HelloWorldWorkflow.update_workflow_status
)
print(f"Update Result: {update_result}")
Dynamic Handler
What is a Dynamic Handler?
Temporal supports Dynamic Workflows, Activities, Signals, and Queries. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.
Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time.
Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.
Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.
Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.
Set a Dynamic Signal
How to set a Dynamic Signal
A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered.
A Signal can be made dynamic by adding dynamic=True
to the @signal.defn
decorator.
The Signal Handler should accept self
, a string input, and a Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
await self._pending_greetings.put(name)
Set a Dynamic Query
How to set a Dynamic Query
A Dynamic Query in Temporal is a Query that is invoked dynamically at runtime if no other Query with the same name is registered.
A Query can be made dynamic by adding dynamic=True
to the @query.defn
decorator.
The Query Handler should accept self
, a string name, and a Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.query(dynamic=True)
def dynamic_query(self, input: str, args: Sequence[RawValue]) -> str:
return self._greeting
Set a Dynamic Workflow
How to set a Dynamic Workflow
A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered.
A Workflow can be made dynamic by adding dynamic=True
to the @workflow.defn
decorator.
You must register the Workflow with the Worker before it can be invoked.
The Workflow Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Set a Dynamic Activity
How to set a Dynamic Activity
A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered.
An Activity can be made dynamic by adding dynamic=True
to the @activity.defn
decorator.
You must register the Activity with the Worker before it can be invoked.
The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)