spring-boot2.x整合Kafka步骤

news/2024/8/26 13:49:11 标签: kafka, 分布式, spring boot

1.pom依赖添加

<properties>
		<java.version>1.8</java.version>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.18</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<exclusions>
				<exclusion>
					<groupId>ch.qos.logback</groupId>
					<artifactId>logback-core</artifactId>
				</exclusion>
				<exclusion>
					<groupId>ch.qos.logback</groupId>
					<artifactId>logback-classic</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context-support</artifactId>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-autoconfigure</artifactId>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
	</dependencies>

2. 配置文件添加配置:

server:
  port: 8080

spring:
  application:
    name: application-kafka
  kafka:
    bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #这个是kafka的地址,对应你server.properties中配置的
    producer:
      batch-size: 16384 #批量大小
      acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      retries: 10 # 消息发送重试次数
      #transaction-id-prefix: transaction
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger:
          ms: 2000 #提交延迟
        #partitioner: #指定分区器
          #class: com.test.config.CustomerPartitionHandler
    consumer:
      group-id: testGroup,testg2 #默认的消费组ID
      enable-auto-commit: true #是否自动提交offset
      auto-commit-interval: 2000 #提交offset延时
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      max-poll-records: 500 #单次拉取消息的最大条数
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session:
          timeout:
            ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
        request:
          timeout:
            ms: 18000 # 消费请求的超时时间
    listener:
      missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#      type: batch


logging:
  config: classpath:log4j2.xml

3. 日志配置

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">

    <!--全局参数-->
    <Properties>
        <Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
<!--         <Property name="logDir">/data/logs/logViewer</Property> -->
        <Property name="logDir">logs</Property>
    </Properties>

    <Appenders>
        <!-- 定义输出到控制台 -->
        <Console name="console" target="SYSTEM_OUT" follow="true">
            <!--控制台只输出level及以上级别的信息-->
<!--             <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> -->
            <PatternLayout>
                <Pattern>${pattern}</Pattern>
            </PatternLayout>
        </Console>
        <!-- 同一来源的Appender可以定义多个RollingFile,定义按天存储日志 -->
        <RollingFile name="rolling_file"
                     fileName="${logDir}/logViewer.log"
                     filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log">
            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout>
                <Pattern>${pattern}</Pattern>
            </PatternLayout>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
            </Policies>
            <!-- 日志保留策略,配置只保留七天 -->
            <DefaultRolloverStrategy>
                <Delete basePath="${logDir}/" maxDepth="1">
                    <IfFileName glob="logViewer_*.log" />
                    <IfLastModified age="7d" />
                </Delete>
            </DefaultRolloverStrategy>
        </RollingFile>
    </Appenders>
    
      <Loggers>
        <Root level="INFO">
            <AppenderRef ref="console"/>
            <AppenderRef ref="rolling_file"/>
        </Root>
    </Loggers>
</Configuration>

4. controller入口类,其它应用通过该接口直接将数据写入kafka

@RequestMapping(value="/kafka")
@Controller
public class ProducerController {
    @Autowired
    private KafkaTemplate kafkaTemplate;
 
//    模拟发送消息
    @RequestMapping(value = "/send",method = RequestMethod.GET)
    public String sendMessage(@PathParam(value = "msg") String msg) {
    	System.out.println("收到get请求。。。");
        kafkaTemplate.send("test",msg);
        return "成功";
    }

5. kafka回调方法(需要回调通知时使用该方式):

    @GetMapping("/kafka/callbackTwo/{message}")
    public void sendCallbackTwoMessage(@PathVariable("message") String message) {
        kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败2:"+throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }

6.kafka消费者注册

@Component
public class KafkaMessageListener {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
 
    /**
     * containerFactory
     * 消息过滤器
	消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
	配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
     * @param record
     */
    @KafkaListener(topics = {"test","test2"},groupId = "testGroup")
    public void listenTestStatus(ConsumerRecord<?, ?> record) {
    	LOGGER.info("接收到消息:开始业务处理。。。。。");
    	if (null == record || null == record.value()) {
			LOGGER.info("接收到空数据,跳过");
		}else {
			LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());
		}
    }
    @KafkaListener(topics = {"test"},groupId = "testg2")
    public void listenTest2(ConsumerRecord<?, ?> record) {
    	LOGGER.info("###listenTest2接收到消息:开始业务处理。。。。。");
    	if (null == record || null == record.value()) {
    		LOGGER.info("接收到空数据,跳过");
    	}else {
    		LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());
    	}
    }
    /**
     * id:消费者ID
		groupId:消费组ID
		topics:监听的topic,可监听多个
		topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区
     * @param records
     */
  //批量消费
    @KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler")
    public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {
        System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
        for (ConsumerRecord<String, Object> record : records) {
            System.out.println(record.value());
        }
    }

7.非spring-boot环境下使用java原生API手写kafka生产消息:

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String,String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //单向发送:不关心服务端的应答。
            producer.send(record);
            System.out.println("message "+i+" sended");
        }
        //消息处理完才停止发送者。
        producer.close();
    }

