2026-05-04
Python
0

目录

🏭 当设备"哑巴"了,你怎么办?
🤔 为什么选SQLite,而不是MySQL?
📐 数据库表结构设计
🗂️ 设备基础信息表
📊 实时状态表
🚨 报警记录表
📈 日统计汇总表
🔧 数据库操作封装
💾 核心业务操作实现
🖥️ 与CustomTkinter的集成要点
🧹 数据维护:别让数据库撑死
🎯 几个实战踩坑提醒
📝 写在最后

🏭 当设备"哑巴"了,你怎么办?

车间里有台注塑机,昨天还好好的,今天一早就停了。操作工说"不知道啥时候停的",班组长说"我没收到报警",工程师打开电脑——Excel表格,上次更新是三天前。

这个场景,做工控软件的朋友应该不陌生。

设备状态监控,听起来是个"小需求",实际上坑深得很。数据怎么存?历史怎么查?报警怎么触发?界面怎么刷新不卡顿?每一个问题单独拎出来都不简单,凑在一起更是让人头大。

本文就从一个真实的生产线监控项目出发,聊聊用CustomTkinter + SQLite搭建设备状态监控系统时,数据库这块该怎么设计——不是照搬教科书,是实际踩过坑之后的经验总结。


🤔 为什么选SQLite,而不是MySQL?

先把这个问题说清楚,不然后面的设计决策没法理解。

工厂现场的监控软件,有几个特点:单机部署为主数据量中等(不是互联网那种亿级)、离线可用运维人员技术水平参差不齐

MySQL当然强,但你得装服务、配权限、维护连接池——出了问题,现场工程师大概率搞不定。SQLite呢?一个.db文件,拷贝走就是备份,零配置,嵌进Python程序里开箱即用。对于单台工控机跑的监控软件,它绰绰有余。

当然,SQLite也有局限:高并发写入会锁表,不适合多进程同时写。但在我们这个场景里——一个主进程采集数据、一个UI线程展示——完全没问题,后面会专门处理线程安全的问题。


📐 数据库表结构设计

好的表结构是整个系统的地基。这里我设计了四张核心表,每张表的存在都有明确理由。

🗂️ 设备基础信息表

sql
CREATE TABLE IF NOT EXISTS devices ( device_id TEXT PRIMARY KEY, -- 设备唯一编号,如 "INJ-001" device_name TEXT NOT NULL, -- 设备名称 device_type TEXT NOT NULL, -- 类型:注塑机/传送带/检测仪 location TEXT, -- 产线位置,如 "A线-3号位" install_date TEXT, -- 安装日期 is_active INTEGER DEFAULT 1 -- 是否启用(1=是,0=否) );

这张表基本上是静态数据,设备上线时录入,几乎不改。device_id用有意义的字符串而不是自增整数——原因很简单,现场沟通时"INJ-001停了"比"设备ID=7停了"直观多了。

📊 实时状态表

sql
CREATE TABLE IF NOT EXISTS device_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, status TEXT NOT NULL, -- running/stopped/warning/error temperature REAL, -- 温度(℃) pressure REAL, -- 压力(MPa) speed REAL, -- 转速(rpm) timestamp TEXT NOT NULL, -- ISO格式时间戳 FOREIGN KEY (device_id) REFERENCES devices(device_id) ); -- 查询性能关键:时间戳和设备ID的联合索引 CREATE INDEX IF NOT EXISTS idx_status_device_time ON device_status(device_id, timestamp DESC);

注意这里的timestamp用TEXT存ISO格式字符串(2026-04-02T09:23:45),不用DATETIME类型。为啥?SQLite的DATETIME支持其实很弱,用字符串反而更灵活,而且ISO格式字符串排序和时间排序完全一致,查"最近1小时数据"用字符串比较就行,不需要额外转换。

🚨 报警记录表

sql
CREATE TABLE IF NOT EXISTS alarms ( alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, alarm_type TEXT NOT NULL, -- overheat/pressure_high/comm_lost 等 alarm_level TEXT NOT NULL, -- info/warning/critical message TEXT, -- 报警描述 trigger_time TEXT NOT NULL, -- 触发时间 ack_time TEXT, -- 确认时间(NULL=未确认) ack_user TEXT, -- 确认人 FOREIGN KEY (device_id) REFERENCES devices(device_id) );

