Kafka KRaft 模式安装与使用指南(无需 Zookeeper)

以下是 完整、按流程顺序重写的 Kafka KRaft 集群搭建文档,包含从零开始的完整初始化流程。


Kafka KRaft 模式集群搭建完整流程(无需 Zookeeper)

适用版本:Kafka 2.13-4.0.0(KRaft 模式)
本文档详细介绍如何在 不依赖 Zookeeper 的情况下,使用 KRaft(Kafka Raft Metadata)模式 搭建一个 3 节点高可用 Kafka 集群。
包含 元数据初始化、格式化、启动、验证 全流程。


🌐 一、集群规划(3 节点示例)

主机名 IP 地址 node.id 角色
kafka1 172.31.7.107 1 broker + controller
kafka2 172.31.7.108 2 broker + controller
kafka3 172.31.7.109 3 broker + controller

✅ 所有节点均使用 混合角色模式(broker + controller)


🔧 二、环境准备

1. 安装 OpenJDK 17(推荐)

# Ubuntu/Debian
sudo apt update && sudo apt install openjdk-17-jdk -y

# CentOS/RHEL
sudo yum install java-17-openjdk-devel -y

验证:

java -version

⚠️ 注意:OpenJDK 11 在 Kafka 4.0.0 中可能出现兼容性问题,强烈推荐使用 OpenJDK 17


📦 三、下载与解压 Kafka

在所有节点执行:

cd /apps
wget https://downloads.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar -xzf kafka_2.13-4.0.0.tgz
ln -s kafka_2.13-4.0.0 kafka

设置环境变量(可选):

export KAFKA_HOME=/apps/kafka_2.13-4.0.0
export PATH=$KAFKA_HOME/bin:$PATH

⚙️ 四、配置文件:config/server.properties

在所有节点创建或编辑 $KAFKA_HOME/config/server.properties

############################# Server Basics #############################
process.roles=broker,controller
node.id=__NODE_ID__

############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://__IP__:9092,CONTROLLER://__IP__:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
inter.broker.listener.name=PLAINTEXT

############################# Log Basics #############################
log.dirs=/apps/kafka_2.13-4.0.0/logs

############################# Controller Quorum #############################
# 三节点控制器投票配置
controller.quorum.voters=1@172.31.7.107:9093,2@172.31.7.108:9093,3@172.31.7.109:9093

############################# Internal Topics #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

🔁 __NODE_ID____IP__ 将由脚本自动替换。


🧰 五、初始化集群元数据(关键步骤!)

⚠️ 此步骤必须在首次启动前执行,且只需执行一次

Step 1:生成唯一的 cluster.id

在任意一台机器上生成集群 ID:

CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
echo $CLUSTER_ID

输出示例:

3hzu8188TC68nQcL5A-Y0g

✅ 记下这个 cluster.id,所有节点将使用相同的 ID。


Step 2:格式化存储目录(所有节点执行)

每台 Kafka 节点 上执行格式化命令,使用上一步生成的 cluster.id

bin/kafka-storage.sh format \
  -t 3hzu8188TC68nQcL5A-Y0g \
  -c /apps/kafka_2.13-4.0.0/config/server.properties \
  --cluster-id 3hzu8188TC68nQcL5A-Y0g

🔍 参数说明:

  • -t:指定 node.id(每台机器不同)
  • -c:配置文件路径
  • --cluster-id:上一步生成的全局唯一 ID

✅ 预期输出:

Formatting /apps/kafka_2.13-4.0.0/logs with cluster id 3hzu8188TC68nQcL5A-Y0g

✅ 此操作会在 log.dirs 目录下创建元数据日志,每个节点必须单独执行


🚀 六、启动脚本(支持动态 IP 替换)

创建管理脚本:bin/kafka-manager.sh

#!/bin/bash

# 自动检测 Java
if command -v java >/dev/null 2>&1; then
    JAVA_BIN=$(command -v java)
    JAVA_HOME=$(dirname "$(dirname "$JAVA_BIN")")
    export JAVA_HOME
    export PATH=$JAVA_HOME/bin:$PATH
else
    echo "❌ Java not found, please install OpenJDK 17+"
    exit 1
fi

KAFKA_HOME="/apps/kafka_2.13-4.0.0"
LOG_FILE="$KAFKA_HOME/logs/kafka.log"
PID_FILE="$KAFKA_HOME/logs/kafka.pid"
BOOTSTRAP_SERVER="$(hostname -I | awk '{print $1}'):9092"

