How to Stream Change Data through MariaDB MaxScale using CDC API

In the previous two blog posts, I introduced Data Streaming with MariaDB MaxScale and demonstrated how to configure MariaDB Master and MariaDB MaxScale for Data Streaming. In this post, I will review how to use the Change Data Capture (CDC) protocol to request streaming data from MariaDB MaxScale.

CDC Protocol

CDC is a new protocol introduced in MariaDB MaxScale 2.0 that allows clients to authenticate and register for CDC events. The new protocol is to be used in conjunction with AVRO router which currently converts binlog events into AVRO records. The CDC protocol is used by clients to request and receive change data events through MariaDB MaxScale either in AVRO or JSON format.

Protocol Phases

There are three phases of the protocol.

1. Connection and Authentication

  • Client connects to MariaDB MaxScale CDC protocol listener.
  • The authentication starts when the client sends the hexadecimal representation of the username concatenated with a colon (:) and the SHA1 of the password.
  • Example: For the user foobar with a password of foopasswd, client should send the following hexadecimal string

foobar:SHA1(foopasswd) ->  666f6f6261723a3137336363643535253331

Then MaxScale returns OK on success and ERR on failure.

2. Registration

  • Client sends UUID and specifies the output format (AVRO or JSON) for data retrieval.
  • Example: The following message from the client registers the client to receive AVRO change data events

REGISTER UUID=11ec2300-2e23-11e6-8308-0002a5d5c51b, TYPE=AVRO

Then MaxScale returns OK on success and ERR on failure.

3. Change Data Capture Commands

  • Request Change Data Events

REQUEST-DATA DATABASE.TABLE[.VERSION] [GTID]

This command fetches data from a specified table in a database and returns the output in the requested format (AVRO or JSON). Data records are sent to clients and if new AVRO versions are found (e.g. mydb.mytable.0000002.avro) the new schema and data will be sent as well.

The data will be streamed until the client closes the connection.

Clients should continue reading from network in order to automatically gets new events.

  • Examples:

REQUEST-DATA db1.table1
REQUEST-DATA dbi1.table1.000003
REQUEST-DATA db2.table4 0-11-345

Example output in JSON, note the AVRO schema and database records (“id” is the only column in table1) at the end:

{"fields": [{"type": "int", "name": "domain"}, {"type": "int", "name": "server_id"}, {"type": "int", "name": "sequence"}, {"type": "int", "name": "event_number"}, {"type": "int", "name": "timestamp"}, {"type": {"type": "enum", "symbols": ["insert", "update_before", "update_after", "delete"], "name": "EVENT_TYPES"}, "name": "event_type"}, {"type": "int", "name": "id"}], "type": "record", "namespace": "MariaDB MaxScaleChangeDataSchema.avro", "name": "ChangeRecord"}                    
{"event_number": 1, "event_type": "insert", "domain": 0, "sequence": 225, "server_id": 1, "timestamp": 1463037556, "id": 1111}
{"event_number": 4, "event_type": "insert", "domain": 0, "sequence": 225, "server_id": 1, "timestamp": 1463037563,  "id": 4444}

Note: When a table has been altered a new schema is generated by the AVRO converter and a new AVRO file comes with that schema.

  • Request Change Data Event Statistics

QUERY-LAST-TRANSACTION

Returns JSON only with last GTID, timestamp and affected tables.

Example output:

{"GTID": "0-1-178", "events": 2, "timestamp": 1462290084, "tables": ["db1.tb1", “db2.tb2”]}

Last GTID could then be later used in a REQUEST-DATA query.

QUERY-TRANSACTION 0-14-1245

Returns JSON events from the specified GTID, the commit timestamp and affected tables:

  • Example output:
{"GTID": "0-14-1245", "events": 23, "timestamp": 1462291124, "tables": ["db1.tb3"]}

 

How to Quickly Connect to the MaxScale CDC Protocol

The avrorouter comes with an example client program, cdc, written in Python 3. This client can connect to MaxScale configured with the CDC protocol and the avrorouter.

Before using this client, you will need to install the Python 3 interpreter and add users to the service with thecdc_users script. For more details about the user creation, please refer to the CDC Protocol and CDC Users documentation.

The output of #cdc –help provides a full list of supported options and a short usage description of the client program.

-bash-4.1$ python3 cdc --help
usage: cdc [--help] [-h HOST] [-P PORT] [-u USER] [-p PASSWORD]
           [-f {AVRO,JSON}] [-t READ_TIMEOUT]
           FILE [GTID]

CDC Binary consumer

positional arguments:
  FILE                  Requested table name in the following format:
                        DATABASE.TABLE[.VERSION]
  GTID                  Requested GTID position

