Kafka未触发消费异常排查实录

2023-05-20 16:16:24 来源:博客园

前言:

最近生产环境系统发现一个疑难杂症,看了很久的问题但是始终无法定位到问题并处理,然后查阅了相关资料也是定位不到问题,不过资料查阅却给了个新的思路,以此为跳板最终解决了问题。

一、问题描述

功能介绍:“主计划拆分子计划”是APS系统很常见的功能,功能大概意思是用户可选多个主计划一次性进行“展开子计划”生成子计划,因单个主计划生成子计划的逻辑相对复杂,所以单个计划耗时不能算低,故这里的批量操作使用了异步进行,这里使用了Kafka进行生产及消费消息。


【资料图】

问题起因:功能完成之后上生产系统,然而偶尔会收到客户提出少量单据卡在中间状态,导致“展开”不了的问题,前前后后查了好久也没能找到具体问题并解决。

二、问题分析

分析数据:通过查看用户提供的单据,发现这些数据都是卡在了某一个中间状态,这个状态是作为中转状态使用的,一开始计划的状态为“未展开”,点击执行“展开子计划”的功能之后,将计划标识为“展开中”之后再推送到Kafka消费处理,Kafka消费者接收到生产者的消息之后,将计划进行处理,处理完成之后再将状态标识为“展开完成”。

所以从数据上分析,问题点应该出现在消息生产到消费过程这段期间,但是纵观代码,发现已经对业务逻辑做了异常处理,如果是消息消费过程发生异常,都会将错误过程记录下来,所以再次定位到问题出现在Kafka的生产消息及消费消息这个过程。

查看日志:根据以上分析结合日志监控的方式,确定问题数据实际上并未进行消费,所以猜想有两种情况:

  1. Kafka根本没有生产消息成功;
  2. 生产消息成功,但是Kafka未Poll到需要消费的消息。

三、进展

  1. 加addCallback回调方法
    • 生产端的kafkaTemplate对象中,封装发送消息的方法,将send()方法封装为通用方法,增加addCallback()回调方法,用于消息生产成功之后回调记录日志。以此确认生产消息是否成功.

2. 参考相关资料:

查问题过程中,看到大佬写的文章,文章里描述了造成消费不成功的问题是因为“Kafka 内存饱和”造成的,但是实际上内存饱和造成的问题是Kafka消费服务Poll消息时候超时,相应的错误信息在我们系统日志中搜不到,最终也确认不是因为改原因造成的问题。(文章下文参考)

  • 【AGP网关】Kafka 异常排查实(内网文章)
  • 记一次kafka消费者不消费,消费组被踢出问题(外网文章)

  1. 研究kafka消费原理

当前确认问题点应该是出现在消费端消费不了消息导致的,那么重新研究一下Kafka消费端的实现原理。

消费者是通过KafkaConsumer对象的poll方法从Kafka队列中将消息拉取出来进行消费,这个poll方法可传入poll超时时间,超过设置的时间则会报拉取超时的异常“due to consumer poll timeout has expired.”,上文中大佬出现的报错就是提示这种拉取超时的报错,超时时间可通过配置节点【max.poll.interval.ms】进行配置;

KafkaConsumer对象poll到数据之后取到ConsumerRecords对象,然后就可以对数据进行消费,直到取到的ConsumerRecords对象是空的(isEmpty()为true)才停止消费。

这里发现有一个隐患的地方,当ConsumerRecords对象取到空数据才停止消费,那么这个ConsumerRecords对象是否会取到多个数据进行消费,是如何进行消费的?!

查阅相关资料,发现Kafka的消费原理是:KafkaConsumer 对象是实时拉取消息的,但不是实时消费消息的。KafkaConsumer 在 poll() 方法中从 Kafka 集群中批量拉取数据,将多个消息封装在 ConsumerRecords 对象中返回。这些消息可以在消费者应用程序的时间间隔内处理,但poll() 方法返回的消息不是立即消费的。只有在 ConsumerRecords 中的所有消息都被处理后,才会发送下一个拉取请求。如果在处理消息时发生错误,可以根据实际需要重新处理这些消息或跳过这些消息。

