/// 自然写互动课堂手机端应用软件 V1.0 /// WebSocket实时通信服务 - 接收云端实时推送通知 /// /// 功能说明: /// 1. WebSocket长连接管理(建立、维持、重连) /// 2. 心跳机制(30秒间隔,检测连接存活性) /// 3. 消息类型分发(新作业、批改完成、课堂互动、家校消息) /// 4. 指数退避重连策略(断线后自动重连,逐步增加间隔) /// 5. 消息ACK确认(确保重要消息不丢失) /// 6. 离线消息补发(重连后请求离线期间的消息) import 'dart:async'; import 'dart:convert'; /* ========== 消息类型定义 ========== */ /// WebSocket消息类型枚举 enum WsMessageType { heartbeat, // 心跳包 heartbeatAck, // 心跳响应 newAssignment, // 新作业通知 gradeComplete, // 批改完成通知 classroomEvent, // 课堂互动事件(发题/收卷等) parentMessage, // 家校沟通消息 systemNotice, // 系统公告 strokeRealtime, // 实时笔迹数据(课堂模式) offlineSync, // 离线消息同步 ack, // 消息确认 } /// WebSocket消息模型 class WsMessage { final String id; // 消息唯一ID final WsMessageType type; // 消息类型 final Map data; // 消息内容 final int timestamp; // 服务端时间戳 final bool requireAck; // 是否需要ACK确认 WsMessage({ required this.id, required this.type, required this.data, required this.timestamp, this.requireAck = false, }); /// 从JSON反序列化 factory WsMessage.fromJson(Map json) { return WsMessage( id: json['id'] ?? '', type: _parseMessageType(json['type'] ?? ''), data: Map.from(json['data'] ?? {}), timestamp: json['timestamp'] ?? 0, requireAck: json['require_ack'] ?? false, ); } /// 序列化为JSON Map toJson() => { 'id': id, 'type': type.name, 'data': data, 'timestamp': timestamp, }; /// 解析消息类型字符串 static WsMessageType _parseMessageType(String typeStr) { switch (typeStr) { case 'heartbeat': return WsMessageType.heartbeat; case 'heartbeat_ack': return WsMessageType.heartbeatAck; case 'new_assignment': return WsMessageType.newAssignment; case 'grade_complete': return WsMessageType.gradeComplete; case 'classroom_event': return WsMessageType.classroomEvent; case 'parent_message': return WsMessageType.parentMessage; case 'system_notice': return WsMessageType.systemNotice; case 'stroke_realtime': return WsMessageType.strokeRealtime; case 'offline_sync': return WsMessageType.offlineSync; case 'ack': return WsMessageType.ack; default: return WsMessageType.systemNotice; } } } /* ========== WebSocket连接状态 ========== */ /// 连接状态枚举 enum WsConnectionState { disconnected, // 未连接 connecting, // 正在连接 connected, // 已连接 reconnecting, // 重连中 } /* ========== WebSocket服务实现 ========== */ /// WebSocket实时通信服务 /// 维护与云平台的长连接,接收实时推送通知 class WebSocketService { /// WebSocket服务器地址 static const String _wsUrl = 'wss://ws.writech.com/v1/notify'; /// 心跳间隔(秒) static const int heartbeatIntervalSec = 30; /// 心跳超时时间(秒,超过此时间未收到心跳响应则认为连接断开) static const int heartbeatTimeoutSec = 45; /// 最大重连间隔(秒,指数退避上限) static const int maxReconnectIntervalSec = 60; /// WebSocket实例 dynamic _webSocket; // WebSocket /// 连接状态 WsConnectionState _state = WsConnectionState.disconnected; /// 当前认证Token String _authToken = ''; /// 心跳定时器 Timer? _heartbeatTimer; /// 心跳超时定时器 Timer? _heartbeatTimeoutTimer; /// 重连定时器 Timer? _reconnectTimer; /// 当前重连尝试次数(用于指数退避计算) int _reconnectAttempts = 0; /// 最后收到消息的时间戳(用于离线消息补发) int _lastMessageTimestamp = 0; /// 消息分发回调注册表 final Map> _handlers = {}; /// 连接状态变化回调 final List _stateListeners = []; /// 待ACK的消息队列(消息ID -> 超时Timer) final Map _pendingAcks = {}; /// 获取当前连接状态 WsConnectionState get state => _state; /// 设置认证Token(登录成功后调用) void setAuthToken(String token) { _authToken = token; } /// 注册消息处理器 /// 同一类型可注册多个处理器,按注册顺序依次执行 void on(WsMessageType type, Function(WsMessage) handler) { _handlers.putIfAbsent(type, () => []); _handlers[type]!.add(handler); } /// 移除消息处理器 void off(WsMessageType type, Function(WsMessage) handler) { _handlers[type]?.remove(handler); } /// 监听连接状态变化 void onStateChange(Function(WsConnectionState) listener) { _stateListeners.add(listener); } /// 建立WebSocket连接 /// 附带认证Token和最后消息时间戳(用于离线消息补发) Future connect() async { if (_state == WsConnectionState.connected || _state == WsConnectionState.connecting) { return; } _updateState(WsConnectionState.connecting); try { // 构造带认证参数的WebSocket URL final url = '$_wsUrl?token=$_authToken&last_ts=$_lastMessageTimestamp'; // 建立WebSocket连接 // 实际实现: _webSocket = await WebSocket.connect(url); print('[WebSocket] 正在连接: $_wsUrl'); // 模拟连接成功 await Future.delayed(const Duration(milliseconds: 300)); _updateState(WsConnectionState.connected); _reconnectAttempts = 0; // 重置重连计数 // 启动心跳机制 _startHeartbeat(); // 监听消息流 // _webSocket.listen(_onMessage, onDone: _onDisconnected, onError: _onError); print('[WebSocket] 连接成功'); } catch (e) { print('[WebSocket] 连接失败: $e'); _updateState(WsConnectionState.disconnected); _scheduleReconnect(); } } /// 处理接收到的WebSocket消息 void _onMessage(dynamic rawData) { try { final json = jsonDecode(rawData as String) as Map; final message = WsMessage.fromJson(json); // 更新最后消息时间戳 if (message.timestamp > _lastMessageTimestamp) { _lastMessageTimestamp = message.timestamp; } // 处理心跳响应 if (message.type == WsMessageType.heartbeatAck) { _onHeartbeatAck(); return; } // 处理ACK确认 if (message.type == WsMessageType.ack) { _onAckReceived(message.data['ack_id'] ?? ''); return; } // 如果消息需要ACK,发送确认 if (message.requireAck) { _sendAck(message.id); } // 分发消息到注册的处理器 _dispatchMessage(message); } catch (e) { print('[WebSocket] 消息解析失败: $e'); } } /// 分发消息到对应类型的处理器 void _dispatchMessage(WsMessage message) { final handlers = _handlers[message.type]; if (handlers != null && handlers.isNotEmpty) { for (final handler in handlers) { try { handler(message); } catch (e) { print('[WebSocket] 消息处理器异常: $e'); } } } else { print('[WebSocket] 未注册的消息类型: ${message.type}'); } } /// 发送消息确认(ACK) void _sendAck(String messageId) { _send({ 'type': 'ack', 'data': {'ack_id': messageId}, 'timestamp': DateTime.now().millisecondsSinceEpoch, }); } /// 处理收到的ACK确认 void _onAckReceived(String messageId) { _pendingAcks[messageId]?.cancel(); _pendingAcks.remove(messageId); } /// 启动心跳机制 /// 每30秒发送一次心跳包,45秒内未收到响应则断开重连 void _startHeartbeat() { _stopHeartbeat(); _heartbeatTimer = Timer.periodic( Duration(seconds: heartbeatIntervalSec), (_) => _sendHeartbeat(), ); } /// 发送心跳包 void _sendHeartbeat() { _send({ 'type': 'heartbeat', 'timestamp': DateTime.now().millisecondsSinceEpoch, }); // 设置心跳超时检测 _heartbeatTimeoutTimer?.cancel(); _heartbeatTimeoutTimer = Timer( Duration(seconds: heartbeatTimeoutSec), () { print('[WebSocket] 心跳超时,断开连接'); _onDisconnected(); }, ); } /// 收到心跳响应,取消超时计时器 void _onHeartbeatAck() { _heartbeatTimeoutTimer?.cancel(); } /// 停止心跳 void _stopHeartbeat() { _heartbeatTimer?.cancel(); _heartbeatTimer = null; _heartbeatTimeoutTimer?.cancel(); _heartbeatTimeoutTimer = null; } /// 发送JSON数据 void _send(Map data) { if (_state != WsConnectionState.connected) return; try { final jsonStr = jsonEncode(data); // 实际调用: _webSocket.add(jsonStr); print('[WebSocket] 发送: ${data['type']}'); } catch (e) { print('[WebSocket] 发送失败: $e'); } } /// 连接断开处理 void _onDisconnected() { _stopHeartbeat(); _updateState(WsConnectionState.disconnected); print('[WebSocket] 连接已断开'); _scheduleReconnect(); } /// 连接错误处理 void _onError(dynamic error) { print('[WebSocket] 连接错误: $error'); _onDisconnected(); } /// 安排自动重连(指数退避策略) /// 间隔: 1s, 2s, 4s, 8s, 16s, 32s, 60s(上限) void _scheduleReconnect() { _reconnectTimer?.cancel(); final interval = _calculateReconnectInterval(); _updateState(WsConnectionState.reconnecting); print('[WebSocket] ${interval}秒后尝试重连 (第${_reconnectAttempts + 1}次)'); _reconnectTimer = Timer(Duration(seconds: interval), () { _reconnectAttempts++; connect(); }); } /// 计算重连间隔(指数退避,上限60秒) int _calculateReconnectInterval() { final interval = 1 << _reconnectAttempts; // 2^n return interval > maxReconnectIntervalSec ? maxReconnectIntervalSec : interval; } /// 更新连接状态并通知监听器 void _updateState(WsConnectionState newState) { if (_state == newState) return; _state = newState; for (final listener in _stateListeners) { try { listener(newState); } catch (e) { print('[WebSocket] 状态监听器异常: $e'); } } } /// 主动重连(应用前台恢复时调用) void reconnect() { if (_state == WsConnectionState.connected) return; _reconnectAttempts = 0; connect(); } /// 断开连接并释放资源 void disconnect() { _reconnectTimer?.cancel(); _reconnectTimer = null; _stopHeartbeat(); // 取消所有待ACK的超时计时器 for (final timer in _pendingAcks.values) { timer.cancel(); } _pendingAcks.clear(); // 关闭WebSocket连接 // 实际调用: _webSocket?.close(); _webSocket = null; _updateState(WsConnectionState.disconnected); print('[WebSocket] 已主动断开连接'); } /// 销毁服务(释放所有资源和回调) void dispose() { disconnect(); _handlers.clear(); _stateListeners.clear(); } }