All posts

A zero-dependency Pub/Sub implementation on top of PostgreSQL

While building the foundation for a project that I am currently working on, I decided to use the Pub/Sub pattern as the main abstraction for a few different commonly used tasks (e.g. cron jobs). There are many popular Pub/Sub solutions available, such as RabbitMQ, GCP Cloud Pub/Sub, and AWS SNS. However, these off-the-shelf solutions seemed to have two main issues that I was slightly worried about:

  • They add an additional point of failure and extra complexity.
  • Publishing Pub/Sub messages cannot utilize database transactions.

As an example, Cloud Pub/Sub requires you to run an emulator locally if you want to have it available in your development environment. While using dependencies to solve some of your problems is, of course, not a problem per se, the added complexity can start to slow you down.

More importantly, third-party solutions cannot support database transactions, which can cause all sorts of issues later on. Because of that, you would most likely end up inserting the Pub/Sub messages to your database first before sending them to the third-party service.

That got me thinking whether it would be feasible to build the Pub/Sub solution purely on top of PostgreSQL. I decided to try, and started working on go-opinionatedevents.

Table of contents

  1. Context and problem
    1. Dependencies
    2. Transactions
    3. Complexity
  2. PostgreSQL as the Pub/Sub backend
    1. Near real-time processing
    2. Horizontal scalability
    3. Routing messages
    4. Retrying failures
    5. Issues and considerations
  3. Pub/Sub use cases
    1. Cron jobs
    2. Scheduled work
    3. Distributed transactions
  4. Migrating to other solutions

Context and problem

Before describing the steps I took to use PostgreSQL as the Pub/Sub backend, I want to take a moment to discuss the issues that an off-the-shelf solution introduces and why I decided not to use one.

Dependencies

Dependencies are not an issue per se. However, deciding to use an off-the-shelf Pub/Sub service has at least the following downsides:

  • You need to learn the concepts and quirks of the chosen service.
  • You most likely end up writing quite a bit of implementation-specific code.
  • The chosen service may impose some limitations or be too generic.
  • You have to setup and maintain the dependency in production.
  • Local development setup becomes more complex.

For my project, I decided to prioritize simplicity. Introducing a new dependency that I have to run locally, have to integrate against, and have to maintain in production felt like a decision that should not be made without considering alternatives.

Transactions

Database transactions are essential for any kind of set of more than one modifying operations. Off-the-shelf Pub/Sub services do not support transactions and even if they did, it would have to be exposed through an API that is not compatible with database transactions. Having no support for database transactions means that you need to orchestrate multiple transactions (or operations) at the code level. It would be prone to errors, adds unnecessary complexity, and most likely would require you to route message publishing through the database anyway.

To demonstrate, a naive implementation of a function creating new users could look like the following:

func createUser(tx *sql.Tx, email string) error {
  if users.ExistsByEmail(tx, email) {
    return errors.New("user already exists")
  }

  orgId, err := organisations.Create(tx)
  if err != nil {
    return err
  }

  if err := pubsub.Publish("organisations.created", "orgId", orgId); err != nil {
    return err
  }

  userId, err := users.Create(tx, orgId, email)
  if err != nil {
    return err
  }

  if err := pubsub.Publish("users.created", "userId", userId); err != nil {
    return err
  }

  return nil
}

The code has the following bugs:

  • What if the users.Create call returns an error? The organisations.created message would have already been published and there is no way to cancel it.
  • What if the users.Create call takes a really long time? Some other service may have already started processing the organisations.created message before the transaction has finished. A call to retrieve the organisation would fail.

To work around these issues, you could move the pubsub.Publish calls outside the database transaction. It would solve both of the mentioned issues, but it would introduce a new one:

  • What if the pubsub.Publish call returns an error? The database transaction has already been committed and there is no easy way to revert it. The result would be one or more dropped Pub/Sub messages.

The only real option solving all of the issues would be to transactionally stage Pub/Sub messages to the database within the already-existing database transaction. Concretely, it would require you to modify the pubsub.Publish method to accept the database transaction (i.e. *sql.Tx) as a parameter and perform a database INSERT instead of sending them to the third-party service.

Now that you are inserting messages to the database anyway, why not go all in?

Complexity

The popular off-the-shelf services are built for a vastly larger problem space than what your application most likely needs. For example, they support many types of exchanges for broadcasting messages to queues, you can define complex criteria to route messages to only specific queues, or you can deploy the entire service in a cluster mode to increase the availability and throughput. Is this really required for your application? Are you really limited by your database write IOPS?