ack_time为NULL就代表报警未处理——这个设计比单独加个status字段更简洁,查"未确认报警"直接WHERE ack_time IS NULL,一目了然。

📈 日统计汇总表

sql
CREATE TABLE IF NOT EXISTS daily_stats ( stat_id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, stat_date TEXT NOT NULL, -- 日期,格式 "2026-04-02" running_minutes INTEGER DEFAULT 0, stop_count INTEGER DEFAULT 0, alarm_count INTEGER DEFAULT 0, avg_temperature REAL, max_temperature REAL, UNIQUE(device_id, stat_date) -- 每台设备每天只有一条记录 );

这张表是个"预计算缓存"。历史数据查询,尤其是"上个月各设备运行率"这种报表,如果每次都扫device_status的原始数据,数据量一大就很慢。提前汇总到这张表,查报表时直接读,快得多。


🔧 数据库操作封装

表结构设计完,接下来是Python代码层面的封装。这里最关键的一个问题:线程安全

CustomTkinter的UI跑在主线程,数据采集通常在后台线程,两个线程同时操作SQLite,不处理好就会碰到ProgrammingError: SQLite objects created in a thread can only be used in that same thread这个经典报错。

解决方案有几种,我倾向于用连接池 + 线程本地存储的方式:

python
import sqlite3 import threading import logging from datetime import datetime from contextlib import contextmanager from typing import Optional, List, Dict, Any logger = logging.getLogger(__name__) class DeviceDatabase: """ 设备状态数据库管理类 使用线程本地连接,确保多线程安全 """ def __init__(self, db_path: str = "device_monitor.db"): self.db_path = db_path self._local = threading.local() # 每个线程独立的连接 self._init_database() def _get_connection(self) -> sqlite3.Connection: """获取当前线程的数据库连接(没有则创建)""" if not hasattr(self._local, 'conn') or self._local.conn is None: self._local.conn = sqlite3.connect( self.db_path, timeout=10, # 等锁超时10秒 check_same_thread=False ) self._local.conn.row_factory = sqlite3.Row # 结果可按列名访问 # 开启WAL模式,读写并发性能更好 self._local.conn.execute("PRAGMA journal_mode=WAL") self._local.conn.execute("PRAGMA synchronous=NORMAL") return self._local.conn @contextmanager def get_cursor(self): """上下文管理器,自动处理事务提交和回滚""" conn = self._get_connection() cursor = conn.cursor() try: yield cursor conn.commit() except Exception as e: conn.rollback() logger.error(f"数据库操作失败,已回滚: {e}") raise finally: cursor.close() def _init_database(self): """初始化数据库,创建所有表""" with self.get_cursor() as cursor: cursor.executescript(""" CREATE TABLE IF NOT EXISTS devices ( device_id TEXT PRIMARY KEY, device_name TEXT NOT NULL, device_type TEXT NOT NULL, location TEXT, install_date TEXT, is_active INTEGER DEFAULT 1 ); CREATE TABLE IF NOT EXISTS device_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, status TEXT NOT NULL, temperature REAL, pressure REAL, speed REAL, timestamp TEXT NOT NULL, FOREIGN KEY (device_id) REFERENCES devices(device_id) ); CREATE INDEX IF NOT EXISTS idx_status_device_time ON device_status(device_id, timestamp DESC); CREATE TABLE IF NOT EXISTS alarms ( alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, alarm_type TEXT NOT NULL, alarm_level TEXT NOT NULL, message TEXT, trigger_time TEXT NOT NULL, ack_time TEXT, ack_user TEXT, FOREIGN KEY (device_id) REFERENCES devices(device_id) ); CREATE TABLE IF NOT EXISTS daily_stats ( stat_id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, stat_date TEXT NOT NULL, running_minutes INTEGER DEFAULT 0, stop_count INTEGER DEFAULT 0, alarm_count INTEGER DEFAULT 0, avg_temperature REAL, max_temperature REAL, UNIQUE(device_id, stat_date) ); """) logger.info(f"数据库初始化完成: {self.db_path}")

