kafka高可用和负载均衡一次搞定
*文档中的所有IP复制使用请替换为自己的IP以便减少报错
*ZK 集群:是全局的「一主二从」(Leader/Follower),保证元数据的一致性。
*Kafka 集群:是「无主对等 Broker」+「分区级主从副本」,保证消息的高可用和负载均衡。
第一步:环境准备(所有节点执行)
# ========== 1. 安装JDK8(核心前提) ==========
# 1. 先检查系统是否已有Java(避免冲突)
java -version
# 如果提示"未找到命令",说明未安装,继续下一步
# 2. 安装JDK8(OpenJDK版本,和Oracle JDK功能一致)
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# 3. 配置环境变量(确保全局生效)
# 编辑profile文件
vi /etc/profile
# 在文件末尾添加以下内容
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
# 4. 使环境变量立即生效
source /etc/profile
# 5. 验证安装是否成功(关键步骤)
java -version
# 正常输出示例:
# openjdk version "1.8.0_392"
# OpenJDK Runtime Environment (build 1.8.0_392-b08)
# OpenJDK 64-Bit Server VM (build 25.392-b08, mixed mode)
# ========== 2. 关闭防火墙 ==========
systemctl stop firewalld && systemctl disable firewalld
# ========== 3. 下载并解压Kafka ==========
cd /usr/local
wget https://archive.apache.org/dist/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -zxvf kafka_2.13-3.5.0.tgz
ln -s kafka_2.13-3.5.0 kafka
cd kafka
第二步:配置 ZooKeeper 集群(所有节点执行)
1. 编辑 ZK 配置文件
vi config/zookeeper.properties
替换为以下内容(所有节点配置一致):
# 数据存储目录
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# 心跳间隔
tickTime=2000
# 初始化连接超时
initLimit=10
# 同步超时
syncLimit=5
# ZK 集群节点(server.数字 对应 myid 文件内容)
server.0=192.168.200.201:2888:3888
server.1=192.168.200.202:2888:3888
server.2=192.168.200.203:2888:3888
# 允许所有地址访问
clientPortAddress=0.0.0.0
# 避免内存溢出
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
# 启用 stat 等四字命令
4lw.commands.whitelist=*
# 先创建目录
mkdir -p /tmp/zookeeper
#master 再写入 myid
echo "0" > /tmp/zookeeper/myid
node1:
mkdir -p /tmp/zookeeper
echo "1" > /tmp/zookeeper/myid
node2:
mkdir -p /tmp/zookeeper
echo "2" > /tmp/zookeeper/myid
第三步:配置 Kafka 集群(每个节点单独配置)
1. master 节点(broker.id=0)
vi config/server.properties
broker.id=0 # Broker 唯一 ID(每个节点不同)
listeners=PLAINTEXT://0.0.0.0:9092 # 监听地址(0.0.0.0 允许所有地址访问)
advertised.listeners=PLAINTEXT://192.168.200.201:9092 # 对外暴露的地址(必须是节点实际 IP)
zookeeper.connect=192.168.200.201:2181,192.168.200.202:2181,192.168.200.203:2181 # ZK 集群连接地址
zookeeper.connection.timeout.ms=10000 # ZK 连接超时
log.dirs=/usr/local/kafka/kafka-logs # 日志存储目录
num.partitions=3 # 默认分区数
default.replication.factor=2 # 默认副本数
delete.topic.enable=true # 删除 topic 开关
2. node1 节点(broker.id=1)
仅修改以下差异配置,其余同 master:
broker.id=1
advertised.listeners=PLAINTEXT://192.168.200.202:9092
advertised.listeners=PLAINTEXT://192.168.200.2021:9092
3. node2 节点(broker.id=2)
仅修改以下差异配置,其余同 master:
broker.id=2
advertised.listeners=PLAINTEXT://192.168.200.203:9092
advertised.listeners=PLAINTEXT://192.168.200.203:9092
第四步:启动集群(按顺序执行)
1. 启动 ZooKeeper 集群(先启动所有 ZK,再启动 Kafka)
master 节点:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
node1 节点:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
node2 节点:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
2. 验证 ZK 集群状态(任意节点执行)
# 查看 ZK 角色(master 应为 leader,node1/node2 为 follower)
echo stat | nc localhost 2181
# 查看 ZK 集群节点
bin/zookeeper-shell.sh localhost:2181 ls /
#启动 Kafka 集群所有节点执行这个
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
#清空 Kafka 数据目录(所有节点)
rm -rf /usr/local/kafka/kafka-logs/*
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
jps
#显示这个两个都成功启动了
[root@node2 kafka]# jps
14532 Kafka
12805 QuorumPeerMain
14990 Jps
#检查 Broker 列表(核心验证)在任意节点(推荐 master)执行:
cd /usr/local/kafka
bin/kafka-broker-api-versions.sh –bootstrap-server 192.168.200.202:9092
#预期输出:会看到 3 个 Broker 信息,分别对应 id: 0、id: 1、id: 2。
#创建测试 Topic 验证功能
bin/kafka-topics.sh –create –topic test_cluster –bootstrap-server 192.168.200.202:9092 –partitions 3 –replication-factor 2 # 创建一个 3 分区、2 副本的测试 Topic
bin/kafka-topics.sh –describe –topic test_cluster –bootstrap-server 192.168.200.202:9092 # 查看 Topic 详情,确认副本分布在不同 Broker 上
#可以进一步测试集群的生产和消费能力:
#生产消息:
bin/kafka-console-producer.sh –topic test_cluster –bootstrap-server 192.168.200.202:9092
#输入几条消息,例如:Hello Kafka!、Cluster test passed!
#消费消息:
bin/kafka-console-consumer.sh –topic test_cluster –from-beginning –bootstrap-server 192.168.200.202:9092
#应该能看到刚才生产的所有消息
一件启停脚本
#!/bin/bash
# Kafka 集群一键启停管理脚本
# 适配:先启动ZK再启动Kafka,先停止Kafka再停止ZK,均使用-daemon守护进程模式
# ====================== 请根据你的集群环境修改以下配置 ======================
# 集群节点列表(主机名/IP,空格分隔)
CLUSTER_NODES=("192.168.200.201" "192.168.200.202" "192.168.200.203")
# Kafka 安装目录(所有节点必须一致)
KAFKA_HOME="/usr/local/kafka"
# Zookeeper 客户端端口(默认2181)
ZK_PORT=2181
# ===========================================================================
# 检查是否配置了免密登录(可选提示,非强制)
check_ssh() {
for node in "${CLUSTER_NODES[@]}"; do
if ! ssh -o ConnectTimeout=3 -q "$node" "echo '免密登录测试' > /dev/null"; then
echo "⚠️ 警告:节点 $node 未配置免密登录,执行时需要手动输入密码!"
echo " 建议在当前节点执行:ssh-copy-id root@$node 配置免密登录"
read -p "是否继续执行?(y/n):" choice
if [ "$choice" != "y" ]; then
exit 1
fi
break
fi
done
}
# 等待 Zookeeper 就绪
wait_for_zk() {
local node=$1
echo "⏳ 等待 Zookeeper 在 $node:$ZK_PORT 就绪..."
for i in {1..30}; do
if ssh "$node" "echo ruok | nc localhost $ZK_PORT" | grep -q "imok"; then
echo "✅ Zookeeper 在 $node 就绪"
return 0
fi
sleep 2
done
echo "❌ Zookeeper 在 $node 启动超时,请检查日志!"
return 1
}
# 启动集群(先ZK后Kafka)
start_cluster() {
echo -e "\n========== 开始启动 Kafka 集群 =========="
for node in "${CLUSTER_NODES[@]}"; do
echo -e "\n📌 操作节点:$node"
echo "1. 启动 Zookeeper(守护进程模式)..."
ssh "$node" "cd $KAFKA_HOME && bin/zookeeper-server-start.sh -daemon config/zookeeper.properties"
# 等待 ZK 就绪
if ! wait_for_zk "$node"; then
echo "❌ 跳过节点 $node 的 Kafka 启动"
continue
fi
echo "2. 启动 Kafka(守护进程模式)..."
# 启动 Kafka 并重定向输出到日志,方便排查
ssh "$node" "cd $KAFKA_HOME && nohup bin/kafka-server-start.sh -daemon config/server.properties > $KAFKA_HOME/kafka-start.log 2>&1 &"
sleep 5 # 等待 Kafka 进程拉起
# 检查 Kafka 是否启动成功
if ssh "$node" "jps | grep -q Kafka"; then
echo "✅ Kafka 在 $node 启动成功"
else
echo "❌ Kafka 在 $node 启动失败,请查看 $KAFKA_HOME/kafka-start.log 日志"
fi
done
echo -e "\n========== 集群启动完成 =========="
status_cluster # 启动后自动查看状态
}
# 停止集群(先Kafka后ZK)
stop_cluster() {
echo -e "\n========== 开始停止 Kafka 集群 =========="
for node in "${CLUSTER_NODES[@]}"; do
echo -e "\n📌 操作节点:$node"
echo "1. 停止 Kafka..."
ssh "$node" "cd $KAFKA_HOME && bin/kafka-server-stop.sh || true"
sleep 3
# 强制杀死残留的 Kafka 进程
ssh "$node" "kill -9 \$(jps | grep Kafka | awk '{print \$1}') 2>/dev/null || true"
echo "2. 停止 Zookeeper..."
ssh "$node" "cd $KAFKA_HOME && bin/zookeeper-server-stop.sh || true"
sleep 3
# 强制杀死残留的 ZK 进程
ssh "$node" "kill -9 \$(jps | grep QuorumPeerMain | awk '{print \$1}') 2>/dev/null || true"
echo "✅ $node 停止完成"
done
echo -e "\n========== 集群停止完成 =========="
status_cluster
}
# 查看集群状态(jps检查进程)
status_cluster() {
echo -e "\n========== 集群进程状态 =========="
for node in "${CLUSTER_NODES[@]}"; do
echo -e "\n📌 节点 $node 进程列表:"
ssh "$node" "jps | grep -E 'Kafka|QuorumPeerMain' || echo '❌ 无Kafka/ZK进程'"
done
echo -e "\n========== 状态查看完成 =========="
}
# 脚本用法提示
usage() {
echo -e "\n📚 Kafka 集群管理脚本用法:"
echo " $0 start - 启动整个集群(先ZK后Kafka)"
echo " $0 stop - 停止整个集群(先Kafka后ZK)"
echo " $0 status - 查看集群所有节点进程状态"
echo -e "\n示例:"
echo " ./kafka-cluster-manager.sh start"
echo " ./kafka-cluster-manager.sh stop"
}
# 主逻辑:判断输入参数
if [ $# -ne 1 ]; then
usage
exit 1
fi
# 先检查SSH免密(可选)
check_ssh
case "$1" in
start)
start_cluster
;;
stop)
stop_cluster
;;
status)
status_cluster
;;
*)
echo "❌ 无效参数:$1"
usage
exit 1
;;
esac
exit 0
打开脚本修改开头的 2 个核心配置,匹配你的集群环境:
vim /usr/local/kafka/kafka-cluster-manager.sh
# 1. 集群节点列表:替换为你的节点IP/主机名(比如 192.168.200.201 192.168.200.202 192.168.200.203)
CLUSTER_NODES=("master" "node1" "node2")
# 2. Kafka安装目录:确认所有节点的路径一致(默认 /usr/local/kafka,无需改)
KAFKA_HOME="/usr/local/kafka"
# 进入脚本目录
cd /usr/local/kafka
# 1. 启动整个集群(自动先启ZK,再启Kafka,启动后显示状态)
./kafka-cluster-manager.sh start
# 2. 停止整个集群(自动先停Kafka,再停ZK,强制清理残留进程,停止后显示状态)
./kafka-cluster-manager.sh stop
# 3. 查看集群所有节点的进程状态
./kafka-cluster-manager.sh status