If not, implementing a custom Pub/Sub abstraction on top of your database might let you cut off a lot of the unnecessary complexity and add features specific to your application. For example, I wanted to be able to schedule Pub/Sub messages into the future rather than sending them to queues right away. Features like this are often easy to implement when you have full control over your implementation. With RabbitMQ, I would have had to do the following:

  1. Configure an abandoned queue with no clients.
  2. Publish scheduled messages to this queue with an expiration time.
  3. Configure the queue to "deadletter" messages to a dead letter exchange.
  4. Route the messages from the dead letter exchange to the actual exchange.

Even for such a simple feature, it still requires a rather hacky workaround. It would have been a single timestamp column with PostgreSQL.

PostgreSQL as the Pub/Sub backend

In this section, I will describe how I approached implementing some of the key qualities of any Pub/Sub solution while working on go-opinionatedevents.

Near real-time processing

For any Pub/Sub solution, it is important to be able to process the published messages as quickly as possible. In the context of PostgreSQL, it means that the published messages should be processed right after they have been inserted into the table. Luckily, a PostgreSQL-specific feature, notification channels, makes implementing real time support easier.

A client can listen to a specific channel by doing the following:

LISTEN pubsub_messages;

Similarly, an INSERT can trigger a procedure, which in turn can notify the pubsub_messages channel by doing the following:

CREATE FUNCTION notify_of_new_pubsub_messages() RETURNS TRIGGER AS $$
DECLARE
    notification JSON;
BEGIN
    notification = json_build_object(
        'topic', NEW .pubsub_message_topic,
        'queue', NEW .pubsub_message_queue,
        'uuid',  NEW .pubsub_message_uuid
    );

    PERFORM pg_notify('pubsub_messages', notification::TEXT);

    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER notify_of_new_pubsub_messages_trigger AFTER INSERT ON pubsub_messages
FOR EACH ROW EXECUTE PROCEDURE notify_of_new_pubsub_messages();

Given this database setup, every INSERT to the pubsub_messages table will trigger a notification to the pubsub_messages channel. A client can listen to the notification channel by doing the following:

func onEvent(_ pq.ListenerEventType, err error) {
  if err != nil {
    fmt.Println("an error occurred")
  }
}

func listen(ctx context.Context, url string, channel string) chan struct{} {
  trigger := make(chan struct{})

  go func() {
    listener := pq.NewListener(url, 1*time.Second, 30*time.Second, onEvent)

    if err := listener.Listen(channel); err != nil {
      return
    }

    for {
      select {
      case <-ctx.Done():
        listener.Close()
        return
      case <-time.After(15*time.Second):
        listener.Ping()
      case <-listener.Notify:
        trigger <- struct{}{}
      }
    }
  }()

  return trigger
}

This only works when the message is inserted while the client is listening to the channel, though. What if there are pending messages when the client connects? What if a message should be retried after a failed delivery? There has to be a mechanism that is not dependent on the notifications.

For go-opinionatedevents, I decided to go with an "aggregated trigger" strategy so that there are the following two overlapping triggers:

  1. An interval trigger, which triggers a pending message check every n seconds
  2. PostgreSQL LISTEN trigger, which triggers a pending message check on NOTIFY

Regardless of which of the two triggers, it makes sure that all of the pending messages will be processed one by one. Therefore, this setup has the real-time features of the NOTIFY approach while maintaining the robustness of the periodical check approach. All messages will be processed in a reasonable amount of time regardless of when they were inserted.

Horizontal scalability

You most likely will have services with more than one instance running. Hence, you will have queues with more than one client processing the messages. Ensuring that published messages don't get processed by all running instances is important, or otherwise it will cause issues.

PostgreSQL supports row-level locks by specifying FOR UPDATE at the end of a SELECT statement. In addition, specifying SKIP LOCKED will skip any locked rows and move on to the next one. By combining the two, you can ensure that there is never more than one client processing a message at the same time.

Assume that there is a service called worker that has three instances running. There is also a worker queue, which receives some relevant messages that need to be processed by one of the worker instances. Now, to select the next pending message from the worker queue, you can do the following:

SELECT pubsub_message_id, pubsub_message_uuid
FROM pubsub_messages
WHERE
  pubsub_message_status = 'pending' AND
  pubsub_message_queue = 'worker'
ORDER BY pubsub_message_published_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;

Routing messages

Off-the-shelf solutions usually provide quite complex ways to route the published messages to the relevant recipients. Having a lot of control over routing is in no way bad in itself, but it makes the routing process a lot more complicated. In an environment where performance really matters, being able to filter only the relevant messages to be processed may be necessary to keep the load in control. However, what if we do not have strict performance-related concerns? Could the message routing mechanism be significantly simpler?

For go-opinionatedevents, I decided to introduce two concepts: topics and queues. Topic is a unique destination to which a message is published. For example, a customers.created message would be published to the customers topic. Queues are essentially lists from which the published messages are eventually read. A queue may receive either none or all of the messages from one or more topics.