PRAGMA journal_mode=WAL这行值得单独说一下。WAL(Write-Ahead Logging)模式下,读操作不会阻塞写操作——UI线程查历史数据的同时,采集线程可以继续写入,互不干扰。对于我们这种"频繁写入、偶尔查询"的场景,开启WAL是个必要操作。


💾 核心业务操作实现

有了基础封装,再实现几个高频操作:

python
# ---- 写入操作 ---- def insert_status(self, device_id: str, status: str, temperature: float = None, pressure: float = None, speed: float = None) -> bool: """写入一条设备状态记录""" timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" INSERT INTO device_status (device_id, status, temperature, pressure, speed, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, (device_id, status, temperature, pressure, speed, timestamp)) return True except Exception as e: logger.error(f"写入状态失败 [{device_id}]: {e}") return False def trigger_alarm(self, device_id: str, alarm_type: str, level: str, message: str) -> Optional[int]: """触发一条报警记录,返回alarm_id""" trigger_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" INSERT INTO alarms (device_id, alarm_type, alarm_level, message, trigger_time) VALUES (?, ?, ?, ?, ?) """, (device_id, alarm_type, level, message, trigger_time)) return cursor.lastrowid except Exception as e: logger.error(f"触发报警失败 [{device_id}]: {e}") return None def acknowledge_alarm(self, alarm_id: int, user: str) -> bool: """确认报警""" ack_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" UPDATE alarms SET ack_time = ?, ack_user = ? WHERE alarm_id = ? AND ack_time IS NULL """, (ack_time, user, alarm_id)) return cursor.rowcount > 0 except Exception as e: logger.error(f"确认报警失败 [alarm_id={alarm_id}]: {e}") return False # ---- 查询操作 ---- def get_latest_status(self, device_id: str) -> Optional[Dict]: """获取设备最新状态""" try: with self.get_cursor() as cursor: cursor.execute(""" SELECT * FROM device_status WHERE device_id = ? ORDER BY timestamp DESC LIMIT 1 """, (device_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"查询最新状态失败 [{device_id}]: {e}") return None def get_status_history(self, device_id: str, hours: int = 1) -> List[Dict]: """获取最近N小时的状态历史""" from datetime import timedelta since = (datetime.now() - timedelta(hours=hours) ).strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" SELECT * FROM device_status WHERE device_id = ? AND timestamp >= ? ORDER BY timestamp ASC """, (device_id, since)) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"查询历史失败 [{device_id}]: {e}") return [] def get_pending_alarms(self) -> List[Dict]: """获取所有未确认报警""" try: with self.get_cursor() as cursor: cursor.execute(""" SELECT a.*, d.device_name, d.location FROM alarms a JOIN devices d ON a.device_id = d.device_id WHERE a.ack_time IS NULL ORDER BY a.trigger_time DESC """) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"查询未确认报警失败: {e}") return []

🖥️ 与CustomTkinter的集成要点

数据库层写好了,怎么和UI对接?这里有个绝对不能犯的错误:在UI事件回调里直接执行数据库查询。

UI主线程是单线程的。你在按钮点击回调里查了个"最近30天历史数据",查了500毫秒,界面就冻住500毫秒。用户体验直接崩。

正确做法是把耗时操作丢到后台线程,查完再通过after()回调更新UI:

