Workflow as code

Create powerful workflows without using a complex framework.

When composing systems, you might need to execute functions in a specific order, create trees, or create DAGs with your functions. Using idiomatic Python, Dispatch allows you to compose complex workflows via a set of simple features relying on asynchronous execution.

Features

dispatch

Any function decorated with @dispatch.function can be used as the entrypoint of a workflow by calling .dispatch(args:

@dispatch.function
def entrypoint(args): ...
    
entrypoint.dispatch(args)

batch

The batch primitive allows you to compose and trigger a batch of the same or different functions. The batch will behave the same as dispatch.

batch is useful when you need to trigger multiple workflows in parallel. For example, when processing a batch of requests in a data pipeline.

@dispatch.function
def funcA(): ...

@dispatch.function
def funcB(someargs): ...

batch = dispatch.batch()
batch.add(funcA)
batch.add(funcB, someargs)

batch.dispatch()

gather

gather allows you to fan-out the execution of multiple functions and collect their results. gather is used within a graph.

@dispatch.function
async def funcA(): ...
 

@dispatch.function
async def funcB(someargs): ...

@dispatch.function
async def main():
  // ...
  results = await gather(
    funcA(),
    funcB(someargs),
  ) 

Patterns

Pipelines

Pipelines are sequential executions of functions where the result of a function is used to execute the next one.

In the following example, the functions get_repo_info and get_contributors form a pipeline managed by Dispatch. In case of failure, they will be automatically retried using the Dispatch adaptive concurrency control algorithm.

import httpx
from fastapi import FastAPI

from dispatch.fastapi import Dispatch

app = FastAPI()
dispatch = Dispatch(app)


@dispatch.function
async def get_repo_info(repo_owner: str, repo_name: str):
    url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
    api_response = httpx.get(url)
    api_response.raise_for_status()
    return api_response.json()


@dispatch.function
async def get_contributors(repo_info: dict):
    contributors_url = repo_info["contributors_url"]
    response = httpx.get(contributors_url)
    response.raise_for_status()
    return response.json()


@dispatch.function
async def main():
    repo_info = await get_repo_info("stealthrocket", "dispatch-py")
    print(f"Repository: {repo_info['full_name']}")

    contributors = await get_contributors(repo_info)
    print(f"Contributors: {len(contributors)}")
    return


@app.get("/")
def root():
    main.dispatch()
    return "OK"

Fan-out and fan-in

Fan-out is the ability to trigger multiple operations from a single point in your workflow. Fan-in is the equivalent on the other end and allow to gather the results into a single point.

In the following example, we use the gather feature to dispatch the execution of three get_repo_info and pass the results of those three executions into reduce_stargazers.

@dispatch.function
async def reduce_stargazers(repos):
    result = await gather(*[get_stargazers(repo) for repo in repos])
    reduced_stars = set()
    for repo in result:
        for stars in repo:
            reduced_stars.add(stars["login"])
    return reduced_stars


@dispatch.function
async def main():
    # Using gather, we fan-out the four following requests.
    repos = await gather(
        get_repo_info("stealthrocket", "coroutine"),
        get_repo_info("stealthrocket", "dispatch-py"),
        get_repo_info("stealthrocket", "wzprof"),
    )

    stars = await reduce_stargazers(repos)

Last updated

©️ Stealth Rocket, Inc. All rights reserved.