8.非spring-boot环境下使用java原生API手写java手写kafka消费者:

 public static void main(String[] args) {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        //kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);
        //每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        //key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
 
        while (true) {
            //PART2:拉取消息
            // 100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            //PART3:处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
            }
 
 
            //提交offset,消息就不会重复推送。
            consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
        }

9.非spring-boot环境下使用java原生API手写异步发送kafka

 public static void main(String[] args) throws ExecutionException, InterruptedException {
	        //PART1:设置发送者相关属性
	        Properties props = new Properties();
	        // 此处配置的是kafka的端口
	        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);
	        // 配置key的序列化类
	        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	        // 配置value的序列化类
	        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	 
	        Producer<String,String> producer = new KafkaProducer<>(props);
	        CountDownLatch latch = new CountDownLatch(5);
	        for(int i = 0; i < 5; i++) {
	            //Part2:构建消息
	            ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);
	            //Part3:发送消息
	            //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
	            producer.send(record, new Callback() {
	                @Override
	                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
	                    if(null != e){
	                        System.out.println("消息发送失败,"+e.getMessage());
	                        e.printStackTrace();
	                    }else{
	                        String topic = recordMetadata.topic();
	                        long offset = recordMetadata.offset();
	                        String message = recordMetadata.toString();
	                        System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
	                    }
	                    latch.countDown();
	                }
	            });
	        }
	        //消息处理完才停止发送者。
	        latch.await();
	        //消息处理完才停止发送者。
	        producer.close();
	    }


http://www.niftyadmin.cn/n/5558220.html

相关文章

英语语法第二课之简单句

简单句一共分为 1、陈述句 2、疑问句 3、感叹句 4、祈使句 下面根据这4类进行讲解 1、陈述句 1.1、陈述句的两种形式 肯定式 I have money 肯定变否定的方式&#xff0c; 助动词&#xff08;do&#xff09;后面加否定词&#xff08;not&#xff09; 否定式 I don’t …

环境配置|PyCharm——Pycharm本地项目打包上传到Github仓库的操作步骤

一、Pycharm端的设置操作 通过Ctrl+Alt+S快捷组合键的方式,打开设置,导航到版本控制一栏中的Git,在Git可执行文件路径中,输入Git.exe。 按照下图顺序,依次点击,完成测试。输出如图标④的结果,即可完成测试。 输出下图结果,配置Git成功,如本地未安装Git,需自行安装。

JavaWeb-【2】CSS和JavaScript

笔记系列持续更新,真正做到详细!!本次系列重点讲解后端,那么第一阶段先讲解前端【续上篇HTML】 目录 一、CSS 1、CSS介绍 2、CSS快速入门 3、CSS语法 4、字体颜色和边框 5、背景颜色和字体样式 6、div和文本居中 7、超链接去下划线和表格细线 8、无序列表去掉样式…

Springboot 3.x - Reactive programming (2)

三、WebFlux Blocking Web vs. Reactive Web Blocking Web (Servlet) and Reactive Web (WebFlux) have significant differences in several aspects. 1. Front Controller Servlet-Blocking Web: Uses DispatcherServlet as the front controller to handle all HTTP req…

前端学习常用技术栈

前端基础&#xff1a;HTML、CSS、JavaScript 前端高级&#xff1a;HTML5、CSS3、JavaScript 语法规范&#xff1a;TypeScript、ECMAScrpit、Eslint、Prettier 前端热门框架&#xff1a;Vue.js、React.js、Angular.js、Bootstrap、Nuxt.js、Svelte.js、Solid.js、Preact.js、Tai…

Docker缩小镜像体积与搭建LNMP架构

镜像加速地址 {"registry-mirrors": ["https://docker.m.daocloud.io","https://docker.1panel.live"] } daemon.json 配置文件里面 bip 配置项中可以配置docker 的网段 {"graph": "/data/docker", #数据目录&#xff0…

认识sm1,sm2,sm3,sm4以及如何在Node.js实现

概述 国密即国家密码局认定的国产密码算法。主要有SM1&#xff0c;SM2&#xff0c;SM3&#xff0c;SM4。密钥长度和分组长度均为128位。 国密算法是指国家密码管理局认定的一系列国产密码算法&#xff0c;包括SM1-SM9以及ZUC等。其中 SM1、SM4、SM5、SM6、SM7、SM8、ZUC等属于…

vue3小程序 中封装组件 但是想在页面中直接获取使用的话可以 通过这样的方式

我们经常会封装许多模那种 loading 或者toast 这种内容&#xff0c;如果在页面上大量写的话 也会感觉代码很乱 所以我们可以简单封装一个 vue2 版本 或者 vue3 版本 1. 创建一个 toast.js 文件 export default function shortToast(title, icon none) {if (typeof title !…