一文带您快速入门Kafka( 四 )


Application.ymlspring:application:name: spring-boot-kafkaprofiles:active: devserver:port: 8080 application-dev.ymlspring:datasource:url: "jdbc:mysql://***:***/***?useSSL=false&useUnicode=true&characterEncoding=utf8&ApplicationName=spring-boot-demo&serverTimezone=UTC&allowMultiQueries=true"username: "***"password: "***"kafka:bootstrap-servers: "192.168.226.140:9092" # 访问Kafka服务端的地址consumer:group-id: ${spring.application.name}-${spring.profiles.active} # 一条消息只会被订阅了该主题的同一个分组内的一个消费者消费mybatis-plus:configuration:# 打印sqllog-impl: org.apache.ibatis.logging.stdout.StdOutImpl logback.xml<?xml version="1.0" encoding="UTF-8"?><configuration><property name="LOG_PATH_HOME" value=https://www.isolves.com/it/cxkf/bk/2023-07-07/"./logs/spring-boot-kafka"/>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36}(%L) - [%X{traceId}] %msg%n${LOG_PATH_HOME}/log.%d{yyyy-MM-dd}.%i.log200MB%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36}(%L) - [%X{traceId}] %msg%n ProducerDemoimport lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;/** * @author CaiZhuliang * @date 2023/6/18 */@Slf4j@Component@RequiredArgsConstructorpublic class ProducerDemo {private final KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息* @param topic 主题* @param msg 消息* @param callback 钩子*/public void send(String topic, String msg, ListenableFutureCallback<SendResult<String, String>> callback) {log.info("发送Kafka消息 - topic : {}, msg : {}", topic, msg);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);if (null != callback) {future.addCallback(callback);}}} ConsumerDemopackage com.example.czl.kafka.kafka.producer.consumer;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * @author CaiZhuliang * @date 2023/6/18 */@Slf4j@Component@RequiredArgsConstructorpublic class ConsumerDemo {@KafkaListener(topics = "test-topic-1")public void receivingMsg(String msg) {log.info("接收到Kafka消息 - msg : {}", msg);}} TestControllerpackage com.example.czl.kafka.controller;import com.example.czl.kafka.kafka.producer.ProducerDemo;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author CaiZhuliang * @date 2023/6/18 */@Slf4j@RestController@RequiredArgsConstructor@RequestMapping("/test")public class TestController {private final ProducerDemo producerDemo;@GetMapping("/send/kafka_msg")public Long sendMsg(String msg) {log.info("测试发送kafka消息 - msg : {}", msg);producerDemo.send("test-topic-1", msg, null);return System.currentTimeMillis();}} postman请求测试如下:

一文带您快速入门Kafka

文章插图
控制台信息如下:
一文带您快速入门Kafka

文章插图
作者介绍蔡柱梁 , 51CTO社区编辑 , 从事Java后端开发8年 , 做过传统项目广电BOSS系统 , 后投身互联网电商 , 负责过订单 , TMS , 中间件等 。
 
【一文带您快速入门Kafka】


推荐阅读