DeBezium
DeBezium简介
DeBezium:功能远远强大与canal、maxwell。构建与kafka之上。
优点:1.处理大容量的数据 2.监控多种数据库:mysql、MongoDB、PostgreSQL、
SQL Server。canal、maxwell限于(mysql)
方式一:
Source connector:(从关系型数据库获取数据)
Sink connector:(将数据输出,一般需要做数据处理分析,所以一般不用)
方式二:部署成Server(当前只能sink到kinesis)
方式三:嵌入式引擎(Flink CDC就是内置了DeBezium)
文档:https://debezium.io/documentation/reference/2.0/
mysql连接器
下载安装 mysql连接器
mkdir -p /opt/debezium/connector/
cd /opt/debezium/connector/
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.4.Final/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
tar -xvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
cp /opt/debezium/connector/debezium-connector-mysql/*.jar /opt/kafka/kafka/libs/
修改kafka配置
cd /opt/kafka/kafka/config/
vim connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/opt/debezium/connector/
重启kafka
cd /opt/kafka/kafka/bin/
./kafka-server-stop.sh;./zookeeper-server-stop.sh;
./zookeeper-server-start.sh -daemon /opt/kafka/kafka/config/zookeeper.properties
./kafka-server-start.sh -daemon /opt/kafka/kafka/config/server.properties
###启动 kafka connector
./connect-distributed.sh -daemon /opt/kafka/kafka/config/connect-distributed.properties
检测
curl -H "Accept:application/json" localhost:8083/connector-plugins
curl -H "Accept:application/json" localhost:8083/connectors
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"mysql_connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"localhost","database.port":"3306","database.user":"root","database.password":"123456","database.server.id":"184","database.server.name":"student","test.student":"inventory","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"dbhistory.student","include.schema.changes":"true"}}'
curl -i -X DELETE localhost:8083/connectors/inventory-connector
辅助命令
./kafka-topics.sh
./kafka-topics.sh
./kafka-topics.sh
./kafka-console-producer.sh
./kafka-console-consumer.sh
./kafka-console-consumer.sh
cd /opt/mysql/mysql/bin
./mysql -u root -p'123456'
use test;