Kafka is a popular distributed streaming platform that can be used for building real-time streaming data pipelines and applications. Kafka Connect is a tool that makes it easy to integrate Kafka with other systems. In this blog post, we’ll show you how to use Kafka Connect to connect Kafka with MySQL.

Setup

Before we can connect Kafka with MySQL, we need to set up the required services. We’ll use Docker and Docker Compose to do this. Create a file called docker-compose.yaml with the following contents:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.0.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:7.0.0
    hostname: schema-registry
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_DEBUG: "true"

  mysql:
    image: mysql:5.7
    hostname: mysql
    container_name: mysql
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: my_database

This will create four services:

  • zookeeper: A ZooKeeper service that Kafka depends on.
  • kafka: A Kafka service that will be used to send and receive messages.
  • schema-registry: A schema registry service that will be used to manage Avro schemas.
  • mysql: A MySQL service that will be used to store data.

Run the following command to start the services:

docker-compose up -d

Configure Kafka Connect

Now that we have set up the required services using Docker, let’s move on to configuring Kafka Connect to connect Kafka with MySQL.

Create a file called mysql.properties with the following contents:

name=mysql-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://mysql:3306/my_database
connection.user=root
connection.password=root
topic.prefix=mysql-
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=updated_at
validate.non.null=false

In this file, we have defined the following properties:

  • name: The name of the connector.
  • connector.class: The class of the connector to be used.
  • tasks.max: The number of tasks to be used by the connector.
  • connection.url: The JDBC URL for connecting to the MySQL instance running in our Docker container.
  • connection.user: The username to connect to the database.
  • connection.password: The password to connect to the database.
  • topic.prefix: The prefix to be used for the topics that will be created.
  • mode: The mode of the connector. In this case, we’re using the timestamp+incrementing mode, which tracks both the timestamp and an incrementing column to detect changes in the database.
  • incrementing.column.name: The name of the column used to detect changes in the database.
  • timestamp.column.name: The name of the column used to track the timestamp of the changes.
  • validate.non.null: Should validate non-null columns in the database.

Start the Kafka Connect service

With the configuration file ready, let’s start the Kafka Connect service. To do this, run the following command:

docker run -d \
    --name=kafka-connect \
    --network=kafka-connect-network \
    -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
    -e CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect \
    -e CONNECT_REST_PORT=8083 \
    -e CONNECT_GROUP_ID=kafka-connect-group \
    -e CONNECT_CONFIG_STORAGE_TOPIC=kafka-connect-configs \
    -e CONNECT_OFFSET_STORAGE_TOPIC=kafka-connect-offsets \
    -e CONNECT_STATUS_STORAGE_TOPIC=kafka-connect-status \
    -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
    -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
    -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
    -e CONNECT_PLUGIN_PATH=/usr/share/java \
    -v /path/to/mysql-connector-java-8.0.23.jar:/usr/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.23.jar \
    confluentinc/cp-kafka-connect:7.0.0

This command will start the Kafka Connect service and configure it to use the Kafka service we set up earlier. We’ve also mounted the MySQL Connector/J JAR file to the plugin path so that Kafka Connect can use it.

Create a Kafka topic

Before we can start streaming data from MySQL to Kafka, we need to create a Kafka topic. We can do this using the Kafka command line tool. Run the following command to create a topic called mysql-employees:

docker exec kafka \
    kafka-topics \
    --create \
    --topic mysql-employees \
    --partitions 1 \
    --replication-factor 1 \
    --if-not-exists \
    --zookeeper zookeeper:2181

This will create a new topic called mysql-employees with one partition and a replication factor of one.

Start streaming data

Now that everything is set up, we can start streaming data from MySQL to Kafka. Run the following command to start the MySQL connector:

curl -X POST \
    -H "Content-Type: application/json" \
    --data '{ "name": "mysql-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://mysql:3306/my_database", "connection.user": "root", "connection.password": "root", "topic.prefix": "mysql-", "mode": "timestamp+incrementing", "incrementing.column.name": "id", "timestamp.column.name": "updated_at", "validate.non.null": false } }' \
    http://localhost:8083/connectors

