September 5, 2016

Real-time Data Streaming to Kafka with MaxScale CDC

The new Change Data Capture (CDC) protocol modules in MaxScale 2.0.0 can be used to convert binlog events into easy to stream data. These streams can be guided to other systems for further processing and in-depth analysis.

In this article, we set up a simple Kafka broker on CentOS 7 and publish changes in the database as JSON with the help of the new CDC protocol in MaxScale.

The tools we'll be using require Python 3, the pip Python package manager and the kafka-python package. You can install them with the following commands on CentOS 7.

sudo yum install epel-release
sudo yum install python34
curl https://bootstrap.pypa.io/get-pip.py|sudo python3.4
sudo pip3 install kafka-python

Note: CentOS 7 Python 3 package is broken and requires manual installation of the pip3 packager (seen in step three).

The environment consists of one MariaDB 10.0 master, MaxScale and one Kafka broker inside a docker container. We'll be running all the components (mysqld, maxscale, docker) on the same machine to make the setup and testing easier.

Configuring the Database

We start off by configuring the master server with row based replication by adding the following lines to its configuration file.

log-bin=binlog
binlog_format=row
binlog_row_image=full

With row based replication, all replication events contain the modified data. This information can be used to reconstruct each change in data as it happens. The MaxScale CDC protocol router, avrorouter, reads this information from the binary logs and converts it into the easy-to-process and compact Avro format.

Configuring MaxScale

The next step is to configure MaxScale. We add two services to the configuration, one for reading the binary logs from the master and another for converting them into JSON and streaming them to the clients.

The first service, named Binlog Service, registers as a slave to the MariaDB master and starts to read replication events. These events are stored to a local cache where the second service, named CDC Service, converts them into Avro format files (we'll configure that later).

Here's the configuration entry for the Binlog Service.

[Binlog Service]
type=service
router=binlogrouter
router_options=server-id=4000,binlogdir=/var/lib/maxscale,mariadb10-compatibility=1
user=maxuser
passwd=maxpwd

[Binlog Listener]
type=listener
service=Binlog Service
protocol=MySQLClient
port=3306

The maxuser:maxpwd credentials are used to connect to the master server and query for details about database users. Read the MaxScale Tutorial on how to create them.

The router_options parameter is the main way the binlogrouter module is configured. The server-id option is the server ID given to the master, binlogdir is the directory where the binlog files are stored and mariadb10-compatibility enables MariaDB 10.0 support in the binlogrouter. For more details on the binlogrouter options, read the Binlogrouter Documentation.

We also configured a MySQL protocol listener so that we can start the replication with the mysql cli.

After configuring the Binlog Service, we'll set up the CDC router and listeners.

[CDC Service]
type=service
router=avrorouter
source=Binlog Service
router_options=filestem=binlog
user=maxuser
passwd=maxpwd

[CDC Listener]
type=listener
service=CDC Service
protocol=CDC
port=4001

The configuration for the CDC Service is very simple. The only parameter we need to configure is source, which tells us which service we are going to use as the source for binary logs, and the router_option filestem, which tells the prefix of the binary log files. The CDC Service will read the Binlog Service configuration and gather all the required information from there and start the conversion process. For more details on how to fine-tune the avrorouter for optimal conversion speed, read the Avrorouter Documentation.

Setting up Kafka in Docker

After we have MaxScale configured, we'll start the Kafka broker inside Docker. We'll use the spotify/kafka image to set up a quick single node Kafka broker on the same machine MaxScale is running on.

sudo docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.0.100 --env ADVERTISED_PORT=9092 spotify/kafka

This command will start the Kafka broker inside a Docker container. The container will contain command line utilities that we can use to read the queues messages from the broker to confirm our setup is working.

Next we'll heave to create a topic where we can publish messages. We'll use the packaged console utilities in the docker container to create it and we'll call it CDC_DataStream.

sudo docker exec -ti kafka /opt/kafka_2.11-0.8.2.1/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic CDC_DataStream --replication-factor 1 --partitions 1

We are using the docker container version of Kafka to simplify the setup and make this easy to reproduce. Read the Kafka Quickstart guide on information how to set up your own Kafka cluster and for more details on the tools used inside the container.

Starting Up MaxScale

The final step is to start the replication in MaxScale and stream events into the Kafka broker using the cdc and cdc_kafka_producer tools included in the MaxScale installation.

After starting MaxScale we connect to the Binlog Service on port 3306 and start replication.

CHANGE MASTER TO MASTER_HOST='127.0.0.1', MASTER_PORT=3306, MASTER_USER='maxuser', MASTER_PASSWORD='maxpwd', MASTER_LOG_POS=4, MASTER_LOG_FILE='binlog.000001';
START SLAVE;

The CDC service allows us to query it for changes in a specific table. For the purpose of this article, we've created an extremely simple test table using the following statement and populated it with some data.

CREATE TABLE test.t1 (id INT);
INSERT INTO test.t1 VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);

