定义

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,主要应用于大数据实时处理领域。

架构图

image-20210515143843109

Zookeeper

kafka集群依赖zookeeper,创建zookeeper容器,kafka集群信息需要存在zookeeper中,如topic的信息(topic有几个,分区数、副本数、leader是谁,副本在哪台机器上等),Kafka 集群中会有一个 broker 会被选举为 controller,负责管理集群 broker 的上下线,所有topic的分区副本分配和 leader选举等工作。 controller的管理工作都是依赖于 Zookeeper 的。如下图,当前当选controller的broker为brokerid为0的kafka主机。(选举原则为先到先得)

image-20210516192537060

注 0.9之前版本消费者消费信息的偏移量也是存在zookeeper中,0.9及其之后的版本存在kafka集群中。(消费者可以直接连zookeeper,但会提示警告,过期)

安装运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#集群0
docker run -d --name kafka0 -p 9090:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.38.128:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.38.128:9090 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime -v /usr/local/docker_container_config/kafka/kafka0/config/:/opt/kafka/config/ -v /usr/local/docker_container_config/kafka/kafka0/data/:/kafka/ wurstmeister/kafka

#-e KAFKA_BROKER_ID=0  在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
#-e KAFKA_ZOOKEEPER_CONNECT=192.168.155.56:2181/kafka 配置zookeeper管理kafka的路径
#-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.38.128:9092 把kafka的地址端口注册给zookeeper,若需要远程访问要改成外网IP
#-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
#-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
#-v /usr/local/docker_container_config/kafka/kafka0/config/:/opt/kafka/config/ 配置文件
#-v /usr/local/docker_container_config/kafka/kafka0/data/:/kafka/
#-e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" 内存不够时使用


#集群1
docker run -d --name kafka1 -p 9091:9092 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.38.128:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.38.128:9091 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime -v /usr/local/docker_container_config/kafka/kafka1/config/:/opt/kafka/config/ wurstmeister/kafka
#集群2
docker run -d --name kafka2 -p 9092:9092 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.38.128:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.38.128:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime -v /usr/local/docker_container_config/kafka/kafka2/config/:/opt/kafka/config/ wurstmeister/kafka

kafka管理工具kafka-manager

1
docker run -d --name kafka-manager  -p 9000:9000  --env ZK_HOSTS=192.168.38.128:2181 sheepkiller/kafka-manager

image-20210516164410776

主题

主题信息存在zookeeper中,所以直连zookeeper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#因为集群依赖于zookeeper,我的的操作几乎直接对zookeeper操作
#查看主题列表
kafka-topics.sh --zookeeper zkip:zkport[/要具体到zookeeper上的节点] --list

#创建主题
kafka-topics.sh --zookeeper zkip:zkport --create --topic 主题名 --partitions 分区数量 --replication-factor 副本数量
#./kafka-topics.sh --zookeeper 192.168.38.128:2181/kafka --topic topic1 --create --partitions 3 --replication-factor 2

#修改分区数
kafka-topics.sh --zookeeper zkip:zkport --alter --topic 主题名 --partitions 分区数

#删除主题
kafka-topics.sh --zookeeper zkip:zkport --delete --topic 主题名

#主题描述(详情)
kafka-topics.sh --zookeeper zkip:zkport --describe --topic 主题名

image-20210515031033422

__consumer_offsets

__consumer_offsets是默认的主题(位移主题,一般第一个consumer程序启动时会创建该主题),主要的作用是保存consumer的位移信息。存储的消息体格式可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。其存放机制是用消费组id的hash值对分区数取模得到。(消息体还保存了位移提交的一些其他元数据,如时间戳和用户自定义的数据等。为了 Kafka 执行各种各样后续的操作,比如删除过期位移消息等)kafka 默认为该 topic 创建了50个分区以降低多个consumer同时更新位移的读写压力

partition分区

kafka会为每个partition创建一个log文件,该log文件中存储的就是生产者发布的数据。生产者的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己 消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,每个 partition可分为多个segment。每个segment 对应两个文件——“.index”文件和“.log”文件。这些文件位于一个分区文件夹下,该文件夹的命名 规则为:topic 名称+分区序号(如下图的topic-0和topic-2)。“.log”文件存储数据。“.index”文件存储索引信息指向对应数据文件中message的物理偏移地址。(定位时使用的是二分查找算法)

log.dirs配置路径下

分区分配策略

  1. RoundRobin
  2. Range

删除策略

