kafka高可用和负载均衡一次搞定

*文档中的所有IP复制使用请替换为自己的IP以便减少报错

*ZK 集群:是全局的「一主二从」(Leader/Follower),保证元数据的一致性。
*Kafka 集群:是「无主对等 Broker」+「分区级主从副本」,保证消息的高可用和负载均衡。

第一步:环境准备(所有节点执行)

# ========== 1. 安装JDK8(核心前提) ==========

# 1. 先检查系统是否已有Java(避免冲突)
java -version
# 如果提示"未找到命令",说明未安装,继续下一步

# 2. 安装JDK8(OpenJDK版本,和Oracle JDK功能一致)
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

# 3. 配置环境变量(确保全局生效)
# 编辑profile文件
vi /etc/profile
# 在文件末尾添加以下内容
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

# 4. 使环境变量立即生效
source /etc/profile

# 5. 验证安装是否成功(关键步骤)
java -version
# 正常输出示例:
# openjdk version "1.8.0_392"
# OpenJDK Runtime Environment (build 1.8.0_392-b08)
# OpenJDK 64-Bit Server VM (build 25.392-b08, mixed mode)

# ========== 2. 关闭防火墙 ==========

systemctl stop firewalld && systemctl disable firewalld

# ========== 3. 下载并解压Kafka ==========
cd /usr/local
wget https://archive.apache.org/dist/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -zxvf kafka_2.13-3.5.0.tgz
ln -s kafka_2.13-3.5.0 kafka
cd kafka

第二步:配置 ZooKeeper 集群(所有节点执行)

1. 编辑 ZK 配置文件
 
vi config/zookeeper.properties

替换为以下内容(所有节点配置一致):
 
# 数据存储目录
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# 心跳间隔
tickTime=2000
# 初始化连接超时
initLimit=10
# 同步超时
syncLimit=5
# ZK 集群节点(server.数字 对应 myid 文件内容)
server.0=192.168.200.201:2888:3888
server.1=192.168.200.202:2888:3888
server.2=192.168.200.203:2888:3888
# 允许所有地址访问
clientPortAddress=0.0.0.0
# 避免内存溢出
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
# 启用 stat 等四字命令
4lw.commands.whitelist=*

# 先创建目录
mkdir -p /tmp/zookeeper

#master 再写入 myid
echo "0" > /tmp/zookeeper/myid

node1:
 
mkdir -p /tmp/zookeeper
echo "1" > /tmp/zookeeper/myid

node2:
 
mkdir -p /tmp/zookeeper
echo "2" > /tmp/zookeeper/myid

第三步:配置 Kafka 集群(每个节点单独配置)

1. master 节点(broker.id=0)

vi config/server.properties

broker.id=0     # Broker 唯一 ID(每个节点不同)

listeners=PLAINTEXT://0.0.0.0:9092     # 监听地址(0.0.0.0 允许所有地址访问)

advertised.listeners=PLAINTEXT://192.168.200.201:9092     # 对外暴露的地址(必须是节点实际 IP)

zookeeper.connect=192.168.200.201:2181,192.168.200.202:2181,192.168.200.203:2181  # ZK 集群连接地址

zookeeper.connection.timeout.ms=10000    # ZK 连接超时

log.dirs=/usr/local/kafka/kafka-logs    # 日志存储目录

num.partitions=3    # 默认分区数

default.replication.factor=2    # 默认副本数

delete.topic.enable=true    # 删除 topic 开关

2. node1 节点(broker.id=1)

仅修改以下差异配置,其余同 master:
 
broker.id=1
advertised.listeners=PLAINTEXT://192.168.200.202:9092
advertised.listeners=PLAINTEXT://192.168.200.2021:9092
 
3. node2 节点(broker.id=2)
仅修改以下差异配置,其余同 master:
 
broker.id=2
advertised.listeners=PLAINTEXT://192.168.200.203:9092
advertised.listeners=PLAINTEXT://192.168.200.203:9092

第四步:启动集群(按顺序执行)
1. 启动 ZooKeeper 集群(先启动所有 ZK,再启动 Kafka)
master 节点:
 
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

node1 节点:
 
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

node2 节点:
 
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

