Confluent Kafka Sink connector for MySQL

Nirav Langaliya
2 min readSep 26, 2019

##Download Confluent Community version.
https://www.confluent.io/confirmation/

## Install Confluent Kafka Local. (Untar setup file)

tar -xvf confluent-community-5.3.1–2.12.tar

##Setup Conflunet Cli.

curl -L https://cnfl.io/cli | sh -s — -b ~/confluent-5.3.1/bin

##Include Confluent bin into PATH.

export PATH=${PATH}:~/confluent-5.3.1/bin

## Start Conflunet

confluent local start
Confluent Start message.

##Download MySQL Jar file place under

cp mysql-connector-java-5.1.48.jar ~/confluent-5.3.1/share/java/kafka-connect-jdbc
cp mysql-connector-java-5.1.48-bin.jar ~/confluent-5.3.1/share/java/kafka-connect-jdbc
MySQL Jar Files

## Create sink MySQL property file for Confluent connector

sink-quickstart-mysql.properties and place it under ~/confluent-5.3.1/etc/kafka-connect-jdbc/

name=test_mysql_sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=orders
connection.url=jdbc:mysql://localhost:3306/demo?user=mysqluser&password=pw
auto.create=true
value.converter.schema.registry.url=http://localhost:8081
key.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

## Load MySQL Connector : test_mysql_sink

confluent local load test_mysql_sink -- -d sink-quickstart-mysql.properties
Output Message.

##Check status of Connector.

confluent local status test_mysql_sink

##Now Produce Avro Messages:

Note: Change topic name at line #33 to orders from Test_MovieData

## Logon to MySQL and verify that the Order table is created under demo schema and records are loaded.

if you get Java Error while start confluent then install java 1.8

The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Current Java version ‘12’ is unsupported at this time. Confluent CLI will exit.ERROR: The Confluent CLI requires Java version 1.8 or 1.11.
See https://docs.confluent.io/current/installation/versions-interoperability.html .
If you have multiple versions of Java installed, you may need to set JAVA_HOME to the version you want Confluent to use.

##Setup JAVA_HOME

export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)

Reference:

https://www.ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/com.ibm.swg.im.infosphere.biginsights.admin.doc/doc/admin_kafka_jdbc_sink.html

Confluent Kafka Python.

https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/

--

--