搜索

利用kafka自带的zookeeper搭建kafka集群 利用kafka自带的zookeeper搭建kafka集群


发布时间: 2022-11-24 18:42:07    浏览次数:34 次

利用kafka自带的zookeeper搭建kafka集群 

 

搭建kafka集群是需要zookeeper的,可是kafka自身就已经带了一个zookeeper,所以不需要额外搭建zookeeper的集群,只需要将kafka自带的zookeeper配置成一个集群就可以。

目录
1、kafka的下载和安装
2、配置zookeeper
3、配置kafka
4、启动zookeeper集群
5、启动kafka集群

1、kafka的下载和安装

本次安装采用的kafka版本是2.3.0。

首先,下载安装包。
从kafka的官方网站上找到相应的下载链接之后,用wget下载(wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz),之后得到一个安装包kafka_2.12-2.3.0.tgz。

其次,安装。
预计将kafka安装在/usr/local/目录下,所以将下载的安装包解压到/usr/local/目录下(tar -xzf kafka_2.12-2.3.0.tgz -C /usr/local),解压后得到一个名为kafka_2.12-2.3.0的文件夹,这就是kafka的安装目录了,为了进入方便给安装目录做一个软连接ln -s kafka_2.12-2.3.0 kafka,这样以后cd kafka的时候,实际上就是进入了kafka安装目录kafka_2.12-2.3.0中。

第三,安装集群规划。
预计安装一个3节点的集群,3个节点的ip如下:

172.17.2.136
172.17.2.138
172.17.2.139

2、配置zookeeper

kafka自带的zookeeper的配置文件是kafka安装目录下config/zookeeper.properties。

第一步,建数据目录和日志目录。
为了配置zookeeper,要提前为zookeeper建好数据目录和日志目录。

cd /usr/local/kafka mkdir -p zookeeper/data zookeeper/log 

第二步,在配置文件中写入配置项。
要配置哪些配置项是提前考虑好的,如下所示:

# the directory where the snapshot is stored. dataDir=/usr/local/kafka/zookeeper/data #修改为自定义的zookeeper日志目录 dataLogDir=/usr/local/kafka/zookeeper/log # the port at which the clients will connect clientPort=2181 #注释掉 #maxClientCnxns=0 #设置连接参数,添加如下配置 #为zk的基本时间单元,毫秒 tickTime=2000 #Leader-Follower初始通信时限 tickTime*10 initLimit=10 #Leader-Follower同步通信时限 tickTime*5 syncLimit=5 #设置broker Id的服务地址 server.0=172.17.2.136:2888:3888 server.1=172.17.2.138:2888:3888 server.2=172.17.2.139:2888:3888 

第三步,建myid文件
在zookeeper的数据目录/usr/local/zookeeper/data下,建一个文本文件myid,内容为每个zookeeper节点的编号。因为是3个节点kafka集群,所以zookeeper集群也是3个节点,他们的编号分别是0、1、2.

经过以上3个步骤,一个节点的zookeeper就配置完成了。

3、配置kafka

kafka的配置文件是config/server.properties。

第一步,创建kafka数据日志目录。注意这里虽然叫日志目录,可不是真的kafka运行日志存放的目录,耳熟kafka中的消息数据存放的目录,因为kafka中消息是以write append log的形式存放的,所以这里的日志目录实际是数据目录,但是数据又的确是以日志的形式存放,所以就叫数据日志目录。

# 创建kafka的数据日志目录,命名为kfkwalog mkdir -p /usr/local/kafka/kfkwalog 

第二步,写入配置项。
kafka的配置项很多,需要更改的配置项如下:

# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://172.17.2.136:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://172.17.2.136:9092 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/usr/local/kafka/kfkwalog ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=172.17.2.136:2181,172.17.2.138:2181,172.17.2.139:2181 

经过以上3步骤,136一个kafka节点配置好了,然后将kafka安装目录打包,之后scp到138、139两个节点,同时在138和139两个节点做对应的修改,主要包括:
(1)是创建zookeeper的数据目录zookeeper/data和日志目录zookeeper/log
(2)创建zookeeper数据目录下的myid文件,138下myid内容是1,138下myid的内容是2。这样以后,136的myid是0,138的myid是2,139的myid是2。
(3)创建kafka的数据日志目录kfkwalog
(4)修改kafka配置文件中的broker.id,136的broker.id是0,138的broker.id是1,139的broker.id是2
(5)修改kafka配置文件中的listeners和advertised.listeners,将其中的ip修改为对应的节点ip。
就完成了kafka集群和zookeeper集群的配置工作了。

