Change Data Capture at NerdWallet

Oct. 6, 2021

NerdWallet’s mission is to provide clarity for all of life’s financial decisions. That’s a tall order which requires delivering quality information and recommendations to consumers in a timely fashion. If you use the NerdWallet app or website then you may have received one of our emails or push notifications, such as alerts about recent changes to your credit score. How do we decide what info to send to whom, and when to send it? The backbone of our communication platform is built using a concept called change data capture, or “CDC” for short.

In this post, I’m going to show you how NerdWallet uses CDC to power our communications platform. I’ve worked on this system since early 2020 and I’m excited to share how we’ve improved it so far and some of our future plans.

What is Change Data Capture?

Let’s take a look at the textbook definition of change data capture:

“CDC is an approach to data integration that is based on the identification, capture and delivery of the changes made to enterprise data sources.” – Wikipedia

Ok… that’s sort of vague. In the world of databases, CDC usually means identifying changes to individual records (in SQL, think INSERT, UPDATE, DELETE) and publishing those changes for consumption by other services.

Why is CDC useful?

CDC captures changes to a system’s data and makes it available as consumable events. These events can be streamed between multiple services in an asynchronous way. This has a few important benefits:

  • Reducing load on databases as the number of consumers scales
  • Decoupling data producers and consumers
  • Abstracting data consumers from the producer’s data storage format

CDC allows data consumers to react to changes in data sources without constantly bombarding them with queries. Rather than polling databases continuously looking for data that has changed, consumers subscribe to CDC data streams and will wait for relevant changes to be delivered. 

This operational model reduces the number of open connections and queries issued to the source databases. This is preferable because it lets the databases focus on responding to synchronous requests that end-users are waiting for in our app or on our website, such as logging in or fetching their credit score.

When building a data-driven system, it makes sense to store different types of data in different storage formats. A backend service that cares about changes to a user’s cash flow probably isn’t interested in the nuances of integrating with both MySQL and DynamoDB. They care about domain-specific information. CDC gives us a great interface to abstract the underlying data storage and expose actionable data (in our case, JSON documents) to subscribed services.

A Real Use-case at NerdWallet

That’s enough generics! Let’s talk about a concrete use-case for CDC at NerdWallet.

Our content marketing team sends push notification and email messaging campaigns through a third-party CRM platform. In order to do intelligent user segmentation and personalization for these campaigns, the CRM platform needs an up-to-date view of each user’s data. User data is updated by backend systems in response to user interactions on the NerdWallet site, the mobile app, or through third-party data ingestion. For example, we have regular ETL jobs to ingest third-party data from TransUnion and update users’ credit score data in our database. We want to have the latest TransUnion reports reflected in the CRM platform for use in marketing campaigns.

We need to make sure that these updates reach the CRM in a timely and reliable manner. Timely, because we don’t want to send content marketing messages based on stale data. Reliably, because content marketing has strict regulatory requirements around unsubscription and privacy concerns.

CDC is a great fit for this model. Dumping database snapshots has a large data egress cost, is overkill when only a few records change, and is a security risk when some tables contain highly sensitive information. Instead, we use CDC to stream changes in specific tables to the CRM as they are written to the database. This also gives us a chance to filter and redact data that isn’t appropriate to share with a third-party system.

Now that we’ve seen why CDC is useful for NerdWallet, we can explore our implementation. We store data about our registered users in an AWS Aurora Postgres cluster. Let’s look at how we identify changes to rows in our Postgres tables and stream those changes to our backend services.

The Nitty Gritty: Logical Decoding

To understand our CDC implementation in Postgres I need to outline some basics about how writes happen in Postgres. Postgres uses a common database component called a Write-Ahead Log (WAL) to achieve durability and provide data replication across multiple instances. When a client commits a transaction that writes data, modifications are first written quickly to the WAL and later updated in the underlying storage volumes (which takes longer, due to secondary concerns like updating table statistics and indices). The writer instance in a Postgres cluster sends chunks of its WAL to the reader instances to keep their view of the data up-to-date.

By default, Postgres appends physical changes to the WAL in a binary format for maximum efficiency. A physical record would look like “set this block of memory to this new binary value,” which has no semantic meaning for a human or non-Postgres application reading the WAL. But Postgres also has the option to write logical changes, appending individual database records into the WAL instead of memory chunks. This is less efficient, but encodes semantic meaning into the WAL that other applications can utilize. This is called logical replication.

