Files
2026-03-22 15:24:40 +08:00

483 lines
16 KiB
Kotlin

/**
* 自然写互动课堂电视端应用软件 V1.0
* WebSocket管理器 - 实时接收笔迹数据流和课堂互动指令
*
* 功能说明:
* 1. WebSocket长连接管理(建立、维持、自动重连)
* 2. 实时笔迹数据接收(从网关/算力盒推送的学生笔迹坐标流)
* 3. 课堂互动指令接收(发题、收卷、分组展示等)
* 4. 心跳机制(30秒间隔,检测连接存活性)
* 5. 指数退避重连策略(断线后自动重连)
* 6. 消息分帧处理(大数据包拆分接收)
* 7. 局域网优先连接(优先连接网关WebSocket,备选连接云端)
*/
package com.writech.tv.network
import android.os.Handler
import android.os.Looper
import android.util.Log
import org.json.JSONArray
import org.json.JSONObject
import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
/**
* WebSocket消息类型定义
*/
object WsMessageTypes {
const val HEARTBEAT = "heartbeat"
const val HEARTBEAT_ACK = "heartbeat_ack"
const val STROKE_DATA = "stroke_data" // 笔迹坐标数据
const val STROKE_BATCH = "stroke_batch" // 批量笔迹数据
const val PEN_DOWN = "pen_down" // 落笔事件
const val PEN_UP = "pen_up" // 抬笔事件
const val CLASSROOM_START = "classroom_start" // 课堂开始
const val CLASSROOM_END = "classroom_end" // 课堂结束
const val QUIZ_START = "quiz_start" // 发题
const val QUIZ_SUBMIT = "quiz_submit" // 学生提交答案
const val QUIZ_STATS = "quiz_stats" // 答题统计结果
const val STUDENT_JOIN = "student_join" // 学生上线
const val STUDENT_LEAVE = "student_leave" // 学生离线
const val DISPLAY_MODE = "display_mode" // 切换显示模式(全班/分组/个人)
}
/**
* 笔迹数据回调接口
*/
interface StrokeDataListener {
/** 收到笔迹坐标数据 */
fun onStrokeData(studentId: String, x: Float, y: Float, pressure: Float, timestamp: Long)
/** 学生落笔事件 */
fun onPenDown(studentId: String, pageId: Int)
/** 学生抬笔事件 */
fun onPenUp(studentId: String)
}
/**
* 课堂事件回调接口
*/
interface ClassroomEventListener {
/** 课堂开始 */
fun onClassroomStart(classId: String, className: String)
/** 课堂结束 */
fun onClassroomEnd(classId: String)
/** 学生上线/离线 */
fun onStudentStatusChange(studentId: String, studentName: String, online: Boolean)
/** 答题事件 */
fun onQuizEvent(eventType: String, data: JSONObject)
/** 显示模式切换 */
fun onDisplayModeChange(mode: String, targetStudentIds: List<String>)
}
/**
* WebSocket连接管理器
* 管理与网关或云端的WebSocket长连接
*/
class WebSocketManager {
companion object {
private const val TAG = "WsManager"
/** 心跳间隔(毫秒) */
private const val HEARTBEAT_INTERVAL = 30_000L
/** 心跳超时(毫秒) */
private const val HEARTBEAT_TIMEOUT = 45_000L
/** 最大重连间隔(毫秒) */
private const val MAX_RECONNECT_INTERVAL = 60_000L
/** 最大重连次数(超过后停止重连) */
private const val MAX_RECONNECT_ATTEMPTS = 100
}
/** 连接状态 */
enum class State {
DISCONNECTED, CONNECTING, CONNECTED, RECONNECTING
}
/** 当前连接状态 */
@Volatile
var state: State = State.DISCONNECTED
private set
/** WebSocket实例 */
private var webSocket: Any? = null // OkHttp WebSocket实例
/** 当前连接URL */
private var currentUrl: String = ""
/** 认证Token */
private var authToken: String = ""
/** 心跳定时器 */
private var heartbeatTimer: Timer? = null
/** 心跳超时定时器 */
private var heartbeatTimeoutTimer: Timer? = null
/** 重连定时器 */
private var reconnectTimer: Timer? = null
/** 重连尝试次数 */
private val reconnectAttempts = AtomicInteger(0)
/** 是否主动断开(主动断开不触发重连) */
private val intentionalDisconnect = AtomicBoolean(false)
/** 最后收到消息时间戳 */
@Volatile
private var lastMessageTimestamp: Long = 0
/** 主线程Handler */
private val mainHandler = Handler(Looper.getMainLooper())
/** 笔迹数据监听器列表 */
private val strokeListeners = CopyOnWriteArrayList<StrokeDataListener>()
/** 课堂事件监听器列表 */
private val classroomListeners = CopyOnWriteArrayList<ClassroomEventListener>()
/** 注册笔迹数据监听器 */
fun addStrokeListener(listener: StrokeDataListener) {
strokeListeners.add(listener)
}
/** 移除笔迹数据监听器 */
fun removeStrokeListener(listener: StrokeDataListener) {
strokeListeners.remove(listener)
}
/** 注册课堂事件监听器 */
fun addClassroomListener(listener: ClassroomEventListener) {
classroomListeners.add(listener)
}
/** 移除课堂事件监听器 */
fun removeClassroomListener(listener: ClassroomEventListener) {
classroomListeners.remove(listener)
}
/**
* 连接WebSocket服务器
* @param url WebSocket服务器地址(网关局域网地址或云端地址)
* @param token 认证Token
*/
fun connect(url: String, token: String) {
if (state == State.CONNECTED || state == State.CONNECTING) {
Log.w(TAG, "WebSocket已连接或正在连接中")
return
}
currentUrl = url
authToken = token
intentionalDisconnect.set(false)
state = State.CONNECTING
Log.i(TAG, "正在连接WebSocket: $url")
// 使用OkHttp建立WebSocket连接
// 实际实现:
// val request = Request.Builder().url("$url?token=$token&device_type=tv").build()
// val client = OkHttpClient.Builder().pingInterval(30, TimeUnit.SECONDS).build()
// webSocket = client.newWebSocket(request, wsListener)
// 模拟连接成功
mainHandler.postDelayed({
onConnected()
}, 200)
}
/** 连接成功回调 */
private fun onConnected() {
state = State.CONNECTED
reconnectAttempts.set(0)
Log.i(TAG, "WebSocket连接成功")
// 启动心跳
startHeartbeat()
// 请求补发离线消息
sendOfflineSyncRequest()
}
/** 处理接收到的WebSocket文本消息 */
fun onMessageReceived(text: String) {
try {
val json = JSONObject(text)
val type = json.optString("type", "")
val data = json.optJSONObject("data") ?: JSONObject()
val timestamp = json.optLong("timestamp", System.currentTimeMillis())
lastMessageTimestamp = timestamp
when (type) {
WsMessageTypes.HEARTBEAT_ACK -> onHeartbeatAck()
WsMessageTypes.STROKE_DATA -> handleStrokeData(data)
WsMessageTypes.STROKE_BATCH -> handleStrokeBatch(data)
WsMessageTypes.PEN_DOWN -> handlePenDown(data)
WsMessageTypes.PEN_UP -> handlePenUp(data)
WsMessageTypes.CLASSROOM_START -> handleClassroomStart(data)
WsMessageTypes.CLASSROOM_END -> handleClassroomEnd(data)
WsMessageTypes.STUDENT_JOIN -> handleStudentJoin(data)
WsMessageTypes.STUDENT_LEAVE -> handleStudentLeave(data)
WsMessageTypes.QUIZ_START -> handleQuizEvent("quiz_start", data)
WsMessageTypes.QUIZ_SUBMIT -> handleQuizEvent("quiz_submit", data)
WsMessageTypes.QUIZ_STATS -> handleQuizEvent("quiz_stats", data)
WsMessageTypes.DISPLAY_MODE -> handleDisplayModeChange(data)
else -> Log.w(TAG, "未知消息类型: $type")
}
} catch (e: Exception) {
Log.e(TAG, "消息解析失败: ${e.message}")
}
}
/* ========== 笔迹数据处理 ========== */
/** 处理单个笔迹坐标数据 */
private fun handleStrokeData(data: JSONObject) {
val studentId = data.optString("student_id", "")
val x = data.optDouble("x", 0.0).toFloat()
val y = data.optDouble("y", 0.0).toFloat()
val pressure = data.optDouble("pressure", 0.5).toFloat()
val timestamp = data.optLong("timestamp", 0)
for (listener in strokeListeners) {
listener.onStrokeData(studentId, x, y, pressure, timestamp)
}
}
/** 处理批量笔迹数据(一次传输多个坐标点,减少消息频率) */
private fun handleStrokeBatch(data: JSONObject) {
val studentId = data.optString("student_id", "")
val pointsArray = data.optJSONArray("points") ?: return
for (i in 0 until pointsArray.length()) {
val point = pointsArray.optJSONObject(i) ?: continue
val x = point.optDouble("x", 0.0).toFloat()
val y = point.optDouble("y", 0.0).toFloat()
val pressure = point.optDouble("pressure", 0.5).toFloat()
val timestamp = point.optLong("timestamp", 0)
for (listener in strokeListeners) {
listener.onStrokeData(studentId, x, y, pressure, timestamp)
}
}
}
/** 处理落笔事件 */
private fun handlePenDown(data: JSONObject) {
val studentId = data.optString("student_id", "")
val pageId = data.optInt("page_id", 0)
for (listener in strokeListeners) {
listener.onPenDown(studentId, pageId)
}
}
/** 处理抬笔事件 */
private fun handlePenUp(data: JSONObject) {
val studentId = data.optString("student_id", "")
for (listener in strokeListeners) {
listener.onPenUp(studentId)
}
}
/* ========== 课堂事件处理 ========== */
/** 处理课堂开始事件 */
private fun handleClassroomStart(data: JSONObject) {
val classId = data.optString("class_id", "")
val className = data.optString("class_name", "")
mainHandler.post {
for (listener in classroomListeners) {
listener.onClassroomStart(classId, className)
}
}
Log.i(TAG, "课堂已开始: $className")
}
/** 处理课堂结束事件 */
private fun handleClassroomEnd(data: JSONObject) {
val classId = data.optString("class_id", "")
mainHandler.post {
for (listener in classroomListeners) {
listener.onClassroomEnd(classId)
}
}
Log.i(TAG, "课堂已结束")
}
/** 处理学生上线事件 */
private fun handleStudentJoin(data: JSONObject) {
val studentId = data.optString("student_id", "")
val name = data.optString("student_name", "")
mainHandler.post {
for (listener in classroomListeners) {
listener.onStudentStatusChange(studentId, name, true)
}
}
}
/** 处理学生离线事件 */
private fun handleStudentLeave(data: JSONObject) {
val studentId = data.optString("student_id", "")
val name = data.optString("student_name", "")
mainHandler.post {
for (listener in classroomListeners) {
listener.onStudentStatusChange(studentId, name, false)
}
}
}
/** 处理答题相关事件 */
private fun handleQuizEvent(eventType: String, data: JSONObject) {
mainHandler.post {
for (listener in classroomListeners) {
listener.onQuizEvent(eventType, data)
}
}
}
/** 处理显示模式切换 */
private fun handleDisplayModeChange(data: JSONObject) {
val mode = data.optString("mode", "all") // all / group / single
val studentIds = mutableListOf<String>()
val idsArray = data.optJSONArray("student_ids")
if (idsArray != null) {
for (i in 0 until idsArray.length()) {
studentIds.add(idsArray.optString(i, ""))
}
}
mainHandler.post {
for (listener in classroomListeners) {
listener.onDisplayModeChange(mode, studentIds)
}
}
}
/* ========== 心跳机制 ========== */
/** 启动心跳定时器 */
private fun startHeartbeat() {
stopHeartbeat()
heartbeatTimer = Timer("ws-heartbeat")
heartbeatTimer?.scheduleAtFixedRate(object : TimerTask() {
override fun run() { sendHeartbeat() }
}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL)
}
/** 发送心跳包 */
private fun sendHeartbeat() {
val msg = JSONObject().apply {
put("type", WsMessageTypes.HEARTBEAT)
put("timestamp", System.currentTimeMillis())
}
sendMessage(msg.toString())
// 设置心跳超时检测
heartbeatTimeoutTimer?.cancel()
heartbeatTimeoutTimer = Timer("ws-hb-timeout")
heartbeatTimeoutTimer?.schedule(object : TimerTask() {
override fun run() {
Log.w(TAG, "心跳超时,断开连接")
handleDisconnect()
}
}, HEARTBEAT_TIMEOUT)
}
/** 收到心跳响应 */
private fun onHeartbeatAck() {
heartbeatTimeoutTimer?.cancel()
}
/** 停止心跳 */
private fun stopHeartbeat() {
heartbeatTimer?.cancel()
heartbeatTimer = null
heartbeatTimeoutTimer?.cancel()
heartbeatTimeoutTimer = null
}
/* ========== 重连机制 ========== */
/** 处理连接断开 */
private fun handleDisconnect() {
stopHeartbeat()
state = State.DISCONNECTED
if (!intentionalDisconnect.get() && reconnectAttempts.get() < MAX_RECONNECT_ATTEMPTS) {
scheduleReconnect()
}
}
/** 安排自动重连(指数退避策略) */
private fun scheduleReconnect() {
val attempt = reconnectAttempts.get()
val interval = minOf(1000L * (1L shl minOf(attempt, 6)), MAX_RECONNECT_INTERVAL)
state = State.RECONNECTING
Log.i(TAG, "${interval}ms后尝试重连 (第${attempt + 1}次)")
reconnectTimer?.cancel()
reconnectTimer = Timer("ws-reconnect")
reconnectTimer?.schedule(object : TimerTask() {
override fun run() {
reconnectAttempts.incrementAndGet()
connect(currentUrl, authToken)
}
}, interval)
}
/** 请求补发离线期间的消息 */
private fun sendOfflineSyncRequest() {
if (lastMessageTimestamp > 0) {
val msg = JSONObject().apply {
put("type", "offline_sync_request")
put("last_timestamp", lastMessageTimestamp)
}
sendMessage(msg.toString())
}
}
/** 发送WebSocket文本消息 */
fun sendMessage(text: String) {
if (state != State.CONNECTED) {
Log.w(TAG, "WebSocket未连接,无法发送消息")
return
}
// 实际调用: webSocket?.send(text)
Log.d(TAG, "发送消息: ${text.take(100)}")
}
/** 主动断开连接 */
fun disconnect() {
intentionalDisconnect.set(true)
stopHeartbeat()
reconnectTimer?.cancel()
// 实际调用: webSocket?.close(1000, "Client disconnect")
webSocket = null
state = State.DISCONNECTED
Log.i(TAG, "WebSocket已主动断开")
}
/** 释放所有资源 */
fun release() {
disconnect()
strokeListeners.clear()
classroomListeners.clear()
}
}