kafka重平衡问题-golang

这里只讨论"github.com/IBM/sarama"的用法

这里将会从触发重平衡的原因,影响重平衡的因素去介绍重平衡

1. 重平衡带来的问题

重平衡设计原意是帮助开发者,实现动态负载均衡节点异常管理的好方法。但实际使用过程中,因为却带来了大量了的问题。kafka设计之初是为了实现海量数据下的低延迟(毫秒)处理能力,所以默认参数也都是为了做低延迟保障。所以如果对kafka进行优雅调参正确心跳变得非常重要。
大家可能遇到问题:

  1. 少量任务执行慢,但导致全部分区都重新分配。
  2. 大量任务处理慢,一直循环重平衡,导致重平衡期间,消费者会处于闲置状态,无法处理任务,任务积压。对已经拿到重平衡后结果的消费者已经执行完的任务时,重平衡变量,导致自己提交失败。
  3. 同时启动多个消费者时,能正常消费,一旦扩缩容,就会一直重平衡,很难稳定下来。

2. 触发重平衡的原因

1. 消费组中新增消费者

因为消费者一旦启动,就会请求broker,所以消费者新增会立即被kakfa识别到,立即触发重平衡(需要和消费者离开的场景区分开)

2. 消费者主动离开

例如,对机器缩容的时候,关闭掉了消费者。这里不会立即触发重平衡,因为kafka判断消费者的存活机制是心跳上报,消费者上报有间隔的上报心跳(config.Consumer.Group.Heartbeat.Interval),kafka在一段时间内发现有心跳上报(Consumer.Group.Session.Timeout),就认为消费者仍然存活。
所以当消费者离开后,broker等待一个Session.Timeout时间,发现没有上报,才会触发重平衡
所以触发缩容的时候,会可能导致消息大于 Session.Timeout的延迟。

3. 消费者无心跳

这里golang的java是client实现略有不同

1. java有两种识别方式:

1.认为节点死亡: config.Consumer.Group.Heartbeat.Interval和Consumer.Group.Session.Timeout
2. 认为节点处理速度过慢max.poll.interval.ms,当消费者会拉取一批数据cache到本地(ChannelBufferSize),一遍执行一遍拉数据,当cache满了,且就会停止拉新数据,直到缓冲区空出新的位置。当某个消费者执行时间过长,导致缓冲区填满且长时间无法空闲出来去容纳新数据,就停止拉数据。当通知拉数据时间>max.poll.interval.ms会认为这个节点处理能力不行,从而触发重平衡,把分配给这个消费者的分区分给其他消费者

2.golang:

主要依赖于 config.Consumer.Group.Heartbeat.Interval和Consumer.Group.Session.Timeout。然而心跳上报和消费执行时绑定在一起的,互相堵塞,即执行完任务才会上报心跳。所以一旦执行时间大于Session.Timeout,就会导致心跳超时,被踢出消费组

3. 影响重平衡因素

1. 心跳检测

  • config.Consumer.Group.Heartbeat.IntervalConsumer.Group.Session.Timeout
    前面讲到了心跳检测失败,就会触发重平衡,所以根据任务时间调整这两个任务,就能不符合预期的重平衡事件。
    Consumer.Group.Session.Timeout>单个消息执行时间
    config.Consumer.Group.Heartbeat.Interval如果任务时间较长,配置为一批任务时间的1/3
    当前这个参数受控于client端和broker端,client端需要配置在broker允许的范围呢
    这对应broker上的group.min.session.timeout.ms and group.max.session.timeout.ms,一般默认min是6s,max是30min。最终值参见broker配置。这俩参数也是可以配置修改的,可以根据实际业务场景去配置。
    不过不建议配置过长,意义不是很大。对于耗时较长的任务,例如一次执行要1h或者5h,那这种任务更重要的是减少任务恢复代价,不建议仅仅通过offset去简单从一个任务从头重启。而需要引入第三方db去记录任务状态

2. 缓冲区消息数量

  • config.ChannelBufferSize 这个参数决定了任务的数量,一批消息的执行时间=单个消息*config.ChannelBufferSize 。如果任务执行很慢,建议参数配置足够小,避免重平衡后导致大量的消息提交失败。

3. 重平衡超时时间

config.Consumer.Group.Rebalance.Timeout,看到1后,是否任务只要把任务超时时间配置足够大就可以了呢?例如任务执行一次小于5min,Consumer.Group.Session.Timeout=6min,config.Consumer.Group.Heartbeat.Interval=3s。那是否足够安全呢?大部分场景是正常。但是如果所有节点正在处理数据中,这时出现了节点扩缩容,或者出现了节点异常被踢出消费组,就会出现循环重平衡。默认重平衡超时时间是1min。重平衡期间任务中的消费者将无法发送心跳去join消费组,导致被丢弃。当任务执行完,发送心跳时,broker又以检测到新消费者再次重平衡。
完整流程如下:

kafka重平衡问题-golang


所以如果想要规避这种情况有两个方案

  1. 重平衡时间>任务执行时间,触发重平衡时,每个消费者都可以有一次机会去发出心跳
  2. 任务执行和kafka重平衡检测并行,一但检测到重平衡,则立即中断任务,发出心跳给broker。

4. 重平衡检测

重平衡期间,消费者处于闲置状态不再处理任何数据,直到任务结束。如果不愿意接受闲置时间过长,可以使用通过将任务处理协程异步出去,主流程一边等任务执行,一边等待kafka检测当前的session状态,当重平衡时session.Context().Done()
会接收到消息。这里直接结束,发出心跳。

