最近生产上碰到一个奇怪的问题,现场人员反馈Kafka
工作不正常。经过开发人员检查,发现一个奇怪的现象:线上两台服务器竟然在同时消费一个Kafka
Topic
中的一个分区(Partition
)。
1. 背景
我们的程序要实现的功能是这样的:系统会定时扫描库存表,根据库存情况决定是否要执行出库操作。之前是采用定时任务处理,执行策略是1分钟一次,为了防止多台服务器并发执行,使用Quartz
集群模式确保定时任务只在一台服务器上执行。最近客户提出需求,某些时候客户希望响应更及时一些,譬如某项请求到达的时候,就要立即检查库存并决定是否需要出库。因此,开发人员对系统进行了调整,改为使用Kafka
Consumer
来触发扫描库存动作;定时任务和接口处同时负责发送触发库存扫描的消息。
相关的代码如下:
Consumer
用来处理扫描库存的功能:
1public class LibraryOutboundTriggerConsumer {
2 private final OutboundService outboundService;
3 private final BoxInfoService boxInfoService;
4
5 @KafkaListener(topics = "${fwm.core.topic.library-outbound-trigger}")
6 public void consume(ConsumerRecord<String, String> record) {
7 log.info("\n\n");
8 log.info("=====================开始执行出库扫描=====================");
9 if (boxInfoService.hasBoxUpdateTask()) {
10 log.info("=====================存在料箱更新任务,执行出库扫描结束=====================");
11 return;
12 }
13 Stopwatch stopwatch = Stopwatch.createStarted();
14 try {
15 boxInfoService.startOutboundTask(90);
16 outboundService.outboundTaskScan();
17 } finally {
18 boxInfoService.stopOutboundTask();
19 }
20
21 log.info("=====================执行出库扫描结束,总耗时{}ms=====================", stopwatch.elapsed(TimeUnit.MILLISECONDS));
22 }
23}
定时任务相关代码,用来发送触发扫描的Kafka
消息:
1public class LibraryScanJob extends QuartzJobBean {
2 private final KafkaTemplate<String, String> kafkaTemplate;
3 private final CoreProperties coreProperties;
4
5 @Override
6 protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
7 kafkaTemplate.send(coreProperties.getTopic().getLibraryOutboundTrigger(), "1", "Job Trigger");
8 }
9}
我们服务端部署的情况是:应用部署在两台服务器上server01
,server02
;Kafka
部署在三台服务器上Kafka01
,Kafka02
,Kafka03
。Consumer
负责消费library-outbound-trigger
这个Topic
,这个Topic
虽然被创建为三个分区,但是实际使用中只用了一个分区,这是因为在发送消息的时候指定了Key
值。我们知道,默认情况下Kafka
会调用Partitioner
对发送内容进行分区,默认的分区算法为DefaultPartitioner
,简单来说其算法是:如果Key
有值,则根据key
计算一个Hash值,然后针对分区数取余;如果Key
为空,则使用Round-Robin
策略进行分区。
DefaultPartitioner is a Partitioner that uses a 32-bit murmur2 hash to compute the partition for a record (with the key defined) or chooses a partition in a round-robin fashion (per the available partitions of the topic).
DefaultPartitioner is the default partitioning strategy (per ProducerConfig.PARTITIONER_CLASS_CONFIG configuration property).
因为上述代码中我们指定了Key
固定为1
,所以它总会分配到同一个分区中,也就是虽然我们创建了三个分区,实际上应该只有一个分区有数据。通过工具软件查看Kafka
中的数据也确实证明了这一点。
因为实际上只有一个分区有数据,所以Consumer
应该只在一台服务器上才能够执行。但是经过观察日志发现,两台服务器上都能够看到执行出库扫描
这种日志。
2.问题分析
Kafka
中一个Partition
只能由一个Consumer Group
中的一个Consumer
消费,这是保证负载均衡的需要,也是Kafka
实现消息相对有序性的一个核心设计理念。用过Kafka
的人都应该见过这张图:
这张图描述了两个Consumer Group
同时在消费一个Topic
,不论哪个Consumer Group
,对于一个特定的Partition
来说,总是只有一个Consumer
在消费其中的消息。对我们的系统来说,只有一个分区有数据,按理说应该只有一台服务器消费才正确。
我们在生产环境安装了Kafka
管理工具Kowl
,经过查看Kafka
的运行状态,发现该Topic
经常出现没有Consumer
消费的情况,而且消费者也不固定;另外,该Topic出现了非常巨大的Lag
,消费Offset
始终没有更新。明显是Kafka
触发Rebalancing
导致的。但是因为服务端禁用了Rebalance
相关的日志,所以从日志中没有找到相关证据,只能通过分析代码来确认问题。
注意上面Consumer
中的代码,Stopwatch
计时的范围并不包括对hasBoxUpdateTask
的调用,该函数是用于防并发设置的,但是这个函数本身写的有问题,导致执行时间过长。而日志中看到的扫描时间并没有包含这个函数执行的时间,因此没有看出异常情况。hasBoxUpdateTask
的代码如下:
1 @Override
2 public boolean hasBoxUpdateTask() {
3 List<String> keys = new ArrayList<>();
4 redisTemplate.execute((RedisConnection connection) -> {
5 try (Cursor<byte[]> cursor = connection.scan(
6 ScanOptions.scanOptions().count(10).match(BOX_UPDATE_TASK_PREFIX + "*").build())) {
7 cursor.forEachRemaining(value -> {
8 keys.add(new String(value));
9 });
10 } catch (Exception e) {
11 log.error("hasBoxUpdateTask", e);
12 }
13 return keys;
14 });
15
16 return keys.size() > 0;
17 }
可以看到代码里面使用了Redis
中生产环境不建议用的Scan
指令。虽然Scan
命令比Keys *
要安全一些,但是同样有可能导致全库扫描:当全库键值很多,匹配数量又很少的时候,一次调用就会触发全库扫描,耗时会很长。经过实验,我们全库19w+,一次不匹配的扫描耗时大概15~20秒。
在Spring
中,我们对于Consumer
配置如下:
1spring:
2 kafka:
3 consumer:
4 enable-auto-commit: true
5 auto-commit-interval: 1000ms
按照上面的配置,期望的表现是每秒钟提交一次Offset
,那为什么会出现Offset
始终没有更新的情况呢?
3. 原因解读
以前一直知道Kafka Consumer
如果处理时间过长就可能导致问题。这次正好查阅一下文档来明确问题产生的原因及问题的表现。
经过研究,发现之前对于自动提交还有些理解不准确的地方。首先,看一下O'Reilly
经典的Kafka: The Definitive Guide
中关于自动提交的描述(第4章):
Automatic Commit
The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms. Just like everything else in the consumer, the automatic commits are driven by the poll loop. Whenever you poll, the consumer checks if it is time to commit, and if it is, it will commit the offsets it returned in the last poll.
Kafka
的自动提交机制是依赖于poll
循环机制的:
1while(true) {
2 // 每次调用poll的时候判断是否需要提交offset
3 ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
4 processRetrievedRecords(records);
5}
也就是说: Kafka只有在发生一次poll的时候才会将上次处理完的Offset提交上去。 如果消息处理时间过长,就会出现Offset
没有来得及提交的情况。
而一次消费时间的长短又与max.poll.records
有关:Kafka
一次poll
可以拉取最多max.poll.records
条记录,然后缓存在本地进行处理。只有本地缓存的数据处理完毕,才会再次进行poll
。而这个值默认为500。
max.poll.records
The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.
Type: int Default: 500
而判断处理时间是否超时则与max.poll.interval.ms
有关:max.poll.interval.ms
定义了两次poll
之间的时间间隔。
max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.
Type: int Default: 300000 (5 minutes)
可以看到两次poll
之间的最长时间为5分钟,如果超过5分钟,则会被判定为Consumer LeaveGroup
,然后触发Rebalancing
。
综合起来看,这个问题出现的原因应该是这样:
Kafka
一次获取了大量的记录,例如500条;- 每条记录消费大概占用15~20秒;
- 记录还没处理完,时间就超过了5分钟,引起了
Rebalancing
; Rebalancing
之后,Consumer
由另一台服务器接管,进入上面同样的处理;而此时第一台服务器上的消息还没有处理完毕,也在继续循环处理;- 这样两台服务器都在处理数据,但是
Offset
却永远没有办法更新;看上去就像是两个Consumer
在消费同一个Partition
。
4. 修改及总结
修改代码,不再使用Redis
的Scan
指令,提高处理速度,问题得以解决。
两点教训:
- 在
Redis
生产环境中不能够使用类似Keys
,Scan
这种指令,这种指令很容易导致超长的处理时间; Kafka
的自动提交机制依赖于poll
循环,如果一次消息处理时间过长,就会出问题。最好的方法是提高处理速度;如果实在不行,可以降低每次拉取的记录数,修改max.poll.records
参数。在springboot
中可以通过spring.kafka.consumer.max-poll-records
参数进行修改。