Kafka集群部署

  • MQ, Server
  • 1,281 clicked

1. 部署JAVA JDK

kafka需要依赖java,所以先确认已安装java

1.1. 检查并清空JDK

首先检测系统是否安装JDK,如已安装则卸载.

官方下载地址:https://www.oracle.com/java/technologies/javase-downloads.html

$ java -version
#如果centos安装完毕后,就发现已经安装了jdk,那么极有可能是安装了OpenJDK,可以通过以下命令进一步查看JDK信息:
$ rpm -qa | grep openjdk
#卸载OpenJDK,执行以下操作:
$ rpm -e --nodeps java-1.6.0-openjdk-xxx

1.2. 安装JAVA JDK

$ wget https://download.oracle.com/otn-pub/java/jdk/16.0.2%2B7/d4a915d82b4c4fbb9bde534da945d746/jdk-16.0.2_linux-x64_bin.rpm
$ rpm -ivh jdk-16.0.2_linux-x64_bin.rpm

1.3. 验证JAVA JDK

#验证安装(出现如下提示表示安装成功)
$ java
$ javac
$ java -version
java version "16.0.2" 2021-07-20
Java(TM) SE Runtime Environment (build 16.0.2+7-67)
Java HotSpot(TM) 64-Bit Server VM (build 16.0.2+7-67, mixed mode, sharing)

1.4. 配置环境变量(/etc/profile)

$ vi /etc/profile

JAVA_HOME=/usr/java/jdk-16.0.2
JRE_HOME=/usr/java/jdk-16.0.2//jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH

使修改生效

$ source /etc/profile   //使修改立即生效
#查看系统环境状态
$ echo $PATH

2. 安装部署Kafka集群

2.1. 下载并安装

$ wget https://dlcdn.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
$ tar -zxvf kafka_2.13-2.8.0.tgz

2.2. 修改配置文件

2.2.1. 修改zookeeper配置文件

因为kafka建立集群依赖zookeer集群进行选举,所以这里可以提前先配置zookeeper集群.

$ mkdir -p /data/zookeeper
$ vi zookeeper.properties
#修改如下内容
dataDir=/data/zookeeper
#添加如下内容,配置端口
server.0=11.32.16.13:2888:3888
server.1=11.32.16.14:2888:3888
server.2=,11.32.16.15::2888:3888

tickTime=2000
initLimit=10
syncLimit=5
  1. tickTime:基本事件单元,这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,每隔tickTime时间就会发送一个心跳;最小 的session过期时间为2倍tickTime

  2. dataDir:存储内存中数据库快照的位置,除非另有说明,否则指向数据库更新的事务日志。注意:应该谨慎的选择日志存放的位置,使用专用的日志存储设备能够大大提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会很大程度上影像系统性能。

  3. client:监听客户端连接的端口。

  4. initLimit:允许follower连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。

  5. syncLimit:表示Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。

  • server.A=B:C:D
        A:其中 A 是一个数字,表示这个是服务器的编号
        B:是这个服务器的 ip 地址;
        C:Zookeeper服务器之间的通信端口;
        D:Leader选举的端口。
    我们需要修改的第一个是 dataDir ,在指定的位置处创建好目录。官方文档连接

2.2.2. 创建zookeepera集群中node编号文件

在上一步 dataDir 指定的目录下,创建 myid 文件。
例如在/tmp/zookeeper中创建myid文件,其中内容为对应的主机ID号.
因为配置集群,所以在所有机子11.32.16.13 11.32.16.14 11.32.16.15上分别创建集群中node编号文件

$ cat /tmp/zookeeper/myid
1

2.2.3. 修改kafka broker配置文件

配置文件用于服务器节点的配置文件,配置服务节点, server.properties

$ cd kafka_2.13-2.8.0/config
$ mkdir -p /var/log/kafka
#如果修改了broker.id,需要删除该路径所有内容/var/log/kafka
$ vi server.properties

    #默认是0,这里自定义node-1:1 node-2:2 node-3:3
    broker.id= 1
    #配置监听端口,注释掉该内容,采用主机IP
    #listeners=PLAINTEXT://:9092
    #新增hostname
    host.name=11.32.16.13
     #delete.topic.enable=true

    #配置kafka日志文件位置
    log.dirs=/var/log/kafka

  #/kafka
    zookeeper.connect=11.32.16.13:2181,11.32.16.14:2181,11.32.16.15:2181

