# 自然写手写识别与AI分析引擎软件 V1.0 # 笔迹预处理模块 - 笔迹数据预处理管道 """ 笔迹预处理模块 提供笔迹坐标数据的完整预处理管道: 去噪 → 坐标归一化 → 笔画分割 → 特征增强 → 张量转换 预处理结果作为AI推理模型的标准化输入 """ import math import logging import numpy as np from typing import List, Dict, Tuple, Optional from dataclasses import dataclass logger = logging.getLogger(__name__) # ==================== 数据结构 ==================== @dataclass class RawStrokePoint: """原始笔迹坐标点(来自点阵笔/网关的原始数据)""" x: float # X坐标(点阵单位) y: float # Y坐标(点阵单位) pressure: float # 压力值 (0.0-1.0) timestamp: int # 采集时间戳(毫秒) pen_up: bool = False # 抬笔标记 @dataclass class ProcessedStroke: """预处理后的笔画数据""" points: np.ndarray # 归一化坐标数组 (N, 3) [x, y, pressure] stroke_index: int = 0 # 笔画序号 point_count: int = 0 # 采样点数 length: float = 0.0 # 笔画长度 duration_ms: int = 0 # 书写耗时 # ==================== 去噪滤波器 ==================== class NoiseFilter: """ 笔迹去噪滤波器 去除采集过程中的抖动噪声和异常点 采用多级滤波策略: 1. 异常点剔除(超出合理范围的坐标) 2. 中值滤波(消除脉冲噪声) 3. 高斯平滑(减少抖动) """ def __init__(self, max_jump_distance: float = 50.0, median_window: int = 3, gaussian_sigma: float = 1.0): self._max_jump = max_jump_distance self._median_window = median_window self._gaussian_sigma = gaussian_sigma def remove_outliers(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]: """ 剔除异常跳跃点 当相邻点的距离超过阈值时,移除该异常点 常见于点阵笔摄像头短暂遮挡导致的坐标跳跃 """ if len(points) < 3: return points filtered = [points[0]] for i in range(1, len(points)): dx = points[i].x - points[i-1].x dy = points[i].y - points[i-1].y dist = math.sqrt(dx*dx + dy*dy) if dist <= self._max_jump: filtered.append(points[i]) else: logger.debug(f"剔除异常点: index={i}, distance={dist:.1f}") return filtered def median_filter(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]: """ 一维中值滤波 对X和Y坐标分别进行中值滤波,有效消除脉冲噪声 同时保留笔画的尖角特征不被过度平滑 """ if len(points) < self._median_window: return points half_w = self._median_window // 2 filtered = [] for i in range(len(points)): start = max(0, i - half_w) end = min(len(points), i + half_w + 1) window = points[start:end] median_x = sorted([p.x for p in window])[len(window) // 2] median_y = sorted([p.y for p in window])[len(window) // 2] filtered.append(RawStrokePoint( x=median_x, y=median_y, pressure=points[i].pressure, timestamp=points[i].timestamp, pen_up=points[i].pen_up )) return filtered def gaussian_smooth(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]: """ 高斯平滑滤波 使用一维高斯核对坐标序列进行卷积平滑 有效减少书写抖动,使笔画更流畅 """ if len(points) < 3: return points # 构造高斯核 kernel_size = max(3, int(self._gaussian_sigma * 4) | 1) # 确保奇数 half_k = kernel_size // 2 kernel = np.array([ math.exp(-0.5 * ((i - half_k) / self._gaussian_sigma) ** 2) for i in range(kernel_size) ]) kernel = kernel / kernel.sum() # 归一化 xs = np.array([p.x for p in points]) ys = np.array([p.y for p in points]) # 边界填充后卷积 padded_x = np.pad(xs, half_k, mode='edge') padded_y = np.pad(ys, half_k, mode='edge') smooth_x = np.convolve(padded_x, kernel, mode='valid') smooth_y = np.convolve(padded_y, kernel, mode='valid') filtered = [] for i in range(len(points)): filtered.append(RawStrokePoint( x=float(smooth_x[i]), y=float(smooth_y[i]), pressure=points[i].pressure, timestamp=points[i].timestamp, pen_up=points[i].pen_up )) return filtered def apply(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]: """执行完整的去噪流程""" result = self.remove_outliers(points) result = self.median_filter(result) result = self.gaussian_smooth(result) return result # ==================== 坐标归一化器 ==================== class CoordinateNormalizer: """ 坐标归一化器 将不同分辨率、不同纸张尺寸的点阵坐标统一归一化到标准范围 支持多种归一化策略:Min-Max归一化、Z-Score标准化、比例缩放 """ def __init__(self, target_range: Tuple[float, float] = (0.0, 1.0), preserve_aspect_ratio: bool = True): self._target_min = target_range[0] self._target_max = target_range[1] self._preserve_aspect = preserve_aspect_ratio def min_max_normalize(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]: """ Min-Max归一化 将坐标映射到[0, 1]范围,保持长宽比 """ if not points: return points xs = [p.x for p in points] ys = [p.y for p in points] min_x, max_x = min(xs), max(xs) min_y, max_y = min(ys), max(ys) # 选择统一的缩放因子以保持长宽比 if self._preserve_aspect: range_x = max_x - min_x range_y = max_y - min_y scale = max(range_x, range_y) if scale < 1e-6: scale = 1.0 else: scale = 1.0 # 分别归一化 target_range = self._target_max - self._target_min normalized = [] for p in points: if self._preserve_aspect: nx = self._target_min + (p.x - min_x) / scale * target_range ny = self._target_min + (p.y - min_y) / scale * target_range else: rx = max_x - min_x if max_x > min_x else 1.0 ry = max_y - min_y if max_y > min_y else 1.0 nx = self._target_min + (p.x - min_x) / rx * target_range ny = self._target_min + (p.y - min_y) / ry * target_range normalized.append(RawStrokePoint( x=nx, y=ny, pressure=p.pressure, timestamp=p.timestamp, pen_up=p.pen_up )) return normalized def center_normalize(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]: """ 中心归一化 将笔迹的重心平移至原点,坐标除以标准差进行缩放 适用于笔迹特征提取和模板匹配 """ if not points: return points xs = np.array([p.x for p in points]) ys = np.array([p.y for p in points]) cx, cy = np.mean(xs), np.mean(ys) std = max(np.std(np.concatenate([xs, ys])), 1e-6) normalized = [] for p in points: normalized.append(RawStrokePoint( x=(p.x - cx) / std, y=(p.y - cy) / std, pressure=p.pressure, timestamp=p.timestamp, pen_up=p.pen_up )) return normalized # ==================== 笔画分割器 ==================== class StrokeSegmenter: """ 笔画分割器 将连续的坐标点流按抬笔事件分割为独立笔画 """ def __init__(self, min_stroke_points: int = 3, penup_time_threshold_ms: int = 200): self._min_points = min_stroke_points self._penup_threshold = penup_time_threshold_ms def segment(self, points: List[RawStrokePoint]) -> List[List[RawStrokePoint]]: """将点序列分割为笔画列表""" if not points: return [] strokes = [] current = [points[0]] for i in range(1, len(points)): # 检测抬笔条件 is_penup = points[i].pen_up time_gap = points[i].timestamp - points[i-1].timestamp is_time_break = time_gap > self._penup_threshold if (is_penup or is_time_break) and len(current) >= self._min_points: strokes.append(current) current = [] if not is_penup: current.append(points[i]) if len(current) >= self._min_points: strokes.append(current) logger.debug(f"笔画分割完成: {len(points)}点 -> {len(strokes)}笔画") return strokes # ==================== 预处理管道 ==================== class StrokePreprocessor: """ 笔迹预处理管道(整合所有预处理步骤) 流程:原始坐标 → 去噪 → 归一化 → 笔画分割 → 张量转换 输出标准化的numpy数组,可直接送入AI推理模型 """ def __init__(self): self._noise_filter = NoiseFilter() self._normalizer = CoordinateNormalizer() self._segmenter = StrokeSegmenter() logger.info("笔迹预处理管道初始化完成") def process(self, raw_points: List[RawStrokePoint], target_size: Tuple[int, int] = (64, 64)) -> Dict: """ 执行完整预处理管道 返回预处理后的笔画数据和生成的图像张量 """ if not raw_points: return {"strokes": [], "image": np.zeros(target_size)} # 第一步:去噪滤波 denoised = self._noise_filter.apply(raw_points) # 第二步:坐标归一化 normalized = self._normalizer.min_max_normalize(denoised) # 第三步:笔画分割 stroke_groups = self._segmenter.segment(normalized) # 第四步:构造ProcessedStroke对象 processed_strokes = [] for idx, group in enumerate(stroke_groups): points_array = np.array([[p.x, p.y, p.pressure] for p in group], dtype=np.float32) length = sum( math.sqrt((group[i].x - group[i-1].x)**2 + (group[i].y - group[i-1].y)**2) for i in range(1, len(group)) ) duration = group[-1].timestamp - group[0].timestamp if len(group) > 1 else 0 processed_strokes.append(ProcessedStroke( points=points_array, stroke_index=idx, point_count=len(group), length=length, duration_ms=duration )) # 第五步:渲染为图像张量(用于CNN模型输入) image = self._render_to_image(normalized, target_size) logger.debug( f"预处理完成: {len(raw_points)}原始点 → {len(denoised)}去噪 → " f"{len(processed_strokes)}笔画 → {target_size}图像" ) return { "strokes": processed_strokes, "image": image, "total_points": len(denoised), "stroke_count": len(processed_strokes) } def _render_to_image(self, points: List[RawStrokePoint], size: Tuple[int, int]) -> np.ndarray: """ 将笔迹坐标渲染为灰度图像 使用Bresenham直线算法连接相邻坐标点 生成的图像可直接作为CNN模型输入 """ w, h = size image = np.zeros((h, w), dtype=np.float32) for i in range(1, len(points)): if points[i].pen_up: continue # Bresenham直线栅格化 x0 = int(points[i-1].x * (w - 1)) y0 = int(points[i-1].y * (h - 1)) x1 = int(points[i].x * (w - 1)) y1 = int(points[i].y * (h - 1)) # 裁剪到图像范围 x0 = max(0, min(w - 1, x0)) y0 = max(0, min(h - 1, y0)) x1 = max(0, min(w - 1, x1)) y1 = max(0, min(h - 1, y1)) dx = abs(x1 - x0) dy = abs(y1 - y0) sx = 1 if x0 < x1 else -1 sy = 1 if y0 < y1 else -1 err = dx - dy while True: # 根据压力值设置像素灰度 pressure = (points[i-1].pressure + points[i].pressure) / 2 image[y0, x0] = max(image[y0, x0], pressure) if x0 == x1 and y0 == y1: break e2 = 2 * err if e2 > -dy: err -= dy x0 += sx if e2 < dx: err += dx y0 += sy return image