This will start the MySQL connector and begin streaming data from the MySQL employees table to the Kafka topic we created earlier.

Verify data is streaming

To verify that data is streaming from MySQL to Kafka, let’s consume the data from the mysql-employees topic. Run the following command to consume the data:

docker run --rm \
    --network=kafka-connect-network \
    confluentinc/cp-kafkacat \
    kafkacat -b kafka:9092 \
    -t mysql-employees \
    -C -q \
    -o beginning

This command will start consuming data from the mysql-employees topic from the beginning. If there is data in the table, you should see it streaming in the console.

ksqlDB

ksqlDB is a powerful tool that allows you to process and analyze Kafka streams in real-time.

Install

To install ksqlDB, we’ll need to create a Docker Compose file that includes the ksqlDB service. Create a file called docker-compose.yml with the following contents:

version: '3'
services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.21.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    network_mode: host
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
    volumes:
      - ./ksql-server:/etc/ksql
    depends_on:
      - kafka
    ports:
      - 8088:8088
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.21.0
    container_name: ksqldb-cli
    network_mode: host
    tty: true
    depends_on:
      - ksqldb-server

This compose file defines two services: ksqldb-server and ksqldb-cli. ksqldb-server is the ksqlDB service, and ksqldb-cli is the command-line interface for interacting with the ksqlDB service.

Note that we’re using the network_mode: host setting to allow the services to communicate directly with the Kafka service without any additional configuration.

Start ksqlDB

With the docker-compose.yml file ready, let’s start the ksqlDB service. Run the following command to start the services:

docker-compose up -d

This command will start the ksqldb-server service and the ksqldb-cli service. Once the services are running, you can interact with the ksqlDB service using the ksqldb-cli container.

Connect ksqlDB to Kafka

Before we can use ksqlDB to process our Kafka streams, we need to connect it to the Kafka service. To do this, we’ll need to create a ksql.properties file that defines the Kafka bootstrap servers. Create a file called ksql.properties with the following contents:

bootstrap.servers=kafka:9092

This file specifies that ksqlDB should connect to the Kafka service at kafka:9092.

Create a ksqlDB stream

With ksqlDB connected to Kafka, we can now create a stream that represents the data in our MySQL table. Run the following command to create a ksqlDB stream:

CREATE STREAM employees_stream (
  id INT,
  name VARCHAR,
  age INT,
  salary FLOAT,
  updated_at BIGINT
) WITH (
  KAFKA_TOPIC='mysql-employees',
  VALUE_FORMAT='AVRO'
);

With this command, we’re creating a new stream called employees_stream with five fields: id, name, age, salary, and updated_at. We’re using the AVRO format, and we’re defining the schema for the stream using the employees_stream.avro file we created earlier.

Query the ksqlDB stream

Now that we have a ksqlDB stream representing the data in our MySQL table, we can use ksqlDB to query and process the data in real-time. Run the following command to query the employees_stream:

SELECT name, age, salary FROM employees_stream;

This command will select the name, age, and salary fields from the employees_stream. If there is data in the stream, you should see it streaming in the console.

Conclusion

In this tutorial, we’ve seen how to use Kafka Connect to stream data from MySQL to Kafka, and how to use ksqlDB to process and analyze the data in real-time. Kafka Connect provides an easy way to connect data sources to Kafka, and ksqlDB provides a powerful tool for processing and analyzing Kafka streams.

By leveraging these tools, you can build powerful real-time data pipelines that allow you to process and analyze data as it streams into Kafka. Whether you’re building real-time analytics dashboards, or processing large volumes of data in real-time, Kafka Connect and ksqlDB provide the building blocks you need to get started.

If you liked this article, you can buy me a coffee

Categories:

Updated:

Kumar Rohit
WRITTEN BY

Kumar Rohit

I like long drives, bike trip & good food. I have passion for coding, especially for Clean-Code.

Leave a comment