Kafka+zookeeper集群介绍

工作流程图

Kafka 和 ZooKeeper 是现代分布式系统中常用的两个组件,它们各自承担着重要的角色,通常在一起使用以实现高效的数据流处理和系统协调。

Kafka 简介

Kafka 是一个分布式流处理平台,专门用于构建实时数据管道和流应用。其主要特点包括:

  • Broker:Kafka 集群由多个服务器(broker)组成,每个 broker 负责存储和管理消息。

  • Topic:消息以主题(topic)的形式组织,每个主题可以有多个分区(partition),以实现并发处理。

  • Partition:每个主题的分区是有序的消息序列,允许负载均衡和扩展。

  • Replica:为了提高可用性和容错性,分区可以有多个副本(replica),其中一个是主副本(leader)。

  • Producer 和 Consumer:生产者(producer)向主题发送消息,消费者(consumer)从主题读取消息,支持消费者组以实现负载均衡。

Kafka 的优点

  • 高吞吐量:能够处理大量数据流,适合实时数据处理。

  • 可扩展性:通过增加 broker 数量,轻松应对数据量和消费者数量的增长。

  • 持久性:消息持久化到磁盘,确保数据不丢失。

  • 容错性:通过数据复制提供高可用性。

  • 灵活的消费模式:支持实时和批量处理。

  • 强大的生态系统:包括 Kafka Streams、Kafka Connect 等工具,方便数据处理和集成。

Kafka 的缺点

  • 管理复杂性:集群的管理和监控可能比较复杂。

  • 消息顺序性:只能保证每个分区内的消息顺序,无法保证全局顺序。

  • 延迟:在高负载情况下,消息延迟可能增加。

  • 学习曲线:新手需要一定的时间来理解其概念和最佳实践。

  • 存储成本:持久化大量数据可能增加存储成本。

ZooKeeper 简介

ZooKeeper 是一个开源的分布式协调服务,主要用于管理大型分布式系统中的配置信息、命名、同步和组服务。其主要功能包括:

  • 配置管理:提供集中式的配置管理。

  • 命名服务:提供统一的命名空间。

  • 分布式同步:通过锁机制帮助节点间同步。

  • 组管理:支持动态的组成员管理。

  • 高可用性:采用主从架构,确保系统在节点故障时仍然正常工作。

结合总结

Kafka 和 ZooKeeper 通常结合使用,以实现高效的分布式数据处理和系统协调。Kafka 负责消息的高效传递和处理,而 ZooKeeper 则提供集群管理、配置管理和协调服务。尽管 Kafka 在某些版本中逐渐减少对 ZooKeeper 的依赖(如 Kafka KRaft 模式),但 ZooKeeper 依然在许多场景中发挥着重要作用,尤其是在需要高可靠性和一致性的分布式系统中。通过结合这两者,开发者能够构建出性能优越、可扩展且高可用的分布式应用。


IP地址

集群服务

192.168.1.1

kafka+zookeeper

192.168.1.2

kafka+zookeeper

192.168.1.3

kafka+zookeeper


安装环境的前置

yum -y install jave   #kafka以及zookeeper是使用的java底层运行,我们需要安装jave

yum -y install supervisor  #服务管理器,咱们使用supervisor启动,这样更好管理服务

yum -y install unzip   #安装解压包服务

部署zookeeper控制集群

PS:这里我用的是自己下载的zip包:zookeeper, 官方下载链接:zookeeper官方

上传zookeeper启动包
  • 部署服务器:192.168.1.1 192.168.1.2 192.168.1.3

  • 部署位置:/opt/app

解压zookeeper启动包

unzip zookeeper.zip
设置zookeeper配置文件
  • 文件路径:/opt/app/zookeeper/conf/zoo.cfg

# The number of milliseconds of each tick
tickTime=2000 
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zookeeper   #zookeeper myid集群优先级文件 所在位置
# the port at which the clients will connect
clientPort=2181   #zookeeper启动端口号
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter

server.1=192.168.1.1:2188:2888   #集群的IP地址,端口不需要动
server.2=192.168.1.2:2188:2888
server.3=192.168.1.3:2188:2888 
#forceSync=no
maxSessionTimeout=300000
zookeeper.connection.timeout.ms=60000
  • dataDir=/data/zookeeper :该路径可自行更改,更改后 myid 需要放入此路径!

