博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
新增多个 Flume 实例后,Kafka 数据重复消费问题处理
阅读量:3938 次
发布时间:2019-05-23

本文共 2625 字,大约阅读时间需要 8 分钟。

文章转载自:https://blog.csdn.net/hejiangtju/article/details/80364388

我们使用 Flume 将数据从 Kafka 加载到 Hive 中。

由于启动一个 Flume 实例时,数据加载的速度只能达到 10MB/秒 (每条Kafka记录100B)。于是我们计划启动多个 Flume 实例 (指定同一个消费者组名称)。

我们知道 Kafka 数据消费是以 Partition 为单位的,即一个 Partition 只能被一个 Flume 实例消费。当启动第二个 Flume 实例时,Kafka 会把已经分配给第一个 Flume 实例的 Partition 回收(revoke)后,然后重新分配(完整过程在这里 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal )。

另一方面,Flume 为了满足事务语义,需要等每条 Kafka Record 真正放到 Channel 后,才能向 Kafka 确认 (调用 consumer.commitSync)。 在 Kafka 回收,分配过程 (rebalance) 中,已经接收到 Flume 端但还未被 Flume 放到 Channel 的 Records (自然也就没有向 Kafka 确认) 就被Kafka 认为未被消费;特别是属于将来被分配给其他 Flume 实例的 Partition 上的 Records,就会再次被新的 Flume 实例消费,造成重复加载。

Kafka 提供 ConsumerRebalanceListener 接口允许消费者侦听这个回收(onPartitionsRevoked)、分配(onPartitionsAssigned)过程。Flume 中 Kafka Source 在侦测到rebalance时的代码如下:

// this flag is set to true in a callback when some partitions are revoked.      // If there are any records we commit them.      if (rebalanceFlag.get()) {        rebalanceFlag.set(false);        break;      }

这个措施很简单,遇到这种情况立即跳出去处理(放到 Channel 并向 kafka 确认)。从代码 (https://github.com/apache/flume/blob/flume-1.8/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java) 上下文可以看到这里有几个缺陷:

  1. 由于把 event 放到 channel 是需要时间的 (执行 interceptor 和 channel selector),不一定能够在 partition 分配给别的 Flume 实例之前完成并提交 Kafka (调研 consumer.commitSync)

  2. 那些已经读取到 records (本地) 中,但是没有放到 eventList 中的记录没有处理

所以我们修改了 Flume 源码,对这两个缺陷进行补救:

// If there are any records we commit them.      if (rebalanceFlag.get()) {        rebalanceFlag.set(false);        // invalidate remaining records        it = null;        // and drop processed events        eventList.clear();        tpAndOffsetMetadata.clear();        // and seek to committed offsets        for (Map.Entry
> es : consumer.listTopics().entrySet()) { for (PartitionInfo pi : es.getValue()) { TopicPartition tp = new TopicPartition(pi.topic(), pi.partition()); try { OffsetAndMetadata oam = consumer.committed(tp); if (oam != null) { consumer.seek(tp, oam.offset()); } } catch (Exception e) { // log.warn("ignore seeking exception, {}", e); } } } log.info("read-ahead records have been dropped."); break; }

详细请参考 https://github.com/hejiang2000/flume/blob/hejiang-kafka-source/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 。

经测试,以上处理完全修正 Kafka Source 数据重复消费问题。

文章转载自:https://blog.csdn.net/hejiangtju/article/details/80364388

你可能感兴趣的文章
项目风险管理脉络
查看>>
项目采购管理脉络
查看>>
项目管理总结
查看>>
java内存区域的分布
查看>>
Java跨平台的构思分析
查看>>
linux目录结构名称对照
查看>>
设计的理念
查看>>
多线程专题 - 脉络图
查看>>
javascript 函数,BOM
查看>>
javascript 客户端能力检测
查看>>
javascript DOM详解之DOM1
查看>>
javascript DOM扩展
查看>>
矛盾论读书笔记
查看>>
规则 - 利用CDN缓存
查看>>
什么是统计学中的 Standard Error ( SE )?
查看>>
统计学中的标准差(SD)和 平均值的标准误差(SEM)的区别
查看>>
[数据挖掘与预测分析] 单变量统计分析思考问题
查看>>
[统计学笔记] (十三)指数分析(2)
查看>>
Data Science 到底是什么?
查看>>
机器学习(Machine Learning)和传统的数据统计分析(Data Statistics)有什么区别?
查看>>