一、Kafka 简介:
Apache Kafka 是一个开源的分布式事件流平台,最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。它主要用于处理实时数据流,将数据从一个地方传输到另一个地方。Kafka 的设计理念是基于发布订阅模型,通过使用主题(topics)来组织和分类数据流。
Kafka 具有以下主要组件:
Producer(生产者): 用于将数据发布到 Kafka 主题,这些数据可以是日志、事件、指标等。
Broker: Kafka 集群由多个 Broker 组成,每个 Broker 是一个独立的服务器,负责存储数据和处理数据流。
Topic(主题): 数据流被组织成主题,生产者将数据发布到主题,消费者从主题订阅数据。
Consumer(消费者): 消费者从主题订阅数据,并进行相应的处理。消费者可以以不同的消费组(Consumer Group)来组织,以实现水平扩展和并行处理。
Zookeeper: 虽然 Kafka 从 0.10 版本开始逐步减少对 Zookeeper 的依赖,但在之前的版本中,Zookeeper 用于管理 Kafka 集群的元数据和状态。
二、Kafka 使用场景:
实时数据流处理: Kafka 可以用于构建实时数据流处理平台,用于收集、传输和处理大量的实时数据,如日志、事件、传感器数据等。
日志收集和分析: 很多组织使用 Kafka 来收集分布在不同系统上的日志数据,并将其传输到集中的存储或分析系统中。
事件驱动架构: Kafka 可以作为事件驱动架构的核心,不同的服务可以通过发布和订阅事件来实现解耦和扩展。
指标和监控数据传输: Kafka 可以用于传输各种系统指标和监控数据,以便进行实时监控和分析。
流式 ETL(Extract, Transform, Load): Kafka 可以用作数据源和数据目标,支持流式 ETL 过程,使数据在不同系统之间流动和转换。
消息队列: Kafka 可以作为高吞吐量的消息队列,用于在不同的应用程序和服务之间传递消息。
日志压缩存储: Kafka 可以用于长期存储数据,比如保存历史事件和数据备份。
总之,Kafka 在处理实时数据流、构建事件驱动架构以及实现可靠的数据传输方面具有广泛的应用场景。它的高吞吐量、可扩展性和持久性特征使其成为许多实时应用和大数据处理场景的理想选择。
三、Kafka 集群部署
3.1 实验环境
注意:以下三个节点已经提前部署了 zookeeper 集群(Kafka 集群依赖 zookeeper )
主机名 | IP | 系统 | 配置 |
---|---|---|---|
kafka-01 | 10.0.0.11 | CentOS 7.6 | 1U 2G |
kafka-02 | 10.0.0.12 | CentOS 7.6 | 1U 2G |
kafka-03 | 10.0.0.13 | CentOS 7.6 | 1U 2G |
3.2 配置 Java 环境
所有节点执行:
yum install java-1.8.0-openjdk -y
java -version
3.3 下载并解压安装包
下载地址:Apache Kafka
所有节点执行:
cd /usr/local/src
wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar xf kafka_2.12-2.1.0.tgz
ln -sv /usr/local/src/kafka_2.12-2.1.0 /usr/local/kafka
cd /usr/local/kafka/
# 根据需要修改配置文件
cat > config/server.properties << EOF
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF
参数解释:
这是一个 Kafka Broker 的配置示例,其中包含了一些常见的配置项。下面逐一解释每个配置项的含义:
broker.id=0: 设置 Broker 的唯一标识符。在一个 Kafka 集群中,每个 Broker 都应该有一个独一无二的 ID。
num.network.threads=3: 指定处理网络请求的线程数,用于接收和处理客户端连接和请求。
num.io.threads=8: 指定执行 I/O 操作的线程数,用于处理磁盘读写等操作。
socket.send.buffer.bytes=102400: 设置套接字发送缓冲区的大小,以字节为单位。
socket.receive.buffer.bytes=102400: 设置套接字接收缓冲区的大小,以字节为单位。
socket.request.max.bytes=104857600: 设置单个请求的最大字节数,用于限制单个请求的大小。
log.dirs=/usr/local/kafka/kafka-logs: 指定 Kafka 日志文件的存储路径。Kafka 会将主题数据以日志文件的形式存储在这些目录下。
num.partitions=1: 设置默认主题的分区数。每个主题可以被分为多个分区,这个配置项定义了新建主题时的默认分区数。
num.recovery.threads.per.data.dir=1: 指定在每个数据目录上运行的恢复线程数,用于在 Broker 启动时进行数据恢复。
offsets.topic.replication.factor=1: 设置存储消费者偏移量信息的内部主题的复制因子。这个值定义了偏移量信息的冗余备份数。
transaction.state.log.replication.factor=1: 设置存储事务状态的内部主题的复制因子。
transaction.state.log.min.isr=1: 设置事务状态日志的最小副本同步数。如果可用的副本数量低于这个值,就不允许写入事务状态。
log.retention.hours=168: 设置日志文件的保留时间,以小时为单位。超过这个时间的日志文件将被自动删除。
log.segment.bytes=1073741824: 设置日志段(log segment)的最大大小,以字节为单位。一旦日志段达到这个大小,Kafka 将创建一个新的日志段。
log.retention.check.interval.ms=300000: 设置日志保留策略的检查间隔,以毫秒为单位。Kafka 将在这个时间间隔内检查是否需要删除过期的日志文件。
zookeeper.connect=localhost:2181: 指定 Zookeeper 的连接地址,用于管理 Kafka 集群的元数据和状态。
zookeeper.connection.timeout.ms=6000: 设置与 Zookeeper 的连接超时时间,以毫秒为单位。
group.initial.rebalance.delay.ms=0: 设置消费者组初始再平衡的延迟时间,以毫秒为单位。设置为0表示消费者加入消费者组后立即进行再平衡。
这些配置项是 Kafka 集群中的 Broker 部分的配置,它们可以根据需求进行调整以满足性能、可靠性和资源管理等方面的要求。
3.3 修改集群配置
kafka-01 执行:
# 配置集群 ID
sed -i 's/broker\.id=0/broker\.id=1/g' config/server.properties
# 修改服务监听地址,默认是localhost:9092
echo "listeners=PLAINTEXT://10.0.0.11:9092" >> config/server.properties
kafka-02 执行:
sed -i 's/broker\.id=0/broker\.id=2/g' config/server.properties
echo "listeners=PLAINTEXT://10.0.0.12:9092" >> config/server.properties
kafka-03 执行:
sed -i 's/broker\.id=0/broker\.id=3/g' config/server.properties
echo "listeners=PLAINTEXT://10.0.0.13:9092" >> config/server.properties
3.4 启动服务
zookeeper-01 执行:
# 启动服务
cd /usr/local/kafka/
bin/kafka-server-start.sh -daemon config/server.properties
#查看端口
[root@kafka-01 kafka]# ss -ntl | grep 9092
LISTEN 0 50 :::9092 :::*
zookeeper-02 执行:
cd /usr/local/kafka/
bin/kafka-server-start.sh -daemon config/server.properties
#查看端口
[root@kafka-01 kafka]# ss -ntl | grep 9092
LISTEN 0 50 :::9092 :::*
zookeeper-03 执行:
cd /usr/local/kafka/
bin/kafka-server-start.sh -daemon config/server.properties
#查看端口
[root@kafka-01 kafka]# ss -ntl | grep 9092
LISTEN 0 50 :::9092 :::*
四、Kafka 数据读写
Kafka自带了一些用于操作和管理Kafka集群的Shell脚本。以下是一些常用的Kafka自带Shell脚本示例:
1. 创建主题:
使用kafka-topics.sh
脚本创建一个名为"test-topic"的主题,具有3个分区和1个副本:
./kafka-topics.sh --create --topic test-topic --zookeeper localhost:2181 --partitions 3 --replication-factor 1
2. 发送消息:
使用kafka-console-producer.sh
脚本发送消息到"test-topic"主题:
[root@kafka-01 bin]# ./kafka-console-producer.sh --topic test-topic --broker-list localhost:9092
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>hello
>world
>end
>^C
3. 接收消息:
使用kafka-console-consumer.sh
脚本从"test-topic"主题消费消息:
[root@kafka-01 bin]# ./kafka-consoconsumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
hello
world
end
4. 查看主题列表:
查看当前Kafka集群中的主题列表:
./kafka-topics.sh --list --zookeeper localhost:2181
5. 查看消费者组列表:
查看当前消费者组的列表:
./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
6. 描述主题:
获取关于主题"test-topic"的信息,如分区和副本分布情况:
./kafka-topics.sh --describe --topic test-topic --zookeeper localhost:2181
7. 查看消费者组偏移量:
查看消费者组"my-group"在主题"test-topic"上的消费偏移量:
./kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092
当涉及删除主题和创建消费者组时,你可以使用Kafka自带的Shell脚本来执行这些操作。以下是删除主题和创建消费者组的示例:
8. 删除主题:
使用kafka-topics.sh
脚本删除名为"test-topic"的主题:
./kafka-topics.sh --delete --topic test-topic --zookeeper localhost:2181
请注意,删除主题会永久删除该主题的所有数据和配置,请谨慎操作。
9. 创建消费者组:
使用kafka-consumer-groups.sh
脚本创建一个名为"my-consumer-group"的消费者组,并将其订阅到"test-topic"主题:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --create --topic test-topic --group my-consumer-group
附录
Kafka基本原理详解(超详细!)kafka工作原理<一蓑烟雨任平生>的博客-CSDN博客