Kafka集群搭建

前言:kafka集群搭建必须依赖于zookeeper。本次搭建选择的软件及版本

  1. apache-zookeeper-3.5.5-bin
  2. jdk8
  3. kafka_2.12-2.3.0

定义解释:

Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。

软件环境:

(服务器选取数量遵从 2*n+1 )

192.168.125.131 server1

192.168.125.132 server2

192.168.125.133 server3

1. Linux服务器一台、三台、五台、(2*n+1),Zookeeper集群的工作是超过半数才能对外提供服务,3台中超过两台超过半数,允许1台挂掉 ,是否可以用偶数,其实没必要。

如果有四台那么挂掉一台还剩下三台服务器,如果在挂掉一个就不行了,这里记住是超过半数。

2. jdk:zookeeper是用java写的所以他的需要JAVA环境,需要先配置jdk,本次选取jdk8。

3. Zookeeper的版本 apache-zookeeper-3.5.5-bin(一定用这个带bin的

下载地址:

https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz

4. kafka的版本kafka_2.12-2.3.0

下载地址:

https://mirrors.cnnic.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz


Zookeeper集群搭建:

Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。

dk8配置自行百度,别忘了配置环境变量

安装路径在/usr/local/
1.通过wget方式下载zookeeper包
[root@localhost local]# wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
2.解压tar包
[root@localhost local]# tar -zxvf kafka_2.12-2.3.0.tgz
3.修改包名称为zookeeper
[root@localhost local]# mv kafka_2.12-2.3.0 kafka
4.进入zookeeper中的conf文件下,zookeeper提供了zoo_sample.cfg,复制一个zoo_sample.cfg将其名称修改为zoo.cfg
[root@localhost local]# cd zookeeper/conf
5.修改 zoo.cfg(每个服务器的conf都按照这个改)
[root@localhost conf]# vim zoo.cfg

conf内容解释:

# 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
tickTime=2000

# initLimit表示用于从节点与主节点之间建立初始化连接的时间上限 10个心跳时间
initLimit=10

# syncLimit表示允许从节点与主节点之间处于不同步状态的时间上限
syncLimit=5

# 快照日志的存储路径 在zookeeper下创建zkdatalog、zkdata
dataDir=/usr/local/zookeeper/zkdata

# 事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
dataLogDir=/usr/local/zookeeper/zkdatalog


# 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
clientPort=2181

# zookeeper集群的配置 server.1要与myid中的值一样
server.1=192.168.125.133:12888:13888
server.2=192.168.125.131:12888:13888
server.3=192.168.125.132:12888:13888

创建myid(该文件放在zkdata文件夹下)

#server1
echo "1" > /usr/local/zookeeper/zkdata/myid
#server2
echo "2" > /usr/local/zookeeper/zkdata/myid
#server3
echo "3" > /usr/local/zookeeper/zkdata/myid

重要说明:

  1. myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。
  2. zoo.cfg 文件是zookeeper配置文件 在conf目录里。
  3. log4j.properties文件是zk的日志输出文件 在conf目录里用java写的程序基本上有个共同点日志都用log4j,来进行管理。
  4. zkEnv.sh和zkServer.sh文件
  5. zkServer.sh 主的管理程序文件
  6. zkEnv.sh 是主要配置,zookeeper集群启动时配置环境变量的文件
  7. 还有一个需要注意

ZooKeeper server will not remove old snapshots and log files when using the default configuration (see > > > autopurge below), this is the responsibility of the operator

zookeeper 不会主动的清除旧的快照和日志文件,这个是操作者的责任。

但是可以通过命令去定期的清理。

#!/bin/bash 

#snapshot file dir
dataDir=/usr/local/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/usr/local/zookeeper/zkdatalog/version-2

#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f

#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。


#zk log dir del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f

其他方法:

第二种:使用ZK的工具类PurgeTxnLog,它的实现了一种简单的历史文件清理策略,可以在这里看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html

第三种:对于上面这个执行,ZK自己已经写好了脚本,在bin/zkCleanup.sh中,所以直接使用这个脚本也是可以执行清理工作的。

第四种:从3.4.0开始,zookeeper提供了自动清理snapshot和事务日志的功能,通过配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:

autopurge.purgeInterval 这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。

autopurge.snapRetainCount 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。

推荐使用第一种方法,对于运维人员来说,将日志清理工作独立出来,便于统一管理也更可控。毕竟zk自带的一些工具并不怎么给力。

启动服务并查看

启动服务

#进入到Zookeeper的bin目录下
cd /usr/local/zookeeper/zookeeper-3.4.6/bin
#启动服务(3台都需要操作)
[root@localhost bin]# sh zkServer.sh start

检查服务状态

#检查服务器状态
[root@localhost bin]# sh zkServer.sh status

通过status就能看到状态:

[root@localhost bin]# sh zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

可以用“jps”查看zk的进程,这个是zk的整个工程的main

#执行命令jps:该命令是java虚拟机中查看HotSpot虚拟机中进程的工具 在jdk/bin目录下
20348 Jps
4233 QuorumPeerMain #代表zookeeper已经启动成功

Kafka集群搭建

创建目录并下载安装软件

#下载软件
[root@localhost tarbag]# wget https://mirrors.cnnic.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#解压kafka
[root@localhost tarbag]# tar -zxvf kafka_2.12-2.3.0.tgz
# 移动到usr/local,并改名称为kafka 目录下
[root@localhost tarbag]# mv kafka_2.12-2.3.0 ../kafka
#修改server.properties文件
[root@localhost kafka]# cd config/
[root@localhost config]# vim server.properties

主要关注:server.properties 这个文件即可,我们可以发现在目录下:

有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

修改配置文件:

broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
#当前kafka对外提供服务的端口默认是9092
# 注意加 advertised 前缀,当kafka启动完成,发送消息时,9092端口监听不到。端口最好不要选用1024以下的,因为需要root权限启动kafka。
advertised.listeners = PLAINTEXT://192.168.125.133:9092

num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/local/kafka/kafkalogs/ # kafka把消息都放在磁盘上,存放日志片段的目录都是有log。dirs指定的。他是用“,”逗号分隔的本地文件系统路径。如果指定了多个路径,那么broker会根据“最少使用”原则,把同一个分区的日志片段放到同一个路径下。要注意,broker会往拥有最少分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.125.131:2181,192.168.125.132:2181,192.168.125.133:2181 #设置zookeeper的连接端口

上面是参数的解释,实际的修改项为:

 # 每台服务器的broker.id都不能相同
broker.id=0
# 监听端口
advertised.listeners = PLAINTEXT://192.168.125.133:9092
#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口
zookeeper.connect=192.168.125.131:2181,192.168.125.132:2181,192.168.125.133:2181

启动Kafka集群并测试

启动服务

#从后台启动Kafka集群(3台都需要启动)
[root@localhost bin]# sh kafka-server-start.sh -daemon ../config/server.properties

# 检查服务是否启动
[root@localhost bin]# jps
32066 Jps
32042 Kafka
30571 QuorumPeerMain

创建Topic来验证是否创建成功

# 1.创建主题
[root@localhost bin]# ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic yshtest
#解释
--replication-factor 2 #复制两份
--partitions 1 #创建1个分区
--topic #主题为yshtest
# 2.如果我们运行list topic命令,我们现在可以看到该主题:
[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server localhost:9092
shuaige
test
test1
yshtest
# 在一台服务器上创建一个发布者
[root@localhost bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic yshtest
>123
>1233445
>usasdj
>assadfsdfsdafas
# 在一台服务器上创建一个订阅者
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yshtest --from-beginning
123
1233445
usasdj
assadfsdfsdafas
^CProcessed a total of 4 messages

完成!!!

参考文章:Kafka集群搭建 https://www.cnblogs.com/mikeguan/p/7079013.html

kafka官网 http://kafka.apache.org/documentation.html#quickstart

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();