/** * 自然写互动课堂教学管理云平台软件 V1.0 * * 笔迹数据处理服务 * 负责笔迹数据的Kafka消费、存储、AI引擎调度 */ package com.writech.cloud.service; import com.writech.cloud.model.StrokeData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; /** * 笔迹数据服务 * * 数据流处理管道: * 1. 网关/算力盒通过MQTT上报笔迹数据到云平台 * 2. 云平台接收服务将数据推入Kafka消息队列 * 3. 本服务作为Kafka消费者接收并处理数据 * 4. 原始笔迹数据存入MongoDB(高写入吞吐量) * 5. 触发AI引擎异步识别(OCR/数学/笔顺) * 6. 识别结果回写MongoDB,推送至各终端 */ @Service public class StrokeService { @Autowired private MongoTemplate mongoTemplate; @Autowired private KafkaTemplate kafkaTemplate; /** AI引擎调用线程池 */ private final ExecutorService aiExecutor = Executors.newFixedThreadPool(16); /** AI引擎服务地址 */ private static final String AI_ENGINE_URL = "http://ai-engine-service:8001"; /** 笔迹数据MongoDB集合名 */ private static final String STROKE_COLLECTION = "stroke_data"; /** 识别结果MongoDB集合名 */ private static final String RESULT_COLLECTION = "recognition_result"; /** * Kafka消费者:接收笔迹数据 * 监听 writech-stroke-topic 主题,批量消费笔迹数据 * * @param message JSON格式的笔迹数据 */ @KafkaListener(topics = "writech-stroke-topic", groupId = "stroke-consumer-group") public void consumeStrokeData(String message) { try { // 解析笔迹数据JSON StrokeData strokeData = parseStrokeData(message); if (strokeData == null) return; // 数据预处理(坐标校验、时间戳排序、去重) preprocessStrokeData(strokeData); // 写入MongoDB存储 saveToMongoDB(strokeData); // 判断是否需要触发AI识别 if (shouldTriggerRecognition(strokeData)) { // 异步调用AI引擎 submitRecognitionTask(strokeData); } } catch (Exception e) { // 处理失败的消息发送到死信队列 kafkaTemplate.send("writech-stroke-dlq", message); } } /** * 保存笔迹数据到MongoDB * 使用批量写入提升性能,每批最多500条 */ public void saveToMongoDB(StrokeData strokeData) { strokeData.setCreateTime(LocalDateTime.now()); strokeData.setProcessingStatus("received"); mongoTemplate.save(strokeData, STROKE_COLLECTION); } /** * 批量保存笔迹数据 * 用于网关批量上传场景,提升写入吞吐量 */ public void batchSave(List strokeDataList) { if (strokeDataList == null || strokeDataList.isEmpty()) return; LocalDateTime now = LocalDateTime.now(); for (StrokeData data : strokeDataList) { data.setCreateTime(now); data.setProcessingStatus("received"); } // MongoDB批量插入 mongoTemplate.insertAll(strokeDataList); } /** * 查询学生笔迹数据 * * @param studentId 学生ID * @param assignmentId 作业ID(可选) * @param startTime 开始时间(可选) * @param endTime 结束时间(可选) * @return 笔迹数据列表 */ public List queryStrokes(String studentId, String assignmentId, LocalDateTime startTime, LocalDateTime endTime) { Query query = new Query(); query.addCriteria(Criteria.where("studentId").is(studentId)); if (assignmentId != null) { query.addCriteria(Criteria.where("assignmentId").is(assignmentId)); } if (startTime != null && endTime != null) { query.addCriteria(Criteria.where("timestamp") .gte(startTime).lte(endTime)); } // 按时间戳排序(回放场景需要) query.with(org.springframework.data.domain.Sort.by( org.springframework.data.domain.Sort.Direction.ASC, "timestamp")); return mongoTemplate.find(query, StrokeData.class, STROKE_COLLECTION); } /** * 提交AI识别任务 * 将笔迹数据异步发送至AI引擎进行识别 */ private void submitRecognitionTask(StrokeData strokeData) { aiExecutor.submit(() -> { try { // 根据作业题目类型选择识别方式 String recognitionType = determineRecognitionType(strokeData); // 调用AI引擎REST API Map requestBody = new HashMap<>(); requestBody.put("strokeId", strokeData.getId()); requestBody.put("studentId", strokeData.getStudentId()); requestBody.put("strokes", strokeData.getStrokes()); requestBody.put("type", recognitionType); // String apiUrl = AI_ENGINE_URL + "/api/v1/ocr/recognize"; // RestTemplate restTemplate = new RestTemplate(); // ResponseEntity response = restTemplate.postForEntity( // apiUrl, requestBody, String.class); // 保存识别结果 // saveRecognitionResult(strokeData.getId(), response.getBody()); // 更新笔迹数据处理状态 updateProcessingStatus(strokeData.getId(), "completed"); } catch (Exception e) { updateProcessingStatus(strokeData.getId(), "failed"); } }); } /** * 笔迹数据预处理 * - 坐标范围校验(过滤异常值) * - 时间戳排序 * - 重复数据去重 * - 坐标归一化(适配不同纸面规格) */ private void preprocessStrokeData(StrokeData strokeData) { if (strokeData.getStrokes() == null) return; List> processed = strokeData.getStrokes().stream() // 过滤无效坐标点 .filter(point -> { int x = ((Number) point.getOrDefault("x", -1)).intValue(); int y = ((Number) point.getOrDefault("y", -1)).intValue(); return x >= 0 && x <= 65535 && y >= 0 && y <= 65535; }) // 按时间戳排序 .sorted((a, b) -> { long ta = ((Number) a.getOrDefault("timestamp", 0L)).longValue(); long tb = ((Number) b.getOrDefault("timestamp", 0L)).longValue(); return Long.compare(ta, tb); }) .collect(Collectors.toList()); // 去重(相同时间戳的重复点) List> deduplicated = new ArrayList<>(); long lastTimestamp = -1; for (Map point : processed) { long ts = ((Number) point.getOrDefault("timestamp", 0L)).longValue(); if (ts != lastTimestamp) { deduplicated.add(point); lastTimestamp = ts; } } strokeData.setStrokes(deduplicated); } /** * 判断是否需要触发AI识别 * - 抬笔事件(笔画结束)触发单字识别 * - 作业提交事件触发整页识别 * - 超过5秒无新数据触发段落识别 */ private boolean shouldTriggerRecognition(StrokeData strokeData) { // 如果关联了作业ID,则需要识别 if (strokeData.getAssignmentId() != null) { return true; } // 检查是否有抬笔标记 if (strokeData.getStrokes() != null) { return strokeData.getStrokes().stream() .anyMatch(p -> Boolean.TRUE.equals(p.get("penUp"))); } return false; } /** 确定识别类型 */ private String determineRecognitionType(StrokeData strokeData) { // 根据作业题目类型确定:ocr/math/stroke_order/essay return "ocr"; } /** 解析笔迹数据JSON */ private StrokeData parseStrokeData(String json) { // JSON反序列化 return null; } /** 更新处理状态 */ private void updateProcessingStatus(String strokeId, String status) { Query query = new Query(Criteria.where("_id").is(strokeId)); org.springframework.data.mongodb.core.query.Update update = new org.springframework.data.mongodb.core.query.Update(); update.set("processingStatus", status); update.set("processedTime", LocalDateTime.now()); mongoTemplate.updateFirst(query, update, STROKE_COLLECTION); } }