还记得上次项目验收的时候吗?客户盯着监控界面,突然问了句:"这设备到底是在线还是离线?"——数据停在5分钟前,界面毫无反应。尴尬。
工业现场不比办公室。PLC断电、网线松动、RS485干扰...这些都是家常便饭。我见过最离谱的情况:生产线运行了半天,监控系统早就断线了,结果数据全丢。老板那个火啊!
今天咱们聊点实在的——用Tkinter做个工业级的断线重连提示灯。不仅要能显示状态,还得自动重连、记录日志。说白了,就是让设备掉线这事儿"看得见、摸得着、能追溯"。
这篇文章会给你:
第一痛:掉线悄无声息
设备断了半小时,界面还显示"连接中"。用户根本不知道出了问题,等发现时数据早凉透了。
第二痛:重连机制不靠谱
有些程序断线后就彻底死了。必须手动重启软件,甚至重启电脑。这在无人值守的场景简直是灾难。
第三痛:故障无法追溯
客户投诉说"昨天下���3点设备掉线了",你翻遍日志也找不到记录。谁信你的辩解?
不就是个灯吗?哪有那么复杂。
——如果你这么想,那就大错特错了。
工业级的提示灯至少要包含:
先来个最简单的。适合快速验证想法,或者给老板演示用。
pythonimport tkinter as tk
import random
import threading
import time
class SimpleLED:
def __init__(self, root):
self.root = root
self.root.title("断线提示灯-简化版")
# 创建画布显示LED灯
self.canvas = tk.Canvas(root, width=100, height=100, bg='white')
self.canvas.pack(pady=20)
self.led = self.canvas.create_oval(20, 20, 80, 80, fill='green')
# 状态标签
self.status_label = tk.Label(root, text="设备在线", font=("微软雅黑", 14))
self.status_label.pack()
self.is_connected = True
self.start_monitor()
def start_monitor(self):
"""启动监控线程"""
def monitor():
while True:
# 模拟检测连接状态(实际项目中替换为真实检测逻辑)
self.is_connected = random.choice([True, True, True, False]) # 75%在线概率
if self.is_connected:
self.canvas.itemconfig(self.led, fill='green')
self.status_label.config(text="设备在线", fg='green')
else:
self.canvas.itemconfig(self.led, fill='red')
self.status_label.config(text="设备离线", fg='red')
time.sleep(2) # 每2秒检测一次
thread = threading.Thread(target=monitor, daemon=True)
thread.start()
if __name__ == "__main__":
root = tk.Tk()
app = SimpleLED(root)
root.mainloop()

