Skip to content
Dispatch Dispatch Dispatch
Python - Data ingest with fan-out

Data ingest with fan-out

Data ingest is a critical component of streaming pipelines and other systems that receive real-time events from web pages, mobile applications, or other backend services.

Creating a reliable data ingest API gets complex quickly when each event needs to be fanned-out to multiple backend services. A common pattern consist in adding a log like Kafka, then a new consumer for each destination. This causes an explosion of complexity as each consumer needs to be operated and monitored independently. It also makes it challenging to deploy those type of systems on serverless platforms.

In this example, we look at how we can create a reliable and high-performance data ingest API in Python with Dispatch and FastAPI, where each event received can be fanned-out to a list of URLs.

Setup

If you haven’t already, make sure you have a Dispatch account and CLI installed.

Install FastAPI to set up an HTTP server. We will also install Pydantic to represent data models in our Python code.

pip install "dispatch-py[fastapi] pydantic"

Ingesting single events

The simplest model consist in receiving a single event at a time. FastAPI makes it simple to setup the frontend POST API, while Dispatch handles all the background processing of each event.

Here is an example applications that would achieve this:

from fastapi import FastAPI
from dispatch import gather
from dispatch.fastapi import Dispatch
from pydantic import BaseModel
import requests
app = FastAPI()
dispatch = Dispatch(app)
# An example data model for the events that we ingest in the example API,
# it should be adapted for real-world applications.
class Event(BaseModel):
user_id: str
...
@app.post("/v1/event")
def post_event(event: Event):
# Start a background operation to process the event, handled by Dispatch,
# calling the `handle_event` function.
#
# This operation returns immediately after the call was scheduled, it does
# not wait for its completion.
handle_event.dispatch(event)
return "OK"
@dispatch.function
async def handle_event(event: Event):
destination_urls = load_event_destinations(event)
# Spawn a sub-call to publish_event for each URL that the event needs to
# be published to.
publish_calls = [publish_event(url, event) for url in destination_urls]
# Wait for all the concurrent operations to complete, which includes
# retrying calls that failed due to transient errors.
await gather(*publish_calls)
@dispatch.function
async def publish_event(url: str, event: Event):
# Function to post each event to a remote HTTPS destination: this could also
# include loading credentials to authenticate with the remove API, injecting
# metadata, etc...
#
# Each call to publish_event get handled independently, with the Dispatch
# scheduler orchestrating the execution to maximize resource utilization
# while applying back-pressure on transient errors.
#
# For a detailed explanation of the retry strategy:
# https://docs.dispatch.run/architecture/adaptive-concurrency-control/
response = requests.post(url, data=event)
response.raise_for_status()
def load_event_destinations(event: Event) -> str:
# Hypothetical function used to lookup the list of URLs that the event needs
# to be published to. In a real-world application, this would be specific to
# the use-case, likely reading records from a database, probably with a
# caching layer.
#
destination_urls = []
# ⚠️ you must implement this function to test the code example!
#
# destination_urls = [
# "https://destination-1/,
# "https://destination-2/,
# "https://destination-3/,
# ]
return destination_urls

An important aspect to take into account here is that each concurrent operation is handled by the Dispatch scheduler, regardless of the transient errors that they observe, they get retried independently.

Note that in this example we left details like API authentication out of the code snippet since they aren’t specific to using Dispatch.

This topic is more specific to FastAPI and well-documented at https://fastapi.tiangolo.com/tutorial/security/

Ingesting batches of events

High-performance systems often resort to trading latency for better throughput by grouping operations into batches. This pattern is well-supported by Dispatch as well by creating a batch object, adding function calls to it, and submitting it to the scheduler in a single, atomic step.

Building on the previous code snippet, the application could add a new POST endpoint on /v1/batch that expects payloads encoded as JSON arrays, for example:

from typing import List
@app.post("/v1/batch")
def post_batch(events: List[Event]):
batch = dispatch.batch()
for event in events:
batch.add(handle_event, event)
batch.dispatch()
return "OK"

Note that while the submission to Dispatch is atomic in this case, each function call is still run concurrently, submitting operations as a batch does not result in dependencies nor a blocking behavior between calls.

Running the program

After saving the code to a file called main.py, run it with the Dispatch CLI:

dispatch run -- uvicorn main:app

Then to ingest data into the system, submit a POST request to /v1/event, for example with a curl command:

curl -X POST -d '{"user_id": "42"}' https://localhost:8000/v1/event

Or to use the batch endpoint:

curl -X POST -d '[{"user_id": "42"}]' https://localhost:8000/v1/batch

This short program shows how easy it is to create reliable applications with just a few lines of code with Dispatch. We haven’t had to add extra infrastructure like queues, databases, or worker pools. Integration with Dispatch is entirely driven from the application code.