地址可以先不用创建,在首次启动kafka的时候,会自动进行创建

2.3. 启动程序

2.3.1. 启动zookeeper

这个是在启动broker之前需要保证zookeeper集群是运行着的。

$ bin/zookeeper-shell.sh status
/root/kafka/kafka_2.13-2.8.0/bin/kafka-run-class.sh: line 330: exec: java: not found
#至此排查问题
$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties

# 稳定后台启动
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

2.3.2. 开放端口并查看

#安全组放开zookeerper通信和选举端口
$ iptables -I INPUT -p tcp -m multiport --dports 2888 -j ACCEPT
$ iptables -I INPUT -p tcp -m multiport --dports 3888 -j ACCEPT
#安全组放开zookeerper服务端口和kafka端口
$ iptables -I INPUT -p tcp -m multiport --dports 2181 -j ACCEPT
$ iptables -I INPUT -p tcp -m multiport --dports 9092 -j ACCEPT

#查看本地安全组
$ netstat -tunlp | grep 9092

2.3.3. 启动kafka broker

先启动一下,开有没有报错。

$ ./bin/kafka-server-start.sh ./config/server.properties

# 稳定后台启动
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
  • 问题1

    Error creating broker listeners from 'vnet-kafka://:9092':

java.lang.IllegalArgumentException: Error creating broker listeners from 'vnet-kafka://:9092': No security protocol defined for listener VNET-KAFKA

配置文件中如果不采用SSL,需要采用PLAINTEXT://:9092;否则采用主机hostname

  • 问题2

INFO Refusing session request for client /11.32.16.13:47997 as it has seen zxid 0x4000000aa our last zxid is 0x300000000 client must try another server (org.apache.zookeeper.server.ZooKeeperServer)

说明11.32.16.13中仍有程序给zookeeper发送信息,需要关闭它.
这里查看没有关闭kafka,所有关闭它. netstat -tunlp|grep 9092,并关闭它

- 问题3

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient

2.4. 检查测试Zookeeper集群状态

  连接kafka使用的时候,里面除了zookeeper之外还有其他内容,来查看一下。登录zookeeper(切换到zk的bin目录下),先连接zk

$ ./bin/zookeeper-shell.sh 11.32.16.13:2181
$ ls /brokers/ids

# 获取集群ID
$ ./bin/kafka-cluster.sh cluster-id --bootstrap-server 11.32.16.13:9092
# 创建一个topic以及它的主备数量,其中zookeeper为其中集群
$ ./bin/kafka-topics.sh --create --topic vnet_monitor_eip_statistic --partitions 1 --replication-factor 2 --zookeeper 11.32.16.13:2181,11.32.16.14:2181,11.32.16.15:2181
  Created topic vnet_monitor_eip_statistic.

# 查看kafka集群状态.
$ ./bin/kafka-topics.sh --describe --zookeeper 11.32.16.15:2181
  Topic: vnet_monitor_eip_statistic TopicId: rMkqsW23Rg6KPJszNBGMTQ PartitionCount: 1   ReplicationFactor: 2   Configs: 
    Topic: vnet_monitor_eip_statistic   Partition: 0    Leader: 1   Replicas: 1,2   Isr: 1,2


3. 模拟使用Kafka

3.1. 创建topic(使用help帮助)

3.1.1. 创建一个topic

$ bin/kafka-topics.sh --create --topic vnet_monitor_eip_statistic --partitions 1 --replication-factor 1 --zookeeper 127.0.0.1:2181

3.1.2. 查看消息topic订阅状态

$ bin/kafka-topics.sh --describe --topic vnet_monitor_eip_statistic --zookeeper 127.0.0.1:2181
bin/kafka-topics.sh --describe --topic vneteip --zookeeper 127.0.0.1:2181

3.2. 生产者和消费者

3.2.1. 启动消息生产者

  1. 启动消息生产者,将消息发送到kafka的topic上
$ bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic vnet_monitor_eip_statistic

3.2.2. 启动消息消费者

$ bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --group dtc_controller_test --topic vnet_monitor_eip_statistic --from-beginning 

4. 参考文档

Kafka官网:http://kafka.apache.org/

学习推荐 :http://orchome.com/kafka/index

官网下载 :http://kafka.apache.org/downloads

发表评论

邮箱地址不会被公开。 必填项已用*标注