Kafka KRaft 模式安装与使用指南(无需 Zookeeper)
2025-09-02
8 min read
以下是 完整、按流程顺序重写的 Kafka KRaft 集群搭建文档,包含从零开始的完整初始化流程。
Kafka KRaft 模式集群搭建完整流程(无需 Zookeeper)
适用版本:Kafka 2.13-4.0.0(KRaft 模式)
本文档详细介绍如何在 不依赖 Zookeeper 的情况下,使用 KRaft(Kafka Raft Metadata)模式 搭建一个 3 节点高可用 Kafka 集群。
包含 元数据初始化、格式化、启动、验证 全流程。
🌐 一、集群规划(3 节点示例)
主机名 | IP 地址 | node.id |
角色 |
---|---|---|---|
kafka1 | 172.31.7.107 | 1 | broker + controller |
kafka2 | 172.31.7.108 | 2 | broker + controller |
kafka3 | 172.31.7.109 | 3 | broker + controller |
✅ 所有节点均使用 混合角色模式(broker + controller)
🔧 二、环境准备
1. 安装 OpenJDK 17(推荐)
# Ubuntu/Debian
sudo apt update && sudo apt install openjdk-17-jdk -y
# CentOS/RHEL
sudo yum install java-17-openjdk-devel -y
验证:
java -version
⚠️ 注意:OpenJDK 11 在 Kafka 4.0.0 中可能出现兼容性问题,强烈推荐使用 OpenJDK 17。
📦 三、下载与解压 Kafka
在所有节点执行:
cd /apps
wget https://downloads.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar -xzf kafka_2.13-4.0.0.tgz
ln -s kafka_2.13-4.0.0 kafka
设置环境变量(可选):
export KAFKA_HOME=/apps/kafka_2.13-4.0.0
export PATH=$KAFKA_HOME/bin:$PATH
⚙️ 四、配置文件:config/server.properties
在所有节点创建或编辑 $KAFKA_HOME/config/server.properties
:
############################# Server Basics #############################
process.roles=broker,controller
node.id=__NODE_ID__
############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://__IP__:9092,CONTROLLER://__IP__:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
############################# Log Basics #############################
log.dirs=/apps/kafka_2.13-4.0.0/logs
############################# Controller Quorum #############################
# 三节点控制器投票配置
controller.quorum.voters=1@172.31.7.107:9093,2@172.31.7.108:9093,3@172.31.7.109:9093
############################# Internal Topics #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
🔁
__NODE_ID__
和__IP__
将由脚本自动替换。
🧰 五、初始化集群元数据(关键步骤!)
⚠️ 此步骤必须在首次启动前执行,且只需执行一次
Step 1:生成唯一的 cluster.id
在任意一台机器上生成集群 ID:
CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
echo $CLUSTER_ID
输出示例:
3hzu8188TC68nQcL5A-Y0g
✅ 记下这个
cluster.id
,所有节点将使用相同的 ID。
Step 2:格式化存储目录(所有节点执行)
在 每台 Kafka 节点 上执行格式化命令,使用上一步生成的 cluster.id
:
bin/kafka-storage.sh format \
-t 3hzu8188TC68nQcL5A-Y0g \
-c /apps/kafka_2.13-4.0.0/config/server.properties \
--cluster-id 3hzu8188TC68nQcL5A-Y0g
🔍 参数说明:
-t
:指定node.id
(每台机器不同)-c
:配置文件路径--cluster-id
:上一步生成的全局唯一 ID
✅ 预期输出:
Formatting /apps/kafka_2.13-4.0.0/logs with cluster id 3hzu8188TC68nQcL5A-Y0g
✅ 此操作会在
log.dirs
目录下创建元数据日志,每个节点必须单独执行。
🚀 六、启动脚本(支持动态 IP 替换)
创建管理脚本:bin/kafka-manager.sh
#!/bin/bash
# 自动检测 Java
if command -v java >/dev/null 2>&1; then
JAVA_BIN=$(command -v java)
JAVA_HOME=$(dirname "$(dirname "$JAVA_BIN")")
export JAVA_HOME
export PATH=$JAVA_HOME/bin:$PATH
else
echo "❌ Java not found, please install OpenJDK 17+"
exit 1
fi
KAFKA_HOME="/apps/kafka_2.13-4.0.0"
LOG_FILE="$KAFKA_HOME/logs/kafka.log"
PID_FILE="$KAFKA_HOME/logs/kafka.pid"
BOOTSTRAP_SERVER="$(hostname -I | awk '{print $1}'):9092"
get_pid() {
if [[ -f "$PID_FILE" ]]; then
pid=$(cat "$PID_FILE")
if [[ -n "$pid" && -d "/proc/$pid" ]]; then echo "$pid"; else rm -f "$PID_FILE"; fi
else
pgrep -f "kafka\.Kafka" || echo ""
fi
}
prepare_config() {
IP=$(hostname -I | awk '{print $1}')
sed -i "s|__IP__|$IP|g" config/server.properties
}
start() {
pid=$(get_pid)
[[ -n "$pid" ]] && { echo "❌ Kafka is already running (PID: $pid)"; return 1; }
prepare_config
echo "🚀 Starting Kafka server..."
nohup bin/kafka-server-start.sh config/server.properties > "$LOG_FILE" 2>&1 &
echo $! > "$PID_FILE"
disown
echo "✅ Kafka started, PID: $!"
}
stop() {
pid=$(get_pid)
[[ -z "$pid" ]] && { echo "❌ Kafka is not running."; return 1; }
echo "🛑 Stopping Kafka (PID: $pid)..."
kill -15 "$pid" && rm -f "$PID_FILE"
for i in {1..30}; do
! kill -0 "$pid" 2>/dev/null && { echo "✅ Kafka stopped."; return 0; }
sleep 1
done
echo "⚠️ Force killing..."
kill -9 "$pid" && rm -f "$PID_FILE"
echo "✅ Kafka killed."
}
status() {
pid=$(get_pid)
[[ -n "$pid" ]] && echo "🟢 Kafka is running (PID: $pid)" || echo "🔴 Kafka is not running."
}
controller_info() {
echo "🧠 Controller Quorum Info:"
bin/kafka-metadata-quorum.sh --bootstrap-server "$BOOTSTRAP_SERVER" describe --status
}
case "$1" in
start) start ;;
stop) stop ;;
restart) stop; sleep 3; start ;;
status) status ;;
controller-info) controller_info ;;
*)
echo "📌 Usage: $0 {start|stop|restart|status|controller-info}"
;;
esac
赋予执行权限:
chmod +x bin/kafka-manager.sh
▶️ 七、启动集群(所有节点)
在每台机器上执行:
1. 设置 node.id
# 节点1
sed -i 's/__NODE_ID__/1/g' config/server.properties
# 节点2
sed -i 's/__NODE_ID__/2/g' config/server.properties
# 节点3
sed -i 's/__NODE_ID__/3/g' config/server.properties
2. 启动 Kafka
bin/kafka-manager.sh start
✅ 八、验证集群状态
1. 检查运行状态
bin/kafka-manager.sh status
2. 查看控制器信息(任一节点执行)
bin/kafka-manager.sh controller-info
✅ 实际输出示例(来自你的测试):
🧠 Controller Quorum Info:
ClusterId: 3hzu8188TC68nQcL5A-Y0g
LeaderId: 1
LeaderEpoch: 1
HighWatermark: 235
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 1, "directoryId": null, "endpoints": ["CONTROLLER://172.31.7.107:9093"]}, {"id": 2, "directoryId": null, "endpoints": ["CONTROLLER://172.31.7.108:9093"]}, {"id": 3, "directoryId": null, "endpoints": ["CONTROLLER://172.31.7.109:9093"]}]
CurrentObservers: []
✅ 健康标志:
LeaderId
存在MaxFollowerLag: 0
CurrentVoters
包含所有 3 个节点
📌 九、常见问题
问题 | 原因 | 解决 |
---|---|---|
kafka-storage.sh: command not found |
路径错误 | 使用 bin/kafka-storage.sh |
格式化失败 | node.id 或 cluster.id 错误 |
检查参数 |
节点无法加入 | 网络不通或端口被占用 | 检查 9092/9093 端口 |
启动崩溃 | OpenJDK 版本问题 | 升级到 OpenJDK 17 |
🎉 十、总结
✅ KRaft 模式初始化完整流程:
- 安装 OpenJDK 17
- 下载 Kafka 并配置
server.properties
- 生成
cluster.id
:bin/kafka-storage.sh random-uuid
- 格式化存储:
bin/kafka-storage.sh format -t <node.id> --cluster-id <id>
- 设置
node.id
并启动 - 使用
kafka-metadata-quorum.sh describe --status
验证