运行起来你会看到一个圆形LED灯,绿色表示在线,红色表示离线。每2秒自动刷新一次状态。
适用场景:实验室测试、功能演示、原型验证。
致命缺陷:
但是!这代码够简洁,5分钟就能看懂改好。有时候,简单就是最大的优势。
简单版只能看,不能用。现在咱们加点硬货——断线自动重连。
pythonimport tkinter as tk
from tkinter import ttk
import socket
import threading
import time
from datetime import datetime
class IndustrialLED:
def __init__(self, root, host='192.168.1.100', port=502):
self.root = root
self.root.title("工业断线重连提示灯")
self.root.geometry("400x300")
# 连接参数
self.host = host
self.port = port
self.socket = None
self.is_connected = False
self.reconnect_count = 0
self.max_reconnect = 10
# UI布局
self.setup_ui()
# 启动连接
self.start_connection()
def setup_ui(self):
"""搭建界面"""
# LED指示灯区域
led_frame = tk.Frame(self.root)
led_frame.pack(pady=20)
self.canvas = tk.Canvas(led_frame, width=80, height=80, bg='#f0f0f0', highlightthickness=0)
self.canvas.pack(side=tk.LEFT, padx=20)
self.led = self.canvas.create_oval(10, 10, 70, 70, fill='gray', outline='#333', width=2)
# 状态文字
status_frame = tk.Frame(led_frame)
status_frame.pack(side=tk.LEFT)
self.status_label = tk.Label(status_frame, text="未连接", font=("微软雅黑", 16, "bold"), fg='gray')
self.status_label.pack(anchor='w')
self.detail_label = tk.Label(status_frame, text=f"{self.host}:{self.port}", font=("Consolas", 10), fg='#666')
self.detail_label.pack(anchor='w')
# 操作按钮
btn_frame = tk.Frame(self.root)
btn_frame.pack(pady=10)
self.reconnect_btn = tk.Button(btn_frame, text="手动重连", command=self.manual_reconnect,
bg='#4CAF50', fg='white', font=("微软雅黑", 10), padx=20)
self.reconnect_btn.pack(side=tk.LEFT, padx=5)
self.disconnect_btn = tk.Button(btn_frame, text="断开连接", command=self.manual_disconnect,
bg='#f44336', fg='white', font=("微软雅黑", 10), padx=20)
self.disconnect_btn.pack(side=tk.LEFT, padx=5)
# 日志显示区
log_frame = tk.Frame(self.root)
log_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=10)
tk.Label(log_frame, text="连接日志:", font=("微软雅黑", 9)).pack(anchor='w')
self.log_text = tk.Text(log_frame, height=8, font=("Consolas", 9), bg='#1e1e1e', fg='#d4d4d4', insertbackground='white')
self.log_text.pack(fill=tk.BOTH, expand=True)
# 滚动条
scrollbar = tk.Scrollbar(self.log_text)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.config(yscrollcommand=scrollbar.set)
scrollbar.config(command=self.log_text.yview)
def log(self, message, level='INFO'):
"""写日志(带颜色)"""
timestamp = datetime.now().strftime("%H:%M:%S")
color_map = {'INFO': '#4CAF50', 'ERROR': '#f44336', 'WARN': '#FF9800'}
self.log_text.insert(tk.END, f"[{timestamp}] ", 'time')
self.log_text.insert(tk.END, f"{message}\n", level)
self.log_text.tag_config('time', foreground='#888')
self.log_text.tag_config(level, foreground=color_map.get(level, '#d4d4d4'))
self.log_text.see(tk.END) # 自动滚到最新
def connect_device(self):
"""尝试连接设备"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(3) # 3秒超时
self.socket.connect((self.host, self.port))
self.is_connected = True
self.reconnect_count = 0
# 更新UI
self.canvas.itemconfig(self.led, fill='#4CAF50') # 绿色
self.status_label.config(text="设备在线", fg='#4CAF50')
self.log(f"连接成功 -> {self.host}:{self.port}", 'INFO')
return True
except Exception as e:
self.is_connected = False
self.canvas.itemconfig(self.led, fill='#f44336') # 红色
self.status_label.config(text="连接失败", fg='#f44336')
self.log(f"连接失败: {str(e)}", 'ERROR')
return False
def start_connection(self):
"""启动连接线程"""
def connection_loop():
while True:
if not self.is_connected:
# 显示重连状态
self.canvas.itemconfig(self.led, fill='#FF9800') # 黄色
self.status_label.config(text="重连中...", fg='#FF9800')
if self.reconnect_count < self.max_reconnect:
self.reconnect_count += 1
self.log(f"第 {self.reconnect_count} 次重连尝试...", 'WARN')
self.connect_device()
else:
self.log("重连次数超限,停止尝试", 'ERROR')
time.sleep(30) # 等30秒后重置计数
self.reconnect_count = 0
time.sleep(5) # 每5秒检测一次
thread = threading.Thread(target=connection_loop, daemon=True)
thread.start()
def manual_reconnect(self):
"""手动重连按钮"""
self.reconnect_count = 0
self.log("用户手动触发重连", 'INFO')
threading.Thread(target=self.connect_device, daemon=True).start()
def manual_disconnect(self):
"""手动断开"""
if self.socket:
self.socket.close()
self.is_connected = False
self.canvas.itemconfig(self.led, fill='gray')
self.status_label.config(text="已断开", fg='gray')
self.log("用户手动断开连接", 'WARN')
if __name__ == "__main__":
root = tk.Tk()
# 实际使用时替换为真实的PLC或设备IP和端口
app = IndustrialLED(root, host='127.0.0.1', port=8080)
root.mainloop()

看到了吗?画风完全变了。
三色LED灯:
自动重连机制:
断线后每5秒尝试重连,最多10次。超限后暂停30秒再试,避免无限循环把设备打爆。
实时日志:
每次连接/断开都有记录,带时间戳。而且日志是彩色的!绿色INFO、红色ERROR、橙色WARN——一眼就能看出问题。
手动干预:
两个按钮——"手动重连"和"断开连接"。测试的时候特别好用,不用改代码就能模拟各种场景。
我在一个水处理项目中用过类似方案。现场有12台PLC,每台都要监控连接状态。用这个提示灯,运维人员打开界面就知道哪台设备掉线了。配合日志功能,还能回溯故障时间,写维修报告特别方便。
踩过的坑:
最初没加socket.settimeout(3),结果设备响应慢的时候程序直接卡死。加了超时后,3秒���响应直接判定为离线,用户体验提升明显。
还有个坑——daemon=True一定要加!不然关闭窗口时线程不会停止,程序成了僵尸进程。
前面两个方案解决了"能用"的问题。但要上生产环境,还得再加点料。
核心升级点:
pythonimport tkinter as tk
from tkinter import messagebox, ttk
import sqlite3
import json
import threading
import time
from datetime import datetime, timedelta
from pymodbus.client.tcp import ModbusTcpClient
from pymodbus.exceptions import ModbusException
import logging
import os
class ProductionLED:
def __init__(self, root, config_file='led_config.json'):
self.root = root
self.root.title("生产线LED监控系统")
self.root.geometry("800x600")
# 加载配置
self.load_config(config_file)
# 初始化日志系统
self.setup_logging()
# 初始化数据库
self.init_database()
# 连接状态
self.is_connected = False
self.modbus_client = None
self.last_heartbeat = None
self.offline_duration = 0
self.reconnect_attempts = 0
# LED状态
self.led_status = {}
self.setup_ui()
self.start_heartbeat()
self.connect_to_device()
def load_config(self, config_file):
"""加载配置文件"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
except FileNotFoundError:
# 创建默认配置
self.config = {
"host": "192.168.1.100",
"port": 502,
"heartbeat_interval": 5,
"reconnect_interval": 10,
"max_reconnect_attempts": 10,
"alert_threshold": 30,
"led_registers": {
"red": 0,
"yellow": 1,
"green": 2,
"status": 10
}
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=4, ensure_ascii=False)
def setup_logging(self):
"""配置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('led_monitor.log', encoding='utf-8'),
logging.StreamHandler()
]
) self.logger = logging.getLogger(__name__)
def init_database(self):
"""初始化SQLite数据库"""
self.conn = sqlite3.connect('connection_log.db', check_same_thread=False)
cursor = self.conn.cursor()
# 检查表是否存在以及结构
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='connection_events'")
table_exists = cursor.fetchone() is not None
if table_exists:
# 检查是否有 duration 字段
cursor.execute("PRAGMA table_info(connection_events)")
columns = [column[1] for column in cursor.fetchall()]
if 'duration' not in columns:
cursor.execute("ALTER TABLE connection_events ADD COLUMN duration INTEGER DEFAULT 0")
else:
# 创建连接事件表
cursor.execute('''
CREATE TABLE connection_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, event_type TEXT NOT NULL, device_ip TEXT, message TEXT, duration INTEGER DEFAULT 0 ) ''')
# LED状态表处理类似
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='led_status'")
if not cursor.fetchone():
cursor.execute('''
CREATE TABLE led_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, red_status INTEGER, yellow_status INTEGER, green_status INTEGER, device_status INTEGER ) ''')
self.conn.commit()
def setup_ui(self):
"""设置用户界面"""
# 主框架
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 连接状态框架
status_frame = ttk.LabelFrame(main_frame, text="连接状态")
status_frame.pack(fill=tk.X, pady=(0, 10))
self.status_label = ttk.Label(status_frame, text="未连接", font=('Arial', 12, 'bold'))
self.status_label.pack(pady=10)
self.ip_label = ttk.Label(status_frame, text=f"设备IP: {self.config['host']}:{self.config['port']}")
self.ip_label.pack()
self.heartbeat_label = ttk.Label(status_frame, text="心跳: --")
self.heartbeat_label.pack()
# LED状态框架
led_frame = ttk.LabelFrame(main_frame, text="LED状态监控")
led_frame.pack(fill=tk.X, pady=(0, 10))
# LED指示器
led_indicators = tk.Frame(led_frame)
led_indicators.pack(pady=10)
self.led_canvas = {}
colors = ['red', 'yellow', 'green']
for i, color in enumerate(colors):
frame = tk.Frame(led_indicators)
frame.grid(row=0, column=i, padx=20)
ttk.Label(frame, text=color.upper()).pack()
canvas = tk.Canvas(frame, width=60, height=60, bg='white')
canvas.pack(pady=5)
self.led_canvas[color] = canvas
self.draw_led(canvas, False)
# 控制按钮
control_frame = ttk.LabelFrame(main_frame, text="设备控制")
control_frame.pack(fill=tk.X, pady=(0, 10))
button_frame = tk.Frame(control_frame)
button_frame.pack(pady=10)
ttk.Button(button_frame, text="连接", command=self.manual_connect).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="断开", command=self.disconnect).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="测试红灯", command=lambda: self.control_led('red', True)).pack(side=tk.LEFT,
padx=5)
ttk.Button(button_frame, text="关闭所有", command=self.turn_off_all).pack(side=tk.LEFT, padx=5)
# 日志框架
log_frame = ttk.LabelFrame(main_frame, text="运行日志")
log_frame.pack(fill=tk.BOTH, expand=True)
# 日志文本框
log_text_frame = tk.Frame(log_frame)
log_text_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.log_text = tk.Text(log_text_frame, height=10)
scrollbar = ttk.Scrollbar(log_text_frame, orient=tk.VERTICAL, command=self.log_text.yview)
self.log_text.configure(yscrollcommand=scrollbar.set)
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 日志按钮
log_button_frame = tk.Frame(log_frame)
log_button_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(log_button_frame, text="清空日志", command=self.clear_log).pack(side=tk.LEFT, padx=5)
ttk.Button(log_button_frame, text="导出日志", command=self.export_log).pack(side=tk.LEFT, padx=5)
ttk.Button(log_button_frame, text="查看统计", command=self.show_statistics).pack(side=tk.LEFT, padx=5)
def draw_led(self, canvas, is_on, color='red'):
"""绘制LED指示器"""
canvas.delete("all")
fill_color = color if is_on else 'gray'
canvas.create_oval(10, 10, 50, 50, fill=fill_color, outline='black', width=2)
if is_on:
canvas.create_oval(20, 20, 40, 40, fill='white', outline='')
def log_message(self, message, level='INFO'):
"""记录日志消息"""
timestamp = datetime.now().strftime('%H:%M:%S')
log_entry = f"[{timestamp}] {level}: {message}\n"
# 更新UI日志
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
# 系统日志
if level == 'ERROR':
self.logger.error(message)
elif level == 'WARNING':
self.logger.warning(message)
else:
self.logger.info(message)
def log_to_db(self, event_type, message, duration=0):
"""写入数据库日志"""
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO connection_events (timestamp, event_type, device_ip, message, duration) VALUES (?, ?, ?, ?, ?) ''', (datetime.now().isoformat(), event_type, self.config['host'], message, duration))
self.conn.commit()
except Exception as e:
self.logger.error(f"数据库写入失败: {e}")
def connect_to_device(self):
"""连接到设备"""
def connect():
try:
self.modbus_client = ModbusTcpClient(self.config['host'], port=self.config['port'])
if self.modbus_client.connect():
self.is_connected = True
self.reconnect_attempts = 0
self.last_heartbeat = datetime.now()
self.root.after(0, lambda: self.status_label.config(text="已连接", foreground='green'))
self.log_message(f"成功连接到设备 {self.config['host']}:{self.config['port']}")
self.log_to_db('CONNECT', '设备连接成功')
else:
raise Exception("连接失败")
except Exception as e:
self.is_connected = False
self.root.after(0, lambda: self.status_label.config(text="连接失败", foreground='red'))
self.log_message(f"连接失败: {e}", 'ERROR')
self.log_to_db('CONNECT_FAIL', f'连接失败: {e}')
# 自动重连
self.schedule_reconnect()
threading.Thread(target=connect, daemon=True).start()
def schedule_reconnect(self):
"""调度重连"""
if self.reconnect_attempts < self.config['max_reconnect_attempts']:
self.reconnect_attempts += 1
self.log_message(f"将在 {self.config['reconnect_interval']} 秒后尝试第 {self.reconnect_attempts} 次重连")
def delayed_reconnect():
time.sleep(self.config['reconnect_interval'])
if not self.is_connected:
self.connect_to_device()
threading.Thread(target=delayed_reconnect, daemon=True).start()
else:
self.log_message("已达到最大重连次数,停止重连", 'ERROR')
self.log_to_db('RECONNECT_FAILED', '达到最大重连次数')
def manual_connect(self):
"""手动连接"""
self.reconnect_attempts = 0
self.connect_to_device()
def disconnect(self):
"""断开连接"""
if self.modbus_client and self.is_connected:
self.modbus_client.close()
self.is_connected = False
self.status_label.config(text="已断开", foreground='orange')
self.log_message("手动断开连接")
self.log_to_db('DISCONNECT', '手动断开连接')
def start_heartbeat(self):
"""启动心跳检测"""
def heartbeat_loop():
while True:
if self.is_connected and self.modbus_client:
success = self.send_heartbeat()
if success:
self.last_heartbeat = datetime.now()
self.offline_duration = 0
# 更新UI
self.root.after(0, lambda: self.heartbeat_label.config(
text=f"心跳: {self.last_heartbeat.strftime('%H:%M:%S')}"
))
# 读取LED状态
self.read_led_status()
else:
self.offline_duration += self.config['heartbeat_interval']
# 超过阈值告警
if self.offline_duration >= self.config['alert_threshold']:
self.show_alert()
self.is_connected = False
self.root.after(0, lambda: self.status_label.config(text="连接超时", foreground='red'))
self.schedule_reconnect()
time.sleep(self.config['heartbeat_interval'])
threading.Thread(target=heartbeat_loop, daemon=True).start()
def send_heartbeat(self):
"""发送心跳包"""
try:
# 读取状态寄存器作为心跳
result = self.modbus_client.read_holding_registers(
address=self.config['led_registers']['status'],
count=1,
device_id=1
)
return not result.isError()
except Exception as e:
self.log_message(f"心跳检测失败: {e}", 'ERROR')
return False
def read_led_status(self):
"""读取LED状态"""
try:
for color in ['red', 'yellow', 'green']:
register = self.config['led_registers'][color]
result = self.modbus_client.read_coils(address=register, count=1, device_id=1)
if not result.isError():
status = result.bits[0]
self.led_status[color] = status
# 更新UI
self.root.after(0, lambda c=color, s=status: self.draw_led(self.led_canvas[c], s, c))
# 记录状态到数据库
self.save_led_status_to_db()
except Exception as e:
self.log_message(f"读取LED状态失败: {e}", 'ERROR')
def save_led_status_to_db(self):
"""保存LED状态到数据库"""
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO led_status (timestamp, red_status, yellow_status, green_status, device_status) VALUES (?, ?, ?, ?, ?) ''', (
datetime.now().isoformat(),
int(self.led_status.get('red', False)),
int(self.led_status.get('yellow', False)),
int(self.led_status.get('green', False)),
1 if self.is_connected else 0
))
self.conn.commit()
except Exception as e:
self.logger.error(f"保存LED状态失败: {e}")
def control_led(self, color, state):
"""控制LED"""
if not self.is_connected:
messagebox.showwarning("警告", "设备未连接")
return
try:
register = self.config['led_registers'][color]
result = self.modbus_client.write_coil(address= register,value= state, device_id=1)
if not result.isError():
action = "开启" if state else "关闭"
self.log_message(f"{action} {color.upper()} LED成功")
# 立即更新状态显示
self.read_led_status()
else:
self.log_message(f"控制LED失败: {result}", 'ERROR')
except Exception as e:
self.log_message(f"LED控制异常: {e}", 'ERROR')
def turn_off_all(self):
"""关闭所有LED"""
for color in ['red', 'yellow', 'green']:
self.control_led(color, False)
def show_alert(self):
"""弹窗告警"""
def show_alert_dialog():
messagebox.showerror(
"连接异常",
f"设备 {self.config['host']} 已离线超过 {self.offline_duration} 秒!\n请检查网络连接或设备状态。"
)
self.root.after(0, show_alert_dialog)
self.log_to_db('ALERT', f'离线超过{self.offline_duration}秒', self.offline_duration)
self.log_message(f"告警: 设备离线超过 {self.offline_duration} 秒", 'WARNING')
def clear_log(self):
"""清空日志"""
self.log_text.delete(1.0, tk.END)
def export_log(self):
"""导出日志"""
try:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM connection_events ORDER BY timestamp DESC LIMIT 1000')
events = cursor.fetchall()
filename = f"connection_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(filename, 'w', encoding='utf-8') as f:
f.write("ID,时间戳,事件类型,设备IP,消息,持续时间\n")
for event in events:
f.write(f"{event[0]},{event[1]},{event[2]},{event[3]},{event[4]},{event[5]}\n")
messagebox.showinfo("导出成功", f"日志已导出到 {filename}")
self.log_message(f"日志导出成功: {filename}")
except Exception as e:
messagebox.showerror("导出失败", f"导出日志失败: {e}")
self.log_message(f"导出日志失败: {e}", 'ERROR')
def show_statistics(self):
"""显示统计信息"""
try:
cursor = self.conn.cursor()
# 连接统计
cursor.execute('SELECT COUNT(*) FROM connection_events WHERE event_type = "CONNECT"')
connect_count = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM connection_events WHERE event_type = "ALERT"')
alert_count = cursor.fetchone()[0]
cursor.execute('SELECT AVG(duration) FROM connection_events WHERE event_type = "ALERT" AND duration > 0')
avg_downtime = cursor.fetchone()[0] or 0
# 今日统计
today = datetime.now().date()
cursor.execute('SELECT COUNT(*) FROM connection_events WHERE DATE(timestamp) = ? AND event_type = "ALERT"',
(today,))
today_alerts = cursor.fetchone()[0]
stats_message = f"""设备连接统计:
━━━━━━━━━━━━━━━━━━━━━━
总连接次数: {connect_count}
总告警次数: {alert_count}
平均故障时长: {avg_downtime:.1f} 秒
今日告警次数: {today_alerts}
━━━━━━━━━━━━━━━━━━━━━━"""
messagebox.showinfo("连接统计", stats_message)
except Exception as e:
messagebox.showerror("统计失败", f"获取统计信息失败: {e}")
def __del__(self):
"""析构函数"""
if hasattr(self, 'conn'):
self.conn.close()
if hasattr(self, 'modbus_client') and self.modbus_client:
self.modbus_client.close()
def main():
root = tk.Tk()
app = ProductionLED(root)
def on_closing():
if messagebox.askokcancel("退出", "确定要退出程序吗?"):
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
main()

配置文件 led_config.json 示例:
json{
"host": "127.0.0.1",
"port": 502,
"heartbeat_interval": 5,
"reconnect_interval": 10,
"max_reconnect_attempts": 10,
"alert_threshold": 30,
"led_registers": {
"red": 0,
"yellow": 1,
"green": 2,
"status": 10
},
"device_info": {
"name": "生产线LED控制器",
"model": "LED-CTRL-001",
"location": "产线A区"
}
}
你可能会说:不就是个提示灯吗,至于吗?
至于。
真实的工业环境,网络可能抖动、设备可能假死(连接在但不响应)、客户可能要求出具故障报告。这时候:
我见过一个项目,因为没做心跳检测,设备死机了半天都没发现。等客户投诉时,已经损失了几十万。
工业软件不等于丑。几个简单技巧让你的界面看起来专业10倍:
1. LED灯加动画效果
pythondef led_blink(self, color):
"""LED闪烁效果"""
for _ in range(3):
self.canvas.itemconfig(self.led, fill=color)
self.root.update()
time.sleep(0.1)
self.canvas.itemconfig(self.led, fill='gray')
self.root.update()
time.sleep(0.1)
2. 使用ttk主题
pythonfrom tkinter import ttk
style = ttk.Style()
style.theme_use('clam') # 或 'alt', 'default', 'classic'
3. 给LED加光晕效果
python# 在Canvas上叠加两个圆,外圆半透明
self.canvas.create_oval(5, 5, 75, 75, fill='', outline=color, width=5, stipple='gray50')
self.canvas.create_oval(20, 20, 60, 60, fill=color)
错误示范:
python# 直接在子线程修改UI — 会崩溃!
def monitor():
self.status_label.config(text="在线") # ❌
正确做法:
pythondef monitor():
self.root.after(0, lambda: self.status_label.config(text="在线")) # ✅
Tkinter不是线程安全的。子线程要修改UI,必须通过after()方法。
每次重连前,一定要先关闭旧的socket:
pythonif self.socket:
try:
self.socket.close()
except:
pass
self.socket = socket.socket(...)
不然会出现"Address already in use"错误,特别是在Windows上。
如果用文件记录日志,记得做日志轮转:
pythonfrom logging.handlers import RotatingFileHandler
handler = RotatingFileHandler('app.log', maxBytes=10*1024*1024, backupCount=5)
我见过一个程序跑了一年,日志文件20GB,直接把硬盘撑爆。
掌握了断线重连,可以继续深挖:
进阶方向1: 通信协议集成
把示例中的socket替换为实际协议 —— Modbus TCP、OPC UA、Ethernet/IP。推荐库:pymodbus、opcua。
进阶方向2: 多设备集群监控
用ttk.Treeview做设备列表,每个设备一个LED灯。适合管理几十台PLC的场景。
进阶方向3: Web化部署
用Flask把状态暴露成REST API,前端用Vue/React做仪表盘。这样手机也能看设备状态了。
评论区聊聊:
实战挑战: 试试给代码加个"连接速度测试"功能 —— 显示ping值和丢包率。提示:用subprocess调用系统ping命令。
最后,收藏前记得点个赞~ 下次遇到设备掉线,直接翻出这篇文章,10分钟搞定提示灯功能。
把这三个方案的代码存到你的工具库,改改参数就能用。工业软件开发,就是要这种"拿来即用"的实在货。
#Python开发 #Tkinter #工业自动化 #通信协议 #故障诊断
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!