Skip to content
Dispatch Dispatch Dispatch
Go - Data ingest with fan-out

Data ingest with fan-out

Data ingest is a critical component of streaming pipelines and other systems that receive real-time events from web pages, mobile applications, or other backend services.

Creating a reliable data ingest API gets complex quickly when each event needs to be fanned-out to multiple backend services. A common pattern consist in adding a log like Kafka, then a new consumer for each destination. This causes an explosion of complexity as each consumer needs to be operated and monitored independently. It also makes it challenging to deploy those type of systems on serverless platforms.

In this example, we look at how we can create a reliable and high-performance data ingest API in Go with Dispatch, where each event received can be fanned-out to a list of URLs.

Setup

If you haven’t already, make sure you have a Dispatch account and CLI installed.

Ingesting single events

The simplest model consist in receiving a single event at a time. Go’s built-in HTTP module makes it simple to setup the frontend POST API, while Dispatch handles all the background processing of each event.

Here is an example applications that would achieve this:

package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"log"
"net/http"
"github.com/dispatchrun/dispatch-go"
"github.com/dispatchrun/dispatch-go/dispatchproto"
)
// An example data model for the events that we ingest in the example API,
// it should be adapted for real-world applications.
type Event struct {
UserId string `json:"user_id"`
}
func (event *Event) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
UserId string `json:"user_id"`
}{
UserId: event.UserId,
})
}
func (event *Event) UnmarshalJSON(data []byte) error {
event_data := &struct {
UserId string `json:"user_id"`
}{}
err := json.Unmarshal(data, &event_data)
if err != nil {
return err
}
event.UserId = event_data.UserId
return nil
}
type Message struct {
Url string
Event Event
}
func (message *Message) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Url string `json:"url"`
Event Event `json:"event"`
}{
Url: message.Url,
Event: message.Event,
})
}
func (message *Message) UnmarshalJSON(data []byte) error {
message_data := &struct {
Url string `json:"url"`
Event Event `json:"event"`
}{}
err := json.Unmarshal(data, &message_data)
if err != nil {
return err
}
message.Url = message_data.Url
message.Event = message_data.Event
return nil
}
func main() {
publishEvent := dispatch.Func("publishEvent", func(ctx context.Context, message *Message) (any, error) {
body, err := json.Marshal(message.Event)
if err != nil {
return nil, err
}
response, err := http.Post(message.Url, "application/json", bytes.NewReader([]byte(body)))
if err != nil {
return nil, err
}
if response.StatusCode != 200 {
return nil, errors.New("endpoint has failed")
}
return nil, nil
})
handleEvent := dispatch.Func("handleEvent", func(ctx context.Context, event *Event) (any, error) {
// Hypothetical list of URLs that the event needs to be published to
// In a real-world application, this would be specific to
// the use-case, likely reading records from a database
destinationUrls := []string{"https://webhook.site/7fe0b4a4-08e3-4350-bc3a-ef6e8ba9303d"}
// You must add a list of URLs to `destinationUrls` to test the code example
// For example, use webhook.site service to generate multiple webhook URLs
// destinationUrls := []string{"https://destination-1", "https://destination-2"}
// Spawn a sub-call to `publishEvent` for each URL that the event needs to be published to
messages := []*Message{}
for _, destinationUrl := range destinationUrls {
message := Message{
Url: destinationUrl,
Event: *event,
}
messages = append(messages, &message)
}
_, err := publishEvent.Gather(messages)
if err != nil {
return nil, err
}
// All events have been sent
return nil, nil
})
endpoint, err := dispatch.New(publishEvent, handleEvent)
if err != nil {
log.Fatal(err)
}
http.Handle(endpoint.Handler())
http.HandleFunc("/v1/event", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
event := Event{}
err = json.Unmarshal(body, &event)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
handleEvent.Dispatch(r.Context(), &event)
w.WriteHeader(http.StatusOK)
})
http.HandleFunc("/v1/batch", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
client, err := endpoint.Client()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
batch := client.Batch()
events := []Event{}
json.Unmarshal(body, &events)
for _, event := range events {
input, err := dispatchproto.Marshal(&event)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
call := dispatchproto.NewCall(endpoint.URL(), "handleEvent", input)
batch.Add(call)
}
_, err = batch.Dispatch(context.TODO())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
})
if err := http.ListenAndServe("localhost:8000", nil); err != nil {
log.Fatal(err)
}
}

An important aspect to take into account here is that each concurrent operation is handled by Dispatch, regardless of the transient errors that they observe, they get retried independently.