With such a simple mechanism for routing the messages from topics to queues, each service can decide whether they want to act upon receiving a message. It means that each service needs to process more messages, but this most likely will not be an issue in a regular application backend. If it becomes a performance bottleneck in the future, one can achieve better efficiency by cleverly naming the topics in a more granular manner. For example, instead of having a single customers topic, one might split it into new_customers and customers. This allows subscribing to only messages related to new customers.

Retrying failures

Unexpected errors while processing messages are inevitable. Therefore, there must be a mechanism for retrying failed messages in some controlled way. Once again, this is relatively easy to do with the current setup. You can store the earliest time a message can be processed alongside the message in the database. When delivering a message fails, just update the earliest processing time to be some time in the future.

Take the same worker setup described earlier, the query would look like the following:

SELECT pubsub_message_id, pubsub_message_uuid
FROM pubsub_messages
WHERE
  pubsub_message_status = 'pending' AND
  pubsub_message_queue = 'worker' AND
  pubsub_message_deliver_at <= now()
ORDER BY pubsub_message_published_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;

Sometimes it would be good to be able to apply an increasing delay for the retries. The number of failed delivery attempts (pubsub_message_delivery_attempts) can be stored in the database and used to compute the backoff time after a failed delivery.

ALTER TABLE messages ADD COLUMN message_deliver_at TIMESTAMP WITH TIME ZONE NOT NULL;
ALTER TABLE messages ADD COLUMN message_delivery_attempts INTEGER NOT NULL DEFAULT 0;
type Backoff interface {
  DeliverAfter(attempt int) time.Duration
}

Issues and considerations

Probably the most important detail to note is that this approach requires all of the publishers and receivers to have access to a single database (cluster). It could be the same as for all the other data or it could be a separate cluster dedicated only for Pub/Sub messages. Regardless, all participants must have access to a single shared database. A single shared database also means that the performance will be capped by the cluster's ability to perform operations on the messages table.

For go-opinionatedevents, I did not implement any way to store the topic-to-queue -mappings at the database level. Everything is configured in the code when the Pub/Sub clients are initialised. Hence, every publisher must know which queues need to receive the messages from every topic. This requirement defeats the Pub/Sub's purpose of not having to know anything about any other system. However, it would not be a big change to move the topic-to-queue -mappings to the database and have them be populated by the recipient and not the publisher.

Pub/Sub use cases

Pub/Sub is one of my favorite architectural design patterns because of its versatility and minimal complexity overhead. It is a robust backbone for implementing many types of common application-level tasks. Retrying with a backoff, automatic transactions, and out-of-the-box support for horizontal scalability remove entire categories of potential bugs.

Cron jobs

Cron jobs are a piece of work performed according to a pre-defined schedule. For example, a nightly cleanup of expired data which is performed every night at 3 AM. Pub/Sub is not often associated with cron jobs, but I think it can provide some benefits over implementing a new system for running cron jobs.

To implement cron jobs, first introduce a new service called cronjobs. All it does is publish messages to a topic to trigger cron jobs. A naive solution of the service could be something like the following:

func launchJob(ctx context.Context, job string, interval time.Duration) {
  for {
    select {
    case <-ctx.Done():
      return;
    case <-time.After(interval):
      if err := pubsub.Publish(fmt.Sprintf("cronjobs.%s", job)); err != nil {
        fmt.Printf("error triggering a cron job: %s\n", err.Error())
      }
    }
  }
}

func main() {
  ctx, cancel := context.WithCancel(context.Background())

  go launchJob(ctx, "job_1", 15*time.Minute)
  go launchJob(ctx, "job_2", 30*time.Minute)

  sig := make(chan os.Signal, 1)
  signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

  <-sig
  cancel()
}

This is, of course, a naive and overly simple solution, but it explains the concept. Some of the improvements that could be made are:

  • Make the service resilient for having multiple instances running at the same time.
  • Use a library which supports the full cron scheduling syntax.

Now that the cron job messages are published according to a pre-defined schedule, implement handlers for the relevant services to process the messages.

func onCronJobMessage(ctx context.Context, msg *Message) Result {
  if msg.Name != "cronjobs.job_1" {
    // return success to not retry the message
    return Success()
  }

  if err := processJob1(ctx, msg); err != nil {
    return Error(err)
  }

  return Success()
}

If the use case requires, it is easy to wrap the handler with a middleware, such as WithLimit or WithBackoff. By implementing cron jobs on top of a robust Pub/Sub implementation, you get:

  • support for retrying failed jobs
  • no need to worry about having multiple instances running
  • support for horizontal scalability

Scheduled work

Cron jobs were a strategy for performing work according to a pre-defined schedule. What if you would like to schedule only one "unit of work" to be performed in the future? For example, by combining cron jobs with scheduled work, it is possible to perform heavy operations over the next 24 hours with automatic support for horizontal scaling.

