340 lines
12 KiB
Java
340 lines
12 KiB
Java
/**
|
|
* 自然写互动课堂教学管理云平台软件 V1.0
|
|
*
|
|
* 消息推送服务
|
|
* 基于 WebSocket 实现多终端实时消息推送
|
|
* 支持新作业通知、批改完成通知、课堂互动指令等
|
|
*/
|
|
package com.writech.cloud.service;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.web.socket.*;
|
|
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
|
import org.springframework.web.socket.config.annotation.*;
|
|
|
|
import java.io.IOException;
|
|
import java.time.LocalDateTime;
|
|
import java.util.*;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
/**
|
|
* 消息服务类
|
|
*
|
|
* WebSocket实时消息通道:/ws/v1/notify
|
|
*
|
|
* 消息类型:
|
|
* - ASSIGNMENT_NEW:新作业通知
|
|
* - ASSIGNMENT_GRADED:批改完成通知
|
|
* - STROKE_REALTIME:实时笔迹数据推送
|
|
* - CLASSROOM_INTERACTION:课堂互动指令
|
|
* - SYSTEM_NOTIFICATION:系统公告
|
|
*/
|
|
@Service
|
|
public class MessageService extends TextWebSocketHandler implements WebSocketConfigurer {
|
|
|
|
@Autowired
|
|
private StringRedisTemplate redisTemplate;
|
|
|
|
/** 在线用户WebSocket会话映射(userId → session列表,支持多终端同时在线) */
|
|
private final ConcurrentHashMap<String, List<WebSocketSession>> userSessions =
|
|
new ConcurrentHashMap<>();
|
|
|
|
/** 教室频道会话映射(classroomId → session列表) */
|
|
private final ConcurrentHashMap<String, List<WebSocketSession>> classroomChannels =
|
|
new ConcurrentHashMap<>();
|
|
|
|
/**
|
|
* WebSocket端点注册
|
|
*/
|
|
@Override
|
|
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
|
registry.addHandler(this, "/ws/v1/notify")
|
|
.setAllowedOrigins("*");
|
|
}
|
|
|
|
/**
|
|
* WebSocket连接建立
|
|
* 从Token中解析用户ID,注册到在线会话映射
|
|
*/
|
|
@Override
|
|
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
|
String userId = extractUserIdFromSession(session);
|
|
if (userId != null) {
|
|
// 注册用户会话
|
|
userSessions.computeIfAbsent(userId, k -> new ArrayList<>()).add(session);
|
|
// 更新在线状态
|
|
updateOnlineStatus(userId, true);
|
|
// 推送离线期间的未读消息
|
|
pushOfflineMessages(userId, session);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* WebSocket消息接收
|
|
* 处理客户端发送的消息(心跳、课堂互动指令等)
|
|
*/
|
|
@Override
|
|
protected void handleTextMessage(WebSocketSession session, TextMessage message)
|
|
throws Exception {
|
|
String payload = message.getPayload();
|
|
Map<String, Object> msg = parseMessage(payload);
|
|
|
|
String type = (String) msg.get("type");
|
|
if (type == null) return;
|
|
|
|
switch (type) {
|
|
case "HEARTBEAT":
|
|
// 回复心跳
|
|
session.sendMessage(new TextMessage("{\"type\":\"HEARTBEAT_ACK\"}"));
|
|
break;
|
|
case "JOIN_CLASSROOM":
|
|
// 加入教室频道(课堂互动场景)
|
|
String classroomId = (String) msg.get("classroomId");
|
|
joinClassroomChannel(classroomId, session);
|
|
break;
|
|
case "LEAVE_CLASSROOM":
|
|
// 离开教室频道
|
|
String leaveClassroom = (String) msg.get("classroomId");
|
|
leaveClassroomChannel(leaveClassroom, session);
|
|
break;
|
|
case "CLASSROOM_COMMAND":
|
|
// 教师发送课堂控制指令(广播至教室内所有终端)
|
|
broadcastToClassroom(msg);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* WebSocket连接断开
|
|
*/
|
|
@Override
|
|
public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
|
|
throws Exception {
|
|
String userId = extractUserIdFromSession(session);
|
|
if (userId != null) {
|
|
// 移除会话
|
|
List<WebSocketSession> sessions = userSessions.get(userId);
|
|
if (sessions != null) {
|
|
sessions.remove(session);
|
|
if (sessions.isEmpty()) {
|
|
userSessions.remove(userId);
|
|
updateOnlineStatus(userId, false);
|
|
}
|
|
}
|
|
}
|
|
// 从教室频道移除
|
|
classroomChannels.values().forEach(list -> list.remove(session));
|
|
}
|
|
|
|
/**
|
|
* 向指定用户推送消息
|
|
* 支持多终端同时推送(手机/Pad/PC同时在线时都能收到)
|
|
*
|
|
* @param userId 目标用户ID
|
|
* @param messageType 消息类型
|
|
* @param data 消息数据
|
|
*/
|
|
public void pushToUser(String userId, String messageType, Map<String, Object> data) {
|
|
Map<String, Object> message = new HashMap<>();
|
|
message.put("type", messageType);
|
|
message.put("data", data);
|
|
message.put("timestamp", System.currentTimeMillis());
|
|
|
|
String json = toJson(message);
|
|
List<WebSocketSession> sessions = userSessions.get(userId);
|
|
|
|
if (sessions != null && !sessions.isEmpty()) {
|
|
// 在线推送
|
|
for (WebSocketSession session : sessions) {
|
|
try {
|
|
if (session.isOpen()) {
|
|
session.sendMessage(new TextMessage(json));
|
|
}
|
|
} catch (IOException e) {
|
|
// 发送失败,记录日志
|
|
}
|
|
}
|
|
} else {
|
|
// 离线存储(用户上线后推送)
|
|
storeOfflineMessage(userId, json);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 向班级所有学生推送消息
|
|
*
|
|
* @param classId 班级ID
|
|
* @param messageType 消息类型
|
|
* @param data 消息数据
|
|
*/
|
|
public void pushToClass(String classId, String messageType, Map<String, Object> data) {
|
|
// 查询班级学生列表
|
|
// List<String> studentIds = classService.getStudentIds(classId);
|
|
List<String> studentIds = new ArrayList<>();
|
|
for (String studentId : studentIds) {
|
|
pushToUser(studentId, messageType, data);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 向教室频道广播消息
|
|
* 用于课堂互动场景,将消息推送至教室内所有终端(黑板/PC/电视/Pad)
|
|
*/
|
|
public void broadcastToClassroom(Map<String, Object> message) {
|
|
String classroomId = (String) message.get("classroomId");
|
|
if (classroomId == null) return;
|
|
|
|
String json = toJson(message);
|
|
List<WebSocketSession> sessions = classroomChannels.get(classroomId);
|
|
if (sessions != null) {
|
|
for (WebSocketSession session : sessions) {
|
|
try {
|
|
if (session.isOpen()) {
|
|
session.sendMessage(new TextMessage(json));
|
|
}
|
|
} catch (IOException e) {
|
|
// 发送失败处理
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 推送作业发布通知
|
|
*/
|
|
public void pushAssignmentNotification(String classId, String title, String assignmentId) {
|
|
Map<String, Object> data = new HashMap<>();
|
|
data.put("assignmentId", assignmentId);
|
|
data.put("title", title);
|
|
data.put("message", "教师发布了新作业: " + title);
|
|
pushToClass(classId, "ASSIGNMENT_NEW", data);
|
|
}
|
|
|
|
/**
|
|
* 推送批改完成通知
|
|
*/
|
|
public void pushGradingNotification(String studentId, String assignmentTitle,
|
|
double score) {
|
|
Map<String, Object> data = new HashMap<>();
|
|
data.put("title", assignmentTitle);
|
|
data.put("score", score);
|
|
data.put("message", "作业\"" + assignmentTitle + "\"批改完成,得分: " + score);
|
|
pushToUser(studentId, "ASSIGNMENT_GRADED", data);
|
|
}
|
|
|
|
/**
|
|
* 推送实时笔迹数据至教室大屏
|
|
* 低延迟推送,用于黑板/电视大屏实时展示学生书写过程
|
|
*/
|
|
public void pushRealtimeStroke(String classroomId, String studentId,
|
|
List<Map<String, Object>> strokePoints) {
|
|
Map<String, Object> data = new HashMap<>();
|
|
data.put("studentId", studentId);
|
|
data.put("points", strokePoints);
|
|
|
|
Map<String, Object> message = new HashMap<>();
|
|
message.put("type", "STROKE_REALTIME");
|
|
message.put("classroomId", classroomId);
|
|
message.put("data", data);
|
|
|
|
broadcastToClassroom(message);
|
|
}
|
|
|
|
// ==================== 内部方法 ====================
|
|
|
|
/** 加入教室频道 */
|
|
private void joinClassroomChannel(String classroomId, WebSocketSession session) {
|
|
classroomChannels.computeIfAbsent(classroomId, k -> new ArrayList<>()).add(session);
|
|
}
|
|
|
|
/** 离开教室频道 */
|
|
private void leaveClassroomChannel(String classroomId, WebSocketSession session) {
|
|
List<WebSocketSession> sessions = classroomChannels.get(classroomId);
|
|
if (sessions != null) {
|
|
sessions.remove(session);
|
|
}
|
|
}
|
|
|
|
/** 从WebSocket会话中提取用户ID */
|
|
private String extractUserIdFromSession(WebSocketSession session) {
|
|
// 从URL参数或握手头中的Token解析用户ID
|
|
String query = session.getUri() != null ? session.getUri().getQuery() : null;
|
|
if (query != null && query.contains("token=")) {
|
|
// 解析Token获取userId
|
|
return "extracted_user_id";
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/** 更新用户在线状态 */
|
|
private void updateOnlineStatus(String userId, boolean online) {
|
|
String key = "writech:user:online:" + userId;
|
|
if (online) {
|
|
redisTemplate.opsForValue().set(key, "1");
|
|
} else {
|
|
redisTemplate.delete(key);
|
|
}
|
|
}
|
|
|
|
/** 存储离线消息 */
|
|
private void storeOfflineMessage(String userId, String message) {
|
|
String key = "writech:offline:msg:" + userId;
|
|
redisTemplate.opsForList().rightPush(key, message);
|
|
// 最多保留100条离线消息
|
|
redisTemplate.opsForList().trim(key, -100, -1);
|
|
}
|
|
|
|
/** 推送离线期间积累的未读消息 */
|
|
private void pushOfflineMessages(String userId, WebSocketSession session)
|
|
throws IOException {
|
|
String key = "writech:offline:msg:" + userId;
|
|
List<String> messages = redisTemplate.opsForList().range(key, 0, -1);
|
|
if (messages != null) {
|
|
for (String msg : messages) {
|
|
session.sendMessage(new TextMessage(msg));
|
|
}
|
|
redisTemplate.delete(key);
|
|
}
|
|
}
|
|
|
|
/** JSON序列化(简化版本) */
|
|
private String toJson(Map<String, Object> map) {
|
|
StringBuilder sb = new StringBuilder("{");
|
|
boolean first = true;
|
|
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
|
if (!first) sb.append(",");
|
|
sb.append("\"").append(entry.getKey()).append("\":");
|
|
Object value = entry.getValue();
|
|
if (value instanceof String) {
|
|
sb.append("\"").append(value).append("\"");
|
|
} else {
|
|
sb.append(value);
|
|
}
|
|
first = false;
|
|
}
|
|
sb.append("}");
|
|
return sb.toString();
|
|
}
|
|
|
|
/** JSON解析(简化版本) */
|
|
private Map<String, Object> parseMessage(String json) {
|
|
return new HashMap<>();
|
|
}
|
|
|
|
/**
|
|
* 获取在线用户统计
|
|
*/
|
|
public Map<String, Integer> getOnlineStats() {
|
|
Map<String, Integer> stats = new HashMap<>();
|
|
stats.put("totalOnlineUsers", userSessions.size());
|
|
stats.put("totalSessions", userSessions.values().stream()
|
|
.mapToInt(List::size).sum());
|
|
stats.put("activeClassrooms", classroomChannels.size());
|
|
return stats;
|
|
}
|
|
}
|