Ingesting batches of events

High-performance systems often resort to trading latency for better throughput by grouping operations into batches. This pattern is well-supported by Dispatch as well by creating a batch, adding function calls to it, and submitting it to Dispatch in a single, atomic step.

Building on the previous code snippet, the application could add a new POST endpoint on /v1/batch that expects payloads encoded as JSON arrays, for example:

package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"log"
"net/http"
"github.com/dispatchrun/dispatch-go"
"github.com/dispatchrun/dispatch-go/dispatchproto"
)
// An example data model for the events that we ingest in the example API,
// it should be adapted for real-world applications.
type Event struct {
UserId string `json:"user_id"`
}
func (event *Event) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
UserId string `json:"user_id"`
}{
UserId: event.UserId,
})
}
func (event *Event) UnmarshalJSON(data []byte) error {
event_data := &struct {
UserId string `json:"user_id"`
}{}
err := json.Unmarshal(data, &event_data)
if err != nil {
return err
}
event.UserId = event_data.UserId
return nil
}
type Message struct {
Url string
Event Event
}
func (message *Message) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Url string `json:"url"`
Event Event `json:"event"`
}{
Url: message.Url,
Event: message.Event,
})
}
func (message *Message) UnmarshalJSON(data []byte) error {
message_data := &struct {
Url string `json:"url"`
Event Event `json:"event"`
}{}
err := json.Unmarshal(data, &message_data)
if err != nil {
return err
}
message.Url = message_data.Url
message.Event = message_data.Event
return nil
}
func main() {
publishEvent := dispatch.Func("publishEvent", func(ctx context.Context, message *Message) (any, error) {
body, err := json.Marshal(message.Event)
if err != nil {
return nil, err
}
response, err := http.Post(message.Url, "application/json", bytes.NewReader([]byte(body)))
if err != nil {
return nil, err
}
if response.StatusCode != 200 {
return nil, errors.New("endpoint has failed")
}
return nil, nil
})
handleEvent := dispatch.Func("handleEvent", func(ctx context.Context, event *Event) (any, error) {
// Hypothetical list of URLs that the event needs to be published to
// In a real-world application, this would be specific to
// the use-case, likely reading records from a database
destinationUrls := []string{"https://webhook.site/7fe0b4a4-08e3-4350-bc3a-ef6e8ba9303d"}
// You must add a list of URLs to `destinationUrls` to test the code example
// For example, use webhook.site service to generate multiple webhook URLs
// destinationUrls := []string{"https://destination-1", "https://destination-2"}
// Spawn a sub-call to `publishEvent` for each URL that the event needs to be published to
messages := []*Message{}
for _, destinationUrl := range destinationUrls {
message := Message{
Url: destinationUrl,
Event: *event,
}
messages = append(messages, &message)
}
_, err := publishEvent.Gather(messages)
if err != nil {
return nil, err
}
// All events have been sent
return nil, nil
})
endpoint, err := dispatch.New(publishEvent, handleEvent)
if err != nil {
log.Fatal(err)
}
http.Handle(endpoint.Handler())
http.HandleFunc("/v1/event", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
event := Event{}
err = json.Unmarshal(body, &event)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
handleEvent.Dispatch(r.Context(), &event)
w.WriteHeader(http.StatusOK)
})
http.HandleFunc("/v1/batch", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
client, err := endpoint.Client()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
batch := client.Batch()
events := []Event{}
json.Unmarshal(body, &events)
for _, event := range events {
input, err := dispatchproto.Marshal(&event)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
call := dispatchproto.NewCall(endpoint.URL(), "handleEvent", input)
batch.Add(call)
}
_, err = batch.Dispatch(context.TODO())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
})
if err := http.ListenAndServe("localhost:8000", nil); err != nil {
log.Fatal(err)
}
}

Note that while the submission to Dispatch is atomic in this case, each function call is still run concurrently, submitting operations as a batch does not result in dependencies nor a blocking behavior between calls.

Running the program

After saving the code to a file called main.go, run it with the Dispatch CLI:

dispatch run -- go run main.go

Then to ingest data into the system, submit a POST request to /v1/event, for example with a curl command:

curl -X POST -d '{"user_id": "42"}' https://localhost:8000/v1/event

Or to use the batch endpoint:

curl -X POST -d '[{"user_id": "42"}]' https://localhost:8000/v1/batch

This short program shows how easy it is to create reliable applications with just a few lines of code with Dispatch. We haven’t had to add extra infrastructure like queues, databases, or worker pools. Integration with Dispatch is entirely driven from the application code.