get_pid() {
    if [[ -f "$PID_FILE" ]]; then
        pid=$(cat "$PID_FILE")
        if [[ -n "$pid" && -d "/proc/$pid" ]]; then echo "$pid"; else rm -f "$PID_FILE"; fi
    else
        pgrep -f "kafka\.Kafka" || echo ""
    fi
}

prepare_config() {
    IP=$(hostname -I | awk '{print $1}')
    sed -i "s|__IP__|$IP|g" config/server.properties
}

start() {
    pid=$(get_pid)
    [[ -n "$pid" ]] && { echo "❌ Kafka is already running (PID: $pid)"; return 1; }

    prepare_config
    echo "🚀 Starting Kafka server..."
    nohup bin/kafka-server-start.sh config/server.properties > "$LOG_FILE" 2>&1 &
    echo $! > "$PID_FILE"
    disown
    echo "✅ Kafka started, PID: $!"
}

stop() {
    pid=$(get_pid)
    [[ -z "$pid" ]] && { echo "❌ Kafka is not running."; return 1; }

    echo "🛑 Stopping Kafka (PID: $pid)..."
    kill -15 "$pid" && rm -f "$PID_FILE"
    for i in {1..30}; do
        ! kill -0 "$pid" 2>/dev/null && { echo "✅ Kafka stopped."; return 0; }
        sleep 1
    done
    echo "⚠️  Force killing..."
    kill -9 "$pid" && rm -f "$PID_FILE"
    echo "✅ Kafka killed."
}

status() {
    pid=$(get_pid)
    [[ -n "$pid" ]] && echo "🟢 Kafka is running (PID: $pid)" || echo "🔴 Kafka is not running."
}

controller_info() {
    echo "🧠 Controller Quorum Info:"
    bin/kafka-metadata-quorum.sh --bootstrap-server "$BOOTSTRAP_SERVER" describe --status
}

case "$1" in
    start) start ;;
    stop) stop ;;
    restart) stop; sleep 3; start ;;
    status) status ;;
    controller-info) controller_info ;;
    *)
        echo "📌 Usage: $0 {start|stop|restart|status|controller-info}"
        ;;
esac

赋予执行权限:

chmod +x bin/kafka-manager.sh

▶️ 七、启动集群(所有节点)

在每台机器上执行:

1. 设置 node.id

# 节点1
sed -i 's/__NODE_ID__/1/g' config/server.properties

# 节点2
sed -i 's/__NODE_ID__/2/g' config/server.properties

# 节点3
sed -i 's/__NODE_ID__/3/g' config/server.properties

2. 启动 Kafka

bin/kafka-manager.sh start

✅ 八、验证集群状态

1. 检查运行状态

bin/kafka-manager.sh status

2. 查看控制器信息(任一节点执行)

bin/kafka-manager.sh controller-info

✅ 实际输出示例(来自你的测试):

🧠 Controller Quorum Info:
ClusterId:              3hzu8188TC68nQcL5A-Y0g
LeaderId:               1
LeaderEpoch:            1
HighWatermark:          235
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [{"id": 1, "directoryId": null, "endpoints": ["CONTROLLER://172.31.7.107:9093"]}, {"id": 2, "directoryId": null, "endpoints": ["CONTROLLER://172.31.7.108:9093"]}, {"id": 3, "directoryId": null, "endpoints": ["CONTROLLER://172.31.7.109:9093"]}]
CurrentObservers:       []

健康标志

  • LeaderId 存在
  • MaxFollowerLag: 0
  • CurrentVoters 包含所有 3 个节点

📌 九、常见问题

问题 原因 解决
kafka-storage.sh: command not found 路径错误 使用 bin/kafka-storage.sh
格式化失败 node.idcluster.id 错误 检查参数
节点无法加入 网络不通或端口被占用 检查 9092/9093 端口
启动崩溃 OpenJDK 版本问题 升级到 OpenJDK 17

🎉 十、总结

KRaft 模式初始化完整流程

  1. 安装 OpenJDK 17
  2. 下载 Kafka 并配置 server.properties
  3. 生成 cluster.idbin/kafka-storage.sh random-uuid
  4. 格式化存储:bin/kafka-storage.sh format -t <node.id> --cluster-id <id>
  5. 设置 node.id 并启动
  6. 使用 kafka-metadata-quorum.sh describe --status 验证