Skip to content

Workflows

With Dispatch and resumable functions, it’s easy to build reliable workflows that perform a lot of work simultaneously or step-by-step.

Fan out

Consider an example scenario, where an application needs to send email notifications to all users. Use batch to send emails to all users in parallel without waiting for completion.

import dispatch
@dispatch.function
async def send_email_notification(user):
# Send email to a user
pass
@dispatch.function
async def notify_all_users():
users = await fetch_users_from_database()
email_delivery = dispatch.batch()
for user in users:
email_delivery.add(send_email_notification(user))
await email_delivery.dispatch()
def main():
notify_all_users.dispatch()
dispatch.run(main)

Normally, when send_email_notification raises an exception even for one user, it would halt the execution of the notify_all_users function. Moreover, retrying notify_all_users would resend the same email to all users who already got that email. Sounds like a nightmare.

However, with Dispatch, only that one failed send_email_notification call would retry, ensuring a consistent delivery of all emails.

Fan in

Unlike batch, gather waits for the completion of all functions and returns their results. The order of results will be in the same order as the functions that returned them.

Take a look at this theoretical application that generates invoices. For each client in the database, it generates a PDF invoice and then uploads it to S3.

import dispatch
@dispatch.function
async def generate_invoice(client):
# Generate a PDF and return its local path
path = create_invoice(client)
return path
@dispatch.function
async def upload_to_s3(path):
# Upload file at `path` to S3
pass
@dispatch.function
async def generate_invoices():
clients = await fetch_clients_from_database()
# When all invoices are generated, `gather` returns a list of file paths
invoice_paths = await dispatch.gather(
*[generate_invoice(client) for client in clients]
)
# Upload each invoice to S3
await gather(*[upload_to_s3(invoice_path) for invoice_path in invoice_paths])
def main():
generate_invoices.dispatch()
dispatch.run(main)

gather makes building fault-tolerant multi-step workflows a breeze.