Using Change Data Capture for Warehouse Analytics
Introduction
We continue our story on the Analytics Platform setup in Picnic. Last time, we looked at the internal services data pipeline. Now we are going to look at the setup for the FCA (automated fulfillment center) pipeline.
FCA is Picnic’s own automated fulfillment center. Thousands of customer orders are prepared here every day in a highly automated manner. The ramp-up and success of FCA is critical for Picnic’s long-term success. Therefore the Analytics Platform team was looking into how to help and drive this project to success as well.
FCA’s backbone is a number of interconnected conveyor belts, which move products and groceries around the facility. This system is called the transport system (TS). As the name suggests, it transports the customers’ groceries around the facility until they are ready for shipping. The TS is not built by Picnic but is provided by a third-party supplier. As such, the inner workings of the TS are somewhat of a black box. Still, we want to analyze the journey of each item on the TS to gather analytics and identify bottlenecks. That’s why the Analytics Platform team has developed a new data pipeline: The FCA pipeline.
Pipeline overview
Since the TS is an externally developed hardware and software system, Picnic cannot change it to expose analytics events directly. However, our analysts would still like to have fine-grained insights into the flow of goods through the system.
So how can we get our hands on events for analytical purposes if we cannot change the TS to expose them?
Enter change data capture (CDC)! CDC is a software engineering practice of capturing changes (“deltas”) made to a system. In our case, we are tracking the changes of location of grocery bags as they move around the transport system. Whenever something happens on the TS, a record is written to the Postgres database. Our approach is to capture these changes and stream them via our Apache Kafka based analytics platform to our Snowflake data warehouse.
To do this, we use Debezium, a popular change data capture tool. It detects any changes made to a database and exposes them as events. In our case, we enable Debezium within the Postgres database of the TS and stream change events to the data warehouse. In the diagram below you can see the complete pipeline setup:
We are running Debezium as a Kafka Connect instance (“Debezium” in the diagram above). The connector publishes each change of a set of tables in Postgres to topics in Kafka. Each operation in Postgres results in a Kafka event. We have configured two sinks for the Debezium events:
- The Snowflake sink puts the data into our data warehouse for further processing and analysis.
- The S3 sink archives the data in S3 buckets.
This pipeline processes around 10–15 million events per day.
For the remainder of this blog post we’ll focus on Debezium connector, because the Kafka part has been described in detail in earlier blog posts. Now that we have a high-level overview of the setup, let’s take a look at the components in more detail.
Postgres — WAL and replication slots
Before we can capture database modifications as events, we need to enable the logical replication plugin in Postgres. Logical replication is the process of transforming the Postgres WAL (write-ahead-log) into a series of consumable changes. For each logical replication consumer, a ‘logical replication slot’ is used. In our case, the consumer will be the Debezium Kafka Connect source connector, which consumes events from said replication slot. The replication slot needs a decoding plugin that will be used to transform the WAL records to the format required by the consumer. More on this below.
Running Debezium as a Kafka Connect instance
Choosing the serialization format
By default, Debezium is publishing data in JSON format, so in an earlier iteration of the pipeline we were using JSON as serialization format. However, the JSON data the connector produces is a self-describing JSON, meaning that each event has its schema definition attached to it. This blows up the messages massively in size, which increases storage costs.
After the initial phase of running Debezium with JSON, we migrated the data to Avro as serialization format, because — being a binary format — it is much more compact, efficient and supports the use of a schema registry. Avro enables you to separate the schema from the events, leading to more concise messages. The Debezium Kafka Connector will auto-create the schema from the CDC events and handle the serialization. With Avro we were able to reduce the size of each event by 97%!
One hurdle in this migration was that, unfortunately, since Debezium 2.0.0, the Avro support is not shipped anymore with the base Debezium image. To overcome this, we execute a number of docker-maven-download commands to add those JAR files back into the Docker image (see below).
Building the Docker container
In order to expose the CDC events to Kafka, we run the Debezium Kafka Connect source connector in a Docker container. We build our own Debezium image from the Debezium base image by pulling the debezium/connect
image. Then we add JVM observability to the container by adding JMX configuration to it. In addition we add the aforementioned Avro support to the container. See the full Dockerfile here.
Configuring the connector
Running this container provides you with the standard Kafka Connect REST API interface. We configure the Debezium connector with the following configuration (shortened):
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"name": "debezium_ts",
"slot.name": "debezium_ts",
"table.include.list": "public.ts_tables.*",
"plugin.name": "decoderbufs",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.replacement": "debezium_ts_topic",
"transforms.Reroute.topic.regex": ".*tr_commandercommunication.*",
"database.user": "debezium-user",
"database.dbname": "ts_db",
"topic.prefix": "ts_tracking",
"database.hostname": "…",
"database.password": "…",
"snapshot.mode": "initial_only",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": ": ",
"value.converter.schema.registry.url": "…"
}
The first two entries are basic configuration about the type of the connector and its name. slot.name
is the aforementioned replication slot from which the Debezium connector subscribes to CDC events. This is a name that you have to configure yourself. The Debezium connector creates the replication slot the first time it is started. You should see something in your logs like:
12:58:25,223 INFO Postgres|debezium-nl-fca-dev__ts6|postgres-connector-task Creating replication slot with command CREATE_REPLICATION_SLOT "debezium_ts" LOGICAL decoderbufs [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
table.include.list
is a regex defining which tables in Postgres should be captured. "plugin.name": "decoderbufs"
is the one of two currently available plugin variations, decoding the records into protobuf format. Note that this is just the data format in which the output plug-in transforms the data from Postgres’ write-ahead log’s internal representation into a generally understandable format. Our Debezium connector will not use this as an output format.
The transforms section describes a set of single message transformations applied to each record for the purpose of topic routing. The ByLogicalTableRouter
router changes events to topics specified by transforms.Reroute.topic.replacement
and transforms.Reroute.topic.regex
. The former indicates that all records are written to a topic named debezium_ts_topic
. The latter describes a regex to include matching tables names in Postgres. So we match all tables that include the name tr_commandercommunication
. We use the logical table router here because we want to route multiple physical tables (in different Postgres shards) into the same topic.
A quite important property is snapshot.mode
. When Debezium is connecting to Postgres for the first time, it tries to stream a consistent snapshot of the configured tables. After the snapshot is completed, it starts streaming changes from that point onwards. With the setting initial_only
it does this only for the first time it connects. Other options for this setting are always
, never
, and custom
. always
is not desirable in our setting because in case of a restart of the connector, it would re-stream the full state of Postgres, which in our case can be quite substantial. An important learning here is that the snapshot is triggered based on the connector.name
property. Hence, if you want to re-trigger a snapshot with "snapshot.mode": "initial_only"
(for example, when you need to consume events from a new slot), then you need to rename the connector.
Lastly, we configure the connectors to use Avro as a data serialization format. Since we are already using Confluent Cloud, we can use the Confluent Schema registry here. The configuration properties of value.converter
configure that the messages should be encoded in Avro whilst using the schema registry indicated at value.converter.schema.registry.url
. This worked quite well for us, as the schema is automatically communicated to the registry and we did not have to configure anything else here.
Kafka & Confluent Cloud
After having read the CDC events from our Postgres database, the Debezium connector is pushing them into their respective topics in Kafka. From here, the journey to our data warehouse is pretty much the same as in our internal services pipeline. We use the Snowflake and S3 connectors to further process and archive the data. Once the Debezium events are in the data warehouse, the data team and analysts can process the events.
Operations
Monitoring
We use Prometheus for monitoring the connectors. Next to monitoring JMX and generic Kafka connect metrics, as described in earlier blog posts, there are also a number of Debezium specific metrics which are interesting. Debezium differentiates between two types of metrics, the snapshotting and the streaming ones. Most metrics are available for either situation. For our setup, we only monitor streaming ones, because we are not expecting frequent snapshotting.
One particular metric that we found quite useful in practice is MilliSecondsSinceLastEvent. When Debezium does not receive any events for a long period, this metric’s value will be -1, so we monitor for this value with the following PromQL expression:
avg_over_time(debezium_metrics_millisecondssincelastevent{context="streaming", plugin="postgres"}[60m])) < 0.
Learnings and pitfalls
During the development of this pipeline we noticed that a good collaboration with the database team and the business is important. It took a number of iterations until we found the right combination of settings that fit our use case. For example, the snapshot.mode
setting is key to make sure you achieve the delivery guarantees needed by the business in case of a database failure.
The development process can be a bit tedious, because you are merely writing JSON configuration files, so you do not have the feedback that you would have with a programming language. In addition, the error messages Debezium produces can also be misleading and tend to be not very informative.
Another point of attention is that the Debezium connector creates a complex, nested schema for the events, which might not be very well suited for downstream consumption. Many sink systems expect a flat structure in which each entry maps to e.g. a database column. If you find yourself in this situation, you can tackle this with a number of single message transformations offered by Kafka Connect. In particular, the ExtractNewRecordState transformation can be useful to flatten the events into a more digestible format.
Wrapping up
In this blog post, we looked at the FCA data pipeline by the Analytics Platform team. We applied change data capture, Kafka and Snowflake to capture analytics events. Debezium is an exciting CDC implementation which enables you to stream database changes to Kafka. At Picnic we use it to work around the inaccessibility of analytical events of third-party components. See you next time!
Recent blog posts
We're strong believers in learning from each other, so
our employees write about what interests them.