Building a Background Processing Pipeline with Dragonfly

In this blog post, you will learn how to use Redis Lists to build a background processing pipeline with Dragonfly.

April 5, 2023

Building a Background Processing Pipeline with Dragonfly

Introduction

In this blog post, you will learn how to use Redis Lists to build a background processing pipeline with Dragonfly.

Dragonfly is an in-memory data store built for modern application workloads. It is fully compatible with the Redis and Memcached APIs and delivers 25X more throughput, higher cache hit rates, with lower tail latency, and effortless vertical scalability.

The particular use case we will be exploring is that of a marketing automation system that ingests user events and sends notifications based on some of them - for example, a welcome email when a user registers on a website.

High level solution architecture

The application includes the following components:

  1. Dragonfly - a Redis compatible in-memory data store that is used as the backend for the producer and worker applications
  2. A producer application that simulates a website user registration process.
  3. A worker application that processes these user registration requests and sends welcome emails.

Both producer and worker applications are written in Go and make use of the gocelery library. The gocelery library lets you implement celery workers and submit celery tasks in Go.

Behind the scenes gocelery uses Redis List so that we don't have to deal with it in our applications. Instead, gocelery provides us with a simplified API to submit and process tasks that need to be handled asynchronously.

Alternatively, you can use go-worker or any Go Redis client to implement this using vanilla Redis List commands (combination of LPUSH and BRPOP)

Producer application

We start with the producer application, that is generating fake user registrations and adding them to a task queue.

In the code block below, the following things are happening:

  1. First, we create a Redis connection pool and use it to create an instance of gocelery.CeleryClient. The Delay method is used to submit a task to the worker application.
  2. The first argument is the name of the task and the second argument is the data that needs to be processed by the worker application.
  3. Then we create a Go channel that will be used to listen for SIGINT and SIGTERM signals. A goroutine is started to send a mock user registration to the worker application every second. The mock data is generated using go-randomdata library.
  4. Finally, we wait for the exit channel to receive a signal and then terminate the producer loop and the application.
package main

import (
   "fmt"
   "log"
   "os"
   "os/signal"
   "syscall"
   "time"

   rdata "github.com/Pallinder/go-randomdata"
   "github.com/gocelery/gocelery"
   "github.com/gomodule/redigo/redis"
)

const (
   redisHostEnvVar     = "REDIS_HOST"
   redisPasswordEnvVar = "REDIS_PASSWORD"
   taskName            = "users.registration.email"
)

var (
   redisHost string
)

func init() {
   redisHost = os.Getenv(redisHostEnvVar)
   if redisHost == "" {
       redisHost = "localhost:6379"
   }

}

func main() {
   redisPool := &redis.Pool{
       Dial: func() (redis.Conn, error) {
           c, err := redis.Dial("tcp", redisHost)

           if err != nil {
               return nil, err
           }
           return c, err
       },
   }

   celeryClient, err := gocelery.NewCeleryClient(
       gocelery.NewRedisBroker(redisPool),
       &gocelery.RedisCeleryBackend{Pool: redisPool},
       1,
   )

   if err != nil {
       log.Fatal(err)
   }

   exit := make(chan os.Signal, 1)
   signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
   closed := false

   go func() {
       fmt.Println("celery producer started...")

       for !closed {
           res, err := celeryClient.Delay(taskName, rdata.FullName(rdata.RandomGender)+","+rdata.Email())
           if err != nil {
               panic(err)
           }
           fmt.Println("sent data and generated task", res.TaskID, "for worker")
           time.Sleep(1 * time.Second)
       }
   }()

   <-exit
   log.Println("exit signalled")

   closed = true
   log.Println("celery producer stopped")
}

Worker application

Now we turn to the worker application, which should pick tasks off the queue and send emails corresponding to them.

  1. First we create a Redis connection pool and use it to create an instance of gocelery.CeleryClient. The Celery client is used to receive tasks from Dragonfly List.
  2. The Register method is used to register a function that will be called when a task is received. The first argument is the name of the task and the second argument is the function that will be called when a task is received.
  3. In a goroutine, we start the worker instance which will start listening for events from Dragonfly.
  4. Finally, we wait for the exit channel to receive a signal, stop the Celery worker instance and then gracefully shutdown the application.
package main

import (
   "fmt"
   "log"
   "math/rand"
   "net/smtp"
   "os"
   "os/signal"
   "strings"
   "syscall"
   "time"

   "github.com/gocelery/gocelery"
   "github.com/gomodule/redigo/redis"
   "github.com/hashicorp/go-uuid"
)

const (
   redisHostEnvVar = "REDIS_HOST"
   taskNameEnvVar  = "TASK_NAME"
   smtpServer      = "localhost:1025"
   taskName        = "users.registration.email"
)

const fromEmail = "admin@foo.com"

const emailBodyTemplate = "Hi %s!!\n\nHere is your auto-generated password %s. Visit https://foobar.com/login to login update your password.\n\nCheers,\nTeam FooBar.\n\n[processed by %s]"

const autogenPassword = "foobarbaz_foobarbaz"

const emailHeaderTemplate = "From: %s" + "\n" +
   "To: %s" + "\n" +
   "Subject: Welcome to FooBar! Here are your login instructions\n\n" +
   "%s"