optional arguments:
  --help                show this help message and exit
  -h HOST, --host HOST  Network address where the connection is made
  -P PORT, --port PORT  Port where the connection is made
  -u USER, --user USER  Username used when connecting
  -p PASSWORD, --password PASSWORD
                        Password used when connecting
  -f {AVRO,JSON}, --format {AVRO,JSON}
                        Data transmission format
  -t READ_TIMEOUT, --timeout READ_TIMEOUT
                        Read timeout

Here is an example of how to query data in JSON using the cdc Python script. It queries the table test.mytable for all change records.

# cdc --user=myuser --password=mypasswd --host=127.0.0.1 --port=4001 test.mytable

The AVRO binary output will look like this:

Objavro.codenullavro.schema?{"type":"record","name":"ChangeRecord","namespace":"MariaDB MaxScaleChangeDataSchema.avro","fields":[{"name":"domain","type":{"type":"int"}},{"name":"server_id","type":{"type":"int"}},{"name":"sequence","type":{"type":"int"}},{"name":"event_number","type":{"type":"int"}},{"name":"timestamp","type":{"type":"int"}},{"name":"event_type","type":{"type":"enum","name":"EVENT_TYPES","symbols":["insert","update_before","update_after","delete"]}},{"name":"id","type":{"type":"int"}}]}??7)??.?v?r???????
??7)??.?v?r???????
???7)??.?v?r???????
???7)??.?v?r????●?
??䗏?

Note: Each client could request data from one AVRO file (it contains one schema only).

In order to get all the events in a transaction that affects N tables, N connections (one per each table) are needed.

The application stays in the read loop and it will gets additional events as long as they come in. All clients are notified with new events.

Assume a transaction affects two tables, tbl1 and tbl2 in db1:

BEGIN;
INSERT into db1.tbl1 (id) VALUES (111);
​INSERT into db1.tbl2 (id) VALUES (222);
COMMIT;

The user should start two clients:

# cdc --user=myuser --password=mypasswd --host=127.0.0.1 --port=4001 db1.tbl1
# cdc --user=myuser --password=mypasswd --host=127.0.0.1 --port=4001 db1.tbl2

 

Example Client (python code):

#!/usr/bin/env python3

import time
import json
import re
import sys
import socket
import hashlib
import argparse
import subprocess
import selectors
import binascii
import os

# Read data as JSON
def read_json():
    decoder = json.JSONDecoder()
    rbuf = bytes()
    ep = selectors.EpollSelector()
    ep.register(sock, selectors.EVENT_READ)

    while True:
        pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
        try:
            buf = sock.recv(4096, socket.MSG_DONTWAIT)
            rbuf += buf
            while True:
                rbuf = rbuf.lstrip()
                data = decoder.raw_decode(rbuf.decode('ascii'))
                rbuf = rbuf[data[1]:]
                print(json.dumps(data[0]))
        except ValueError as err:
            sys.stdout.flush()
            pass
        except Exception:
            break

# Read data as Avro
def read_avro():
    ep = selectors.EpollSelector()
    ep.register(sock, selectors.EVENT_READ)

    while True:
        pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
        try:
            buf = sock.recv(4096, socket.MSG_DONTWAIT)
            os.write(sys.stdout.fileno(), buf)
            sys.stdout.flush()
        except Exception:
            break

parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="4001")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("-f", "--format", dest="format", help="Data transmission format", default="JSON", choices=["AVRO", "JSON"])
parser.add_argument("-t", "--timeout", dest="read_timeout", help="Read timeout", default=0)
parser.add_argument("FILE", help="Requested table name in the following format: DATABASE.TABLE[.VERSION]")
parser.add_argument("GTID", help="Requested GTID position", default=None, nargs='?')

opts = parser.parse_args(sys.argv[1:])

sock = socket.create_connection([opts.host, opts.port])

# Authentication
auth_string = binascii.b2a_hex((opts.user + ":").encode())
auth_string += bytes(hashlib.sha1(opts.password.encode("utf_8")).hexdigest().encode())
sock.send(auth_string)

# Discard the response
response = str(sock.recv(1024)).encode('utf_8')

# Register as a client as request Avro format data
sock.send(bytes(("REGISTER UUID=XXX-YYY_YYY, TYPE=" + opts.format).encode()))

# Discard the response again
response = str(sock.recv(1024)).encode('utf_8')

# Request a data stream
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))

if opts.format == "JSON":
    read_json()
elif opts.format == "AVRO":
    read_avro()

Additional example applications can be found here:

https://github.com/mariadb-corporation/MaxScale/tree/2.0/server/modules/routing/avro

Up next in this blog series, Markus Makela will show you how to use the CDC API in a Kafka Producer.

Relevant Links