1. 部署JAVA JDK
kafka需要依赖java,所以先确认已安装java
1.1. 检查并清空JDK
首先检测系统是否安装JDK,如已安装则卸载.
官方下载地址:https://www.oracle.com/java/technologies/javase-downloads.html
1 2 3 4 5 |
$ java -version #如果centos安装完毕后,就发现已经安装了jdk,那么极有可能是安装了OpenJDK,可以通过以下命令进一步查看JDK信息: $ rpm -qa | grep openjdk #卸载OpenJDK,执行以下操作: $ rpm -e --nodeps java-1.6.0-openjdk-xxx |
1.2. 安装JAVA JDK
1 2 |
$ wget https://download.oracle.com/otn-pub/java/jdk/16.0.2%2B7/d4a915d82b4c4fbb9bde534da945d746/jdk-16.0.2_linux-x64_bin.rpm $ rpm -ivh jdk-16.0.2_linux-x64_bin.rpm |
1.3. 验证JAVA JDK
1 2 3 4 5 6 7 |
#验证安装(出现如下提示表示安装成功) $ java $ javac $ java -version java version "16.0.2" 2021-07-20 Java(TM) SE Runtime Environment (build 16.0.2+7-67) Java HotSpot(TM) 64-Bit Server VM (build 16.0.2+7-67, mixed mode, sharing) |
1.4. 配置环境变量(/etc/profile)
1 2 3 4 5 6 7 |
$ vi /etc/profile JAVA_HOME=/usr/java/jdk-16.0.2 JRE_HOME=/usr/java/jdk-16.0.2//jre PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export JAVA_HOME JRE_HOME PATH CLASSPATH |
使修改生效
1 2 3 |
$ source /etc/profile //使修改立即生效 #查看系统环境状态 $ echo $PATH |
2. 安装部署Kafka集群
2.1. 下载并安装
- 官方网站下载地址:http://kafka.apache.org/downloads
1 2 |
$ wget https://dlcdn.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz $ tar -zxvf kafka_2.13-2.8.0.tgz |
2.2. 修改配置文件
2.2.1. 修改zookeeper配置文件
因为kafka建立集群依赖zookeer集群进行选举,所以这里可以提前先配置zookeeper集群.
1 2 3 4 5 6 7 8 9 10 11 12 |
$ mkdir -p /data/zookeeper $ vi zookeeper.properties #修改如下内容 dataDir=/data/zookeeper #添加如下内容,配置端口 server.0=11.32.16.13:2888:3888 server.1=11.32.16.14:2888:3888 server.2=,11.32.16.15::2888:3888 tickTime=2000 initLimit=10 syncLimit=5 |
-
tickTime:基本事件单元,这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,每隔tickTime时间就会发送一个心跳;最小 的session过期时间为2倍tickTime
-
dataDir:存储内存中数据库快照的位置,除非另有说明,否则指向数据库更新的事务日志。注意:应该谨慎的选择日志存放的位置,使用专用的日志存储设备能够大大提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会很大程度上影像系统性能。
-
client:监听客户端连接的端口。
-
initLimit:允许follower连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
-
syncLimit:表示Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
- server.A=B:C:D
A:其中 A 是一个数字,表示这个是服务器的编号
B:是这个服务器的 ip 地址;
C:Zookeeper服务器之间的通信端口;
D:Leader选举的端口。
我们需要修改的第一个是 dataDir ,在指定的位置处创建好目录。官方文档连接
2.2.2. 创建zookeepera集群中node编号文件
在上一步 dataDir 指定的目录下,创建 myid 文件。
例如在/tmp/zookeeper中创建myid文件,其中内容为对应的主机ID号.
因为配置集群,所以在所有机子11.32.16.13 11.32.16.14 11.32.16.15上分别创建集群中node编号文件
1 2 |
$ cat /tmp/zookeeper/myid 1 |
2.2.3. 修改kafka broker配置文件
配置文件用于服务器节点的配置文件,配置服务节点, server.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
$ cd kafka_2.13-2.8.0/config $ mkdir -p /var/log/kafka #如果修改了broker.id,需要删除该路径所有内容/var/log/kafka $ vi server.properties #默认是0,这里自定义node-1:1 node-2:2 node-3:3 broker.id= 1 #配置监听端口,注释掉该内容,采用主机IP #listeners=PLAINTEXT://:9092 #新增hostname host.name=11.32.16.13 #delete.topic.enable=true #配置kafka日志文件位置 log.dirs=/var/log/kafka #/kafka zookeeper.connect=11.32.16.13:2181,11.32.16.14:2181,11.32.16.15:2181 |
地址可以先不用创建,在首次启动kafka的时候,会自动进行创建
2.3. 启动程序
2.3.1. 启动zookeeper
这个是在启动broker之前需要保证zookeeper集群是运行着的。
1 2 3 4 5 6 7 |
$ bin/zookeeper-shell.sh status /root/kafka/kafka_2.13-2.8.0/bin/kafka-run-class.sh: line 330: exec: java: not found #至此排查问题 $ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties # 稳定后台启动 $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties |
2.3.2. 开放端口并查看
1 2 3 4 5 6 7 8 9 |
#安全组放开zookeerper通信和选举端口 $ iptables -I INPUT -p tcp -m multiport --dports 2888 -j ACCEPT $ iptables -I INPUT -p tcp -m multiport --dports 3888 -j ACCEPT #安全组放开zookeerper服务端口和kafka端口 $ iptables -I INPUT -p tcp -m multiport --dports 2181 -j ACCEPT $ iptables -I INPUT -p tcp -m multiport --dports 9092 -j ACCEPT #查看本地安全组 $ netstat -tunlp | grep 9092 |
2.3.3. 启动kafka broker
先启动一下,开有没有报错。
1 2 3 4 |
$ ./bin/kafka-server-start.sh ./config/server.properties # 稳定后台启动 $ ./bin/kafka-server-start.sh -daemon ./config/server.properties |
-
问题1
Error creating broker listeners from 'vnet-kafka://:9092':
java.lang.IllegalArgumentException: Error creating broker listeners from 'vnet-kafka://:9092': No security protocol defined for listener VNET-KAFKA
配置文件中如果不采用SSL,需要采用PLAINTEXT://:9092;否则采用主机hostname
- 问题2
INFO Refusing session request for client /11.32.16.13:47997 as it has seen zxid 0x4000000aa our last zxid is 0x300000000 client must try another server (org.apache.zookeeper.server.ZooKeeperServer)
说明11.32.16.13中仍有程序给zookeeper发送信息,需要关闭它.
这里查看没有关闭kafka,所有关闭它. netstat -tunlp|grep 9092,并关闭它
- 问题3
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
2.4. 检查测试Zookeeper集群状态
连接kafka使用的时候,里面除了zookeeper之外还有其他内容,来查看一下。登录zookeeper(切换到zk的bin目录下),先连接zk
1 2 3 4 5 6 7 8 9 10 11 12 13 |
$ ./bin/zookeeper-shell.sh 11.32.16.13:2181 $ ls /brokers/ids # 获取集群ID $ ./bin/kafka-cluster.sh cluster-id --bootstrap-server 11.32.16.13:9092 # 创建一个topic以及它的主备数量,其中zookeeper为其中集群 $ ./bin/kafka-topics.sh --create --topic vnet_monitor_eip_statistic --partitions 1 --replication-factor 2 --zookeeper 11.32.16.13:2181,11.32.16.14:2181,11.32.16.15:2181 Created topic vnet_monitor_eip_statistic. # 查看kafka集群状态. $ ./bin/kafka-topics.sh --describe --zookeeper 11.32.16.15:2181 Topic: vnet_monitor_eip_statistic TopicId: rMkqsW23Rg6KPJszNBGMTQ PartitionCount: 1 ReplicationFactor: 2 Configs: Topic: vnet_monitor_eip_statistic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 |
3. 模拟使用Kafka
3.1. 创建topic(使用help帮助)
3.1.1. 创建一个topic
1 |
$ bin/kafka-topics.sh --create --topic vnet_monitor_eip_statistic --partitions 1 --replication-factor 1 --zookeeper 127.0.0.1:2181 |
3.1.2. 查看消息topic订阅状态
1 2 |
$ bin/kafka-topics.sh --describe --topic vnet_monitor_eip_statistic --zookeeper 127.0.0.1:2181 bin/kafka-topics.sh --describe --topic vneteip --zookeeper 127.0.0.1:2181 |
3.2. 生产者和消费者
3.2.1. 启动消息生产者
- 启动消息生产者,将消息发送到kafka的topic上
1 |
$ bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic vnet_monitor_eip_statistic |
3.2.2. 启动消息消费者
1 |
$ bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --group dtc_controller_test --topic vnet_monitor_eip_statistic --from-beginning |
4. 参考文档
Kafka官网:http://kafka.apache.org/
学习推荐 :http://orchome.com/kafka/index
官网下载 :http://kafka.apache.org/downloads
赞赏微信赞赏
支付宝赞赏