反观项目代码,发现在KafkaListener监听器拉取到数据之后,项目中仅仅只是取第一条数据进行消费,这里是不是有问题呢?(参考下面代码块)

/**     * 计划发布SAP-KTWKZ     *     * @param records     * @param consumer     */    @KafkaListener(topics = {KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC},            id = KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC, containerFactory = "batchFactory")    public void dayPlanReleaseToSapKTWKZTopic(ConsumerRecords records, Consumer consumer) {        String data = ApsKafkaUtils.getFirstRecordValues();        log.info("mq消费-KT万颗子计划发布SAP数据:\n{}", data);        try {            SystemPostDTO systemPostDTO = JsonUtils.fromJson(data, SystemPostDTO.class);            String postContent = systemPostDTO.getData();            if (StrUtil.isEmpty(postContent)) {                log.warn("mq消费-KT万颗子计划发布SAP数据异常警告,systemPostDTO.data数据不能为空");                return;            }            pushRecordService.consumePushProductionOrderKtWKZ(postContent);        } catch (Exception e) {            log.error("mq消费-KT万颗子计划发布SAP数据异常:", e);        }    }

四、问题重现及

重现:

从第三点的分析中,暂时确定可能是ConsumerRecords对象接收到多条消息,但是消费端仅仅消费了第一条消息导致的问题,那么通过写demo来测试批量生产消息是否会导致ConsumerRecords一次性拉取到多条消息。

生产:

