Workflow as code

Create powerful workflows without using a complex framework.

When composing systems, you might need to execute functions in a specific order, create trees, or create DAGs with your functions. Using idiomatic Python, Dispatch allows you to compose complex workflows via a set of simple features relying on asynchronous execution.



Any function decorated with @dispatch.function can be used as the entrypoint of a workflow by calling .dispatch(args:

def entrypoint(args): ...


The batch primitive allows you to compose and trigger a batch of the same or different functions. The batch will behave the same as dispatch.

batch is useful when you need to trigger multiple workflows in parallel. For example, when processing a batch of requests in a data pipeline.

def funcA(): ...

def funcB(someargs): ...

batch = dispatch.batch()
batch.add(funcB, someargs)



gather allows you to fan-out the execution of multiple functions and collect their results. gather is used within a graph.

async def funcA(): ...

async def funcB(someargs): ...

async def main():
  // ...
  results = await gather(



Pipelines are sequential executions of functions where the result of a function is used to execute the next one.

In the following example, the functions get_repo_info and get_contributors form a pipeline managed by Dispatch. In case of failure, they will be automatically retried using the Dispatch adaptive concurrency control algorithm.

import httpx
from fastapi import FastAPI

from dispatch.fastapi import Dispatch

app = FastAPI()
dispatch = Dispatch(app)

async def get_repo_info(repo_owner: str, repo_name: str):
    url = f"{repo_owner}/{repo_name}"
    api_response = httpx.get(url)
    return api_response.json()

async def get_contributors(repo_info: dict):
    contributors_url = repo_info["contributors_url"]
    response = httpx.get(contributors_url)
    return response.json()

async def main():
    repo_info = await get_repo_info("stealthrocket", "dispatch-py")
    print(f"Repository: {repo_info['full_name']}")

    contributors = await get_contributors(repo_info)
    print(f"Contributors: {len(contributors)}")

def root():
    return "OK"

Fan-out and fan-in

Fan-out is the ability to trigger multiple operations from a single point in your workflow. Fan-in is the equivalent on the other end and allow to gather the results into a single point.

In the following example, we use the gather feature to dispatch the execution of three get_repo_info and pass the results of those three executions into reduce_stargazers.

async def reduce_stargazers(repos):
    result = await gather(*[get_stargazers(repo) for repo in repos])
    reduced_stars = set()
    for repo in result:
        for stars in repo:
    return reduced_stars

async def main():
    # Using gather, we fan-out the four following requests.
    repos = await gather(
        get_repo_info("stealthrocket", "coroutine"),
        get_repo_info("stealthrocket", "dispatch-py"),
        get_repo_info("stealthrocket", "wzprof"),

    stars = await reduce_stargazers(repos)

Last updated

©️ Stealth Rocket, Inc. All rights reserved.