前言:kafka集群搭建必须依赖于zookeeper。本次搭建选择的软件及版本
定义解释:
Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。
软件环境:
(服务器选取数量遵从 2*n+1 )
192.168.125.131 server1
192.168.125.132 server2
192.168.125.133 server3
1. Linux服务器一台、三台、五台、(2*n+1),Zookeeper集群的工作是超过半数才能对外提供服务,3台中超过两台超过半数,允许1台挂掉 ,是否可以用偶数,其实没必要。
如果有四台那么挂掉一台还剩下三台服务器,如果在挂掉一个就不行了,这里记住是超过半数。
2. jdk:zookeeper是用java写的所以他的需要JAVA环境,需要先配置jdk,本次选取jdk8。
3. Zookeeper的版本 apache-zookeeper-3.5.5-bin(一定用这个带bin的)
下载地址:
https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
4. kafka的版本kafka_2.12-2.3.0
下载地址:
https://mirrors.cnnic.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
Zookeeper集群搭建:
Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
dk8配置自行百度,别忘了配置环境变量
安装路径在/usr/local/
1.通过wget方式下载zookeeper包
[root@localhost local]# wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
2.解压tar包
[root@localhost local]# tar -zxvf kafka_2.12-2.3.0.tgz
3.修改包名称为zookeeper
[root@localhost local]# mv kafka_2.12-2.3.0 kafka
4.进入zookeeper中的conf文件下,zookeeper提供了zoo_sample.cfg,复制一个zoo_sample.cfg将其名称修改为zoo.cfg
[root@localhost local]# cd zookeeper/conf
5.修改 zoo.cfg(每个服务器的conf都按照这个改)
[root@localhost conf]# vim zoo.cfg
conf内容解释:
# 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
tickTime=2000
# initLimit表示用于从节点与主节点之间建立初始化连接的时间上限 10个心跳时间
initLimit=10
# syncLimit表示允许从节点与主节点之间处于不同步状态的时间上限
syncLimit=5
# 快照日志的存储路径 在zookeeper下创建zkdatalog、zkdata
dataDir=/usr/local/zookeeper/zkdata
# 事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
dataLogDir=/usr/local/zookeeper/zkdatalog
# 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
clientPort=2181
# zookeeper集群的配置 server.1要与myid中的值一样
server.1=192.168.125.133:12888:13888
server.2=192.168.125.131:12888:13888
server.3=192.168.125.132:12888:13888
创建myid(该文件放在zkdata文件夹下)
#server1
echo "1" > /usr/local/zookeeper/zkdata/myid
#server2
echo "2" > /usr/local/zookeeper/zkdata/myid
#server3
echo "3" > /usr/local/zookeeper/zkdata/myid
重要说明:
ZooKeeper server will not remove old snapshots and log files when using the default configuration (see > > > autopurge below), this is the responsibility of the operator
zookeeper 不会主动的清除旧的快照和日志文件,这个是操作者的责任。
但是可以通过命令去定期的清理。
#!/bin/bash
#snapshot file dir
dataDir=/usr/local/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/usr/local/zookeeper/zkdatalog/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。
#zk log dir del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f
其他方法:
第二种:使用ZK的工具类PurgeTxnLog,它的实现了一种简单的历史文件清理策略,可以在这里看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html
第三种:对于上面这个执行,ZK自己已经写好了脚本,在bin/zkCleanup.sh中,所以直接使用这个脚本也是可以执行清理工作的。
第四种:从3.4.0开始,zookeeper提供了自动清理snapshot和事务日志的功能,通过配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:
autopurge.purgeInterval 这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。
autopurge.snapRetainCount 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。
推荐使用第一种方法,对于运维人员来说,将日志清理工作独立出来,便于统一管理也更可控。毕竟zk自带的一些工具并不怎么给力。
启动服务并查看
启动服务
#进入到Zookeeper的bin目录下
cd /usr/local/zookeeper/zookeeper-3.4.6/bin
#启动服务(3台都需要操作)
[root@localhost bin]# sh zkServer.sh start
检查服务状态
#检查服务器状态
[root@localhost bin]# sh zkServer.sh status
通过status就能看到状态:
[root@localhost bin]# sh zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。
可以用“jps”查看zk的进程,这个是zk的整个工程的main
#执行命令jps:该命令是java虚拟机中查看HotSpot虚拟机中进程的工具 在jdk/bin目录下
20348 Jps
4233 QuorumPeerMain #代表zookeeper已经启动成功
Kafka集群搭建
创建目录并下载安装软件
#下载软件
[root@localhost tarbag]# wget https://mirrors.cnnic.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#解压kafka
[root@localhost tarbag]# tar -zxvf kafka_2.12-2.3.0.tgz
# 移动到usr/local,并改名称为kafka 目录下
[root@localhost tarbag]# mv kafka_2.12-2.3.0 ../kafka
#修改server.properties文件
[root@localhost kafka]# cd config/
[root@localhost config]# vim server.properties
主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群
修改配置文件:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
#当前kafka对外提供服务的端口默认是9092
# 注意加 advertised 前缀,当kafka启动完成,发送消息时,9092端口监听不到。端口最好不要选用1024以下的,因为需要root权限启动kafka。
advertised.listeners = PLAINTEXT://192.168.125.133:9092
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/local/kafka/kafkalogs/ # kafka把消息都放在磁盘上,存放日志片段的目录都是有log。dirs指定的。他是用“,”逗号分隔的本地文件系统路径。如果指定了多个路径,那么broker会根据“最少使用”原则,把同一个分区的日志片段放到同一个路径下。要注意,broker会往拥有最少分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.125.131:2181,192.168.125.132:2181,192.168.125.133:2181 #设置zookeeper的连接端口
上面是参数的解释,实际的修改项为:
# 每台服务器的broker.id都不能相同
broker.id=0
# 监听端口
advertised.listeners = PLAINTEXT://192.168.125.133:9092
#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口
zookeeper.connect=192.168.125.131:2181,192.168.125.132:2181,192.168.125.133:2181
启动Kafka集群并测试
启动服务
#从后台启动Kafka集群(3台都需要启动)
[root@localhost bin]# sh kafka-server-start.sh -daemon ../config/server.properties
# 检查服务是否启动
[root@localhost bin]# jps
32066 Jps
32042 Kafka
30571 QuorumPeerMain
创建Topic来验证是否创建成功
# 1.创建主题
[root@localhost bin]# ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic yshtest
#解释
--replication-factor 2 #复制两份
--partitions 1 #创建1个分区
--topic #主题为yshtest
# 2.如果我们运行list topic命令,我们现在可以看到该主题:
[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server localhost:9092
shuaige
test
test1
yshtest
# 在一台服务器上创建一个发布者
[root@localhost bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic yshtest
>123
>1233445
>usasdj
>assadfsdfsdafas
# 在一台服务器上创建一个订阅者
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yshtest --from-beginning
123
1233445
usasdj
assadfsdfsdafas
^CProcessed a total of 4 messages
完成!!!
参考文章:Kafka集群搭建 https://www.cnblogs.com/mikeguan/p/7079013.html
kafka官网 http://kafka.apache.org/documentation.html#quickstart
留言与评论(共有 0 条评论) |