Workflows
With Dispatch and resumable functions, it’s easy to build reliable workflows that perform a lot of work simultaneously or step-by-step.
Fan out
Consider an example scenario, where an application needs to send email notifications to all users. Use Batch
to send emails to all users in parallel without waiting for completion.
package main
import ( "context" "log"
"github.com/dispatchrun/dispatch-go" "github.com/dispatchrun/dispatch-go/dispatchproto")
func main() { notifyUser := dispatch.Func("notifyUser", func(ctx context.Context, userId string) (any, error) { // Send an email notification to the user return nil, nil })
endpoint, err := dispatch.New(notifyUser) if err != nil { log.Fatal(err) }
client, err := endpoint.Client() if err != nil { log.Fatal(err) }
notifyAllUsers := func() { // In a real-world application, users would be fetched from a database users := []string{"user_1", "user_2"}
batch := client.Batch()
for _, userId := range users { input, err := dispatchproto.Marshal(userId) if err != nil { log.Fatal(err) }
call := dispatchproto.NewCall(endpoint.URL(), "notifyUser", input) batch.Add(call) }
if _, err = batch.Dispatch(context.TODO()); err != nil { log.Fatal(err) } }
go notifyAllUsers()
if err := endpoint.ListenAndServe(); err != nil { log.Fatal(err) }}
Normally, when notifyUser
returns an error even for one user, it would halt the execution of the notifyAllUsers
function. Moreover, retrying notifyAllUsers
would resend the same email to all users who already got that email. Sounds like a nightmare.
However, with Dispatch, only that one failed notifyUser
call would retry, ensuring a consistent delivery of all emails.
Fan in
Gather
waits for the completion of all function calls and returns their results. The order of results will be in the same order as the functions that returned them.
Take a look at this theoretical application that generates invoices. For each client in the database, it generates a PDF invoice and then uploads it to S3.
package main
import ( "context" "log"
"github.com/dispatchrun/dispatch-go")
func main() { generateInvoice := dispatch.Func("generateInvoice", func(ctx context.Context, clientId string) (string, error) { // Generate a PDF and return its local path return "/path/to/invoice.pdf", nil })
uploadToS3 := dispatch.Func("uploadToS3", func(ctx context.Context, path string) (any, error) { // Upload file at `path` to S3 return nil, nil })
generateInvoices := dispatch.Func("generateInvoices", func(ctx context.Context, input string) (any, error) { // Fetch clients from the database in the real world clients := []string{"user_1", "user_2", "user_3"}
invoicePaths, err := generateInvoice.Gather(clients) if err != nil { return nil, err }
return uploadToS3.Gather(invoicePaths) })
endpoint, err := dispatch.New(generateInvoice, uploadToS3, generateInvoices) if err != nil { log.Fatal(err) }
go func() { if _, err := generateInvoices.Dispatch(context.Background(), ""); err != nil { log.Fatal(err) } }()
if err := endpoint.ListenAndServe(); err != nil { log.Fatal(err) }}
Gather
makes building fault-tolerant multi-step workflows with fan-out/fan-in a breeze.