Winse Blog

走走停停, 熙熙攘攘, 忙忙碌碌, 不知何畏.

Spark2-0 & Kafka0-10-1订阅多个单只读一个分区

同事在使用Spark-Kafka-Streaming的时刻遇到只能读取一个分区的情况,最后他找到问题所在。这里记录下,说白了就是Spark-2.0.0默认是用Kafka-0.10.0.1,自己换程序版本有风险!

问题的关键点

  • Kafka-0.10.1.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
org/apache/kafka/clients/consumer/KafkaConsumer.java
    private void updateFetchPositions(Set<TopicPartition> partitions) {
        // lookup any positions for partitions which are awaiting reset (which may be the
        // case if the user called seekToBeginning or seekToEnd. We do this check first to
        // avoid an unnecessary lookup of committed offsets (which typically occurs when
        // the user is manually assigning partitions and managing their own offsets).
        fetcher.resetOffsetsIfNeeded(partitions);

        if (!subscriptions.hasAllFetchPositions()) {
            // if we still don't have offsets for all partitions, then we should either seek
            // to the last committed position or reset using the auto reset policy

            // first refresh commits for all assigned partitions
            coordinator.refreshCommittedOffsetsIfNeeded();

            // then do any offset lookups in case some positions are not known
            fetcher.updateFetchPositions(partitions);
        }
    }
  • Kafka-0.10.0.1
1
2
3
4
5
6
7
8
org.apache.kafka.clients.consumer.KafkaConsumer#updateFetchPositions
    private void updateFetchPositions(Set<TopicPartition> partitions) {
        // refresh commits for all assigned partitions
        coordinator.refreshCommittedOffsetsIfNeeded();

        // then do any offset lookups in case some positions are not known
        fetcher.updateFetchPositions(partitions);
    }

问题描述以及说明

当订阅同一个主题的多个分区时,每次SparkStreaming会获取每次处理的Offset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#latestOffsets
  protected def latestOffsets(): Map[TopicPartition, Long] = {
    val c = consumer
    c.poll(0)
    val parts = c.assignment().asScala

    // make sure new partitions are reflected in currentOffsets
    val newPartitions = parts.diff(currentOffsets.keySet)
    // position for new partitions determined by auto.offset.reset if no commit
    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
    // don't want to consume messages, so pause
    c.pause(newPartitions.asJava)
    // find latest available offsets
    c.seekToEnd(currentOffsets.keySet.asJava)
    parts.map(tp => tp -> c.position(tp)).toMap
  }
  
  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    val rdd = new KafkaRDD[K, V](
      context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
... 

如果使用kafka-0.10.1.0时,seekToEnd会重置当前客户端分区实例的position为null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd
    public void seekToEnd(Collection<TopicPartition> partitions) {
        acquire();
        try {
            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
            for (TopicPartition tp : parts) {
                log.debug("Seeking to end of partition {}", tp);
                subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
            }
        } finally {
            release();
        }
    }
org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(TopicPartition, OffsetResetStrategy)
    public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
        assignedState(partition).awaitReset(offsetResetStrategy);
    } 
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#awaitReset
        private void awaitReset(OffsetResetStrategy strategy) {
            this.resetStrategy = strategy;
            this.position = null;
        }

此时再调用position一个个分区的获取最新位置信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
org.apache.kafka.clients.consumer.KafkaConsumer#position
    public long position(TopicPartition partition) {
        acquire();
        try {
            if (!this.subscriptions.isAssigned(partition))
                throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
            Long offset = this.subscriptions.position(partition);
            if (offset == null) {
                updateFetchPositions(Collections.singleton(partition));
                offset = this.subscriptions.position(partition);
            }
            return offset;
        } finally {
            release();
        }
    }

    private void updateFetchPositions(Set<TopicPartition> partitions) {
        // lookup any positions for partitions which are awaiting reset (which may be the
        // case if the user called seekToBeginning or seekToEnd. We do this check first to
        // avoid an unnecessary lookup of committed offsets (which typically occurs when
        // the user is manually assigning partitions and managing their own offsets).
        fetcher.resetOffsetsIfNeeded(partitions);

        if (!subscriptions.hasAllFetchPositions()) {
            // if we still don't have offsets for all partitions, then we should either seek
            // to the last committed position or reset using the auto reset policy

            // first refresh commits for all assigned partitions
            coordinator.refreshCommittedOffsetsIfNeeded();

            // then do any offset lookups in case some positions are not known
            fetcher.updateFetchPositions(partitions);
        }
    } 

org.apache.kafka.clients.consumer.internals.Fetcher#resetOffsetsIfNeeded
    public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            // TODO: If there are several offsets to reset, we could submit offset requests in parallel
            if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
                resetOffset(tp);
        }
    }
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#seek
        private void seek(long offset) {
            this.position = offset;
            this.resetStrategy = null;
        } 

新版本KafkaConsumer先更新位置,最终调用seek设置position以及重置resetStrategy。

但是后面又额外多了一个判断!!检测所有的分区,只要有一个有问题就重新获取position,最对有问题啊!尽管后面又调用updateFetchPositions但是环境已经变了啊!!导致多个分区的情况下只能读取一个分区的数据。

问题找到了,直接客户端用旧的就行了。

–END

Comments