Kafka to ColumnStore Data Adapter

You are viewing an old version of this article. View the current version here.

Kafka-Avro Adapter Tutorial

This is a short tutorial on how to create a Java application that serializes data to Kafka in Avro format and how to stream this data into MariaDB ColumnStore via the Kafka-Avro Data Adapter.

Setting Up Kafka

Here's a very simple Docker Compose file for creating a local setup with a Kafka broker.

version: '3'
services:
    zookeeper:
        image: confluent/zookeeper
        container_name: zookeeper
        ports:
            - "2181:2181"
            - "2888:2888"
            - "3888:3888"

    kafka:
        image: confluent/kafka
        container_name: kafka
        ports:
            - "9092:9092"
        depends_on:
            - zookeeper
        environment:
            KAFKA_ADVERTISED_HOST_NAME: kafka

    schema-registry:
        image: confluent/schema-registry
        container_name: schema-registry
        ports:
            - "8081:8081"
        depends_on:
            - kafka

Using this configuration, run docker-compose up -d and you have a local Kafka setup. After that, just add the following line into /etc/hosts.

127.0.0.1 kafka

This way the hostname that the kafka broker advertises works for both the docker containers and the host system.

Setting Up ColumnStore

Clone this repository and follow the instructions on how to set it up. To make testing of the setup easier, add --network host to the docker command. This will start the ColumnStore container with the host system's network.

After the container is running, execute the following command to copy the ColumnStore configuration file from the container. Replace the mcs-container with the name of your container.

docker cp mcs-container:/usr/local/mariadb/columnstore/etc/Columnstore.xml .

The command copies the Columnstore.xml file that contains all the information needed to connect to ColumnStore.

Creating the Client Application

The following is a minimal client application that streams data into Kafka as Avro.

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaAvroGenerator {

    static final String SCHEMA_STRING = "{"
            + "\"namespace\": \"KafkaAvroGenerator\", "
            + "\"type\": \"record\", "
            + "\"name\": \"Record\", "
            + "\"fields\": ["
            + "{\"name\": \"user_id\", \"type\": \"int\"}, "
            + "{\"name\": \"data\", \"type\": \"string\"}"
            + "]}";

    public static void main(String[] args) {
        org.apache.log4j.BasicConfigurator.configure();

        // The Avro schema for the data
        Schema schema = new Schema.Parser().parse(SCHEMA_STRING);

        // Parameters for the Kafka Producer
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://127.0.0.1:8081");
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        Producer<String, GenericRecord> producer = new KafkaProducer<>(p);

        for (int i = 0; i < 5; i++) {
            // Create a new record, fill it with data and send it with the producer to the 'hello.world' topic
            GenericRecord record = new GenericData.Record(schema);
            record.put("user_id", i);
            record.put("data", "hello world" + Integer.toString(i * i));
            producer.send(new ProducerRecord<>("hello.world", null, record));
        }

        producer.close();
    }
}

You can import the libraries by adding the following dependencies into your pom.xml.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0-cp1</version>
</dependency>
  <dependency>
      <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

After that, build the client with the build system of your liking.

Configuring the Adapter

The adapter is configured with a JSON configuration file. Write the following into a file named config.json located in /etc/mcs-kafka-adapter/.

{
    "options" : {
        "broker"   : "127.0.0.1:9092",
        "registry" : "127.0.0.1:8081",
        "config" : "/path/to/Columnstore.xml"
    },
    "streams" : [
        {
            "topic" : "hello.world",
            "database" : "test",
            "table" : "t1"
        }
    ]
}

The streams field defines an array of streams we want to connect to ColumnStore. The topic field defines the name of the topic to consume and the database and table fields together define the target table where the data is stored.

The options field defines the adapter configuration. The broker field is the address of the Kafka broker, the registry is the address of the Schema Registry and config is the path to the Columnstore.xml file we copied earlier.

Once the configuration is created, the database table needs to be created in ColumnStore. The layout of the table must match the Avro schema of the consumed stream. Execute the following SQL to create a table that matches the schema of the test application.

CREATE TABLE test.t1(user_id INT, data VARCHAR(64));

Starting the Adapter

To start the adapter, run the following command.

kafka_to_avro -c config.json

The -c flag tells where the configuration file is located. The default path that the adapter uses is /etc/mcs-kafka-adapter/config.json.

Once the adapter is running, start the example Java client which will then produce records and send them to the Kafka broker. In a few moments, the ColumnStore table test.t1 should have the data.

Comments

Comments loading...
Content reproduced on this site is the property of its respective owners, and this content is not reviewed in advance by MariaDB. The views, information and opinions expressed by this content do not necessarily represent those of MariaDB or any other party.