Files
2026-03-22 15:24:40 +08:00

134 lines
5.7 KiB
Java
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 自然写互动课堂教学管理云平台软件 V1.0
*
* Kafka 消息队列配置
* 配置笔迹数据流处理的Kafka生产者和消费者
*/
package com.writech.cloud.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka 配置类
*
* 消息主题定义:
* - writech-stroke-topic:笔迹原始数据(网关/算力盒 → 云平台)
* - writech-recognition-topicAI识别请求(云平台 → AI引擎)
* - writech-result-topic:识别结果(AI引擎 → 云平台)
* - writech-notification-topic:通知消息(云平台 → 终端)
* - writech-stroke-dlq:笔迹数据死信队列(处理失败的消息)
*
* 数据流向:
* 点阵笔 → 网关/算力盒 → Kafka(stroke-topic) → 云平台数据接收服务
* → MongoDB存储 → Kafka(recognition-topic) → AI引擎处理
* → Kafka(result-topic) → 结果回写 → WebSocket推送终端
*/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id:writech-cloud-group}")
private String consumerGroupId;
/**
* Kafka 生产者配置
* 用于发送AI识别请求和通知消息
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 消息可靠性配置
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试3次
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
// 批量发送配置(提升笔迹数据吞吐量)
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 延迟10ms
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB缓冲
// 幂等性(防止重复消息)
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Kafka 消费者配置
* 用于消费笔迹数据和识别结果
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 消费者配置
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每批最多500条
configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最少1KB
configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200); // 最大等待200ms
return new DefaultKafkaConsumerFactory<>(configProps);
}
/**
* Kafka 监听器容器工厂
* 配置并发消费者数量和批量消费模式
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 并发消费者数量(对应Topic的分区数)
factory.setConcurrency(8);
// 启用批量消费模式
factory.setBatchListener(true);
// 手动确认模式
factory.getContainerProperties().setAckMode(
org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
* 笔迹数据Topic名称常量
*/
public static class Topics {
/** 笔迹原始数据 */
public static final String STROKE_DATA = "writech-stroke-topic";
/** AI识别请求 */
public static final String RECOGNITION_REQUEST = "writech-recognition-topic";
/** AI识别结果 */
public static final String RECOGNITION_RESULT = "writech-result-topic";
/** 通知消息 */
public static final String NOTIFICATION = "writech-notification-topic";
/** 笔迹数据死信队列 */
public static final String STROKE_DLQ = "writech-stroke-dlq";
/** 设备状态上报 */
public static final String DEVICE_STATUS = "writech-device-status-topic";
private Topics() {} // 禁止实例化
}
}