python
import sqlite3 import threading import logging import queue import random import numpy as np from datetime import datetime, timedelta from contextlib import contextmanager from typing import Optional, List, Dict, Any import time import customtkinter as ctk from PIL import Image, ImageDraw import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import io # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DeviceDatabase: """设备状态数据库管理类""" def __init__(self, db_path: str = "device_monitor.db"): self.db_path = db_path self._lock = threading.RLock() self._local = threading.local() self._init_database() self._load_initial_data() def _get_connection(self) -> sqlite3.Connection: """获取当前线程的数据库连接""" if not hasattr(self._local, 'conn') or self._local.conn is None: try: self._local.conn = sqlite3.connect( self.db_path, timeout=10, check_same_thread=True ) self._local.conn.row_factory = sqlite3.Row self._local.conn.execute("PRAGMA journal_mode=WAL") self._local.conn.execute("PRAGMA synchronous=NORMAL") self._local.conn.execute("PRAGMA busy_timeout = 5000") logger.debug(f"[Thread {threading.current_thread().name}] 数据库连接已创建") except Exception as e: logger.error(f"数据库连接失败: {e}") raise return self._local.conn @contextmanager def get_cursor(self): """上下文管理器,自动处理事务""" conn = self._get_connection() cursor = conn.cursor() try: yield cursor conn.commit() except Exception as e: conn.rollback() logger.error(f"数据库操作失败,已回滚: {e}") raise finally: cursor.close() def _init_database(self): """初始化数据库结构""" with self.get_cursor() as cursor: cursor.executescript(""" CREATE TABLE IF NOT EXISTS devices ( device_id TEXT PRIMARY KEY, device_name TEXT NOT NULL, device_type TEXT NOT NULL, location TEXT, install_date TEXT, is_active INTEGER DEFAULT 1 ); CREATE TABLE IF NOT EXISTS device_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, status TEXT NOT NULL, temperature REAL, pressure REAL, speed REAL, timestamp TEXT NOT NULL, FOREIGN KEY (device_id) REFERENCES devices(device_id) ); CREATE INDEX IF NOT EXISTS idx_status_device_time ON device_status(device_id, timestamp DESC); CREATE TABLE IF NOT EXISTS alarms ( alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, alarm_type TEXT NOT NULL, alarm_level TEXT NOT NULL, message TEXT, trigger_time TEXT NOT NULL, ack_time TEXT, ack_user TEXT, FOREIGN KEY (device_id) REFERENCES devices(device_id) ); CREATE TABLE IF NOT EXISTS daily_stats ( stat_id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, stat_date TEXT NOT NULL, running_minutes INTEGER DEFAULT 0, stop_count INTEGER DEFAULT 0, alarm_count INTEGER DEFAULT 0, avg_temperature REAL, max_temperature REAL, UNIQUE(device_id, stat_date) ); """) logger.info(f"数据库初始化完成: {self.db_path}") def _load_initial_data(self): """加载初始测试数据""" try: with self.get_cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM devices") if cursor.fetchone()[0] == 0: devices = [ ("INJ-001", "注塑机#1", "注塑机", "A车间", "2024-01-15"), ("INJ-002", "注塑机#2", "注塑机", "A车间", "2024-01-15"), ("CONV-001", "传送带#1", "传送设备", "B车间", "2024-02-01"), ] cursor.executemany(""" INSERT INTO devices (device_id, device_name, device_type, location, install_date) VALUES (?, ?, ?, ?, ?) """, devices) now = datetime.now() device_data = [ ("INJ-001", "运行中", 85.5, 150.0, 45.2), ("INJ-002", "待机", 72.1, 140.0, 38.5), ("CONV-001", "运行中", 62.3, 120.0, 52.1), ] for device_id, status, temp, pressure, speed in device_data: for i in range(100): # 增加到100条历史记录 ts = (now - timedelta(seconds=i * 10)).strftime("%Y-%m-%dT%H:%M:%S") cursor.execute(""" INSERT INTO device_status (device_id, status, temperature, pressure, speed, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, (device_id, status, temp + i * 0.05, pressure, speed, ts)) alarms_data = [ ("INJ-002", "温度超高", "warning", "温度达到92℃,接近报警值"), ("INJ-001", "压力异常", "warning", "压力波动较大,需要检查"), ] for device_id, alarm_type, level, message in alarms_data: ts = (now - timedelta( minutes=alarms_data.index((device_id, alarm_type, level, message)))).strftime( "%Y-%m-%dT%H:%M:%S") cursor.execute(""" INSERT INTO alarms (device_id, alarm_type, alarm_level, message, trigger_time) VALUES (?, ?, ?, ?, ?) """, (device_id, alarm_type, level, message, ts)) logger.info("初始测试数据已加载") except Exception as e: logger.error(f"加载初始数据失败: {e}") def insert_status(self, device_id: str, status: str, temperature: float = None, pressure: float = None, speed: float = None) -> bool: """写入设备状态记录""" timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" INSERT INTO device_status (device_id, status, temperature, pressure, speed, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, (device_id, status, temperature, pressure, speed, timestamp)) logger.debug(f"状态记录已写入 [{device_id}]: 温度={temperature}℃") return True except Exception as e: logger.error(f"写入状态失败 [{device_id}]: {e}") return False def trigger_alarm(self, device_id: str, alarm_type: str, level: str, message: str) -> Optional[int]: """触发报警记录""" trigger_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" INSERT INTO alarms (device_id, alarm_type, alarm_level, message, trigger_time) VALUES (?, ?, ?, ?, ?) """, (device_id, alarm_type, level, message, trigger_time)) alarm_id = cursor.lastrowid logger.info(f"报警已触发 [alarm_id={alarm_id}, device={device_id}, level={level}]") return alarm_id except Exception as e: logger.error(f"触发报警失败 [{device_id}]: {e}") return None def acknowledge_alarm(self, alarm_id: int, user: str) -> bool: """确认报警""" ack_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" UPDATE alarms SET ack_time = ?, ack_user = ? WHERE alarm_id = ? AND ack_time IS NULL """, (ack_time, user, alarm_id)) success = cursor.rowcount > 0 if success: logger.info(f"报警已确认 [alarm_id={alarm_id}, user={user}]") return success except Exception as e: logger.error(f"确认报警失败 [alarm_id={alarm_id}]: {e}") return False def get_latest_status(self, device_id: str) -> Optional[Dict]: """获取设备最新状态""" try: with self.get_cursor() as cursor: cursor.execute(""" SELECT * FROM device_status WHERE device_id = ? ORDER BY timestamp DESC LIMIT 1 """, (device_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"查询最新状态失败 [{device_id}]: {e}") return None def get_status_history(self, device_id: str, limit: int = 100) -> List[Dict]: """获取最近N条状态历史""" try: with self.get_cursor() as cursor: cursor.execute(""" SELECT * FROM device_status WHERE device_id = ? ORDER BY timestamp DESC LIMIT ? """, (device_id, limit)) rows = cursor.fetchall() # 反转列表,使得最老的数据在前面 return [dict(row) for row in reversed(rows)] except Exception as e: logger.error(f"查询历史失败 [{device_id}]: {e}") return [] def get_pending_alarms(self) -> List[Dict]: """获取所有未确认报警""" try: with self.get_cursor() as cursor: cursor.execute(""" SELECT a.*, d.device_name, d.location FROM alarms a JOIN devices d ON a.device_id = d.device_id WHERE a.ack_time IS NULL ORDER BY a.trigger_time DESC """) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"查询未确认报警失败: {e}") return [] def get_all_devices(self) -> List[Dict]: """获取所有设备""" try: with self.get_cursor() as cursor: cursor.execute("SELECT * FROM devices WHERE is_active = 1") return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"查询设备列表失败: {e}") return [] # ============ 后台数据写入线程 ============ class DataWriterThread(threading.Thread): """后台线程:定期更新设备状态数据到数据库""" def __init__(self, db: DeviceDatabase, interval: int = 3): super().__init__(daemon=True, name="DataWriter") self.db = db self.interval = interval self._stop_event = threading.Event() self.device_states = { "INJ-001": {"status": "运行中", "temp": 85.5, "pressure": 150.0, "speed": 45.2}, "INJ-002": {"status": "待机", "temp": 72.1, "pressure": 140.0, "speed": 38.5}, "CONV-001": {"status": "运行中", "temp": 62.3, "pressure": 120.0, "speed": 52.1}, } def run(self): """线程主循环""" logger.info("后台数据写入线程启动") while not self._stop_event.is_set(): try: self._update_all_devices() self._check_alarms() time.sleep(self.interval) except Exception as e: logger.error(f"数据写入线程出错: {e}") time.sleep(self.interval) def _update_all_devices(self): """更新所有设备的状态数据""" for device_id, state in self.device_states.items(): temp_change = random.uniform(-0.5, 0.5) new_temp = state["temp"] + temp_change state["temp"] = max(50, min(100, new_temp)) pressure_change = random.uniform(-2, 2) new_pressure = state["pressure"] + pressure_change state["pressure"] = max(100, min(200, new_pressure)) speed_change = random.uniform(-1, 1) new_speed = state["speed"] + speed_change state["speed"] = max(20, min(60, new_speed)) if random.random() < 0.1: state["status"] = random.choice(["运行中", "待机", "维护中"]) self.db.insert_status( device_id, state["status"], temperature=state["temp"], pressure=state["pressure"], speed=state["speed"] ) def _check_alarms(self): """检查并触发报警条件""" for device_id, state in self.device_states.items(): if state["temp"] > 90: self.db.trigger_alarm( device_id, "过温", "critical", f"温度{state['temp']:.1f}℃,超过安全值90℃" ) elif state["temp"] > 85: if random.random() < 0.05: self.db.trigger_alarm( device_id, "温度预警", "warning", f"温度{state['temp']:.1f}℃,建议检查" ) if state["pressure"] < 100 or state["pressure"] > 200: if random.random() < 0.1: self.db.trigger_alarm( device_id, "压力异常", "warning", f"压力{state['pressure']:.1f},超出正常范围" ) def stop(self): """停止线程""" self._stop_event.set() logger.info("后台数据写入线程已停止") # ============ 自定义 CustomTkinter Canvas 显示 Matplotlib 图表 ============ class MatplotlibCanvas(ctk.CTkFrame): """在 CustomTkinter 中嵌入 Matplotlib 图表""" def __init__(self, master, **kwargs): super().__init__(master, **kwargs) self.figure = None self.canvas = None self._create_figure() def _create_figure(self): """创建初始图表""" # 设置中文字体,避免乱码问题 plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下显示中文 plt.rcParams['axes.unicode_minus'] = False self.figure = Figure(figsize=(12, 3.5), dpi=100) self.figure.patch.set_facecolor('#212121') # 深灰色背景 self.ax = self.figure.add_subplot(111) self.ax.set_facecolor('#2a2a2a') self.ax.grid(True, alpha=0.2, color='white') self.ax.set_xlabel('数据点', color='white', fontsize=10) self.ax.set_ylabel('温度 ℃', color='white', fontsize=10) self.ax.tick_params(colors='white') self.canvas = FigureCanvasTkAgg(self.figure, master=self) self.canvas.get_tk_widget().pack(fill="both", expand=True) def update_plot(self, data: List[Dict]): """更新图表数据 Args: data: 状态数据列表,包含 'temperature' 字段 """ if not data: return temperatures = [d.get('temperature', 0) for d in data] x_points = list(range(len(temperatures))) self.ax.clear() self.ax.set_facecolor('#2a2a2a') self.ax.grid(True, alpha=0.2, color='white') # 绘制线条 self.ax.plot(x_points, temperatures, color='#00BFFF', linewidth=2, label='温度') # 填充区域 self.ax.fill_between(x_points, temperatures, alpha=0.3, color='#00BFFF') # 标记最大值和最小值 if temperatures: max_temp = max(temperatures) min_temp = min(temperatures) max_idx = temperatures.index(max_temp) min_idx = temperatures.index(min_temp) self.ax.plot(max_idx, max_temp, 'ro', markersize=8) self.ax.plot(min_idx, min_temp, 'go', markersize=8) self.ax.set_xlabel('数据点', color='white', fontsize=10) self.ax.set_ylabel('温度 ℃', color='white', fontsize=10) self.ax.set_ylim(50, 100) self.ax.tick_params(colors='white') self.ax.legend(loc='upper left', facecolor='#2a2a2a', edgecolor='white') self.figure.tight_layout() self.canvas.draw() # ============ 主应用 ============ class DeviceMonitorApp(ctk.CTk): def __init__(self): super().__init__() self.db = DeviceDatabase("device_monitor.db") # 启动后台写入线程 self.writer_thread = DataWriterThread(self.db, interval=3) self.writer_thread.start() self.title("生产线设备状态监控") self.geometry("1280x900") self.resizable(True, True) self.current_device_id = "INJ-001" self._build_ui() # 立即加载一次数据 self.after(100, self._load_data) def _build_ui(self): """构建UI布局""" # 顶部:标题和设备选择 top_frame = ctk.CTkFrame(self, fg_color="transparent") top_frame.pack(fill="x", padx=20, pady=10) title = ctk.CTkLabel( top_frame, text="生产线设备状态监控系统", font=("微软雅黑", 20, "bold") ) title.pack(side="left") devices = self.db.get_all_devices() device_options = [f"{d['device_name']} ({d['device_id']})" for d in devices] self.device_var = ctk.StringVar(value=device_options[0] if device_options else "无设备") device_menu = ctk.CTkComboBox( top_frame, values=device_options, variable=self.device_var, command=self._on_device_changed, width=250 ) device_menu.pack(side="right", padx=10) # 中间:设备状态显示 status_frame = ctk.CTkFrame(self) status_frame.pack(fill="both", expand=True, padx=20, pady=10) self.status_label = ctk.CTkLabel( status_frame, text="加载中...", font=("微软雅黑", 16, "bold"), text_color="green" ) self.status_label.pack(anchor="w", pady=10) self.temp_label = ctk.CTkLabel( status_frame, text="温度: --℃", font=("微软雅黑", 14) ) self.temp_label.pack(anchor="w", pady=5) # 报警列表 alarm_title = ctk.CTkLabel( status_frame, text="待处理报警", font=("微软雅黑", 14, "bold") ) alarm_title.pack(anchor="w", pady=(20, 10)) self.alarm_frame = ctk.CTkScrollableFrame( status_frame, height=100, fg_color="gray20" ) self.alarm_frame.pack(fill="x", padx=5, pady=(0, 10)) # 温度曲线图 chart_title = ctk.CTkLabel( status_frame, text="温度趋势(最近100个数据点)", font=("微软雅黑", 14, "bold") ) chart_title.pack(anchor="w", pady=(10, 5)) self.chart_canvas = MatplotlibCanvas(status_frame, fg_color="gray10") self.chart_canvas.pack(fill="both", expand=True, padx=5) # 底部:刷新状态 bottom_frame = ctk.CTkFrame(self, fg_color="transparent") bottom_frame.pack(fill="x", padx=20, pady=10) self.refresh_label = ctk.CTkLabel( bottom_frame, text="自动刷新: 正常", font=("微软雅黑", 11), text_color="gray" ) self.refresh_label.pack(side="left") def _on_device_changed(self, value): """设备选择改变时的回调""" device_id = value.split("(")[-1].rstrip(")") self.current_device_id = device_id logger.info(f"切换设备: {device_id}") self._load_data() def _load_data(self): """在后台线程加载数据""" def _fetch(): try: latest = self.db.get_latest_status(self.current_device_id) alarms = self.db.get_pending_alarms() history = self.db.get_status_history(self.current_device_id, limit=100) self.after(0, lambda: self._update_ui(latest, alarms, history)) except Exception as e: logger.error(f"数据加载失败: {e}") self.after(0, lambda: self.refresh_label.configure( text="自动刷新: 出错", text_color="red" )) thread = threading.Thread(target=_fetch, daemon=True) thread.start() def _update_ui(self, latest_status, alarms, history): """在主线程更新UI""" if latest_status: status_text = f"设备 {self.current_device_id} - 状态: {latest_status['status']}" self.status_label.configure( text=status_text, text_color="green" if latest_status['status'] == "运行中" else "orange" ) temp = latest_status.get('temperature') if temp: temp_text = f"温度: {temp:.1f}℃ | 压力: {latest_status.get('pressure', '--'):.1f} | 速度: {latest_status.get('speed', '--'):.1f}" self.temp_label.configure(text=temp_text) else: self.status_label.configure(text="设备状态: 无数据", text_color="gray") self.temp_label.configure(text="温度: --℃") # 更新报警列表 for widget in self.alarm_frame.winfo_children(): widget.destroy() if alarms: for alarm in alarms[:5]: # 只显示最新5条 alarm_text = ( f"[{alarm['alarm_level'].upper()}] " f"{alarm['device_name']} - {alarm['message']}" ) color = "red" if alarm['alarm_level'] == 'critical' else "orange" label = ctk.CTkLabel( self.alarm_frame, text=alarm_text, text_color=color, wraplength=600 ) label.pack(anchor="w", padx=10, pady=3, fill="x") else: label = ctk.CTkLabel( self.alarm_frame, text="✓ 暂无报警", text_color="green", font=("微软雅黑", 12, "bold") ) label.pack(expand=True) # 更新温度曲线 if history: self.chart_canvas.update_plot(history) self.refresh_label.configure( text=f"自动刷新: {datetime.now().strftime('%H:%M:%S')}", text_color="gray" ) def _schedule_refresh(self): """定时刷新""" self._load_data() self.after(2000, self._schedule_refresh) def on_closing(self): """关闭应用前的清理""" logger.info("应用关闭中...") self.writer_thread.stop() self.destroy() if __name__ == "__main__": app = DeviceMonitorApp() app.after(2000, app._schedule_refresh) app.protocol("WM_DELETE_WINDOW", app.on_closing) app.mainloop()

