前言
在当今高并发、实时化、分布式的系统架构中,消息中间件已成为解耦服务、削峰填谷、异步处理和数据管道的核心基础设施。而Apache Kafka,凭借其高吞吐、低延迟、持久化和水平扩展能力,早已成为业界构建可靠数据流平台的首选。
然而,“会用Kafka” 和 “用好Kafka” 之间,隔着无数生产事故的坑:消息丢失、重复消费、集群雪崩、ISR频繁抖动……这些问题往往源于对Kafka底层机制理解不足,或配置调优缺乏经验。
本文将深入Kafka的核心原理——从分区与副本机制、日志存储结构、Controller选举,到生产者幂等性、消费者位移管理,并结合多年线上实践经验,总结出一套高可用、高性能、易运维的Kafka最佳实践。无论你是刚接触Kafka的开发者,还是负责生产环境稳定性的SRE,都能从中获得可落地的参考方案,真正打造出稳定、可靠、高效的消息中间件系统。

1.安装前准备
1.1 操作系统要求
Kafka可以在多种 [Linux 发行版](https://so.csdn.net/so/search?q=Linux 发行版&spm=1001.2101.3001.7020)上运行,本文以CentOS 7为例,其他发行版步骤类似,只需调整包管理命令。
1.2 java环境要求
Kafka基于Java开发,需安装 JDK 8 或以上版本:
java -version

1.3 安装JDK
下载 JDK
- Oracle 官网:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
- 或 OpenJDK 官网下载 Linux 版本
- 示例(OpenJDK 8):
wget https://download.java.net/openjdk/jdk8u41/ri/openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz
解压安装包
mkdir -p /usr/local/java
tar -zxvf openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz -C /usr/local/java
配置环境变量
在 /etc/profile 末尾追加:
export JAVA_HOME=/usr/local/java/jdk1.8.0_41
export PATH=$PATH:$JAVA_HOME/bin
使配置生效:
source /etc/profile
验证安装
java -version
2.安装 Kafka
2.1 下载 Kafka
- 官网:https://kafka.apache.org/downloads
- 示例版本:3.6.2
linux系统可以直接命令一键安装:
wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
tar -xzf kafka_2.13-3.9.1.tgz
mv kafka_2.13-3.9.1 kafka

2.2 创建数据日志目录
在kafka解压目录同一路径下:创建一个kafka_data,用于装kafka和zookeeper的log和数据等:
mkdir -p /opt/kafka_data
mkdir -p /opt/kafka_data/zookeeper
mkdir -p /opt/kafka_data/log
mkdir -p /opt/kafka_data/log/kafka
mkdir -p /opt/kafka_data/log/zookeeper

2.3 配置Kafka配置文件
编辑这个文件:
broker.id=0
port=9092
host.name=ip
log.dirs=/opt/kafka_data/log/kafka
zookeeper.connect=localhost:2181



2.4 配置zookeeper配置文件
dataDir=/opt/kafka_data/zookeeper
dataLogDir=/opt/kafka_data/log/zookeeper
clientPort=2181
maxClientCnxns=100
tickTimes=2000
initLimit=10
syncLimit=5

3.启动与停止Kafka
3.1开启ZooKeeper
开启ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties &

3.2启动Kafka:
./kafka-server-start.sh ../config/server.properties &

验证是否启动成功:
jps
输出应包含:
QuorumPeerMain
Kafka

3.3停止zookeeper
./zookeeper-server-stop.sh ../config/zookeeper.properties &
3.4停止kafkfa
./kafka-server-stop.sh ../config/server.properties &
4.创建生产者topic和消费者topic简单示例
在一个终端执行创建生产者: (推消息到shan)
cd /opt/bin/ #进入kafka目录
./kafka-console-producer.sh --broker-list 192.168.42.140:9092 --topic wd_test #wd_test你要建立的topic名

在一个终端执行创建消费者: (从shan上消费消息)
cd /opt/bin/ #进入kafka目录
./kafka-console-producer.sh --broker-list 192.168.42.140:9092 --topic wd_test #消费shan中topic消息

查看效果: 一个终端不断输入推送的消息,另一个终端则消费这个消息



查看当前主题:
./kafka-topics.sh --zookeeper localhost:2181 --list

你正在家里远程办公,突然接到任务:需要验证一个新业务模块的消息生产与消费逻辑。
但Kafka集群部署在公司内网测试环境,没有公网IP,防火墙也不开放9099/9092端口——你既无法连接Broker创建Topic,也无法从本地启动生产者或消费者进行调试。
传统的做法是:
- 提交代码到CI/CD触发部署(慢)
- 求运维临时开防火墙(麻烦)
- 或干脆去公司(不现实)
有没有更敏捷的方式?
有!借助内网穿透工具,我们可以将内网Kafka的9092端口安全暴露到公网。
只需一条隧道命令,你的本地开发机就能像在内网一样:
- 通过 kafka-topics.sh 创建测试 Topic
- 用 kafka-console-producer.sh 发送消息
- 用 kafka-console-consumer.sh 实时消费验证
整个过程无需改动 Kafka 配置、无需网络权限审批,5 分钟打通内外网,让开发调试回归高效。
跟我一起来操作吧~
5.安装cpolar内网穿透工具
cpolar 可以将你本地电脑中的服务(如 SSH、Web、数据库)映射到公网。即使你在家里或外出时,也可以通过公网地址连接回本地运行的开发环境。
❤️以下是安装cpolar步骤:
使用一键脚本安装命令:
sudo curl https://get.cpolar.sh | sh

安装完成后,执行下方命令查看cpolar服务状态:(如图所示即为正常启动)
sudo systemctl status cpolar

Cpolar安装和成功启动服务后,在浏览器上输入虚拟机主机IP加9200端口即:【ip:9200】访问Cpolar管理界面,使用Cpolar官网注册的账号登录,登录后即可看到cpolar web 配置界面,接下来在web 界面配置即可:
打开浏览器访问本地9200端口,使用cpolar账户密码登录即可,登录后即可对隧道进行管理。

6.配置公网地址
通过配置,你可以在本地 WSL 或 Linux 系统上运行 SSH 服务,并通过 Cpolar 将其映射到公网,从而实现从任意设备远程连接开发环境的目的。
- 隧道名称:可自定义,本例使用了:zookeeper,注意不要与已有的隧道名称重复
- 协议:tcp
- 本地地址:2181
- 端口类型:随机临时TCP端口
- 地区:China Top

创建成功后,打开左侧在线隧道列表,可以看到刚刚通过创建隧道生成了公网地址,接下来就可以在其他电脑或者移动端设备(异地)上,使用任意一个地址在终端中访问即可。
- tcp 表示使用的协议类型
-
2.tcp.cpolar.top是 Cpolar 提供的域名
-
13917是随机分配的公网端口号

通过Cpolar提供的公网地址和端口,Kafka就能从本地启动生产者或消费者进行调试啦!
生产:
./kafka-console-producer.sh --broker-list 2.tcp.cpolar.top:13917 --topic shan
消费:
./kafka-console-consumer.sh --bootstrap-server 2.tcp.cpolar.top:13917 --topic shan
7.保留固定TCP公网地址
使用cpolar为其配置TCP地址,该地址为固定地址,不会随机变化。

选择区域和描述:有一个下拉菜单,当前选择的是“China Top”。
右侧输入框,用于填写描述信息。
保留按钮:在右侧有一个橙色的“保留”按钮,点击该按钮可以保留所选的TCP地址。
列表中显示了一条已保留的TCP地址记录。
- 地区:显示为“China Top”。
-
地址:显示为“26.tcp.cpolar.top:13166”。

登录cpolar web UI管理界面,点击左侧仪表盘的隧道管理——隧道列表,找到所要配置的隧道Kafka,点击右侧的编辑。

修改隧道信息,将保留成功的TCP端口配置到隧道中。
- 端口类型:选择固定TCP端口
- 预留的TCP地址:填写保留成功的TCP地址
点击更新。

创建完成后,打开在线隧道列表,此时可以看到随机的公网地址已经发生变化,地址名称也变成了保留和固定的TCP地址。

最后就可以使用命令测试啦!
总结
Kafka的强大不仅在于其性能,更在于其围绕“可靠性”“可扩展性”“可观测性”构建的完整生态。掌握其核心原理,结合严谨的配置与运维规范,才能真正发挥Kafka在实时数据处理、事件驱动架构和微服务解耦中的价值,打造坚如磐石的消息基础设施。
感谢您对本篇文章的喜爱,有任何问题欢迎留言交流。cpolar官网-安全的内网穿透工具 | 无需公网ip | 远程访问 | 搭建网站