Any program can open up a “replication connection” to Postgres and consume WAL changes. Clients iterate over the WAL, where each committed transaction is a single record describing all of that transaction’s changes. This is where logical decoding comes in. In logical decoding, Postgres will serialize the WAL records into string-encoded data that clients can read. But the default output format is not super useful for us. Let’s say a client writes to the database to update a user’s last login time. Here’s what we’d see emitted by logical decoding:

BEGIN 142032
table public.users: 
    INSERT: name[character varying]:'Eoin' 
            last_login[timestamp without time zone]:'2021-10-06 01:50:01.270649'
COMMIT 142032

We want JSON! Luckily, there’s a great open-source plugin for Postgres logical decoding called wal2json that turns the WAL records into JSON.

{
  "change": [
    {
      "kind": "insert",
      "schema": "public",
      "table": "users",
      "columnnames": ["id", "name", "last_login"],
      "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
      "columnvalues": [1, "Eoin", "2018-03-27 11:58:28.988414"]
    }
  ] 
}

We wrote a new service called wal-extractor that connects to our Postgres cluster and consumes JSON-encoded WAL records. Logical replication and decoding in Postgres has an interesting constraint: only one client can consume from a replication connection at a time. That clued us in to the fact that this service could not be horizontally scaled and would likely be the throughput bottleneck for our whole CDC system. For that reason, we chose to write it in Go to maximize the amount of concurrent performance we could squeeze out of it (versus our other option of Python, which is less suitable for this type of concurrent task).

Great – we can now consume data changes from Postgres in a convenient JSON format. How do we get those changes to our other backend systems? We could ask wal-extractor to publish these changes with HTTP requests to every downstream consumer. But this approach is complicated and brittle. How will wal-extractor discover consumers? How will it handle unavailable consumers? How can it avoid waiting on slow HTTP requests? What if consumers want to replay messages from the past? We could solve these problems in wal-extractor, but we’d be reinventing the wheel.

Instead, wal-extractor should publish to a highly-available message queue designed to buffer data change events between the producer and multiple consumers. This will give us reliability and scalability when delivering CDC messages across all of our services.

Message Queueing

We use Amazon SNS+SQS as a message queue (MQ) implementation to decouple WAL record consumption and data syncing to our CRM. There are a few key benefits to this architecture:

  • Using a MQ with at-least-once delivery semantics helps solve our reliability concerns
  • Consumers aren’t overwhelmed by high-throughput bursts of WAL records
  • Consumers can manage their own subscription filters for which types of CDC records they want to receive
  • SNS allows us to “fan out” events to individual SQS queues for multiple consumers, allowing them to consume changes independently at different rates

In our architecture, wal-extractor publishes to an SNS topic that fans out to each consumer’s SQS queue.

Wal-extractor’s publishing mechanism is interesting. We use a work queue/worker pool architecture to avoid slowing down the critical WAL consumption loop with HTTP publish requests. Go uses the I/O downtime during WAL consumption to context switch and work through publishing the already-consumed records to SQS. Without this optimization, our total throughput would be severely limited as WAL consumption would be blocked on waiting for publish requests to SQS (which is optimized for horizontal scalability, not low latency). Implementing this in Go was straightforward using the language’s native concurrency primitives, so it ended up being a great choice for us.

Summary

We’ve examined all the main pieces of CDC at NerdWallet. Changes originate in Postgres, get consumed from the WAL by wal-extractor, are published to SNS+SQS, and are finally consumed by other backend services.

This CDC system allows us to take consumer-focused action with the most up-to-date data available. For example, we stream up-to-date views of users’ data to marketing systems, letting our content marketing team deliver highly actionable insights to our users.

Next Steps

We’ve come a long way in terms of CDC maturity, but we are definitely not “feature complete”. We are still working on improving this system. Our aspirations include:

  • Building out capabilities to replay CDC records in response to outages
  • Including both the new changes as well as old state of the data in each record
  • Improving tooling for engineers to run CDC pipelines for local development

Additionally, I’ve had the opportunity to work on a small team that has just delivered our first CDC pipelines for AWS DynamoDB tables! That could be a whole blog post in itself, but it uses many of the same concepts described here.

If you’re interested in discussing CDC further or want to pick my brain about Postgres logical decoding, you can reach me at [email protected].

If this work excites you, I encourage you to check out our careers page for backend engineering positions and join our platform team!