2. 验证 ZK 集群状态(任意节点执行)
 
# 查看 ZK 角色(master 应为 leader,node1/node2 为 follower)

echo stat | nc localhost 2181

# 查看 ZK 集群节点

bin/zookeeper-shell.sh localhost:2181 ls /

#启动 Kafka 集群所有节点执行这个

nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

#清空 Kafka 数据目录(所有节点)

rm -rf /usr/local/kafka/kafka-logs/*

nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

jps

#显示这个两个都成功启动了

[root@node2 kafka]# jps
14532 Kafka
12805 QuorumPeerMain
14990 Jps

#检查 Broker 列表(核心验证)在任意节点(推荐 master)执行:

cd /usr/local/kafka

bin/kafka-broker-api-versions.sh –bootstrap-server 192.168.200.202:9092

#预期输出:会看到 3 个 Broker 信息,分别对应 id: 0、id: 1、id: 2。

#创建测试 Topic 验证功能

bin/kafka-topics.sh –create –topic test_cluster –bootstrap-server 192.168.200.202:9092 –partitions 3 –replication-factor 2                            # 创建一个 3 分区、2 副本的测试 Topic

bin/kafka-topics.sh –describe –topic test_cluster –bootstrap-server 192.168.200.202:9092    # 查看 Topic 详情,确认副本分布在不同 Broker 上

#可以进一步测试集群的生产和消费能力:

#生产消息:

bin/kafka-console-producer.sh –topic test_cluster –bootstrap-server 192.168.200.202:9092

#输入几条消息,例如:Hello Kafka!、Cluster test passed!

#消费消息:

bin/kafka-console-consumer.sh –topic test_cluster –from-beginning –bootstrap-server 192.168.200.202:9092
#应该能看到刚才生产的所有消息

一件启停脚本

#!/bin/bash
# Kafka 集群一键启停管理脚本
# 适配:先启动ZK再启动Kafka,先停止Kafka再停止ZK,均使用-daemon守护进程模式
# ====================== 请根据你的集群环境修改以下配置 ======================
# 集群节点列表(主机名/IP,空格分隔)
CLUSTER_NODES=("192.168.200.201" "192.168.200.202" "192.168.200.203")
# Kafka 安装目录(所有节点必须一致)
KAFKA_HOME="/usr/local/kafka"
# Zookeeper 客户端端口(默认2181)
ZK_PORT=2181
# ===========================================================================
# 检查是否配置了免密登录(可选提示,非强制)
check_ssh() {
    for node in "${CLUSTER_NODES[@]}"; do
        if ! ssh -o ConnectTimeout=3 -q "$node" "echo '免密登录测试' > /dev/null"; then
            echo "⚠️  警告:节点 $node 未配置免密登录,执行时需要手动输入密码!"
            echo "   建议在当前节点执行:ssh-copy-id root@$node 配置免密登录"
            read -p "是否继续执行?(y/n):" choice
            if [ "$choice" != "y" ]; then
                exit 1
            fi
            break
        fi
    done
}
# 等待 Zookeeper 就绪
wait_for_zk() {
    local node=$1
    echo "⏳ 等待 Zookeeper 在 $node:$ZK_PORT 就绪..."
    for i in {1..30}; do
        if ssh "$node" "echo ruok | nc localhost $ZK_PORT" | grep -q "imok"; then
            echo "✅ Zookeeper 在 $node 就绪"
            return 0
        fi
        sleep 2
    done
    echo "❌ Zookeeper 在 $node 启动超时,请检查日志!"
    return 1
}
# 启动集群(先ZK后Kafka)
start_cluster() {
    echo -e "\n========== 开始启动 Kafka 集群 =========="
    for node in "${CLUSTER_NODES[@]}"; do
        echo -e "\n📌 操作节点:$node"
        echo "1. 启动 Zookeeper(守护进程模式)..."
        ssh "$node" "cd $KAFKA_HOME && bin/zookeeper-server-start.sh -daemon config/zookeeper.properties"
        # 等待 ZK 就绪
        if ! wait_for_zk "$node"; then
            echo "❌ 跳过节点 $node 的 Kafka 启动"
            continue
        fi
        echo "2. 启动 Kafka(守护进程模式)..."
        # 启动 Kafka 并重定向输出到日志,方便排查
        ssh "$node" "cd $KAFKA_HOME && nohup bin/kafka-server-start.sh -daemon config/server.properties > $KAFKA_HOME/kafka-start.log 2>&1 &"
        sleep 5  # 等待 Kafka 进程拉起
        # 检查 Kafka 是否启动成功
        if ssh "$node" "jps | grep -q Kafka"; then
            echo "✅ Kafka 在 $node 启动成功"
        else
            echo "❌ Kafka 在 $node 启动失败,请查看 $KAFKA_HOME/kafka-start.log 日志"
        fi
    done
    echo -e "\n========== 集群启动完成 =========="
    status_cluster  # 启动后自动查看状态
}
# 停止集群(先Kafka后ZK)
stop_cluster() {
    echo -e "\n========== 开始停止 Kafka 集群 =========="
    for node in "${CLUSTER_NODES[@]}"; do
        echo -e "\n📌 操作节点:$node"
        echo "1. 停止 Kafka..."
        ssh "$node" "cd $KAFKA_HOME && bin/kafka-server-stop.sh || true"
        sleep 3
        # 强制杀死残留的 Kafka 进程
        ssh "$node" "kill -9 \$(jps | grep Kafka | awk '{print \$1}') 2>/dev/null || true"
        echo "2. 停止 Zookeeper..."
        ssh "$node" "cd $KAFKA_HOME && bin/zookeeper-server-stop.sh || true"
        sleep 3
        # 强制杀死残留的 ZK 进程
        ssh "$node" "kill -9 \$(jps | grep QuorumPeerMain | awk '{print \$1}') 2>/dev/null || true"
        echo "✅ $node 停止完成"
    done
    echo -e "\n========== 集群停止完成 =========="
    status_cluster
}
# 查看集群状态(jps检查进程)
status_cluster() {
    echo -e "\n========== 集群进程状态 =========="
    for node in "${CLUSTER_NODES[@]}"; do
        echo -e "\n📌 节点 $node 进程列表:"
        ssh "$node" "jps | grep -E 'Kafka|QuorumPeerMain' || echo '❌ 无Kafka/ZK进程'"
    done
    echo -e "\n========== 状态查看完成 =========="
}
# 脚本用法提示
usage() {
    echo -e "\n📚 Kafka 集群管理脚本用法:"
    echo "  $0 start   - 启动整个集群(先ZK后Kafka)"
    echo "  $0 stop    - 停止整个集群(先Kafka后ZK)"
    echo "  $0 status  - 查看集群所有节点进程状态"
    echo -e "\n示例:"
    echo "  ./kafka-cluster-manager.sh start"
    echo "  ./kafka-cluster-manager.sh stop"
}
# 主逻辑:判断输入参数
if [ $# -ne 1 ]; then
    usage
    exit 1
fi
# 先检查SSH免密(可选)
check_ssh
case "$1" in
    start)
        start_cluster
        ;;
    stop)
        stop_cluster
        ;;
    status)
        status_cluster
        ;;
    *)
        echo "❌ 无效参数:$1"
        usage
        exit 1
        ;;
esac
exit 0

打开脚本修改开头的 2 个核心配置,匹配你的集群环境:
 
vim /usr/local/kafka/kafka-cluster-manager.sh

# 1. 集群节点列表:替换为你的节点IP/主机名(比如 192.168.200.201 192.168.200.202 192.168.200.203)
CLUSTER_NODES=("master" "node1" "node2")

# 2. Kafka安装目录:确认所有节点的路径一致(默认 /usr/local/kafka,无需改)
KAFKA_HOME="/usr/local/kafka"

# 进入脚本目录

cd /usr/local/kafka

# 1. 启动整个集群(自动先启ZK,再启Kafka,启动后显示状态)

./kafka-cluster-manager.sh start

# 2. 停止整个集群(自动先停Kafka,再停ZK,强制清理残留进程,停止后显示状态)

./kafka-cluster-manager.sh stop

# 3. 查看集群所有节点的进程状态

./kafka-cluster-manager.sh status

© 版权声明

相关文章