安装Java

dnf install java -y

安装Kafka

环境说明

服务器ipnode.id端口
192.168.15.3119092/9093
192.168.15.3229092/9093
192.168.15.3339092/9093

准备工作

安装包下载、配置文件修改

# 下载安装包
cd /usr/local/src
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzvf kafka_2.13-3.4.0.tgz -C /usr/local

# 创建数据目录
mkdir -p /data/kafka	# 所有服务器均需操作

# 修改kafka配置文件,因为需要使用kraft模式的kafka,所以配置文件位置是:/usr/local/kafka_2.13-3.4.0/config/kraft
cd /usr/local/kafka_2.13-3.4.0/config/kraft
vim server.properties
# 需要修改的内容如下:
node.id=1
controller.quorum.voters=1@192.168.15.31:9093,2@192.168.15.32:9093,3@192.168.15.33:9093
advertised.listeners=PLAINTEXT://192.168.15.31:9092
log.dirs=/data/kafka

# 拷贝修改完成的文件到其余两台服务器,可以自己手动提前进行免密登陆配置
scp -r /usr/local/kafka_2.13-3.4.0 192.168.15.32:/usr/local/
scp -r /usr/local/kafka_2.13-3.4.0 192.168.15.33:/usr/local/

# 登陆其余两台服务器,对配置文件进行修改
# 192.168.15.32 /usr/local/kafka_2.13-3.4.0/config/kraft/server.properties需修改的内容:
node.id=2
advertised.listeners=PLAINTEXT://192.168.15.32:9092

# 192.168.15.33 /usr/local/kafka_2.13-3.4.0/config/kraft/server.properties需修改的内容:
node.id=3
advertised.listeners=PLAINTEXT://192.168.15.33:9092

初始化

# 生成uuid,随便选一台服务器就行
cd /usr/local/kafka_2.13-3.4.0/bin
./kafka-storage.sh random-uuid
# 返回结果如下:
06pSQXHFTy6Mad4XUE2ZnA

# 格式化存储路径,每台服务器均需执行
/usr/local/kafka_2.13-3.4.0/bin/kafka-storage.sh format -t 06pSQXHFTy6Mad4XUE2ZnA -c /usr/local/kafka_2.13-3.4.0/config/kraft/server.properties

启动服务

每台服务器都需要执行

/usr/local/kafka_2.13-3.4.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.4.0/config/kraft/server.properties

集群启停脚本

vim kafka_mgr.sh
#!/bin/bash
#kafka集群启动脚本,需要提前配置免密
case $1 in
 "start"){
    for i in 192.168.15.31 192.168.15.32 192.168.15.33
    do
       echo "--------启动 $i kafka with kraft-------"
       ssh $i "/usr/local/kafka_2.13-3.4.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.4.0/config/kraft/server.properties"
    done
};;
"stop"){
    for i in 192.168.15.31 192.168.15.32 192.168.15.33
    do
       echo "------停止 $i kafka--------"
       ssh $i "/usr/local/kafka_2.13-3.4.0/bin/kafka-server-stop.sh"
    done
};;
esac


chmod u+x kafka_mgr.sh

测试

创建topic

/usr/local/kafka_2.13-3.4.0/bin/kafka-topics.sh --create --topic test --partitions 1 --replication-factor 3 --bootstrap-server 192.168.15.31:9092,192.168.15.32:9092,192.168.15.33:9092

查看topic

/usr/local/kafka_2.13-3.4.0/bin/kafka-topics.sh --list --bootstrap-server 192.168.15.31:9092,192.168.15.32:9092,192.168.15.33:9092

生产消息

# 选择一台服务器进行消息的创建
/usr/local/kafka_2.13-3.4.0/bin/kafka-console-producer.sh --broker-list 192.168.15.31:9092,192.168.15.32:9092,192.168.15.33:9092 --topic test
# 执行成功后会出现一个 > 符号,可以输入消息,输入消息回车后会在消费消息会话中看到内容

消费消息

# 另选一台服务器或者在打开一个会话进行消息的接收
/usr/local/kafka_2.13-3.4.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.31:9092,192.168.15.32:9092,192.168.15.33:9092 --topic test
# 执行成功后不会出现 > 符号,可以看到在生产消息会话中创建的信息

星霜荏苒 居诸不息