Application.ymlspring:application:name: spring-boot-kafkaprofiles:active: devserver:port: 8080
application-dev.ymlspring:datasource:url: "jdbc:mysql://***:***/***?useSSL=false&useUnicode=true&characterEncoding=utf8&ApplicationName=spring-boot-demo&serverTimezone=UTC&allowMultiQueries=true"username: "***"password: "***"kafka:bootstrap-servers: "192.168.226.140:9092" # 访问Kafka服务端的地址consumer:group-id: ${spring.application.name}-${spring.profiles.active} # 一条消息只会被订阅了该主题的同一个分组内的一个消费者消费mybatis-plus:configuration:# 打印sqllog-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logback.xml<?xml version="1.0" encoding="UTF-8"?><configuration><property name="LOG_PATH_HOME" value=https://www.isolves.com/it/cxkf/bk/2023-07-07/"./logs/spring-boot-kafka"/>
ProducerDemoimport lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;/** * @author CaiZhuliang * @date 2023/6/18 */@Slf4j@Component@RequiredArgsConstructorpublic class ProducerDemo {private final KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息* @param topic 主题* @param msg 消息* @param callback 钩子*/public void send(String topic, String msg, ListenableFutureCallback<SendResult<String, String>> callback) {log.info("发送Kafka消息 - topic : {}, msg : {}", topic, msg);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);if (null != callback) {future.addCallback(callback);}}}
ConsumerDemopackage com.example.czl.kafka.kafka.producer.consumer;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * @author CaiZhuliang * @date 2023/6/18 */@Slf4j@Component@RequiredArgsConstructorpublic class ConsumerDemo {@KafkaListener(topics = "test-topic-1")public void receivingMsg(String msg) {log.info("接收到Kafka消息 - msg : {}", msg);}}
TestControllerpackage com.example.czl.kafka.controller;import com.example.czl.kafka.kafka.producer.ProducerDemo;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author CaiZhuliang * @date 2023/6/18 */@Slf4j@RestController@RequiredArgsConstructor@RequestMapping("/test")public class TestController {private final ProducerDemo producerDemo;@GetMapping("/send/kafka_msg")public Long sendMsg(String msg) {log.info("测试发送kafka消息 - msg : {}", msg);producerDemo.send("test-topic-1", msg, null);return System.currentTimeMillis();}}
postman请求测试如下:
文章插图
控制台信息如下:
文章插图
作者介绍蔡柱梁 , 51CTO社区编辑 , 从事Java后端开发8年 , 做过传统项目广电BOSS系统 , 后投身互联网电商 , 负责过订单 , TMS , 中间件等 。
【一文带您快速入门Kafka】
推荐阅读
- 如何通乳
- 草鱼习性详解,掌握后可搞清鱼道,快速诱鱼进窝
- 如何快速提高英语口语水平
- 孕妇肚子胀气怎么办快速解决方法
- 翡翠中的纹、裂、棉,一文搞懂
- 快速打通任督二脉的方法 打通任督二脉的方法
- 如何快速瘦腿瘦屁股
- 一文搞懂CSS line-height和vertical-align
- 什么是商用密码?数字时代商用密码为何重要?一文解读
- “一统美国”进程加快?特斯拉充电技术正快速发展成为美国行业标准