Skip to content
Dispatch Dispatch Dispatch
Go - Workflows

Workflows

With Dispatch and resumable functions, it’s easy to build reliable workflows that perform a lot of work simultaneously or step-by-step.

Fan out

Consider an example scenario, where an application needs to send email notifications to all users. Use Batch to send emails to all users in parallel without waiting for completion.

package main
import (
"context"
"log"
"github.com/dispatchrun/dispatch-go"
"github.com/dispatchrun/dispatch-go/dispatchproto"
)
func main() {
notifyUser := dispatch.Func("notifyUser", func(ctx context.Context, userId string) (any, error) {
// Send an email notification to the user
return nil, nil
})
endpoint, err := dispatch.New(notifyUser)
if err != nil {
log.Fatal(err)
}
client, err := endpoint.Client()
if err != nil {
log.Fatal(err)
}
notifyAllUsers := func() {
// In a real-world application, users would be fetched from a database
users := []string{"user_1", "user_2"}
batch := client.Batch()
for _, userId := range users {
input, err := dispatchproto.Marshal(userId)
if err != nil {
log.Fatal(err)
}
call := dispatchproto.NewCall(endpoint.URL(), "notifyUser", input)
batch.Add(call)
}
if _, err = batch.Dispatch(context.TODO()); err != nil {
log.Fatal(err)
}
}
go notifyAllUsers()
if err := endpoint.ListenAndServe(); err != nil {
log.Fatal(err)
}
}

Normally, when notifyUser returns an error even for one user, it would halt the execution of the notifyAllUsers function. Moreover, retrying notifyAllUsers would resend the same email to all users who already got that email. Sounds like a nightmare.

However, with Dispatch, only that one failed notifyUser call would retry, ensuring a consistent delivery of all emails.

Fan in

Gather waits for the completion of all function calls and returns their results. The order of results will be in the same order as the functions that returned them.

Take a look at this theoretical application that generates invoices. For each client in the database, it generates a PDF invoice and then uploads it to S3.

package main
import (
"context"
"log"
"github.com/dispatchrun/dispatch-go"
)
func main() {
generateInvoice := dispatch.Func("generateInvoice", func(ctx context.Context, clientId string) (string, error) {
// Generate a PDF and return its local path
return "/path/to/invoice.pdf", nil
})
uploadToS3 := dispatch.Func("uploadToS3", func(ctx context.Context, path string) (any, error) {
// Upload file at `path` to S3
return nil, nil
})
generateInvoices := dispatch.Func("generateInvoices", func(ctx context.Context, input string) (any, error) {
// Fetch clients from the database in the real world
clients := []string{"user_1", "user_2", "user_3"}
invoicePaths, err := generateInvoice.Gather(clients)
if err != nil {
return nil, err
}
return uploadToS3.Gather(invoicePaths)
})
endpoint, err := dispatch.New(generateInvoice, uploadToS3, generateInvoices)
if err != nil {
log.Fatal(err)
}
go func() {
if _, err := generateInvoices.Dispatch(context.Background(), ""); err != nil {
log.Fatal(err)
}
}()
if err := endpoint.ListenAndServe(); err != nil {
log.Fatal(err)
}
}

Gather makes building fault-tolerant multi-step workflows with fan-out/fan-in a breeze.