4、启动zookeeper集群

三个节点依次启动,首先启动136上的zookeeper,然后启动138、139的。

首先,启动采用如下的命令启动zookeeper。

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper/zookeeper.log 2>1 & 

然后,查看zookeeper是否启动成功。

[root@kfkby ~]# jps -l 130374 sun.tools.jps.Jps 111887 kafka.Kafka 47439 org.apache.zookeeper.server.quorum.QuorumPeerMain #如果jps有这行输出,说明zookeeper已经正常启动起来了 

5、启动kafka集群

启动kafka和查看kafka启动是否成功的命令如下:

nohup bin/kafka-server-start.sh config/server.properties > logs/kafka/kafka.log 2>1 & [root@kfkby ~]# jps -l 130374 sun.tools.jps.Jps 111887 kafka.Kafka # 如果jps有这一行输出,说明kafka已经正常启动起来了 47439 org.apache.zookeeper.server.quorum.QuorumPeerMain 用下面的命令测试kafka是否正常启动了: [root@THQ-DEV-5 bin]# ./kafka-topics.sh --bootstrap-server 192.168.0.10:9092 --list EDGE_QY_VEH_INFO V2x_V1_Screen_Config_Down __consumer_offsets edge-qyobu-up test v2x_v1_obu_cmd_down v2x_v1_obu_original_up [root@THQ-DEV-5 bin]# 
 

四、测试Kafka集群
1、创建topic:test

/home/erp/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181 --replication-factor 1 --partitions 1 --topic test

2、列出已创建的topic列表

/home/erp/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

3、模拟客户端去发送消息

/home/erp/kafka/bin/kafka-console-producer.sh --broker-list 192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092 --topic test

4、模拟客户端去接受消息

/home/erp/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

搭建kafka集群是需要zookeeper的,可是kafka自身就已经带了一个zookeeper,所以不需要额外搭建zookeeper的集群,只需要将kafka自带的zookeeper配置成一个集群就可以。

目录
1、kafka的下载和安装
2、配置zookeeper
3、配置kafka
4、启动zookeeper集群
5、启动kafka集群

1、kafka的下载和安装

本次安装采用的kafka版本是2.3.0。

首先,下载安装包。
从kafka的官方网站上找到相应的下载链接之后,用wget下载(wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz),之后得到一个安装包kafka_2.12-2.3.0.tgz。

其次,安装。
预计将kafka安装在/usr/local/目录下,所以将下载的安装包解压到/usr/local/目录下(tar -xzf kafka_2.12-2.3.0.tgz -C /usr/local),解压后得到一个名为kafka_2.12-2.3.0的文件夹,这就是kafka的安装目录了,为了进入方便给安装目录做一个软连接ln -s kafka_2.12-2.3.0 kafka,这样以后cd kafka的时候,实际上就是进入了kafka安装目录kafka_2.12-2.3.0中。

第三,安装集群规划。
预计安装一个3节点的集群,3个节点的ip如下:

172.17.2.136
172.17.2.138
172.17.2.139

2、配置zookeeper

kafka自带的zookeeper的配置文件是kafka安装目录下config/zookeeper.properties。

第一步,建数据目录和日志目录。
为了配置zookeeper,要提前为zookeeper建好数据目录和日志目录。

cd /usr/local/kafka mkdir -p zookeeper/data zookeeper/log 

第二步,在配置文件中写入配置项。
要配置哪些配置项是提前考虑好的,如下所示:

# the directory where the snapshot is stored. dataDir=/usr/local/kafka/zookeeper/data #修改为自定义的zookeeper日志目录 dataLogDir=/usr/local/kafka/zookeeper/log # the port at which the clients will connect clientPort=2181 #注释掉 #maxClientCnxns=0 #设置连接参数,添加如下配置 #为zk的基本时间单元,毫秒 tickTime=2000 #Leader-Follower初始通信时限 tickTime*10 initLimit=10 #Leader-Follower同步通信时限 tickTime*5 syncLimit=5 #设置broker Id的服务地址 server.0=172.17.2.136:2888:3888 server.1=172.17.2.138:2888:3888 server.2=172.17.2.139:2888:3888 