After we have created and populated the table on the master, the table and the changes in it will be replicated to MaxScale. MaxScale will cache the binlogs locally and convert them into the Avro format for quick streaming. After a short while, MaxScale should've read and converted enough data so that we can start querying it using the cdc tool.

Next, we'll query the CDC Service for change records on the test.t1 table. Since we didn't configure any extra users, we'll use the service user we configured for the service.

cdc -u maxuser -pmaxpwd -h 127.0.0.1 -P 4001 test.t1

We'll get a continuous stream of JSON objects printed into the standard output which we can utilize as the source for out Kafka streamer, the cdc_kafka_producer utility. All we have to do is to pipe the output of the cdc program into the cdc_kafka_producer to push it to the broker.

cdc -u maxuser -pmaxpwd -h 127.0.0.1 -P 4001 test.t1 | cdc_kafka_producer --kafka-broker=127.0.0.1:9092 --kafka-topic=CDC_DataStream

We send the changes to the broker listening on the port 9092 on the local host and publish them on the CDC_DataStream topic we created earlier. When we start the console consumer in another terminal, we'll see the events arriving as they are published on the broker.

[vagrant@maxscale ~]$ sudo docker exec -ti kafka /opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic CDC_DataStream
{"namespace": "MaxScaleChangeDataSchema.avro", "type": "record", "fields": [{"type": "int", "name": "domain"}, {"type": "int", "name": "server_id"}, {"type": "int", "name": "sequence"}, {"type": "int", "name": "event_number"}, {"type": "int", "name": "timestamp"}, {"type": {"symbols": ["insert", "update_before", "update_after", "delete"], "type": "enum", "name": "EVENT_TYPES"}, "name": "event_type"}, {"type": "int", "name": "id"}], "name": "ChangeRecord"}
{"domain": 0, "event_number": 1, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 1}
{"domain": 0, "event_number": 2, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 2}
{"domain": 0, "event_number": 3, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 3}
{"domain": 0, "event_number": 4, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 4}
{"domain": 0, "event_number": 5, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 5}
{"domain": 0, "event_number": 6, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 6}
{"domain": 0, "event_number": 7, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 7}
{"domain": 0, "event_number": 8, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 8}
{"domain": 0, "event_number": 9, "event_type": "insert", "server_id": 1, "sequence": 58, "timestamp": 1470670824, "id": 9}

The first JSON object is the schema of the table and the objects following it are the actual changes. If we insert, delete or modify data on the database, we'll see the changes in JSON only seconds after they happen.

And that's it, we've successfully created a real-time stream representation of the database changes with the help of MaxScale CDC protocol and Kafka.

About Markus Mäkelä

Markus Mäkelä is a Software Engineer working on MariaDB MaxScale. He graduated from Metropolia University of Applied Sciences in Helsinki, Finland.

Read all posts by Markus Mäkelä