加入收藏 | 设为首页 | 会员中心 | 我要投稿 应用网_丽江站长网 (http://www.0888zz.com/)- 科技、建站、数据工具、云上网络、机器学习!
当前位置: 首页 > 站长资讯 > 动态 > 正文

又出问题了!

发布时间:2021-04-06 10:06:26 所属栏目:动态 来源:互联网
导读:面两种情况我们可以人为的避免,在实际工作过程中,对于Kafka发生Rebalance最常见的原因是消费组成员的变化。 消费者成员正常的添加和停掉导致Rebalance,这种情况无法避免,但是时在某些情况下,Consumer 实例会被 Coordinator 错误地认为 已停止 从而被踢

面两种情况我们可以人为的避免,在实际工作过程中,对于Kafka发生Rebalance最常见的原因是消费组成员的变化。

消费者成员正常的添加和停掉导致Rebalance,这种情况无法避免,但是时在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group,导致Rebalance。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经 “死” 了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。这个时间可以通过Consumer 端的参数 session.timeout.ms 进行配置。默认值是 10 秒。

除了这个参数,Consumer 还提供了一个控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。

通过上面的分析,我们可以看一下那些rebalance是可以避免的:

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过)

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,最好将该参数值设置得大一点,比下游最大处理时间稍长一点。

总之,要为业务处理逻辑留下充足的时间。这样,Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 。

拉取偏移量与提交偏移量

kafka的偏移量(offset)是由消费者进行管理的,偏移量有两种,拉取偏移量(position)与提交偏移量(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka会使用拉取偏移量的值作为分区的提交偏移量发送给协调者。

如果没有提交偏移量,下一次消费者重新与broker连接后,会从当前消费者group已提交到broker的偏移量处开始消费。

所以,问题就在这里,当我们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。所以拉取偏移量没有提交到broker,分区又rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。

异常日志提示的方案

其实,说了这么多,Kafka消费者输出的异常日志中也给出了相应的解决方案。

接下来,我们说说Kafka中的拉取偏移量和提交偏移量。

其实,从输出的日志信息中,也大概给出了解决问题的方式,简单点来说,就是可以通过增加 max.poll.interval.ms 时长和 session.timeout.ms时长,减少 max.poll.records的配置值,并且消费端在处理完消息时要及时提交偏移量。

问题解决

(编辑:应用网_丽江站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读