image.png

daemon=True这个参数别忘了设置——它保证主程序退出时后台线程自动结束,不会出现关了窗口程序还在后台跑的情况。


🧹 数据维护:别让数据库撑死

生产线24小时不停,每5秒采一次数据,一台设备一天就是17280条记录,10台设备一年下来……你算算。

原始数据不可能无限保留,需要定期清理。我的做法是保留最近30天的原始数据,更早的数据只保留日统计汇总:

python
def cleanup_old_data(self, keep_days: int = 30): """清理超过keep_days天的原始状态数据""" from datetime import timedelta cutoff = (datetime.now() - timedelta(days=keep_days) ).strftime("%Y-%m-%dT%H:%M:%S") try: with self.get_cursor() as cursor: cursor.execute(""" DELETE FROM device_status WHERE timestamp < ? """, (cutoff,)) deleted = cursor.rowcount logger.info(f"清理历史数据完成,删除 {deleted} 条记录") # 清理完顺手压缩一下数据库文件 conn = self._get_connection() conn.execute("VACUUM") except Exception as e: logger.error(f"清理数据失败: {e}")

这个清理任务建议每天凌晨跑一次,用schedule库或者Windows任务计划都行。VACUUM操作会重建数据库文件回收空间,但比较耗时,放在清理之后、非业务高峰期执行。