kafka使用Compact 策略来删除位移主题中的过期消息,后台线程``Log Cleaner`定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据

compact

ISR

partition分区中会有一个leader,若干个follower,Leader维护了一个动态的 in-syncreplicaset (ISR),意为和 leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间(默认10秒)未向leader同步数据 ,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms 参数设定。leader 发生故障之后,就会从 ISR 中选举新的 leader。

注:Unclean leader选举:由于ISR 是动态变化的,当ISR列表只有leader,且此时leader挂了,即此时ISR为空,此时Kafka就要重新选举出新的 leader。但ISR为空,Kafka 把不在 ISR 列表中的存活副本称为“非同步副本”,这些副本中的消息远远落后于 leader,如果选举这种副本作为 leader 的话就可能造成数据丢失。0.11版开始,Kafka提供了一个参数 unclean.leader.election.enable,用于控制是否允许非同步副本参与 leader 选举;如果开启,则当 ISR 为空时就会从这些副本中选举新的leader,这个过程称为 Unclean leader 选举。

0.9版本之前replica.lag.max.messages和replica.lag.time.max.ms两个参数用于判断副本是否需要被剔除。

0.9及其之后把replica.lag.max.messages剔除。详情见官网

Kafka的leader选举与副本

ack 应答/副本同步机制

为保证 producer 发送的数据可靠送到指定的topic,topic的每个 partition收到producer发送的数据后,都需要向 producer 回复ack,如果 producer收到ack,就会进行下一轮的发送,否则重新发送数据。

Kafka 为用户提供了三种可靠性级别ack应答机制。

0:producer 不等待 broker 的 ack,最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据。

1:producer 等待 broker 的 ack,partition的leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader故障,那么将会丢失数据;

-1:producer 等待 broker 的 ack,partition的leader和follower(ISP中的follower,注:leader一定在ISR中)全部落盘成功后才返回ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会 造成数据重复。

故障恢复

LEO(Log End Offset):指的是每个副本最大的 offset; HW(High Watermark):指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

(1)follower 故障 follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘 记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。 等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加入 ISR 了。

(2)leader 故障 leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Kafka高效的原因

  1. 数据分区存储:方便集群扩展且以区为单位读写提高并发读写能力。
  2. 磁盘顺序写:数据写入log文件以追加的形式写入,省去磁针寻道时间。
  3. 零拷贝技术:省去数据从内核态到用户态之间的互相拷贝。

生产消费者

Kafka中消息是以 topic 进行分类的(一个主题类似一个队列),生产者生产消息,消费者消费消息,都是面向 topic 的。即创建生产者和消费者时都需指明topic。

注:Kafka2.4之前读写消息数据都是在Leader分区,Kafka2.4开始可以配置replica.selector.class允许从副本读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
#进入kafka的bin目录
#运行生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
#后面直接输入数据即可

#另开窗口同样进入bin目录
#开启消费者监听 --from-beginning 从头开始消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic --from-beginning

#kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic
#会提示过期
#Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
image-20210515231826192

消费者组

注:在同一消费者组中的消费者不能同时消费同一分区

修改config目录下的consumer.properties配置文件的group.id,创建消费者时带上–consumer.config 参数。

1
2
3
4
5
6
#consumer.properties中的配置group.id=czm-group
#生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

#消费者,创建两个窗口
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config consumer.properties(配置文件路径) --topic topic1

幂等性

Kafka在0.11 版本中引入了一项重大特性:幂等性。指 Producer不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。将Producer的参数中 enable.idompotence 设置为 true 即可开启。开启幂等性的 Producer 在初始化的时候会被分配一个PID,发往同一 Partition的消息会附带 Sequence Number。而Broker端会对做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。 (注:PID重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话)

事务

kafka在0.11版本后引入事务,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

Producer 事务

引入一个全局唯一的 Transaction ID,并将 Producer 获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 Transaction ID 获得原来的 PID,实现跨会话。 为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer 事务

Consume 的事务保证就会相对较弱,尤其时无法保证 commit 的信息被精确消费。这是由于 Consumer可以通过offset访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况(默认保存一周)。

其他参考链接

Java API使用

maven坐标

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.czm.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.Scanner;

/**
* @author CZM
* @create 2021/5/19
*/
@Slf4j
public class MyProducer {
public static final String topic = "topic1";
public static void main(String[] args) {
Properties properties = new Properties();
//kafka集群,broker-list
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "czm.com:9090,czm.com:9091,czm.com:9092");
//配置ack级别0,1,-1(all)
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
//等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//RecordAccumulator 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024);
//序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(properties);
Scanner in = new Scanner(System.in);
while (in.hasNextLine()) {
String value = in.nextLine();
if ("exit".equals(value)) {
break;
}

ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, value);
producer.send(producerRecord, (RecordMetadata metadata, Exception exception) ->{
log.info("发送成功主题{}, 分区:{},偏移量:{}", metadata.topic(), metadata.partition(), metadata.offset());
});

}
producer.close();

}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.czm.kafka.consumer;

import com.czm.kafka.producer.MyProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
* @author CZM
* @create 2021/5/19
*/
@Slf4j
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "czm.com:9090,czm.com:9091,czm.com:9092");
//消费者组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "czm-group");
//关闭自动提交
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "false");
//自动提交时间间隔(ms)
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//反序列化类
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
try(Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties)){
//订阅主题
consumer.subscribe(Arrays.asList(MyProducer.topic));
while (true) {
ConsumerRecords<String, String> message = consumer.poll(Duration.ofSeconds(5));
message.forEach((e)->log.info("消息:topic: {}, partition:{}, offset:{}, value:{}",e.topic(), e.partition(), e.offset(), e.value()));
//同步提交
//consumer.commitAsync();

//异步提交
consumer.commitAsync((Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) -> {
if (exception != null) {
log.error("消息提交失败:{}", offsets);
}
});
}
} catch (Exception e) {
log.error("接收异常:{}", e.getMessage());
}


}
}

常见问题

leader选举

时间轮(TimingWheel)

Kafka Stream(可用于实时计算)

基础学习链接

原理学习链接

简单示例

流和状态

学习链接

时间窗口

学习链接

Kafka connect

学习链接

常见面试问题

参考链接

参考链接

END