本文共 2625 字,大约阅读时间需要 8 分钟。
文章转载自:https://blog.csdn.net/hejiangtju/article/details/80364388
我们使用 Flume 将数据从 Kafka 加载到 Hive 中。
由于启动一个 Flume 实例时,数据加载的速度只能达到 10MB/秒 (每条Kafka记录100B)。于是我们计划启动多个 Flume 实例 (指定同一个消费者组名称)。
我们知道 Kafka 数据消费是以 Partition 为单位的,即一个 Partition 只能被一个 Flume 实例消费。当启动第二个 Flume 实例时,Kafka 会把已经分配给第一个 Flume 实例的 Partition 回收(revoke)后,然后重新分配(完整过程在这里 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal )。
另一方面,Flume 为了满足事务语义,需要等每条 Kafka Record 真正放到 Channel 后,才能向 Kafka 确认 (调用 consumer.commitSync)。 在 Kafka 回收,分配过程 (rebalance) 中,已经接收到 Flume 端但还未被 Flume 放到 Channel 的 Records (自然也就没有向 Kafka 确认) 就被Kafka 认为未被消费;特别是属于将来被分配给其他 Flume 实例的 Partition 上的 Records,就会再次被新的 Flume 实例消费,造成重复加载。
Kafka 提供 ConsumerRebalanceListener 接口允许消费者侦听这个回收(onPartitionsRevoked)、分配(onPartitionsAssigned)过程。Flume 中 Kafka Source 在侦测到rebalance时的代码如下:
// this flag is set to true in a callback when some partitions are revoked. // If there are any records we commit them. if (rebalanceFlag.get()) { rebalanceFlag.set(false); break; }
这个措施很简单,遇到这种情况立即跳出去处理(放到 Channel 并向 kafka 确认)。从代码 (https://github.com/apache/flume/blob/flume-1.8/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java) 上下文可以看到这里有几个缺陷:
由于把 event 放到 channel 是需要时间的 (执行 interceptor 和 channel selector),不一定能够在 partition 分配给别的 Flume 实例之前完成并提交 Kafka (调研 consumer.commitSync)
那些已经读取到 records (本地) 中,但是没有放到 eventList 中的记录没有处理
所以我们修改了 Flume 源码,对这两个缺陷进行补救:
// If there are any records we commit them. if (rebalanceFlag.get()) { rebalanceFlag.set(false); // invalidate remaining records it = null; // and drop processed events eventList.clear(); tpAndOffsetMetadata.clear(); // and seek to committed offsets for (Map.Entry> es : consumer.listTopics().entrySet()) { for (PartitionInfo pi : es.getValue()) { TopicPartition tp = new TopicPartition(pi.topic(), pi.partition()); try { OffsetAndMetadata oam = consumer.committed(tp); if (oam != null) { consumer.seek(tp, oam.offset()); } } catch (Exception e) { // log.warn("ignore seeking exception, {}", e); } } } log.info("read-ahead records have been dropped."); break; }
详细请参考 https://github.com/hejiang2000/flume/blob/hejiang-kafka-source/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 。
经测试,以上处理完全修正 Kafka Source 数据重复消费问题。
文章转载自:https://blog.csdn.net/hejiangtju/article/details/80364388