🎯 几个实战踩坑提醒

坑一:timestamp精度问题。 如果采集频率很高(比如每秒多次),ISO格式到秒的精度不够,改用%Y-%m-%dT%H:%M:%S.%f包含微秒。

坑二:设备离线判断别靠状态字段。 有时候采集程序挂了,数据库里最后一条记录还是"running",但设备其实已经断联。正确做法是查最新记录的时间戳,超过阈值(比如30秒没数据)就判定为"通信中断",这个逻辑放在应用层,不要放在数据库里。

坑三:批量写入用executemany 如果一次要写入多台设备的状态,别循环调execute,用executemany一次提交,性能差距在10倍以上。

python
# 低效写法 for record in records: cursor.execute("INSERT INTO device_status ...", record) # 高效写法 cursor.executemany("INSERT INTO device_status ...", records)

📝 写在最后

SQLite + CustomTkinter的组合,在工厂单机监控软件这个场景里,是个很务实的选择。不需要运维MySQL,不需要网络,程序打包成exe交付,现场工程师直接用。

本文涉及的完整代码已整理开源,包含数据库初始化脚本、完整的DeviceDatabase类,以及一个可运行的CustomTkinter演示界面,供学习参考。

设计思路比代码本身更重要。表结构为什么这样分、索引为什么建在这里、线程安全为什么这样处理——这些背后的"为什么",才是可以迁移到下一个项目的东西。


标签Python CustomTkinter SQLite 工控软件 设备监控

相关信息

我用夸克网盘给你分享了「device20260504.zip」,点击链接或复制整段内容,打开「夸克APP」即可获取。 /8a3a3YPde9:/ 链接:https://pan.quark.cn/s/ee96cb8df878 提取码:7V8j

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!