Files
2026-03-22 15:24:40 +08:00

432 lines
13 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 自然写教室智能算力盒边缘计算软件 V1.0
* NPU/GPU硬件调度模块 - 硬件加速资源管理与任务分配
*
* 管理算力盒上的NPU/GPU计算资源
* 支持多种硬件平台:NVIDIA GPU(CUDA)、瑞芯微NPU(RKNN)、通用GPU(OpenCL)
* 根据任务类型和硬件负载动态选择最优推理路径
*/
#ifndef NPU_SCHEDULER_H
#define NPU_SCHEDULER_H
#include <string>
#include <vector>
#include <memory>
#include <mutex>
#include <atomic>
#include <chrono>
#include <queue>
#include <functional>
#include <unordered_map>
#include <thread>
#include <condition_variable>
#include <cstring>
// ==================== 硬件设备抽象 ====================
/** 硬件加速器类型 */
enum class AcceleratorType {
CPU_ONLY = 0, // 仅CPU(无加速器可用时的兜底方案)
NVIDIA_GPU = 1, // NVIDIA GPU (CUDA/TensorRT)
ROCKCHIP_NPU = 2, // 瑞芯微NPU (RKNN)
AMLOGIC_NPU = 3, // 晶晨NPU
GENERIC_OPENCL = 4 // 通用OpenCL GPU
};
/** 硬件设备信息 */
struct AcceleratorDevice {
AcceleratorType type; // 加速器类型
int device_id; // 设备编号
std::string name; // 设备名称
std::string driver_version; // 驱动版本
size_t total_memory_mb; // 总显存/内存(MB)
size_t free_memory_mb; // 可用显存/内存(MB)
float compute_capability; // 算力指标
float current_utilization; // 当前利用率(0-1)
float temperature_celsius; // 当前温度
float max_temperature; // 最高安全温度
bool is_available; // 是否可用
};
/** 推理任务资源需求 */
struct TaskResourceRequirement {
size_t memory_mb; // 需要的显存(MB)
float estimated_time_ms; // 预估推理时间
bool requires_fp16; // 是否需要FP16支持
bool requires_int8; // 是否需要INT8支持
int preferred_device; // 偏好设备ID-1表示无偏好)
};
// ==================== 硬件检测器 ====================
/**
* 硬件加速器检测器
* 启动时扫描系统中可用的NPU/GPU设备
* 自动匹配设备驱动和推理后端
*/
class HardwareDetector {
public:
/**
* 扫描系统中所有可用的加速器设备
* 检测顺序:NVIDIA GPU → 瑞芯微NPU → 通用OpenCL → CPU
*/
std::vector<AcceleratorDevice> detect_devices() {
std::vector<AcceleratorDevice> devices;
// 检测NVIDIA GPU
if (detect_nvidia_gpu(devices)) {
// 通过NVML库获取GPU信息
}
// 检测瑞芯微NPU
if (detect_rockchip_npu(devices)) {
// 通过sysfs获取NPU信息
}
// 如果没有加速器,添加CPU作为兜底
if (devices.empty()) {
AcceleratorDevice cpu_dev;
cpu_dev.type = AcceleratorType::CPU_ONLY;
cpu_dev.device_id = 0;
cpu_dev.name = "CPU";
cpu_dev.total_memory_mb = get_system_memory_mb();
cpu_dev.free_memory_mb = get_free_memory_mb();
cpu_dev.is_available = true;
devices.push_back(cpu_dev);
}
return devices;
}
private:
bool detect_nvidia_gpu(std::vector<AcceleratorDevice>& devices) {
// 检查 /dev/nvidia0 是否存在
// 使用NVML API获取设备信息
// nvmlInit();
// nvmlDeviceGetCount(&count);
// for (int i = 0; i < count; i++) {
// nvmlDeviceGetHandleByIndex(i, &device);
// nvmlDeviceGetName(device, name, sizeof(name));
// nvmlDeviceGetMemoryInfo(device, &mem);
// nvmlDeviceGetUtilizationRates(device, &util);
// nvmlDeviceGetTemperature(device, NVML_TEMPERATURE_GPU, &temp);
// }
return false;
}
bool detect_rockchip_npu(std::vector<AcceleratorDevice>& devices) {
// 检查 /dev/rknpu 或 /sys/class/misc/rknpu 是否存在
// 读取NPU硬件信息
// cat /sys/kernel/debug/rknpu/load // NPU负载
return false;
}
size_t get_system_memory_mb() {
// 读取 /proc/meminfo
return 4096; // 默认4GB
}
size_t get_free_memory_mb() {
return 2048;
}
};
// ==================== 设备负载监控 ====================
/**
* 硬件设备负载实时监控
* 定期采集GPU/NPU利用率、温度、显存使用等指标
* 为调度策略提供实时数据支撑
*/
class DeviceLoadMonitor {
public:
struct DeviceMetrics {
int device_id;
float utilization; // 利用率 (0-1)
float memory_usage; // 显存使用率 (0-1)
float temperature; // 温度(摄氏度)
float power_watts; // 功耗(瓦)
int inference_qps; // 当前推理QPS
std::chrono::steady_clock::time_point timestamp;
};
DeviceLoadMonitor() : running_(false) {}
/** 启动监控(后台线程定期采集) */
void start(int interval_ms = 1000) {
running_ = true;
monitor_thread_ = std::thread([this, interval_ms]() {
while (running_) {
collect_metrics();
std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms));
}
});
}
/** 获取指定设备的最新指标 */
DeviceMetrics get_metrics(int device_id) {
std::lock_guard<std::mutex> lock(mutex_);
auto it = latest_metrics_.find(device_id);
if (it != latest_metrics_.end()) {
return it->second;
}
return DeviceMetrics{};
}
/** 获取所有设备指标 */
std::vector<DeviceMetrics> get_all_metrics() {
std::lock_guard<std::mutex> lock(mutex_);
std::vector<DeviceMetrics> result;
for (const auto& pair : latest_metrics_) {
result.push_back(pair.second);
}
return result;
}
void stop() {
running_ = false;
if (monitor_thread_.joinable()) {
monitor_thread_.join();
}
}
private:
void collect_metrics() {
std::lock_guard<std::mutex> lock(mutex_);
// NVIDIA GPU: nvmlDeviceGetUtilizationRates + nvmlDeviceGetTemperature
// 瑞芯微NPU: 读取 /sys/kernel/debug/rknpu/load
// CPU: 读取 /proc/stat
}
std::unordered_map<int, DeviceMetrics> latest_metrics_;
std::mutex mutex_;
std::atomic<bool> running_;
std::thread monitor_thread_;
};
// ==================== 调度策略 ====================
/**
* 推理任务调度策略
* 根据任务特征和设备负载选择最优的推理设备
*/
class SchedulingPolicy {
public:
virtual ~SchedulingPolicy() = default;
/** 选择最优设备执行推理任务 */
virtual int select_device(const TaskResourceRequirement& requirement,
const std::vector<AcceleratorDevice>& devices,
const std::vector<DeviceLoadMonitor::DeviceMetrics>& metrics) = 0;
};
/**
* 最小负载调度策略
* 优先选择当前利用率最低的设备
*/
class MinLoadPolicy : public SchedulingPolicy {
public:
int select_device(const TaskResourceRequirement& requirement,
const std::vector<AcceleratorDevice>& devices,
const std::vector<DeviceLoadMonitor::DeviceMetrics>& metrics) override {
int best_device = 0;
float min_load = 1.0f;
for (size_t i = 0; i < devices.size(); i++) {
if (!devices[i].is_available) continue;
if (devices[i].free_memory_mb < requirement.memory_mb) continue;
float load = (i < metrics.size()) ? metrics[i].utilization : 0.0f;
if (load < min_load) {
min_load = load;
best_device = static_cast<int>(i);
}
}
return best_device;
}
};
/**
* 温度感知调度策略
* 除了负载外还考虑设备温度,防止过热降频
*/
class ThermalAwarePolicy : public SchedulingPolicy {
public:
ThermalAwarePolicy(float temp_threshold = 80.0f) : temp_threshold_(temp_threshold) {}
int select_device(const TaskResourceRequirement& requirement,
const std::vector<AcceleratorDevice>& devices,
const std::vector<DeviceLoadMonitor::DeviceMetrics>& metrics) override {
int best_device = 0;
float best_score = -1.0f;
for (size_t i = 0; i < devices.size(); i++) {
if (!devices[i].is_available) continue;
if (devices[i].free_memory_mb < requirement.memory_mb) continue;
float load = (i < metrics.size()) ? metrics[i].utilization : 0.0f;
float temp = (i < metrics.size()) ? metrics[i].temperature : 0.0f;
// 综合评分:负载权重0.6 + 温度权重0.4
float load_score = 1.0f - load;
float temp_score = (temp < temp_threshold_) ? 1.0f : (1.0f - (temp - temp_threshold_) / 20.0f);
float score = load_score * 0.6f + temp_score * 0.4f;
if (score > best_score) {
best_score = score;
best_device = static_cast<int>(i);
}
}
return best_device;
}
private:
float temp_threshold_;
};
// ==================== NPU调度器(核心) ====================
/**
* NPU/GPU硬件调度器
* 管理推理任务到硬件设备的分配调度
* 核心功能:
* 1. 硬件资源池化管理
* 2. 基于负载和温度的智能调度
* 3. 设备故障自动切换
* 4. 推理性能指标采集
*/
class NpuScheduler {
public:
NpuScheduler() : initialized_(false) {}
/**
* 初始化调度器
* 检测硬件设备,启动负载监控,设置调度策略
*/
bool initialize() {
// 检测可用硬件加速器
HardwareDetector detector;
devices_ = detector.detect_devices();
if (devices_.empty()) {
return false;
}
// 启动设备负载监控
load_monitor_.start(1000);
// 设置调度策略(默认温度感知策略)
policy_ = std::make_unique<ThermalAwarePolicy>(80.0f);
initialized_ = true;
return true;
}
/**
* 为推理任务分配最优设备
*/
int schedule_task(const TaskResourceRequirement& requirement) {
if (!initialized_) return 0;
auto metrics = load_monitor_.get_all_metrics();
return policy_->select_device(requirement, devices_, metrics);
}
/**
* 获取所有设备状态
*/
std::vector<AcceleratorDevice> get_device_status() {
// 更新设备实时状态
auto metrics = load_monitor_.get_all_metrics();
for (auto& dev : devices_) {
for (const auto& m : metrics) {
if (m.device_id == dev.device_id) {
dev.current_utilization = m.utilization;
dev.temperature_celsius = m.temperature;
}
}
}
return devices_;
}
/** 获取调度统计信息 */
struct SchedulerStats {
long total_tasks_scheduled;
long total_tasks_completed;
long total_tasks_failed;
float avg_inference_ms;
float gpu_avg_utilization;
float gpu_temperature;
int active_devices;
};
SchedulerStats get_stats() {
SchedulerStats stats;
stats.total_tasks_scheduled = tasks_scheduled_.load();
stats.total_tasks_completed = tasks_completed_.load();
stats.total_tasks_failed = tasks_failed_.load();
stats.active_devices = static_cast<int>(devices_.size());
auto metrics = load_monitor_.get_all_metrics();
if (!metrics.empty()) {
float total_util = 0;
for (const auto& m : metrics) total_util += m.utilization;
stats.gpu_avg_utilization = total_util / metrics.size();
stats.gpu_temperature = metrics[0].temperature;
}
return stats;
}
void shutdown() {
load_monitor_.stop();
initialized_ = false;
}
private:
std::vector<AcceleratorDevice> devices_;
DeviceLoadMonitor load_monitor_;
std::unique_ptr<SchedulingPolicy> policy_;
bool initialized_;
std::atomic<long> tasks_scheduled_{0};
std::atomic<long> tasks_completed_{0};
std::atomic<long> tasks_failed_{0};
};
// ==================== 配置管理 ====================
/**
* 算力盒配置管理(边缘设备专用)
* 从JSON配置文件和环境变量加载配置
* 支持运行时配置热更新(通过MQTT远程指令)
*/
struct EdgeBoxConfiguration {
// 推理配置
int max_concurrent_inferences = 4; // 最大并发推理数
int inference_queue_size = 256; // 推理队列大小
int default_timeout_ms = 500; // 默认推理超时
// NPU/GPU配置
float gpu_memory_fraction = 0.8f; // GPU显存使用比例上限
float thermal_throttle_temp = 80.0f; // 温度降频阈值
bool enable_fp16 = true; // 启用FP16推理
bool enable_int8 = false; // 启用INT8量化
// 网络配置
std::string grpc_listen = "0.0.0.0:50052";
std::string mqtt_broker = "ssl://mqtt.writech.com:8883";
bool enable_mtls = true;
// 存储配置
std::string models_dir = "/opt/models";
std::string cache_dir = "/var/lib/writech/cache";
int offline_cache_max_mb = 256;
// 集群配置
bool enable_cluster = true;
std::string cluster_discovery = "mdns";
};
#endif // NPU_SCHEDULER_H