How to Stream Change Data through MariaDB MaxScale using CDC API
In the previous two blog posts, I introduced Data Streaming with MariaDB MaxScale and demonstrated how to configure MariaDB Master and MariaDB MaxScale for Data Streaming. In this post, I will review how to use the Change Data Capture (CDC) protocol to request streaming data from MariaDB MaxScale.
CDC Protocol
CDC is a new protocol introduced in MariaDB MaxScale 2.0 that allows clients to authenticate and register for CDC events. The new protocol is to be used in conjunction with AVRO router which currently converts binlog events into AVRO records. The CDC protocol is used by clients to request and receive change data events through MariaDB MaxScale either in AVRO or JSON format.
Protocol Phases
There are three phases of the protocol.
1. Connection and Authentication
- Client connects to MariaDB MaxScale CDC protocol listener.
- The authentication starts when the client sends the hexadecimal representation of the username concatenated with a colon (:) and the SHA1 of the password.
- Example: For the user foobar with a password of foopasswd, client should send the following hexadecimal string
foobar:SHA1(foopasswd) -> 666f6f6261723a3137336363643535253331
Then MaxScale returns OK on success and ERR on failure.
2. Registration
- Client sends UUID and specifies the output format (AVRO or JSON) for data retrieval.
- Example: The following message from the client registers the client to receive AVRO change data events
REGISTER UUID=11ec2300-2e23-11e6-8308-0002a5d5c51b, TYPE=AVRO
Then MaxScale returns OK on success and ERR on failure.
3. Change Data Capture Commands
- Request Change Data Events
REQUEST-DATA DATABASE.TABLE[.VERSION] [GTID]
This command fetches data from a specified table in a database and returns the output in the requested format (AVRO or JSON). Data records are sent to clients and if new AVRO versions are found (e.g. mydb.mytable.0000002.avro) the new schema and data will be sent as well.
The data will be streamed until the client closes the connection.
Clients should continue reading from network in order to automatically gets new events.
- Examples:
REQUEST-DATA db1.table1
REQUEST-DATA dbi1.table1.000003
REQUEST-DATA db2.table4 0-11-345
Example output in JSON, note the AVRO schema and database records (“id” is the only column in table1) at the end:
{"fields": [{"type": "int", "name": "domain"}, {"type": "int", "name": "server_id"}, {"type": "int", "name": "sequence"}, {"type": "int", "name": "event_number"}, {"type": "int", "name": "timestamp"}, {"type": {"type": "enum", "symbols": ["insert", "update_before", "update_after", "delete"], "name": "EVENT_TYPES"}, "name": "event_type"}, {"type": "int", "name": "id"}], "type": "record", "namespace": "MariaDB MaxScaleChangeDataSchema.avro", "name": "ChangeRecord"}
{"event_number": 1, "event_type": "insert", "domain": 0, "sequence": 225, "server_id": 1, "timestamp": 1463037556, "id": 1111}
{"event_number": 4, "event_type": "insert", "domain": 0, "sequence": 225, "server_id": 1, "timestamp": 1463037563, "id": 4444}
Note: When a table has been altered a new schema is generated by the AVRO converter and a new AVRO file comes with that schema.
- Request Change Data Event Statistics
QUERY-LAST-TRANSACTION
Returns JSON only with last GTID, timestamp and affected tables.
Example output:
{"GTID": "0-1-178", "events": 2, "timestamp": 1462290084, "tables": ["db1.tb1", “db2.tb2”]}
Last GTID could then be later used in a REQUEST-DATA query.
QUERY-TRANSACTION 0-14-1245
Returns JSON events from the specified GTID, the commit timestamp and affected tables:
- Example output:
{"GTID": "0-14-1245", "events": 23, "timestamp": 1462291124, "tables": ["db1.tb3"]}
How to Quickly Connect to the MaxScale CDC Protocol
The avrorouter comes with an example client program, cdc, written in Python 3. This client can connect to MaxScale configured with the CDC protocol and the avrorouter.
Before using this client, you will need to install the Python 3 interpreter and add users to the service with thecdc_users script. For more details about the user creation, please refer to the CDC Protocol and CDC Users documentation.
The output of #cdc –help provides a full list of supported options and a short usage description of the client program.
-bash-4.1$ python3 cdc --help
usage: cdc [--help] [-h HOST] [-P PORT] [-u USER] [-p PASSWORD]
[-f {AVRO,JSON}] [-t READ_TIMEOUT]
FILE [GTID]
CDC Binary consumer
positional arguments:
FILE Requested table name in the following format:
DATABASE.TABLE[.VERSION]
GTID Requested GTID position
optional arguments:
--help show this help message and exit
-h HOST, --host HOST Network address where the connection is made
-P PORT, --port PORT Port where the connection is made
-u USER, --user USER Username used when connecting
-p PASSWORD, --password PASSWORD
Password used when connecting
-f {AVRO,JSON}, --format {AVRO,JSON}
Data transmission format
-t READ_TIMEOUT, --timeout READ_TIMEOUT
Read timeout
Here is an example of how to query data in JSON using the cdc Python script. It queries the table test.mytable for all change records.
# cdc --user=myuser --password=mypasswd --host=127.0.0.1 --port=4001 test.mytable
The AVRO binary output will look like this:
Objavro.codenullavro.schema?{"type":"record","name":"ChangeRecord","namespace":"MariaDB MaxScaleChangeDataSchema.avro","fields":[{"name":"domain","type":{"type":"int"}},{"name":"server_id","type":{"type":"int"}},{"name":"sequence","type":{"type":"int"}},{"name":"event_number","type":{"type":"int"}},{"name":"timestamp","type":{"type":"int"}},{"name":"event_type","type":{"type":"enum","name":"EVENT_TYPES","symbols":["insert","update_before","update_after","delete"]}},{"name":"id","type":{"type":"int"}}]}??7)??.?v?r???????
??7)??.?v?r???????
???7)??.?v?r???????
???7)??.?v?r????●?
??䗏?
Note: Each client could request data from one AVRO file (it contains one schema only).
In order to get all the events in a transaction that affects N tables, N connections (one per each table) are needed.
The application stays in the read loop and it will gets additional events as long as they come in. All clients are notified with new events.
Assume a transaction affects two tables, tbl1 and tbl2 in db1:
BEGIN;
INSERT into db1.tbl1 (id) VALUES (111);
INSERT into db1.tbl2 (id) VALUES (222);
COMMIT;
The user should start two clients:
# cdc --user=myuser --password=mypasswd --host=127.0.0.1 --port=4001 db1.tbl1
# cdc --user=myuser --password=mypasswd --host=127.0.0.1 --port=4001 db1.tbl2
Example Client (python code):
#!/usr/bin/env python3
import time
import json
import re
import sys
import socket
import hashlib
import argparse
import subprocess
import selectors
import binascii
import os
# Read data as JSON
def read_json():
decoder = json.JSONDecoder()
rbuf = bytes()
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
rbuf += buf
while True:
rbuf = rbuf.lstrip()
data = decoder.raw_decode(rbuf.decode('ascii'))
rbuf = rbuf[data[1]:]
print(json.dumps(data[0]))
except ValueError as err:
sys.stdout.flush()
pass
except Exception:
break
# Read data as Avro
def read_avro():
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
os.write(sys.stdout.fileno(), buf)
sys.stdout.flush()
except Exception:
break
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="4001")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("-f", "--format", dest="format", help="Data transmission format", default="JSON", choices=["AVRO", "JSON"])
parser.add_argument("-t", "--timeout", dest="read_timeout", help="Read timeout", default=0)
parser.add_argument("FILE", help="Requested table name in the following format: DATABASE.TABLE[.VERSION]")
parser.add_argument("GTID", help="Requested GTID position", default=None, nargs='?')
opts = parser.parse_args(sys.argv[1:])
sock = socket.create_connection([opts.host, opts.port])
# Authentication
auth_string = binascii.b2a_hex((opts.user + ":").encode())
auth_string += bytes(hashlib.sha1(opts.password.encode("utf_8")).hexdigest().encode())
sock.send(auth_string)
# Discard the response
response = str(sock.recv(1024)).encode('utf_8')
# Register as a client as request Avro format data
sock.send(bytes(("REGISTER UUID=XXX-YYY_YYY, TYPE=" + opts.format).encode()))
# Discard the response again
response = str(sock.recv(1024)).encode('utf_8')
# Request a data stream
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
if opts.format == "JSON":
read_json()
elif opts.format == "AVRO":
read_avro()
Additional example applications can be found here:
https://github.com/mariadb-corporation/MaxScale/tree/2.0/server/modules/routing/avro