1. 部署JAVA JDK
kafka需要依赖java,所以先确认已安装java
1.1. 检查并清空JDK
首先检测系统是否安装JDK,如已安装则卸载.
官方下载地址:https://www.oracle.com/java/technologies/javase-downloads.html
$ java -version
#如果centos安装完毕后,就发现已经安装了jdk,那么极有可能是安装了OpenJDK,可以通过以下命令进一步查看JDK信息:
$ rpm -qa | grep openjdk
#卸载OpenJDK,执行以下操作:
$ rpm -e --nodeps java-1.6.0-openjdk-xxx
1.2. 安装JAVA JDK
$ wget https://download.oracle.com/otn-pub/java/jdk/16.0.2%2B7/d4a915d82b4c4fbb9bde534da945d746/jdk-16.0.2_linux-x64_bin.rpm
$ rpm -ivh jdk-16.0.2_linux-x64_bin.rpm
1.3. 验证JAVA JDK
#验证安装(出现如下提示表示安装成功)
$ java
$ javac
$ java -version
java version "16.0.2" 2021-07-20
Java(TM) SE Runtime Environment (build 16.0.2+7-67)
Java HotSpot(TM) 64-Bit Server VM (build 16.0.2+7-67, mixed mode, sharing)
1.4. 配置环境变量(/etc/profile)
$ vi /etc/profile
JAVA_HOME=/usr/java/jdk-16.0.2
JRE_HOME=/usr/java/jdk-16.0.2//jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
使修改生效
$ source /etc/profile //使修改立即生效
#查看系统环境状态
$ echo $PATH
2. 安装部署Kafka集群
2.1. 下载并安装
- 官方网站下载地址:http://kafka.apache.org/downloads
$ wget https://dlcdn.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
$ tar -zxvf kafka_2.13-2.8.0.tgz
2.2. 修改配置文件
2.2.1. 修改zookeeper配置文件
因为kafka建立集群依赖zookeer集群进行选举,所以这里可以提前先配置zookeeper集群.
$ mkdir -p /data/zookeeper
$ vi zookeeper.properties
#修改如下内容
dataDir=/data/zookeeper
#添加如下内容,配置端口
server.0=11.32.16.13:2888:3888
server.1=11.32.16.14:2888:3888
server.2=,11.32.16.15::2888:3888
tickTime=2000
initLimit=10
syncLimit=5
-
tickTime:基本事件单元,这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,每隔tickTime时间就会发送一个心跳;最小 的session过期时间为2倍tickTime
-
dataDir:存储内存中数据库快照的位置,除非另有说明,否则指向数据库更新的事务日志。注意:应该谨慎的选择日志存放的位置,使用专用的日志存储设备能够大大提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会很大程度上影像系统性能。
-
client:监听客户端连接的端口。
-
initLimit:允许follower连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
-
syncLimit:表示Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
- server.A=B:C:D
A:其中 A 是一个数字,表示这个是服务器的编号
B:是这个服务器的 ip 地址;
C:Zookeeper服务器之间的通信端口;
D:Leader选举的端口。
我们需要修改的第一个是 dataDir ,在指定的位置处创建好目录。官方文档连接
2.2.2. 创建zookeepera集群中node编号文件
在上一步 dataDir 指定的目录下,创建 myid 文件。
例如在/tmp/zookeeper中创建myid文件,其中内容为对应的主机ID号.
因为配置集群,所以在所有机子11.32.16.13 11.32.16.14 11.32.16.15上分别创建集群中node编号文件
$ cat /tmp/zookeeper/myid
1
2.2.3. 修改kafka broker配置文件
配置文件用于服务器节点的配置文件,配置服务节点, server.properties
$ cd kafka_2.13-2.8.0/config
$ mkdir -p /var/log/kafka
#如果修改了broker.id,需要删除该路径所有内容/var/log/kafka
$ vi server.properties
#默认是0,这里自定义node-1:1 node-2:2 node-3:3
broker.id= 1
#配置监听端口,注释掉该内容,采用主机IP
#listeners=PLAINTEXT://:9092
#新增hostname
host.name=11.32.16.13
#delete.topic.enable=true
#配置kafka日志文件位置
log.dirs=/var/log/kafka
#/kafka
zookeeper.connect=11.32.16.13:2181,11.32.16.14:2181,11.32.16.15:2181
地址可以先不用创建,在首次启动kafka的时候,会自动进行创建
2.3. 启动程序
2.3.1. 启动zookeeper
这个是在启动broker之前需要保证zookeeper集群是运行着的。
$ bin/zookeeper-shell.sh status
/root/kafka/kafka_2.13-2.8.0/bin/kafka-run-class.sh: line 330: exec: java: not found
#至此排查问题
$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
# 稳定后台启动
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
2.3.2. 开放端口并查看
#安全组放开zookeerper通信和选举端口
$ iptables -I INPUT -p tcp -m multiport --dports 2888 -j ACCEPT
$ iptables -I INPUT -p tcp -m multiport --dports 3888 -j ACCEPT
#安全组放开zookeerper服务端口和kafka端口
$ iptables -I INPUT -p tcp -m multiport --dports 2181 -j ACCEPT
$ iptables -I INPUT -p tcp -m multiport --dports 9092 -j ACCEPT
#查看本地安全组
$ netstat -tunlp | grep 9092
2.3.3. 启动kafka broker
先启动一下,开有没有报错。
$ ./bin/kafka-server-start.sh ./config/server.properties
# 稳定后台启动
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
-
问题1
Error creating broker listeners from 'vnet-kafka://:9092':
java.lang.IllegalArgumentException: Error creating broker listeners from 'vnet-kafka://:9092': No security protocol defined for listener VNET-KAFKA
配置文件中如果不采用SSL,需要采用PLAINTEXT://:9092;否则采用主机hostname
- 问题2
INFO Refusing session request for client /11.32.16.13:47997 as it has seen zxid 0x4000000aa our last zxid is 0x300000000 client must try another server (org.apache.zookeeper.server.ZooKeeperServer)
说明11.32.16.13中仍有程序给zookeeper发送信息,需要关闭它.
这里查看没有关闭kafka,所有关闭它. netstat -tunlp|grep 9092,并关闭它
- 问题3
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
2.4. 检查测试Zookeeper集群状态
连接kafka使用的时候,里面除了zookeeper之外还有其他内容,来查看一下。登录zookeeper(切换到zk的bin目录下),先连接zk
$ ./bin/zookeeper-shell.sh 11.32.16.13:2181
$ ls /brokers/ids
# 获取集群ID
$ ./bin/kafka-cluster.sh cluster-id --bootstrap-server 11.32.16.13:9092
# 创建一个topic以及它的主备数量,其中zookeeper为其中集群
$ ./bin/kafka-topics.sh --create --topic vnet_monitor_eip_statistic --partitions 1 --replication-factor 2 --zookeeper 11.32.16.13:2181,11.32.16.14:2181,11.32.16.15:2181
Created topic vnet_monitor_eip_statistic.
# 查看kafka集群状态.
$ ./bin/kafka-topics.sh --describe --zookeeper 11.32.16.15:2181
Topic: vnet_monitor_eip_statistic TopicId: rMkqsW23Rg6KPJszNBGMTQ PartitionCount: 1 ReplicationFactor: 2 Configs:
Topic: vnet_monitor_eip_statistic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
3. 模拟使用Kafka
3.1. 创建topic(使用help帮助)
3.1.1. 创建一个topic
$ bin/kafka-topics.sh --create --topic vnet_monitor_eip_statistic --partitions 1 --replication-factor 1 --zookeeper 127.0.0.1:2181
3.1.2. 查看消息topic订阅状态
$ bin/kafka-topics.sh --describe --topic vnet_monitor_eip_statistic --zookeeper 127.0.0.1:2181
bin/kafka-topics.sh --describe --topic vneteip --zookeeper 127.0.0.1:2181
3.2. 生产者和消费者
3.2.1. 启动消息生产者
- 启动消息生产者,将消息发送到kafka的topic上
$ bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic vnet_monitor_eip_statistic
3.2.2. 启动消息消费者
$ bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --group dtc_controller_test --topic vnet_monitor_eip_statistic --from-beginning
4. 参考文档
Kafka官网:http://kafka.apache.org/