Files
2026-03-22 15:24:40 +08:00

327 lines
9.9 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* 自然写互动课堂教学管理网关软件 V1.0
* mqtt_client.c - MQTT通信客户端(TLS加密)
*
* 功能说明:
* 1. MQTT 3.1.1协议实现(基于mosquitto库)
* 2. TLS/SSL加密通信
* 3. 自动重连与会话恢复
* 4. 主题订阅管理(控制指令下发)
* 5. 笔迹数据批量发布
* 6. 遗嘱消息(设备离线通知)
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <syslog.h>
#include <errno.h>
#include <time.h>
/* Mosquitto MQTT库 */
#include <mosquitto.h>
/* 模块头文件 */
#include "mqtt_client.h"
#include "gateway_config.h"
/* ========== 常量定义 ========== */
/* MQTT QoS级别 */
#define MQTT_QOS_AT_MOST_ONCE 0
#define MQTT_QOS_AT_LEAST_ONCE 1
/* MQTT保活间隔(秒) */
#define MQTT_KEEPALIVE_SEC 60
/* 重连间隔(秒) */
#define MQTT_RECONNECT_SEC 5
/* 最大重连间隔(秒,指数退避上限) */
#define MQTT_MAX_RECONNECT_SEC 120
/* 消息批量发布缓冲区大小 */
#define PUBLISH_BATCH_SIZE 32
/* 主题前缀 */
#define TOPIC_PREFIX "writech/gateway/"
/* ========== 数据结构 ========== */
/* MQTT客户端状态 */
typedef struct {
struct mosquitto *mosq; /* Mosquitto实例 */
char gateway_id[64]; /* 网关唯一ID */
char broker_host[256]; /* 服务器地址 */
int broker_port; /* 服务器端口 */
int is_connected; /* 是否已连接 */
int reconnect_count; /* 重连次数 */
pthread_mutex_t pub_mutex; /* 发布锁 */
/* 主题 */
char topic_stroke_data[128]; /* 笔迹数据上报主题 */
char topic_device_status[128]; /* 设备状态上报主题 */
char topic_cmd_subscribe[128]; /* 命令下发订阅主题 */
char topic_ota[128]; /* OTA升级通知主题 */
/* TLS证书路径 */
char ca_cert_path[256]; /* CA证书 */
char client_cert_path[256]; /* 客户端证书 */
char client_key_path[256]; /* 客户端私钥 */
/* 统计 */
unsigned long msgs_published;
unsigned long msgs_received;
unsigned long bytes_sent;
} MQTTState;
static MQTTState g_mqtt;
/* 命令回调函数 */
static void (*g_cmd_callback)(const char *topic, const uint8_t *payload,
int payload_len) = NULL;
/* ========== MQTT回调函数 ========== */
/**
* 连接成功回调
*/
static void on_connect(struct mosquitto *mosq, void *userdata, int rc) {
(void)userdata;
if (rc == 0) {
g_mqtt.is_connected = 1;
g_mqtt.reconnect_count = 0;
syslog(LOG_INFO, "MQTT: 已连接到 %s:%d", g_mqtt.broker_host, g_mqtt.broker_port);
/* 订阅控制指令主题 */
mosquitto_subscribe(mosq, NULL, g_mqtt.topic_cmd_subscribe, MQTT_QOS_AT_LEAST_ONCE);
/* 订阅OTA升级通知主题 */
mosquitto_subscribe(mosq, NULL, g_mqtt.topic_ota, MQTT_QOS_AT_LEAST_ONCE);
/* 发布上线状态 */
publish_status("online");
} else {
syslog(LOG_ERR, "MQTT: 连接失败,返回码=%d", rc);
g_mqtt.is_connected = 0;
}
}
/**
* 连接断开回调
*/
static void on_disconnect(struct mosquitto *mosq, void *userdata, int rc) {
(void)mosq;
(void)userdata;
g_mqtt.is_connected = 0;
syslog(LOG_WARNING, "MQTT: 连接断开,原因=%d", rc);
/* 非主动断开,将自动重连 */
if (rc != 0) {
g_mqtt.reconnect_count++;
}
}
/**
* 消息接收回调(订阅的主题收到消息)
*/
static void on_message(struct mosquitto *mosq, void *userdata,
const struct mosquitto_message *msg) {
(void)mosq;
(void)userdata;
g_mqtt.msgs_received++;
syslog(LOG_DEBUG, "MQTT: 收到消息 [%s] 长度=%d", msg->topic, msg->payloadlen);
/* 分发到命令处理回调 */
if (g_cmd_callback) {
g_cmd_callback(msg->topic, (const uint8_t *)msg->payload, msg->payloadlen);
}
}
/**
* 发布完成回调
*/
static void on_publish(struct mosquitto *mosq, void *userdata, int mid) {
(void)mosq;
(void)userdata;
(void)mid;
g_mqtt.msgs_published++;
}
/* ========== 初始化 ========== */
/**
* 初始化MQTT客户端
*
* @param host MQTT服务器地址
* @param port MQTT服务器端口(8883=TLS
* @return 0成功, -1失败
*/
int mqtt_client_init(const char *host, int port) {
memset(&g_mqtt, 0, sizeof(g_mqtt));
pthread_mutex_init(&g_mqtt.pub_mutex, NULL);
strncpy(g_mqtt.broker_host, host, sizeof(g_mqtt.broker_host) - 1);
g_mqtt.broker_port = port;
/* 生成网关ID */
snprintf(g_mqtt.gateway_id, sizeof(g_mqtt.gateway_id),
"writech-gw-%08x", (unsigned int)time(NULL));
/* 构建主题 */
snprintf(g_mqtt.topic_stroke_data, sizeof(g_mqtt.topic_stroke_data),
"%s%s/stroke", TOPIC_PREFIX, g_mqtt.gateway_id);
snprintf(g_mqtt.topic_device_status, sizeof(g_mqtt.topic_device_status),
"%s%s/status", TOPIC_PREFIX, g_mqtt.gateway_id);
snprintf(g_mqtt.topic_cmd_subscribe, sizeof(g_mqtt.topic_cmd_subscribe),
"%s%s/cmd/#", TOPIC_PREFIX, g_mqtt.gateway_id);
snprintf(g_mqtt.topic_ota, sizeof(g_mqtt.topic_ota),
"%s%s/ota", TOPIC_PREFIX, g_mqtt.gateway_id);
/* 初始化Mosquitto库 */
mosquitto_lib_init();
/* 创建Mosquitto客户端实例 */
g_mqtt.mosq = mosquitto_new(g_mqtt.gateway_id, true, NULL);
if (g_mqtt.mosq == NULL) {
syslog(LOG_ERR, "MQTT: 创建客户端失败");
return -1;
}
/* 注册回调 */
mosquitto_connect_callback_set(g_mqtt.mosq, on_connect);
mosquitto_disconnect_callback_set(g_mqtt.mosq, on_disconnect);
mosquitto_message_callback_set(g_mqtt.mosq, on_message);
mosquitto_publish_callback_set(g_mqtt.mosq, on_publish);
/* 设置遗嘱消息(设备异常离线时自动发布) */
char will_payload[128];
snprintf(will_payload, sizeof(will_payload),
"{\"gatewayId\":\"%s\",\"status\":\"offline\"}", g_mqtt.gateway_id);
mosquitto_will_set(g_mqtt.mosq, g_mqtt.topic_device_status,
strlen(will_payload), will_payload, MQTT_QOS_AT_LEAST_ONCE, true);
/* 配置TLS */
const char *ca_cert = gateway_config_get_string("mqtt.ca_cert", "/etc/writech/ca.pem");
const char *client_cert = gateway_config_get_string("mqtt.client_cert", "/etc/writech/client.pem");
const char *client_key = gateway_config_get_string("mqtt.client_key", "/etc/writech/client.key");
strncpy(g_mqtt.ca_cert_path, ca_cert, sizeof(g_mqtt.ca_cert_path) - 1);
strncpy(g_mqtt.client_cert_path, client_cert, sizeof(g_mqtt.client_cert_path) - 1);
strncpy(g_mqtt.client_key_path, client_key, sizeof(g_mqtt.client_key_path) - 1);
int tls_ret = mosquitto_tls_set(g_mqtt.mosq, ca_cert, NULL,
client_cert, client_key, NULL);
if (tls_ret != MOSQ_ERR_SUCCESS) {
syslog(LOG_WARNING, "MQTT: TLS配置失败,将使用非加密连接");
}
/* 设置自动重连 */
mosquitto_reconnect_delay_set(g_mqtt.mosq, MQTT_RECONNECT_SEC,
MQTT_MAX_RECONNECT_SEC, true);
/* 发起连接 */
int ret = mosquitto_connect_async(g_mqtt.mosq, host, port, MQTT_KEEPALIVE_SEC);
if (ret != MOSQ_ERR_SUCCESS) {
syslog(LOG_ERR, "MQTT: 连接发起失败: %s", mosquitto_strerror(ret));
return -1;
}
/* 启动Mosquitto网络循环线程 */
mosquitto_loop_start(g_mqtt.mosq);
syslog(LOG_INFO, "MQTT客户端初始化完成,网关ID=%s", g_mqtt.gateway_id);
return 0;
}
/* ========== 数据发布 ========== */
/**
* 发布笔迹数据到MQTT
*
* @param pen_mac 笔MAC地址
* @param data 笔迹二进制数据
* @param data_len 数据长度
* @return 0成功, -1未连接, -2发布失败
*/
int mqtt_publish_stroke(const char *pen_mac, const uint8_t *data, int data_len) {
if (!g_mqtt.is_connected) {
return -1;
}
/* 构建包含笔MAC的完整主题 */
char topic[256];
snprintf(topic, sizeof(topic), "%s/%s", g_mqtt.topic_stroke_data, pen_mac);
pthread_mutex_lock(&g_mqtt.pub_mutex);
int ret = mosquitto_publish(g_mqtt.mosq, NULL, topic,
data_len, data, MQTT_QOS_AT_MOST_ONCE, false);
pthread_mutex_unlock(&g_mqtt.pub_mutex);
if (ret == MOSQ_ERR_SUCCESS) {
g_mqtt.bytes_sent += data_len;
return 0;
}
syslog(LOG_WARNING, "MQTT: 发布失败: %s", mosquitto_strerror(ret));
return -2;
}
/**
* 发布网关/设备状态
*/
static void publish_status(const char *status) {
char payload[512];
snprintf(payload, sizeof(payload),
"{\"gatewayId\":\"%s\",\"status\":\"%s\","
"\"uptime\":%lu,\"penCount\":%d,"
"\"msgsSent\":%lu,\"msgsRecv\":%lu}",
g_mqtt.gateway_id, status,
(unsigned long)time(NULL),
0, /* pen count to be filled */
g_mqtt.msgs_published,
g_mqtt.msgs_received);
mosquitto_publish(g_mqtt.mosq, NULL, g_mqtt.topic_device_status,
strlen(payload), payload, MQTT_QOS_AT_LEAST_ONCE, true);
}
/* ========== 外部接口 ========== */
int mqtt_client_is_connected(void) { return g_mqtt.is_connected; }
int mqtt_client_get_fd(void) {
return mosquitto_socket(g_mqtt.mosq);
}
void mqtt_client_process_read(void) {
mosquitto_loop_read(g_mqtt.mosq, 1);
}
void mqtt_client_process_write(void) {
mosquitto_loop_write(g_mqtt.mosq, 1);
}
void mqtt_client_set_cmd_callback(void (*cb)(const char *, const uint8_t *, int)) {
g_cmd_callback = cb;
}
void mqtt_client_cleanup(void) {
if (g_mqtt.mosq) {
publish_status("offline");
mosquitto_disconnect(g_mqtt.mosq);
mosquitto_loop_stop(g_mqtt.mosq, true);
mosquitto_destroy(g_mqtt.mosq);
}
mosquitto_lib_cleanup();
pthread_mutex_destroy(&g_mqtt.pub_mutex);
syslog(LOG_INFO, "MQTT客户端已清理");
}