Files
2026-03-22 15:24:40 +08:00

257 lines
9.1 KiB
Java

/**
* 自然写互动课堂教学管理云平台软件 V1.0
*
* 笔迹数据处理服务
* 负责笔迹数据的Kafka消费、存储、AI引擎调度
*/
package com.writech.cloud.service;
import com.writech.cloud.model.StrokeData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* 笔迹数据服务
*
* 数据流处理管道:
* 1. 网关/算力盒通过MQTT上报笔迹数据到云平台
* 2. 云平台接收服务将数据推入Kafka消息队列
* 3. 本服务作为Kafka消费者接收并处理数据
* 4. 原始笔迹数据存入MongoDB(高写入吞吐量)
* 5. 触发AI引擎异步识别(OCR/数学/笔顺)
* 6. 识别结果回写MongoDB,推送至各终端
*/
@Service
public class StrokeService {
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/** AI引擎调用线程池 */
private final ExecutorService aiExecutor = Executors.newFixedThreadPool(16);
/** AI引擎服务地址 */
private static final String AI_ENGINE_URL = "http://ai-engine-service:8001";
/** 笔迹数据MongoDB集合名 */
private static final String STROKE_COLLECTION = "stroke_data";
/** 识别结果MongoDB集合名 */
private static final String RESULT_COLLECTION = "recognition_result";
/**
* Kafka消费者:接收笔迹数据
* 监听 writech-stroke-topic 主题,批量消费笔迹数据
*
* @param message JSON格式的笔迹数据
*/
@KafkaListener(topics = "writech-stroke-topic", groupId = "stroke-consumer-group")
public void consumeStrokeData(String message) {
try {
// 解析笔迹数据JSON
StrokeData strokeData = parseStrokeData(message);
if (strokeData == null) return;
// 数据预处理(坐标校验、时间戳排序、去重)
preprocessStrokeData(strokeData);
// 写入MongoDB存储
saveToMongoDB(strokeData);
// 判断是否需要触发AI识别
if (shouldTriggerRecognition(strokeData)) {
// 异步调用AI引擎
submitRecognitionTask(strokeData);
}
} catch (Exception e) {
// 处理失败的消息发送到死信队列
kafkaTemplate.send("writech-stroke-dlq", message);
}
}
/**
* 保存笔迹数据到MongoDB
* 使用批量写入提升性能,每批最多500条
*/
public void saveToMongoDB(StrokeData strokeData) {
strokeData.setCreateTime(LocalDateTime.now());
strokeData.setProcessingStatus("received");
mongoTemplate.save(strokeData, STROKE_COLLECTION);
}
/**
* 批量保存笔迹数据
* 用于网关批量上传场景,提升写入吞吐量
*/
public void batchSave(List<StrokeData> strokeDataList) {
if (strokeDataList == null || strokeDataList.isEmpty()) return;
LocalDateTime now = LocalDateTime.now();
for (StrokeData data : strokeDataList) {
data.setCreateTime(now);
data.setProcessingStatus("received");
}
// MongoDB批量插入
mongoTemplate.insertAll(strokeDataList);
}
/**
* 查询学生笔迹数据
*
* @param studentId 学生ID
* @param assignmentId 作业ID(可选)
* @param startTime 开始时间(可选)
* @param endTime 结束时间(可选)
* @return 笔迹数据列表
*/
public List<StrokeData> queryStrokes(String studentId, String assignmentId,
LocalDateTime startTime, LocalDateTime endTime) {
Query query = new Query();
query.addCriteria(Criteria.where("studentId").is(studentId));
if (assignmentId != null) {
query.addCriteria(Criteria.where("assignmentId").is(assignmentId));
}
if (startTime != null && endTime != null) {
query.addCriteria(Criteria.where("timestamp")
.gte(startTime).lte(endTime));
}
// 按时间戳排序(回放场景需要)
query.with(org.springframework.data.domain.Sort.by(
org.springframework.data.domain.Sort.Direction.ASC, "timestamp"));
return mongoTemplate.find(query, StrokeData.class, STROKE_COLLECTION);
}
/**
* 提交AI识别任务
* 将笔迹数据异步发送至AI引擎进行识别
*/
private void submitRecognitionTask(StrokeData strokeData) {
aiExecutor.submit(() -> {
try {
// 根据作业题目类型选择识别方式
String recognitionType = determineRecognitionType(strokeData);
// 调用AI引擎REST API
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("strokeId", strokeData.getId());
requestBody.put("studentId", strokeData.getStudentId());
requestBody.put("strokes", strokeData.getStrokes());
requestBody.put("type", recognitionType);
// String apiUrl = AI_ENGINE_URL + "/api/v1/ocr/recognize";
// RestTemplate restTemplate = new RestTemplate();
// ResponseEntity<String> response = restTemplate.postForEntity(
// apiUrl, requestBody, String.class);
// 保存识别结果
// saveRecognitionResult(strokeData.getId(), response.getBody());
// 更新笔迹数据处理状态
updateProcessingStatus(strokeData.getId(), "completed");
} catch (Exception e) {
updateProcessingStatus(strokeData.getId(), "failed");
}
});
}
/**
* 笔迹数据预处理
* - 坐标范围校验(过滤异常值)
* - 时间戳排序
* - 重复数据去重
* - 坐标归一化(适配不同纸面规格)
*/
private void preprocessStrokeData(StrokeData strokeData) {
if (strokeData.getStrokes() == null) return;
List<Map<String, Object>> processed = strokeData.getStrokes().stream()
// 过滤无效坐标点
.filter(point -> {
int x = ((Number) point.getOrDefault("x", -1)).intValue();
int y = ((Number) point.getOrDefault("y", -1)).intValue();
return x >= 0 && x <= 65535 && y >= 0 && y <= 65535;
})
// 按时间戳排序
.sorted((a, b) -> {
long ta = ((Number) a.getOrDefault("timestamp", 0L)).longValue();
long tb = ((Number) b.getOrDefault("timestamp", 0L)).longValue();
return Long.compare(ta, tb);
})
.collect(Collectors.toList());
// 去重(相同时间戳的重复点)
List<Map<String, Object>> deduplicated = new ArrayList<>();
long lastTimestamp = -1;
for (Map<String, Object> point : processed) {
long ts = ((Number) point.getOrDefault("timestamp", 0L)).longValue();
if (ts != lastTimestamp) {
deduplicated.add(point);
lastTimestamp = ts;
}
}
strokeData.setStrokes(deduplicated);
}
/**
* 判断是否需要触发AI识别
* - 抬笔事件(笔画结束)触发单字识别
* - 作业提交事件触发整页识别
* - 超过5秒无新数据触发段落识别
*/
private boolean shouldTriggerRecognition(StrokeData strokeData) {
// 如果关联了作业ID,则需要识别
if (strokeData.getAssignmentId() != null) {
return true;
}
// 检查是否有抬笔标记
if (strokeData.getStrokes() != null) {
return strokeData.getStrokes().stream()
.anyMatch(p -> Boolean.TRUE.equals(p.get("penUp")));
}
return false;
}
/** 确定识别类型 */
private String determineRecognitionType(StrokeData strokeData) {
// 根据作业题目类型确定:ocr/math/stroke_order/essay
return "ocr";
}
/** 解析笔迹数据JSON */
private StrokeData parseStrokeData(String json) {
// JSON反序列化
return null;
}
/** 更新处理状态 */
private void updateProcessingStatus(String strokeId, String status) {
Query query = new Query(Criteria.where("_id").is(strokeId));
org.springframework.data.mongodb.core.query.Update update =
new org.springframework.data.mongodb.core.query.Update();
update.set("processingStatus", status);
update.set("processedTime", LocalDateTime.now());
mongoTemplate.updateFirst(query, update, STROKE_COLLECTION);
}
}