Consider a scenario where there are 100k customers in the database and that there is a need to perfom a heavy operation (~10 seconds) per customer roughly every 24 hours. Processing all customers would take 278 hours if you used a single worker. By defining a single cron job, you would have to:

  • process all customers at once
  • process all customers with a single worker

A better approach would be to scale the customer processing into two directions: horizontally and over time. To do this, you can do the following:

  1. Define a cron job to be triggered once every 24 hours.
  2. Process the cron job message
    1. Sample a random time in the next 24 hours for every customer
    2. Publish a scheduled message with the sampled time for every customer
  3. Process the scheduled and customer-specific messages one by one

By following the described process, you get to freely scale horizontally, retry customer-specific messages, and distribute the load over time.

The described process would look something like the following in code:

// called for every `cronjobs.job_3` message (step 2)
func onJob3(ctx context.Context, msg *Message) Result {
  err := withTx(func (tx *sql.Tx) error {
    ids := customers.FindAllIDs(tx)

    for id, _ := range ids {
      // this will set the message's `pubsub_message_deliver_at` to the sampled time
      msg := NewCustomerJobMessage(id, sampleTimeWithin(24*time.Hour))

      if err := pubsub.Publish(tx, msg); err != nil {
        return err
      }
    }

    return nil
  })
  if err != nil {
    return Error(err)
  }

  return Success()
}

// called for every customer-specific message published by `onJob3`
func onCustomerJob(ctx context.Context, msg *Message) Result {
  customer := customers.FindByID(msg.CustomerID)

  if err := processCustomer(ctx, customer); err != nil {
    return Error(err)
  }

  return Success()
}

Distributed transactions

Quite often there is a need to perform a series of operations in an orchestrated manner. Any of the steps may fail and the system needs to be able to revert its state regardless of when the error occurred. This can be considered to be a distributed transaction. Once the transaction is completed, either all or none of the operations will persist.

In a distributed environment, there are so many potential points of failure. Being able to retry and control the execution of the transaction's steps independently of each other is absolutely necessary to achieve any kind of reliability. Pub/Sub can help with this as the foundational pattern for executing a single step with retries, backoff, and so on. One pattern that is often implemented on top of Pub/Sub is the Saga design pattern. Sagas have the following qualities:

  • A saga is a sequence of independent transactions with every transaction having a compensable transaction associated to it.
  • If a transaction fails, the saga executes the compensable transactions for all the already-performed transactions.
  • Compensating transactions are quaranteed to eventually succeed.

As a counterexample to distributed transactions, consider the following function:

func performOperations(ctx context.Context) error {
  if err := serviceOne.Do(ctx); err != nil {
    return err
  }
  return serviceTwo.Do(ctx)
}

In the extremely naive implementation above, the function performs two API calls to two separate services. What is the end state if the second service call fails? There is no rollback for the first operation.

func performOperations(ctx context.Context) error {
  if err := serviceOne.Do(ctx); err != nil {
    return err
  }
  if err := serviceTwo.Do(ctx); err != nil {
    _ := serviceOne.Rollback(ctx)
    return err
  }
  return nil
}

Slightly better, but not much. The rollback may still fail and there is no retrying logic. It may also be, depending on the service, that rollback can only be performed after some wall-clock time. This approach would become unmaintainable with any additional logic.

The Saga pattern and distributed transactions in general is such a large topic to explore that I will not go into detail in this post. However, I might write another post in the future specifically about implementing Sagas on top of Pub/Sub.

Migrating to other solutions

Having hands tied to any one specific solution is not ideal regardless of which solution you pick at the start. There will come the time when something starts to become a bottleneck and/or a decision must be made to switch to something else. In case you decided to use Pub/Sub as the core communication pattern between different components, it is important to think about the implementation in terms of how easy it is to switch to a different Pub/Sub backend. For example, if you use the Cloud Pub/Sub client library directly every time you want to publish a message, it will be a lot of work to switch to AWS SNS. Building an abstraction between the call site and the backend makes sense.

Another aspect to consider is that almost none of the off-the-shelf solutions offer transactions the same way as the database does. If you first build with the assumption of being able to use database transactions, it will be a problem if you want to switch to some other solution later on. However, being able to use database transactions is probably an attribute you want to have regardless of the Pub/Sub backend.

That's relatively easy to implement. Consider a service called worker. First, route every message that is published from worker to a single queue called worker. Then, implement a function for re-sending all of the messages to whatever Pub/Sub backend you are migrating to. For every message published by the worker service, do the following:

func onWorkerMessage(ctx context.Context, msg *Message) Result {
  pubsub := getNewPubSubClient()

  if err := pubsub.Publish(msg); err != nil {
    return Error(err)
  }

  return Success()
}

Now, you still have the advantage of using database transactions and you can still be 100% certain that every message is eventually published to your new Pub/Sub backend.