329 lines
10 KiB
Python
329 lines
10 KiB
Python
# 自然写教学数据分析与学情诊断系统软件 V1.0
|
|
# main.py - 服务启动入口(FastAPI + 定时任务调度)
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
import asyncio
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
from contextlib import asynccontextmanager
|
|
|
|
from fastapi import FastAPI, Request, Response
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.middleware.trustedhost import TrustedHostMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
import uvicorn
|
|
|
|
# ============================================================
|
|
# 日志配置
|
|
# ============================================================
|
|
|
|
LOG_FORMAT = (
|
|
"%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s"
|
|
)
|
|
|
|
def setup_logging(log_level: str = "INFO") -> None:
|
|
"""初始化日志系统,同时输出到控制台和文件"""
|
|
logging.basicConfig(
|
|
level=getattr(logging, log_level.upper(), logging.INFO),
|
|
format=LOG_FORMAT,
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler(
|
|
"logs/analytics.log", encoding="utf-8", mode="a"
|
|
),
|
|
],
|
|
)
|
|
|
|
logger = logging.getLogger("writech.analytics")
|
|
|
|
|
|
# ============================================================
|
|
# 全局配置
|
|
# ============================================================
|
|
|
|
class AnalyticsConfig:
|
|
"""学情系统全局配置"""
|
|
|
|
# 服务基本配置
|
|
SERVICE_NAME: str = "writech-learning-analytics"
|
|
SERVICE_VERSION: str = "1.0.0"
|
|
HOST: str = os.getenv("ANALYTICS_HOST", "0.0.0.0")
|
|
PORT: int = int(os.getenv("ANALYTICS_PORT", "8300"))
|
|
DEBUG: bool = os.getenv("ANALYTICS_DEBUG", "false").lower() == "true"
|
|
|
|
# 数据库连接配置
|
|
CLICKHOUSE_HOST: str = os.getenv("CH_HOST", "localhost")
|
|
CLICKHOUSE_PORT: int = int(os.getenv("CH_PORT", "9000"))
|
|
CLICKHOUSE_DB: str = os.getenv("CH_DB", "writech_analytics")
|
|
CLICKHOUSE_USER: str = os.getenv("CH_USER", "default")
|
|
CLICKHOUSE_PASSWORD: str = os.getenv("CH_PASSWORD", "")
|
|
|
|
MYSQL_HOST: str = os.getenv("MYSQL_HOST", "localhost")
|
|
MYSQL_PORT: int = int(os.getenv("MYSQL_PORT", "3306"))
|
|
MYSQL_DB: str = os.getenv("MYSQL_DB", "writech_analytics")
|
|
MYSQL_USER: str = os.getenv("MYSQL_USER", "root")
|
|
MYSQL_PASSWORD: str = os.getenv("MYSQL_PASSWORD", "")
|
|
|
|
# Neo4j知识图谱连接
|
|
NEO4J_URI: str = os.getenv("NEO4J_URI", "bolt://localhost:7687")
|
|
NEO4J_USER: str = os.getenv("NEO4J_USER", "neo4j")
|
|
NEO4J_PASSWORD: str = os.getenv("NEO4J_PASSWORD", "")
|
|
|
|
# Kafka配置
|
|
KAFKA_BROKERS: str = os.getenv("KAFKA_BROKERS", "localhost:9092")
|
|
KAFKA_TOPIC_STROKE: str = "writech.stroke.raw"
|
|
KAFKA_TOPIC_GRADE: str = "writech.grade.result"
|
|
KAFKA_GROUP_ID: str = "analytics-consumer-group"
|
|
|
|
# 报告生成配置
|
|
REPORT_OUTPUT_DIR: str = os.getenv("REPORT_DIR", "/data/reports")
|
|
REPORT_TEMPLATE_DIR: str = os.getenv(
|
|
"TEMPLATE_DIR", "/data/templates"
|
|
)
|
|
|
|
# JWT鉴权密钥(与云平台共享)
|
|
JWT_SECRET: str = os.getenv("JWT_SECRET", "writech-jwt-secret-key")
|
|
JWT_ALGORITHM: str = "HS256"
|
|
|
|
# 定时任务配置
|
|
DAILY_REPORT_CRON: str = "0 2 * * *" # 每天凌晨2点
|
|
WEEKLY_REPORT_CRON: str = "0 3 * * 1" # 每周一凌晨3点
|
|
|
|
|
|
# ============================================================
|
|
# 应用生命周期管理
|
|
# ============================================================
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""应用启动和关闭时的资源管理"""
|
|
logger.info(
|
|
"正在启动 %s v%s ...",
|
|
AnalyticsConfig.SERVICE_NAME,
|
|
AnalyticsConfig.SERVICE_VERSION,
|
|
)
|
|
|
|
# 启动时初始化各服务组件
|
|
try:
|
|
# 初始化ClickHouse连接池
|
|
logger.info("初始化ClickHouse连接: %s:%d",
|
|
AnalyticsConfig.CLICKHOUSE_HOST,
|
|
AnalyticsConfig.CLICKHOUSE_PORT)
|
|
# await init_clickhouse_pool()
|
|
|
|
# 初始化MySQL连接池
|
|
logger.info("初始化MySQL连接: %s:%d",
|
|
AnalyticsConfig.MYSQL_HOST,
|
|
AnalyticsConfig.MYSQL_PORT)
|
|
# await init_mysql_pool()
|
|
|
|
# 初始化Neo4j驱动
|
|
logger.info("初始化Neo4j连接: %s", AnalyticsConfig.NEO4J_URI)
|
|
# await init_neo4j_driver()
|
|
|
|
# 启动Kafka消费者线程
|
|
logger.info("启动Kafka消费者: %s", AnalyticsConfig.KAFKA_BROKERS)
|
|
# start_kafka_consumers()
|
|
|
|
# 注册定时任务调度
|
|
logger.info("注册定时报告生成任务")
|
|
# register_cron_jobs()
|
|
|
|
logger.info("所有服务组件初始化完成")
|
|
except Exception as e:
|
|
logger.error("服务初始化失败: %s", str(e))
|
|
raise
|
|
|
|
yield
|
|
|
|
# 关闭时释放资源
|
|
logger.info("正在关闭服务...")
|
|
# await close_clickhouse_pool()
|
|
# await close_mysql_pool()
|
|
# await close_neo4j_driver()
|
|
# stop_kafka_consumers()
|
|
logger.info("服务已安全关闭")
|
|
|
|
|
|
# ============================================================
|
|
# FastAPI应用创建
|
|
# ============================================================
|
|
|
|
app = FastAPI(
|
|
title="自然写教学数据分析与学情诊断系统",
|
|
description="对学生书写及答题数据进行大数据分析,生成学情诊断报告",
|
|
version=AnalyticsConfig.SERVICE_VERSION,
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# CORS中间件(允许管理前端跨域访问)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[
|
|
"https://admin.writech.com",
|
|
"https://teacher.writech.com",
|
|
],
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST", "PUT"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# 可信主机校验
|
|
app.add_middleware(
|
|
TrustedHostMiddleware,
|
|
allowed_hosts=["*.writech.com", "localhost"],
|
|
)
|
|
|
|
|
|
# ============================================================
|
|
# 全局中间件
|
|
# ============================================================
|
|
|
|
@app.middleware("http")
|
|
async def audit_logging_middleware(request: Request, call_next):
|
|
"""审计日志中间件:记录所有数据查询与导出操作"""
|
|
start_time = datetime.now()
|
|
request_id = request.headers.get("X-Request-ID", "")
|
|
|
|
# 执行请求
|
|
response: Response = await call_next(request)
|
|
|
|
# 计算耗时
|
|
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
|
|
|
|
# 记录审计日志(数据查询和导出类接口)
|
|
if request.url.path.startswith("/api/v1/"):
|
|
logger.info(
|
|
"AUDIT | %s | %s %s | status=%d | %.1fms | user=%s",
|
|
request_id,
|
|
request.method,
|
|
request.url.path,
|
|
response.status_code,
|
|
duration_ms,
|
|
request.headers.get("X-User-ID", "anonymous"),
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
@app.middleware("http")
|
|
async def data_permission_middleware(request: Request, call_next):
|
|
"""数据权限中间件:教师仅查看本班数据,家长仅查看子女数据"""
|
|
# 从JWT中提取用户角色和权限范围
|
|
# token = request.headers.get("Authorization", "").replace("Bearer ", "")
|
|
# user_info = decode_jwt(token)
|
|
# role = user_info.get("role", "")
|
|
#
|
|
# 数据权限过滤规则:
|
|
# - teacher: 仅可访问 class_ids 范围内的数据
|
|
# - parent: 仅可访问 student_ids 范围内的数据
|
|
# - admin: 可访问本校全部数据
|
|
# - super_admin: 无限制
|
|
|
|
response = await call_next(request)
|
|
return response
|
|
|
|
|
|
# ============================================================
|
|
# 路由注册
|
|
# ============================================================
|
|
|
|
# 导入并注册各API路由模块
|
|
# from api.profile_api import router as profile_router
|
|
# from api.report_api import router as report_router
|
|
# from api.growth_api import router as growth_router
|
|
#
|
|
# app.include_router(profile_router, prefix="/api/v1/profile")
|
|
# app.include_router(report_router, prefix="/api/v1/report")
|
|
# app.include_router(growth_router, prefix="/api/v1/growth")
|
|
|
|
|
|
# ============================================================
|
|
# 健康检查接口
|
|
# ============================================================
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""健康检查端点,Kubernetes存活探针使用"""
|
|
return {
|
|
"status": "healthy",
|
|
"service": AnalyticsConfig.SERVICE_NAME,
|
|
"version": AnalyticsConfig.SERVICE_VERSION,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
|
|
|
|
@app.get("/ready")
|
|
async def readiness_check():
|
|
"""就绪检查端点,确认所有依赖服务可用"""
|
|
checks = {
|
|
"clickhouse": False,
|
|
"mysql": False,
|
|
"neo4j": False,
|
|
"kafka": False,
|
|
}
|
|
|
|
# 检查ClickHouse连接
|
|
# try:
|
|
# await clickhouse_ping()
|
|
# checks["clickhouse"] = True
|
|
# except Exception:
|
|
# pass
|
|
|
|
all_ready = all(checks.values())
|
|
return JSONResponse(
|
|
status_code=200 if all_ready else 503,
|
|
content={
|
|
"ready": all_ready,
|
|
"checks": checks,
|
|
},
|
|
)
|
|
|
|
|
|
# ============================================================
|
|
# 全局异常处理
|
|
# ============================================================
|
|
|
|
@app.exception_handler(Exception)
|
|
async def global_exception_handler(request: Request, exc: Exception):
|
|
"""全局异常捕获,返回统一错误格式"""
|
|
logger.error(
|
|
"未处理异常 | %s %s | %s: %s",
|
|
request.method,
|
|
request.url.path,
|
|
type(exc).__name__,
|
|
str(exc),
|
|
)
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"code": 500,
|
|
"message": "服务内部错误",
|
|
"detail": str(exc) if AnalyticsConfig.DEBUG else None,
|
|
},
|
|
)
|
|
|
|
|
|
# ============================================================
|
|
# 启动入口
|
|
# ============================================================
|
|
|
|
if __name__ == "__main__":
|
|
# 确保日志目录存在
|
|
os.makedirs("logs", exist_ok=True)
|
|
os.makedirs(AnalyticsConfig.REPORT_OUTPUT_DIR, exist_ok=True)
|
|
|
|
setup_logging("DEBUG" if AnalyticsConfig.DEBUG else "INFO")
|
|
logger.info("启动学情诊断系统服务...")
|
|
|
|
uvicorn.run(
|
|
"main:app",
|
|
host=AnalyticsConfig.HOST,
|
|
port=AnalyticsConfig.PORT,
|
|
reload=AnalyticsConfig.DEBUG,
|
|
workers=4 if not AnalyticsConfig.DEBUG else 1,
|
|
log_level="info",
|
|
)
|