software copyright
This commit is contained in:
@@ -0,0 +1,392 @@
|
||||
# 自然写手写识别与AI分析引擎软件 V1.0
|
||||
# 笔迹预处理模块 - 笔迹数据预处理管道
|
||||
|
||||
"""
|
||||
笔迹预处理模块
|
||||
提供笔迹坐标数据的完整预处理管道:
|
||||
去噪 → 坐标归一化 → 笔画分割 → 特征增强 → 张量转换
|
||||
预处理结果作为AI推理模型的标准化输入
|
||||
"""
|
||||
|
||||
import math
|
||||
import logging
|
||||
import numpy as np
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ==================== 数据结构 ====================
|
||||
|
||||
@dataclass
|
||||
class RawStrokePoint:
|
||||
"""原始笔迹坐标点(来自点阵笔/网关的原始数据)"""
|
||||
x: float # X坐标(点阵单位)
|
||||
y: float # Y坐标(点阵单位)
|
||||
pressure: float # 压力值 (0.0-1.0)
|
||||
timestamp: int # 采集时间戳(毫秒)
|
||||
pen_up: bool = False # 抬笔标记
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessedStroke:
|
||||
"""预处理后的笔画数据"""
|
||||
points: np.ndarray # 归一化坐标数组 (N, 3) [x, y, pressure]
|
||||
stroke_index: int = 0 # 笔画序号
|
||||
point_count: int = 0 # 采样点数
|
||||
length: float = 0.0 # 笔画长度
|
||||
duration_ms: int = 0 # 书写耗时
|
||||
|
||||
|
||||
# ==================== 去噪滤波器 ====================
|
||||
|
||||
class NoiseFilter:
|
||||
"""
|
||||
笔迹去噪滤波器
|
||||
去除采集过程中的抖动噪声和异常点
|
||||
采用多级滤波策略:
|
||||
1. 异常点剔除(超出合理范围的坐标)
|
||||
2. 中值滤波(消除脉冲噪声)
|
||||
3. 高斯平滑(减少抖动)
|
||||
"""
|
||||
|
||||
def __init__(self, max_jump_distance: float = 50.0,
|
||||
median_window: int = 3, gaussian_sigma: float = 1.0):
|
||||
self._max_jump = max_jump_distance
|
||||
self._median_window = median_window
|
||||
self._gaussian_sigma = gaussian_sigma
|
||||
|
||||
def remove_outliers(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]:
|
||||
"""
|
||||
剔除异常跳跃点
|
||||
当相邻点的距离超过阈值时,移除该异常点
|
||||
常见于点阵笔摄像头短暂遮挡导致的坐标跳跃
|
||||
"""
|
||||
if len(points) < 3:
|
||||
return points
|
||||
|
||||
filtered = [points[0]]
|
||||
for i in range(1, len(points)):
|
||||
dx = points[i].x - points[i-1].x
|
||||
dy = points[i].y - points[i-1].y
|
||||
dist = math.sqrt(dx*dx + dy*dy)
|
||||
|
||||
if dist <= self._max_jump:
|
||||
filtered.append(points[i])
|
||||
else:
|
||||
logger.debug(f"剔除异常点: index={i}, distance={dist:.1f}")
|
||||
|
||||
return filtered
|
||||
|
||||
def median_filter(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]:
|
||||
"""
|
||||
一维中值滤波
|
||||
对X和Y坐标分别进行中值滤波,有效消除脉冲噪声
|
||||
同时保留笔画的尖角特征不被过度平滑
|
||||
"""
|
||||
if len(points) < self._median_window:
|
||||
return points
|
||||
|
||||
half_w = self._median_window // 2
|
||||
filtered = []
|
||||
|
||||
for i in range(len(points)):
|
||||
start = max(0, i - half_w)
|
||||
end = min(len(points), i + half_w + 1)
|
||||
window = points[start:end]
|
||||
|
||||
median_x = sorted([p.x for p in window])[len(window) // 2]
|
||||
median_y = sorted([p.y for p in window])[len(window) // 2]
|
||||
|
||||
filtered.append(RawStrokePoint(
|
||||
x=median_x, y=median_y,
|
||||
pressure=points[i].pressure,
|
||||
timestamp=points[i].timestamp,
|
||||
pen_up=points[i].pen_up
|
||||
))
|
||||
|
||||
return filtered
|
||||
|
||||
def gaussian_smooth(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]:
|
||||
"""
|
||||
高斯平滑滤波
|
||||
使用一维高斯核对坐标序列进行卷积平滑
|
||||
有效减少书写抖动,使笔画更流畅
|
||||
"""
|
||||
if len(points) < 3:
|
||||
return points
|
||||
|
||||
# 构造高斯核
|
||||
kernel_size = max(3, int(self._gaussian_sigma * 4) | 1) # 确保奇数
|
||||
half_k = kernel_size // 2
|
||||
kernel = np.array([
|
||||
math.exp(-0.5 * ((i - half_k) / self._gaussian_sigma) ** 2)
|
||||
for i in range(kernel_size)
|
||||
])
|
||||
kernel = kernel / kernel.sum() # 归一化
|
||||
|
||||
xs = np.array([p.x for p in points])
|
||||
ys = np.array([p.y for p in points])
|
||||
|
||||
# 边界填充后卷积
|
||||
padded_x = np.pad(xs, half_k, mode='edge')
|
||||
padded_y = np.pad(ys, half_k, mode='edge')
|
||||
|
||||
smooth_x = np.convolve(padded_x, kernel, mode='valid')
|
||||
smooth_y = np.convolve(padded_y, kernel, mode='valid')
|
||||
|
||||
filtered = []
|
||||
for i in range(len(points)):
|
||||
filtered.append(RawStrokePoint(
|
||||
x=float(smooth_x[i]), y=float(smooth_y[i]),
|
||||
pressure=points[i].pressure,
|
||||
timestamp=points[i].timestamp,
|
||||
pen_up=points[i].pen_up
|
||||
))
|
||||
return filtered
|
||||
|
||||
def apply(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]:
|
||||
"""执行完整的去噪流程"""
|
||||
result = self.remove_outliers(points)
|
||||
result = self.median_filter(result)
|
||||
result = self.gaussian_smooth(result)
|
||||
return result
|
||||
|
||||
|
||||
# ==================== 坐标归一化器 ====================
|
||||
|
||||
class CoordinateNormalizer:
|
||||
"""
|
||||
坐标归一化器
|
||||
将不同分辨率、不同纸张尺寸的点阵坐标统一归一化到标准范围
|
||||
支持多种归一化策略:Min-Max归一化、Z-Score标准化、比例缩放
|
||||
"""
|
||||
|
||||
def __init__(self, target_range: Tuple[float, float] = (0.0, 1.0),
|
||||
preserve_aspect_ratio: bool = True):
|
||||
self._target_min = target_range[0]
|
||||
self._target_max = target_range[1]
|
||||
self._preserve_aspect = preserve_aspect_ratio
|
||||
|
||||
def min_max_normalize(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]:
|
||||
"""
|
||||
Min-Max归一化
|
||||
将坐标映射到[0, 1]范围,保持长宽比
|
||||
"""
|
||||
if not points:
|
||||
return points
|
||||
|
||||
xs = [p.x for p in points]
|
||||
ys = [p.y for p in points]
|
||||
min_x, max_x = min(xs), max(xs)
|
||||
min_y, max_y = min(ys), max(ys)
|
||||
|
||||
# 选择统一的缩放因子以保持长宽比
|
||||
if self._preserve_aspect:
|
||||
range_x = max_x - min_x
|
||||
range_y = max_y - min_y
|
||||
scale = max(range_x, range_y)
|
||||
if scale < 1e-6:
|
||||
scale = 1.0
|
||||
else:
|
||||
scale = 1.0 # 分别归一化
|
||||
|
||||
target_range = self._target_max - self._target_min
|
||||
normalized = []
|
||||
for p in points:
|
||||
if self._preserve_aspect:
|
||||
nx = self._target_min + (p.x - min_x) / scale * target_range
|
||||
ny = self._target_min + (p.y - min_y) / scale * target_range
|
||||
else:
|
||||
rx = max_x - min_x if max_x > min_x else 1.0
|
||||
ry = max_y - min_y if max_y > min_y else 1.0
|
||||
nx = self._target_min + (p.x - min_x) / rx * target_range
|
||||
ny = self._target_min + (p.y - min_y) / ry * target_range
|
||||
normalized.append(RawStrokePoint(
|
||||
x=nx, y=ny, pressure=p.pressure,
|
||||
timestamp=p.timestamp, pen_up=p.pen_up
|
||||
))
|
||||
return normalized
|
||||
|
||||
def center_normalize(self, points: List[RawStrokePoint]) -> List[RawStrokePoint]:
|
||||
"""
|
||||
中心归一化
|
||||
将笔迹的重心平移至原点,坐标除以标准差进行缩放
|
||||
适用于笔迹特征提取和模板匹配
|
||||
"""
|
||||
if not points:
|
||||
return points
|
||||
|
||||
xs = np.array([p.x for p in points])
|
||||
ys = np.array([p.y for p in points])
|
||||
|
||||
cx, cy = np.mean(xs), np.mean(ys)
|
||||
std = max(np.std(np.concatenate([xs, ys])), 1e-6)
|
||||
|
||||
normalized = []
|
||||
for p in points:
|
||||
normalized.append(RawStrokePoint(
|
||||
x=(p.x - cx) / std,
|
||||
y=(p.y - cy) / std,
|
||||
pressure=p.pressure,
|
||||
timestamp=p.timestamp,
|
||||
pen_up=p.pen_up
|
||||
))
|
||||
return normalized
|
||||
|
||||
|
||||
# ==================== 笔画分割器 ====================
|
||||
|
||||
class StrokeSegmenter:
|
||||
"""
|
||||
笔画分割器
|
||||
将连续的坐标点流按抬笔事件分割为独立笔画
|
||||
"""
|
||||
|
||||
def __init__(self, min_stroke_points: int = 3,
|
||||
penup_time_threshold_ms: int = 200):
|
||||
self._min_points = min_stroke_points
|
||||
self._penup_threshold = penup_time_threshold_ms
|
||||
|
||||
def segment(self, points: List[RawStrokePoint]) -> List[List[RawStrokePoint]]:
|
||||
"""将点序列分割为笔画列表"""
|
||||
if not points:
|
||||
return []
|
||||
|
||||
strokes = []
|
||||
current = [points[0]]
|
||||
|
||||
for i in range(1, len(points)):
|
||||
# 检测抬笔条件
|
||||
is_penup = points[i].pen_up
|
||||
time_gap = points[i].timestamp - points[i-1].timestamp
|
||||
is_time_break = time_gap > self._penup_threshold
|
||||
|
||||
if (is_penup or is_time_break) and len(current) >= self._min_points:
|
||||
strokes.append(current)
|
||||
current = []
|
||||
|
||||
if not is_penup:
|
||||
current.append(points[i])
|
||||
|
||||
if len(current) >= self._min_points:
|
||||
strokes.append(current)
|
||||
|
||||
logger.debug(f"笔画分割完成: {len(points)}点 -> {len(strokes)}笔画")
|
||||
return strokes
|
||||
|
||||
|
||||
# ==================== 预处理管道 ====================
|
||||
|
||||
class StrokePreprocessor:
|
||||
"""
|
||||
笔迹预处理管道(整合所有预处理步骤)
|
||||
流程:原始坐标 → 去噪 → 归一化 → 笔画分割 → 张量转换
|
||||
输出标准化的numpy数组,可直接送入AI推理模型
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._noise_filter = NoiseFilter()
|
||||
self._normalizer = CoordinateNormalizer()
|
||||
self._segmenter = StrokeSegmenter()
|
||||
logger.info("笔迹预处理管道初始化完成")
|
||||
|
||||
def process(self, raw_points: List[RawStrokePoint],
|
||||
target_size: Tuple[int, int] = (64, 64)) -> Dict:
|
||||
"""
|
||||
执行完整预处理管道
|
||||
返回预处理后的笔画数据和生成的图像张量
|
||||
"""
|
||||
if not raw_points:
|
||||
return {"strokes": [], "image": np.zeros(target_size)}
|
||||
|
||||
# 第一步:去噪滤波
|
||||
denoised = self._noise_filter.apply(raw_points)
|
||||
|
||||
# 第二步:坐标归一化
|
||||
normalized = self._normalizer.min_max_normalize(denoised)
|
||||
|
||||
# 第三步:笔画分割
|
||||
stroke_groups = self._segmenter.segment(normalized)
|
||||
|
||||
# 第四步:构造ProcessedStroke对象
|
||||
processed_strokes = []
|
||||
for idx, group in enumerate(stroke_groups):
|
||||
points_array = np.array([[p.x, p.y, p.pressure] for p in group], dtype=np.float32)
|
||||
length = sum(
|
||||
math.sqrt((group[i].x - group[i-1].x)**2 + (group[i].y - group[i-1].y)**2)
|
||||
for i in range(1, len(group))
|
||||
)
|
||||
duration = group[-1].timestamp - group[0].timestamp if len(group) > 1 else 0
|
||||
|
||||
processed_strokes.append(ProcessedStroke(
|
||||
points=points_array,
|
||||
stroke_index=idx,
|
||||
point_count=len(group),
|
||||
length=length,
|
||||
duration_ms=duration
|
||||
))
|
||||
|
||||
# 第五步:渲染为图像张量(用于CNN模型输入)
|
||||
image = self._render_to_image(normalized, target_size)
|
||||
|
||||
logger.debug(
|
||||
f"预处理完成: {len(raw_points)}原始点 → {len(denoised)}去噪 → "
|
||||
f"{len(processed_strokes)}笔画 → {target_size}图像"
|
||||
)
|
||||
|
||||
return {
|
||||
"strokes": processed_strokes,
|
||||
"image": image,
|
||||
"total_points": len(denoised),
|
||||
"stroke_count": len(processed_strokes)
|
||||
}
|
||||
|
||||
def _render_to_image(self, points: List[RawStrokePoint],
|
||||
size: Tuple[int, int]) -> np.ndarray:
|
||||
"""
|
||||
将笔迹坐标渲染为灰度图像
|
||||
使用Bresenham直线算法连接相邻坐标点
|
||||
生成的图像可直接作为CNN模型输入
|
||||
"""
|
||||
w, h = size
|
||||
image = np.zeros((h, w), dtype=np.float32)
|
||||
|
||||
for i in range(1, len(points)):
|
||||
if points[i].pen_up:
|
||||
continue
|
||||
|
||||
# Bresenham直线栅格化
|
||||
x0 = int(points[i-1].x * (w - 1))
|
||||
y0 = int(points[i-1].y * (h - 1))
|
||||
x1 = int(points[i].x * (w - 1))
|
||||
y1 = int(points[i].y * (h - 1))
|
||||
|
||||
# 裁剪到图像范围
|
||||
x0 = max(0, min(w - 1, x0))
|
||||
y0 = max(0, min(h - 1, y0))
|
||||
x1 = max(0, min(w - 1, x1))
|
||||
y1 = max(0, min(h - 1, y1))
|
||||
|
||||
dx = abs(x1 - x0)
|
||||
dy = abs(y1 - y0)
|
||||
sx = 1 if x0 < x1 else -1
|
||||
sy = 1 if y0 < y1 else -1
|
||||
err = dx - dy
|
||||
|
||||
while True:
|
||||
# 根据压力值设置像素灰度
|
||||
pressure = (points[i-1].pressure + points[i].pressure) / 2
|
||||
image[y0, x0] = max(image[y0, x0], pressure)
|
||||
|
||||
if x0 == x1 and y0 == y1:
|
||||
break
|
||||
e2 = 2 * err
|
||||
if e2 > -dy:
|
||||
err -= dy
|
||||
x0 += sx
|
||||
if e2 < dx:
|
||||
err += dx
|
||||
y0 += sy
|
||||
|
||||
return image
|
||||
Reference in New Issue
Block a user