Kafka实时API探秘( 二 )

因为 Kafka 是持久地存储事件的,所以当我们想要使用事件时,它们是可用的,直到我们使用完后才过期(这个可以根据主题来配置) 。
事件被写入 Kafka 主题后,就可供一个或多个消费者读取 。消费者可以采用传统的发布/订阅方式,并在新事件到达时接收它们,也可以根据应用程序的需要选择重新消费之前某个时间点的事件 。Kafka 的这种回放功能要归功于它的持久和可伸缩的存储层,这给很多重要的实际应用场景提供了巨大优势,如机器学习和 A/B 测试,这些场景同时需要历史数据和实时数据 。在受监管的行业中,数据必须保留多年才符合法律规定 。传统的消息传递系统如 RabbitMQ、ActiveMQ 无法支持这样的要求 。
package mainimport ("fmt""gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")func main() {topic := "test_topic"cm := kafka.ConfigMap{"bootstrap.servers":"localhost:9092","go.events.channel.enable": true,"group.id":"rmoff_01"}c, _ := kafka.NewConsumer(&cm)defer c.Close()c.Subscribe(topic, nil)for {select {case ev := <-c.Events():switch ev.(type) {case *kafka.Message:km := ev.(*kafka.Message)fmt.Printf("? Message '%v' received from topic '%v'n", string(km.Value), string(*km.TopicPartition.Topic))}}}}当一个消费者连接到 Kafka 时,它会提供一个消费者群组标识符 。消费者群组支持两种功能 。首先,Kafka 用它跟踪消费者读取主题的偏移量,当消费者重新连接时,可以从之前的位置继续读取 。第二,消费者应用程序可能希望使用多个消费者读取数据,形成一个消费者群组,从而并行处理数据 。Kafka 将事件分配给群组内的每一个消费者,如果随后有成员离开或加入(例如当一个消费者实例发生崩溃时),会主动管理好群组 。
这意味着多个服务可以使用相同的数据,而它们之间没有任何相互依赖关系 。同样的数据也可以使用 Kafka Connect API 路由到其他数据存储中 。
Kafka 提供了 JAVA、C/C++、Go、Python 和 Node.js 等语言的生产者和消费者 API 。不过,如果你的应用程序想要使用 HTTP 而不是原生的 Kafka 协议呢?这个时候可以使用 REST 代理 。
在 Kafka 中使用 REST API假设我们正在为智能停车场的设备开发一个应用程序 。记录汽车停车位的事件的有效载荷可能看起来像这样:
{"name": "NCP Sheffield","space": "A42","occupied": true}我们可以把这个事件发送到 Kafka 主题上,它也会将记录事件的时间作为事件元数据的一部分 。使用Confluent REST Proxy向 Kafka 生成数据只需要进行一个简单的 REST 调用:
curl -X POST-H "Content-Type: Application/vnd.kafka.json.v2+json"-H "Accept: application/vnd.kafka.v2+json"--data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}'"http://localhost:8082/topics/carpark"任何一个应用程序都可以使用之前介绍的原生消费者 API 或使用 REST API 来消费这个主题 。与原生消费者 API 一样,使用 REST API 的消费者也是消费者群组的成员,这个时候叫作订阅 。因此,对于 REST API,必须首先声明消费者和订阅:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json"--data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}'http://localhost:8082/consumers/rmoff_consumercurl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}'http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription完成这些之后,就可以读取事件了:
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records[{"topic": "carpark","key": null,"value": {"name": "Sheffield NCP","space": "A42","occupied": true},"partition": 0,"offset": 0}]如果有多个事件要接收,可以通过批次获取 。如果想要检查新事件,需要再次进行 REST 调用 。
我们已经介绍了如何向 Kafka 写入数据和从 Kafka 主题获取数据 。但是,很多时候,我们想做的不只是简单的 Pub/Sub 。我们想基于事件流看到更大的图景——所有汽车来来去去的情况、现在有多少空车位或者某个特定停车场的更新流 。
条件通知、流式处理和物化视图如果你认为 Kafka 只是一个提供 Pub/Sub 功能的系统,就跟认为 iphone 只是一个用来拨打和接收电话的设备一样 。我的意思是,如果把 Pub/Sub 看成 Kafka 提供的众多能力当中的一个,这是没有错的……它的作用确实远远不止于此 。Kafka 通过 Kafka Streams API 提供了流式处理能力 。这是一个功能丰富的 Java 客户端库,用于在 Kafka 中大规模和跨多台机器对数据进行有状态的流式处理 。Kafka Streams 被沃尔玛、Ticketmaster 和 Bloomberg 等公司广泛应用,它还是 ksqlDB 的基础 。


推荐阅读