第三步,建myid文件
在zookeeper的数据目录/usr/local/zookeeper/data下,建一个文本文件myid,内容为每个zookeeper节点的编号。因为是3个节点kafka集群,所以zookeeper集群也是3个节点,他们的编号分别是0、1、2.

经过以上3个步骤,一个节点的zookeeper就配置完成了。

3、配置kafka

kafka的配置文件是config/server.properties。

第一步,创建kafka数据日志目录。注意这里虽然叫日志目录,可不是真的kafka运行日志存放的目录,耳熟kafka中的消息数据存放的目录,因为kafka中消息是以write append log的形式存放的,所以这里的日志目录实际是数据目录,但是数据又的确是以日志的形式存放,所以就叫数据日志目录。

# 创建kafka的数据日志目录,命名为kfkwalog mkdir -p /usr/local/kafka/kfkwalog 

第二步,写入配置项。
kafka的配置项很多,需要更改的配置项如下:

# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://172.17.2.136:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://172.17.2.136:9092 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/usr/local/kafka/kfkwalog ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=172.17.2.136:2181,172.17.2.138:2181,172.17.2.139:2181 

经过以上3步骤,136一个kafka节点配置好了,然后将kafka安装目录打包,之后scp到138、139两个节点,同时在138和139两个节点做对应的修改,主要包括:
(1)是创建zookeeper的数据目录zookeeper/data和日志目录zookeeper/log
(2)创建zookeeper数据目录下的myid文件,138下myid内容是1,138下myid的内容是2。这样以后,136的myid是0,138的myid是2,139的myid是2。
(3)创建kafka的数据日志目录kfkwalog
(4)修改kafka配置文件中的broker.id,136的broker.id是0,138的broker.id是1,139的broker.id是2
(5)修改kafka配置文件中的listeners和advertised.listeners,将其中的ip修改为对应的节点ip。
就完成了kafka集群和zookeeper集群的配置工作了。

4、启动zookeeper集群

三个节点依次启动,首先启动136上的zookeeper,然后启动138、139的。

首先,启动采用如下的命令启动zookeeper。

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper/zookeeper.log 2>1 & 

然后,查看zookeeper是否启动成功。

[root@kfkby ~]# jps -l 130374 sun.tools.jps.Jps 111887 kafka.Kafka 47439 org.apache.zookeeper.server.quorum.QuorumPeerMain #如果jps有这行输出,说明zookeeper已经正常启动起来了 

5、启动kafka集群

启动kafka和查看kafka启动是否成功的命令如下:

nohup bin/kafka-server-start.sh config/server.properties > logs/kafka/kafka.log 2>1 & [root@kfkby ~]# jps -l 130374 sun.tools.jps.Jps 111887 kafka.Kafka # 如果jps有这一行输出,说明kafka已经正常启动起来了 47439 org.apache.zookeeper.server.quorum.QuorumPeerMain 用下面的命令测试kafka是否正常启动了: [root@THQ-DEV-5 bin]# ./kafka-topics.sh --bootstrap-server 192.168.0.10:9092 --list EDGE_QY_VEH_INFO V2x_V1_Screen_Config_Down __consumer_offsets edge-qyobu-up test v2x_v1_obu_cmd_down v2x_v1_obu_original_up [root@THQ-DEV-5 bin]# 
 

四、测试Kafka集群
1、创建topic:test

/home/erp/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181 --replication-factor 1 --partitions 1 --topic test

2、列出已创建的topic列表

/home/erp/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

3、模拟客户端去发送消息

/home/erp/kafka/bin/kafka-console-producer.sh --broker-list 192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092 --topic test

4、模拟客户端去接受消息

/home/erp/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

免责声明 利用kafka自带的zookeeper搭建kafka集群 利用kafka自带的zookeeper搭建kafka集群,资源类别:文本, 浏览次数:34 次, 文件大小:-- , 由本站蜘蛛搜索收录2022-11-24 06:42:07。此页面由程序自动采集,只作交流和学习使用,本站不储存任何资源文件,如有侵权内容请联系我们举报删除, 感谢您对本站的支持。 原文链接:https://www.cnblogs.com/iancloud/p/16810510.html