var (
   redisHost string
   workerID  string
)

func init() {
   redisHost = os.Getenv(redisHostEnvVar)
   if redisHost == "" {
       redisHost = "localhost:6379"
   }

   rnd, _ := uuid.GenerateUUID()
   workerID = "worker-" + rnd
}

func main() {
   redisPool := &redis.Pool{
       Dial: func() (redis.Conn, error) {
           c, err := redis.Dial("tcp", redisHost)
           if err != nil {
               return nil, err
           }
           return c, err
       },
   }

   celeryClient, err := gocelery.NewCeleryClient(
       gocelery.NewRedisBroker(redisPool),
       &gocelery.RedisCeleryBackend{Pool: redisPool},
       1,
   )

   if err != nil {
       log.Fatal("failed to create celery client ", err)
   }

   sendEmail := func(registrationEvent string) {
       name := strings.Split(registrationEvent, ",")[0]
       userEmail := strings.Split(registrationEvent, ",")[1]

       fmt.Println("user registration info:", name, userEmail)

       sleepFor := rand.Intn(9) + 1
       time.Sleep(time.Duration(sleepFor) * time.Second)

       body := fmt.Sprintf(emailBodyTemplate, name, autogenPassword, workerID)
       msg := fmt.Sprintf(emailHeaderTemplate, fromEmail, userEmail, body)


       err := smtp.SendMail(smtpServer, nil, "test@localhost", []string{"foo@bar.com"}, []byte(msg))
       if err != nil {
           log.Fatal("failed to send email - ", err)
       }

       fmt.Println("sent email to", userEmail)

   }

   celeryClient.Register(taskName, sendEmail)

   go func() {
       celeryClient.StartWorker()
       fmt.Println("celery worker started. worker ID", workerID)
   }()

   exit := make(chan os.Signal, 1)
   signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)

   <-exit
   fmt.Println("exit signalled")

   celeryClient.StopWorker()
   fmt.Println("celery worker stopped")
}

Pre-requisites

To run the whole application using instructions in the blog, you will need the following installed on your local machine:

  1. Go
  2. Docker

Run the application

Our worker application will send emails. Rather than using an external service, we will make use of a mock SMTP server to keep things simple, yet realistic.

  1. Start the SMTP server in Docker

    docker run --rm -p 1080:1080 -p 1025:1025 soulteary/maildev
    

    Once the container starts, you should see these logs:

    MailDev using directory /tmp/maildev-1
    MailDev webapp running at http://0.0.0.0:1080
    MailDev SMTP Server running at 0.0.0.0:1025
    
  2. Start Dragonfly in Docker (refer to the documentation for Linux or MacOS specific instructions)

    docker run --rm -p 6379:6379 --ulimit memlock=-1 docker.Dragonfly.io/Dragonfly/dragonfly
    
  3. Start the worker application

    go run worker/worker.go
    

    You should see this log (worker ID might be different in your case):

    celery worker started. worker ID worker-0d8d8b51-d351-ba87-c012-2b404439e1f2
    
  4. Start the producer application

    go run producer/producer.go
    

    As the producer applications starts and continues to send mock data, you should see these logs (task ID might be different in your case):

    celery producer started...
    sent data and generated task 3687275c-748b-4eaf-b540-dbe772123fa8 for worker
    sent data and generated task 7da24e60-f33b-4690-9262-6408ac85a9ef for worker
    ...
    

Verify the results

As the producer application sends mock user data, the worker application will process it and send emails. To check the emails, navigate to the user interface of the SMTP server using a browser - http://localhost:1080/

Email sent by the worker application

This is a good time to peek into Dragonfly. Since it runs locally in a Docker container, we can connect to it using the redis-cli. After connecting, check a key called celery - this is the List that acts as the queue to hold the user registration events that’s ultimately processed by the worker application.

127.0.0.1:6379> llen celery

(integer) 106

At the end of the email, you will notice [processed by worker-<random uuid>]. More on this in the next section.

Scaling the background processing pipeline

Using a List allows us to scale the processing pipeline horizontally by simply adding more worker applications. To start another instance of the worker application:

go run worker/worker.go

After a few seconds, you will notice from the worker application logs that both the instances are processing data from the List (celery). If you look closely at the email contents, at the end you will be able to see which instance of the worker application actually processed and sent that email.

This is only meant for learning/testing purposes.

You can continue to start additional worker instances.

Solution code walk through

Now that you’ve seen the application in action, let’s take a look at the code. I will be highlighting the key parts of the code that are relevant to the blog post.

The complete code is available on GitHub

Conclusion

In this blog post, we saw how to use Dragonfly and Go to build a background processing pipeline. The use case was a marketing automation system where we were sending emails to users who registered on a website. However, the same approach can be used to build any kind of processing pipeline using Dragonfly as the message backend. We also saw how to scale the background processing pipeline horizontally by simply adding more worker applications.

Stay up to date on all things Dragonfly

Subscribe to receive a monthly newsletter with new content, product announcements, events info, and more!

Start building today

Dragonfly is fully compatible with the Redis ecosystem and requires no code changes to implement.