func (h *Handler) handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) (err error) {
	defer func() {
		if recovererr := recover(); recovererr != nil {
			errstack := errpkg.GetErrStack()
			err = errors.Errorf("handle panic,recovererr:%v,errstack:%v", recovererr, errstack)
			slog.Error("handle failed", "err", err)
		}
	}()
	signal := make(chan error)
	go func() {
		err := h.handleSigle(session, msg)
		signal <- err
	}()
	for {
		select {
		case err := <-signal:
			slog.Error("handleSigle failed", "err", err)
			return nil
		case <-session.Context().Done():
			slog.Info("end hanle ,session is done")
			return nil
		}
	}
}

完整代码

package kafka
import (
	"fmt"
	"log/slog"
	"sync"
	"github.com/IBM/sarama"
	"github.com/pkg/errors"
	errpkg "inc-nlp-llm-clue-dispatch-online/err"
)
type MessageHandler interface {
	Handle(name string, topic string, partition int32, offset int64, key string, value string) error
}
type Handler struct {
	name           string
	mermberID      string
	handler        MessageHandler
	processingLock sync.Mutex // 限制单个消费者实例的并行度
}
// Setup 在消费组会话开始前运行一次
func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
	h.mermberID = session.MemberID()
	slog.Info("Consumer Setup", "name", h.name,
		"mermberID", h.mermberID,
		"generationID", session.GenerationID(),
		"partion", session.Claims(),
		"h", fmt.Sprintf("%p", h),
	)
	return nil
}
// Cleanup 在消费组会话结束后运行一次
func (h *Handler) Cleanup(session sarama.ConsumerGroupSession) error {
	slog.Info("Consumer Cleanup", "name", h.name,
		"mermberID", h.mermberID,
		"generationID", session.GenerationID(),
		"partion", session.Claims(),
	)
	return nil
}
// ConsumeClaim 从 Kafka 的分区中拉取消息并处理
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case msg := <-claim.Messages():
			err := h.handle(session, msg)
			if err != nil {
				slog.Error("ConsumeClaim failed", "err", err)
			}
		case <-session.Context().Done():
			return nil
		}
	}
}
func (h *Handler) handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) (err error) {
	defer func() {
		if recovererr := recover(); recovererr != nil {
			errstack := errpkg.GetErrStack()
			err = errors.Errorf("handle panic,recovererr:%v,errstack:%v", recovererr, errstack)
			slog.Error("handle failed", "err", err)
		}
	}()
	signal := make(chan error)
	go func() {
		err := h.handleSigle(session, msg)
		signal <- err
	}()
	for {
		select {
		case err := <-signal:
			slog.Error("handleSigle failed", "err", err)
			return nil
		case <-session.Context().Done():
			slog.Info("end hanle ,session is done")
			return nil
		}
	}
}
func (h *Handler) handleSigle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {
	// 串行处理该消费者实例的所有分区消息
	h.processingLock.Lock()
	defer h.processingLock.Unlock()
	err := h.handler.Handle(h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
	if err != nil {
		slog.Error("Message handle failed", "err", err)
	} else {
		slog.Info("Message handle finish",
			"topic", msg.Topic,
			"partition", msg.Partition,
			"offset", msg.Offset,
			"key", string(msg.Key))
	}
	session.MarkMessage(msg, "")
	session.Commit()
	return nil
}

5. 重平衡策略

  • config.Consumer.Group.Rebalance.Strategy kafka重平衡策略
  • kafka提供了三种重平衡策略,range,roundrobin,sticky.
    如果是为了维持任务稳定性,减少任务重平衡带来的开销,这里建议使用sticky,他会最大限度,尽量少的修改分区分配的变动,例如4分区,4消费者。新增1个,最终不会调整分区分配。

5. 如何解决

到这里,主要触发重平衡问题的原因就是

1. 任务原本执行时间长,

没有合理地配置心跳超时时间和重平衡时间

  1. 用sticky,最小限度地调整分区分配情况
  2. 增大Consumer.Group.Session.Timeout为略大于最大任务执行时间,config.Consumer.Group.Heartbeat.Interval=Consumer.Group.Session.Timeout/3
  3. 选择增大config.Consumer.Group.Rebalance.Timeout,或者通过session检测去监控重平衡触发,及时上报心跳

2. 异常的任务耗时,突发的延时过长

  • 机器节点异常了,如果节点的网络,cpu,内存问题,应当触发重平衡,直到该节点状态恢复
  • io延迟超预期,合理控制所有的io超时限制,避免完全不可控的任务执行时间。

5. 应用场景

所有产品都是为对应的应用场景所设计的,kafka原本设计是为了海量低延迟。快速重平衡也是为了对节点异常下的快速切流保障。
但kafka也保留了开发者自由调整参数的空间,去适配不同的应用场景。
最后,可以适配不同场景不等于所有场景,对于超长延时的任务,如果小时级别的任务,建议还是加上任务恢复机制,不纯依赖kafka进行任务恢复

6. 其他

最后kafka是为流式场景设计,如果kafka有这些问题,为啥很多流式场景在用呢

  1. 常见的使用还是海量数据,低延迟的为主
  2. flink的流式使用帮忙解决了这些问题,flink是for流式的大数据任务处理,天然就有任务长,数据大的特性。
    为了保证任务稳定和可恢复性强,引入了checkpoint机制,定时为整个数据流做快照,记录下此时的任务状态(包含计算状态),这里也会记录kafka的消费情况。所以就算任务执行时间长,flink自己做了状态管理,不受限预任务的任务时间,避免kafka反复重平衡。

如果有问题,欢迎指出交流哈,也可以在go的git仓库里去查找或者提出issues
https://github.com/IBM/sarama/issues

© 版权声明

相关文章