@ApiOperation("测试Kafka poll消息机制")    @PostMapping("/v99/schedule/kafka/test")    public ResponseEntity testKafkaPollMessage(            @RequestParam("testData") String testData) {        String topic = "my-test-topic";        for (int i = 0; i < 100; i++) {            Thread.sleep(0);//参数有0,10,100            SubassemblyOpenPlanListInfoRespDTO dto = kafkaDemo.sendMessage(testData);        }        return Results.success("Success");    }

消费:

@KafkaListener(topics = {"my-test-topic"},            id = "my-test-topic", containerFactory = "batchFactory")    public void myTestKafka(ConsumerRecords records, Consumer consumer) {        List recordValues = ApsKafkaUtils.getRecordValues();        try {            log.info("接收到的数据为【{}】{}", recordValues.size(), JsonUtils.toJson(recordValues));        } catch (Exception e) {            log.error("接收到的数据为异常:", e);        }    }

以上接口,通过调用发现确实出现ConsumerRecords对象poll到多条消息的情况:

其中,for循环中执行等待时间越长,出现一个ConsumerRecords对象拉取到多条数据的情况越少:

那么分析为什么在实际使用过程中,【主计划拆分子计划】这个功能是偶然出现消费失败的问题,而不是稳定出现呢?

再次通过代码ReView的方式去回顾一下这个功能,发现当前代码中,是使用了for循环将一批次计划单循环推送给MQ进行消费,单个循环里执行了一次读库一次写库的操作,一次循环耗时大概几十毫秒,与上述demo的Thread.sleep(10)场景类似,所以基本确定偶发这种问题的原因出现在这里。

解决:

其实解决该问题很简单,只需要在消费端获取到ConsumerRecords对象之后,将拉取到的所有消息列表循环消费而不是只消费单条消息即可,之前的仅消费单条消息的场景经过沟通确认只存在某些特殊场景才需要使用,暂时不再保证该种场景。

五、总结

本案例中,通过日志、业务场景、写Demo使用并发工具等方式来分析及重现问题,将一个生产上的疑难杂症处理掉,其中也通过参考大佬的文章,虽然问题描述和大佬描述的基本一致,也和网络上的Blog描述一致,但是产生的问题却并不一样。

总的来说,其实解决问题不难,重要的是要了解问题,了解原理以及了解到解决问题的步骤,建议从多个方面一起查看问题。从其他参考文章描述中,可以从业务、日志、内存环境等查看问题,我这里补充一点,也可以多多结合业务来适当写demo去测试问题,可能也会有意外收获。

其他

大家有没有遇到其他的生产上的疑难杂症呢,大家都是怎么遇到问题,最后怎么解决问题呢,这里大家不妨进行讨论,也可以列出多多的跟进方案或者工具,大家一起学习进步。

关键词:

相关文章

热文推荐

Kafka未触发消费异常排查实录
Kafka未触发消费异常排查实录

前言:最近生产环境系统发现一个疑难杂症,看了很久的......更多>

70美刀成标配?《真人快打1》成为switch第2个70刀游戏_今日热文
70美刀成标配?《真人快打1》成为switch第2个70刀游戏_今日热文

NetherRealmStudios正式宣布了真人快打的新作《真人快......更多>

广西壮族自治区三江侗族自治县发布雷电黄色预警
广西壮族自治区三江侗族自治县发布雷电黄色预警

三江县气象台2023年5月20日6时2分发布雷电黄色预警信......更多>

全球实时:如何清洁窗纱_怎么清洗纱窗
全球实时:如何清洁窗纱_怎么清洗纱窗

如何清洁窗纱,怎么清洗纱窗很多人还不知道,现在让我......更多>

排行推荐

湘菜馆取名_湘菜馆取名有什么 环球观速讯
湘菜馆取名_湘菜馆取名有什么 环球观速讯
欢迎观看本篇文章,小升来为大家解答以上问题。湘菜馆... 更多>
天天热点!普通人能够拥有的最好的资产
天天热点!普通人能够拥有的最好的资产
《富爸爸穷爸爸》一书中这样描述资产:资产就是能够源... 更多>
金地集团记录
金地集团记录
$金地集团(SH600383)$$龙湖集团(00960)$本来买入完毕... 更多>
新资讯:也来说说贵州茅台参与设立产业基金的事
新资讯:也来说说贵州茅台参与设立产业基金的事
5月18日,贵州茅台发布一系列公告,其中花费100亿参与... 更多>
【新视野】重金求购的投资秘技,打开一看…
经常在开盘之前,股神们都会给出一个当天金股列表啥的... 更多>
世界简讯:美国共和党谈判代表离开现场 债务上限谈判突然中止
美国共和党债务上限谈判代表格雷夫斯周五上午与白宫代... 更多>
本周个人现金净资产超500万元 世界热文
周五又买股票了。这几天胡思乱想的多了,因为个人现金... 更多>
因停车距离纠纷乘客拒绝下车达28小时 警方:两名乘客被行拘 天天微动态
近日,一则“因停车距离纠纷,乘客拒绝下车已达28小时... 更多>
奋达科技:人工智能音箱暂未接入大模型 董事长肖奋直言减持系还贷所需 天天看热讯
“友谊的小船说翻就翻”“本人也不舍减持股票”,不同... 更多>
创5年新高,糖价逼近7000元/吨,供需缺口约660万吨
今年以来,国内外糖价持续走高,当前国内期现货糖价均... 更多>
碳酸锂未来售价预计在20万至35万区间波动|天天速讯
对于未来价格趋势,隆众资讯锂分析师曲音飞认为,此轮... 更多>
万达回应裁员:的确在进行优化 但没有大规模
近日市场消息称“万达将大规模裁员,涉及比例达30%以... 更多>
离离原上草 野火烧不尽春风吹又生是什么意思
今天来聊聊关于离离原上草,野火烧不尽春风吹又生是什... 更多>
锦州烧烤主打一个啥都能烤!_今日最新
锦州烧烤啥都能考,而且各个部位均能整出专门店,鸡头... 更多>

【播资讯】追忆左晖:何谓难而正确

南昌,我想对你说!-微资讯

【热闻】东微半导:2022年业绩亮眼

《济南市灵活就业人员参加住房公积

全球头条:广东省地方金融监管局倪

广东省科技厅副厅长梁勤儒:推动科

【新视野】万达回应裁员:的确在进

环球热讯:今年的公募基金怎么了?

你需要把假认知转真认知…|天天即时

济南出台住房公积金新政:灵活就业