kafka KafkaStream的iterator()方法不会执行
谢邀!KafkaStream#iter返回的是ConsumerIterator,而后者本质上是个迭代器,底层使用阻塞队列,因此在遍历的时候如果队列中没有可以获取的FetchedDataChunk,那么就会阻塞住的。默认情况下构建的KafaStream是阻塞的,你可以传递一个consumerTimeoutMs来看下和默认情况有何不同。
■网友
Scala 版本的Consumer 本身是一个多线程的客户端:Consumer本身有后台线程不断地从Broker里面取数据然后放到一个阻塞队列里面,直到队列塞满则被Block在那里;而用户线程调用ConsumerIterator的过程其实就是从这个队列里面取数据,直到队列变空则用户调用的iterator就会变成一个Blocking Call。
■网友
【kafka KafkaStream的iterator()方法不会执行】 因为阻塞了,你可以再开启一个producer发布数据,就能看到了。设置 consumer.timeout.ms 之后会抛出ConsumerTimeoutException
■网友
kafka底层迭代时候,有一个BlockQueue
会阻塞住,直到收到新的消息
kafka consumer基本使用及 ConsumerIterator如何遍历message
推荐阅读
- kafka同步咋解决乱序问题
- Kafka manager, Burrow等几款Kafka Monitor的流行程度咋样
- kafka启用kerberos后,新producer和consumer都报了这个错误,跪求大神解决
- kafka使用high api怎样确保不丢失消息,不重复发送,消息只读取一次
- Java为啥ArrayList#Iterator#next()要复制elementData字段的引用
- SparkStreaming读取kafka流 rdd的partition数目
- 咋将kafka与elasticsearch连通,将kafka的数据传入elasticsearch
- 公司内使用kafka,redis等基础组件时,是否有必要对官方client包再加一次封装
- Oracle数据库数据变更同步到kafka
- 在0.10的kafka中,使用bootstrap.servers启动控制台消费者,无法消费消息为啥