设置集群的 myid
  • 部署服务器:192.168.1.1 192.168.1.2 192.168.1.3

## 进入到 zookeeper myid集群优先级文件 所在位置

cd /data/zookeeper/

## 生成 myid
 
echo  "1"  > myid   #在 192.168.1.1 服务器
echo  "2"  > myid   #在 192.168.1.2 服务器
echo  "3"  > myid   #在 192.168.1.3 服务器
设置zookeeper启动 ini 文件
  • 部署服务器:192.168.1.1 192.168.1.2 192.168.1.3

## 创建zookeeper日志文件目录

mkdir -p /var/log/zookeeper/

## 添加文件

vi /etc/supervisord.d/zookeeper.ini

[program:zookeeper]
command=/opt/app/zookeeper/bin/zkServer.sh start-foreground
process_name=%(program_name)s
numprocs=1
directory=/opt/app/zookeeper
autostart=true
autorestart=true
startsecs=5
startretries=3
stopsignal=QUIT
stopwaitsecs=5
user=root
redirect_stderr=true
stdout_logfile=/var/log/zookeeper/zookeeper_supervisor.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=5
;serverurl=AUTO
启动 zookeeper 服务
  • 部署服务器:192.168.1.1 192.168.1.2 192.168.1.3

## 启动supervisord服务

systemctl  start supervisord

## 设置开机自启动

systemctl  enable supervisord

## 启动zookeeper服务

supervisorctl update  && supervisorctl start zookeeper

## 检查服务是否正常启动

supervisorctl  status zookeeper

部署kafka集群

PS:这里我用的是自己下载的zip包:kafka ,官方下载链接:kafka官方

上传kafka启动包
  • 部署服务器:192.168.1.1 192.168.1.2 192.168.1.3

  • 部署位置:/opt/app

解压kafka启动包

unzip kafka.zip
设置kafka配置文件
  • 文件路径:/opt/app/kafka/config/server.properties

broker.id=1   #集群的id,集群中的每台服务器都不能一致
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.1.1:9092   #集群当前的的服务器IP地址 192.168.1.1 192.168.1.2 192.168.1.3
num.network.threads=25
num.io.threads=48
socket.send.buffer.bytes=10240000
socket.receive.buffer.bytes=10240000
socket.request.max.bytes=1048576000
log.dirs=/data/kafka   #kafka日志地址
num.partitions=48
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.bytes=10737418240
zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181   # zookeeper集群IP地址+端口
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
default.replication.factor=2
message.max.bytes=1024000000
max.partition.fetch.bytes=200000000
log.cleaner.enable=true
log.cleanup.policy=delete
delete.topic.enable=true
auto.create.topics.enable=true
设置kafka启动 ini 文件
  • 部署服务器:192.168.1.1 192.168.1.2 192.168.1.3

## 创建kafka日志文件目录

mkdir -p /var/log/kafka

## 添加文件

vi /etc/supervisord.d/kaf.ini

[program:kafka]
command=/opt/app/kafka/bin/kafka-server-start.sh /opt/app/kafka/config/server.properties 
process_name=%(program_name)s
numprocs=1
directory=/opt/app/kafka
autostart=true
autorestart=true
startsecs=5
startretries=3
stopsignal=QUIT
stopwaitsecs=5
user=root
redirect_stderr=true
stdout_logfile=/var/log/kafka/kafka_supervisor.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=5
;serverurl=AUTO
启动 kafka 服务
  • 部署服务器:192.168.1.1 192.168.1.2 192.168.1.3

## 启动supervisord服务

systemctl  start supervisord

## 设置开机自启动

systemctl  enable supervisord

## 启动kafka服务

supervisorctl update  && supervisorctl start kafka

## 检查服务是否正常启动

supervisorctl  status kafka

测试生产和消费

  • 生产主题:xiaopalu

  • 生产地址:/opt/app/kafka/bin

  • 测试生产服务器地址:192.168.1.1

./kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic xiaopalu
  • 消费主题:xiaopalu

  • 消费地址:/opt/app/kafka/bin

  • 测试生产服务器地址:192.168.1.3

./kafka-console-consumer.sh --bootstrap-server 192.168.1.3:9092 --topic xiaopalu --from-beginning