Kafka is a popular distributed streaming platform that can be used for building real-time streaming data pipelines and applications. Kafka Connect is a tool that makes it easy to integrate Kafka with other systems. In this blog post, we’ll show you how to use Kafka Connect to connect Kafka with MySQL.
Setup
Before we can connect Kafka with MySQL, we need to set up the required services. We’ll use Docker and Docker Compose to do this. Create a file called docker-compose.yaml with the following contents:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.0.0
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_DEBUG: "true"
mysql:
image: mysql:5.7
hostname: mysql
container_name: mysql
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: my_database
This will create four services:
zookeeper
: A ZooKeeper service that Kafka depends on.kafka
: A Kafka service that will be used to send and receive messages.schema-registry
: A schema registry service that will be used to manage Avro schemas.mysql
: A MySQL service that will be used to store data.
Run the following command to start the services:
docker-compose up -d
Configure Kafka Connect
Now that we have set up the required services using Docker, let’s move on to configuring Kafka Connect to connect Kafka with MySQL.
Create a file called mysql.properties
with the following contents:
name=mysql-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://mysql:3306/my_database
connection.user=root
connection.password=root
topic.prefix=mysql-
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=updated_at
validate.non.null=false
In this file, we have defined the following properties:
name
: The name of the connector.connector.class
: The class of the connector to be used.tasks.max
: The number of tasks to be used by the connector.connection.url
: The JDBC URL for connecting to the MySQL instance running in our Docker container.connection.user
: The username to connect to the database.connection.password
: The password to connect to the database.topic.prefix
: The prefix to be used for the topics that will be created.mode
: The mode of the connector. In this case, we’re using the timestamp+incrementing mode, which tracks both the timestamp and an incrementing column to detect changes in the database.incrementing.column.name
: The name of the column used to detect changes in the database.timestamp.column.name
: The name of the column used to track the timestamp of the changes.validate.non.null
: Should validate non-null columns in the database.
Start the Kafka Connect service
With the configuration file ready, let’s start the Kafka Connect service. To do this, run the following command:
docker run -d \
--name=kafka-connect \
--network=kafka-connect-network \
-e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
-e CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID=kafka-connect-group \
-e CONNECT_CONFIG_STORAGE_TOPIC=kafka-connect-configs \
-e CONNECT_OFFSET_STORAGE_TOPIC=kafka-connect-offsets \
-e CONNECT_STATUS_STORAGE_TOPIC=kafka-connect-status \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_PLUGIN_PATH=/usr/share/java \
-v /path/to/mysql-connector-java-8.0.23.jar:/usr/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.23.jar \
confluentinc/cp-kafka-connect:7.0.0
This command will start the Kafka Connect service and configure it to use the Kafka service we set up earlier. We’ve also mounted the MySQL Connector/J JAR file to the plugin path so that Kafka Connect can use it.
Create a Kafka topic
Before we can start streaming data from MySQL to Kafka, we need to create a Kafka topic. We can do this using the Kafka command line tool. Run the following command to create a topic called mysql-employees
:
docker exec kafka \
kafka-topics \
--create \
--topic mysql-employees \
--partitions 1 \
--replication-factor 1 \
--if-not-exists \
--zookeeper zookeeper:2181
This will create a new topic called mysql-employees
with one partition and a replication factor of one.
Start streaming data
Now that everything is set up, we can start streaming data from MySQL to Kafka. Run the following command to start the MySQL connector:
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "mysql-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://mysql:3306/my_database", "connection.user": "root", "connection.password": "root", "topic.prefix": "mysql-", "mode": "timestamp+incrementing", "incrementing.column.name": "id", "timestamp.column.name": "updated_at", "validate.non.null": false } }' \
http://localhost:8083/connectors
This will start the MySQL connector and begin streaming data from the MySQL employees
table to the Kafka topic we created earlier.
Verify data is streaming
To verify that data is streaming from MySQL to Kafka, let’s consume the data from the mysql-employees
topic. Run the following command to consume the data:
docker run --rm \
--network=kafka-connect-network \
confluentinc/cp-kafkacat \
kafkacat -b kafka:9092 \
-t mysql-employees \
-C -q \
-o beginning
This command will start consuming data from the mysql-employees
topic from the beginning. If there is data in the table, you should see it streaming in the console.
ksqlDB
ksqlDB is a powerful tool that allows you to process and analyze Kafka streams in real-time.
Install
To install ksqlDB, we’ll need to create a Docker Compose file that includes the ksqlDB service. Create a file called docker-compose.yml
with the following contents:
version: '3'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.21.0
hostname: ksqldb-server
container_name: ksqldb-server
network_mode: host
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
volumes:
- ./ksql-server:/etc/ksql
depends_on:
- kafka
ports:
- 8088:8088
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.21.0
container_name: ksqldb-cli
network_mode: host
tty: true
depends_on:
- ksqldb-server
This compose
file defines two services: ksqldb-server
and ksqldb-cli
. ksqldb-server
is the ksqlDB service, and ksqldb-cli
is the command-line interface for interacting with the ksqlDB service.
Note that we’re using the network_mode: host
setting to allow the services to communicate directly with the Kafka service without any additional configuration.
Start ksqlDB
With the docker-compose.yml
file ready, let’s start the ksqlDB service. Run the following command to start the services:
docker-compose up -d
This command will start the ksqldb-server
service and the ksqldb-cli
service. Once the services are running, you can interact with the ksqlDB service using the ksqldb-cli
container.
Connect ksqlDB to Kafka
Before we can use ksqlDB to process our Kafka streams, we need to connect it to the Kafka service. To do this, we’ll need to create a ksql.properties
file that defines the Kafka bootstrap servers. Create a file called ksql.properties
with the following contents:
bootstrap.servers=kafka:9092
This file specifies that ksqlDB should connect to the Kafka service at kafka:9092
.
Create a ksqlDB stream
With ksqlDB connected to Kafka, we can now create a stream that represents the data in our MySQL table. Run the following command to create a ksqlDB stream:
CREATE STREAM employees_stream (
id INT,
name VARCHAR,
age INT,
salary FLOAT,
updated_at BIGINT
) WITH (
KAFKA_TOPIC='mysql-employees',
VALUE_FORMAT='AVRO'
);
With this command, we’re creating a new stream called employees_stream with five fields: id, name, age, salary, and updated_at. We’re using the AVRO format, and we’re defining the schema for the stream using the employees_stream.avro file we created earlier.
Query the ksqlDB stream
Now that we have a ksqlDB stream representing the data in our MySQL table, we can use ksqlDB to query and process the data in real-time. Run the following command to query the employees_stream
:
SELECT name, age, salary FROM employees_stream;
This command will select the name
, age
, and salary
fields from the employees_stream
. If there is data in the stream, you should see it streaming in the console.
Conclusion
In this tutorial, we’ve seen how to use Kafka Connect to stream data from MySQL to Kafka, and how to use ksqlDB to process and analyze the data in real-time. Kafka Connect provides an easy way to connect data sources to Kafka, and ksqlDB provides a powerful tool for processing and analyzing Kafka streams.
By leveraging these tools, you can build powerful real-time data pipelines that allow you to process and analyze data as it streams into Kafka. Whether you’re building real-time analytics dashboards, or processing large volumes of data in real-time, Kafka Connect and ksqlDB provide the building blocks you need to get started.
If you liked this article, you can buy me a coffee
Leave a comment