车间里一台老式PLC,厂家早倒闭了,驱动没了,文档没了,就剩一根RS-232线插在那儿。老板拍桌子:"数据必须采!"——你坐在电脑前,盯着那根线发呆。
工业现场就是这么残酷。不像互联网项目,你可以用RESTful API优雅地调数据;这里的设备说话用的是串口,波特率9600,8位数据位,1位停止位,没有握手。你要么懂,要么认输。
我在一个水处理自动化项目里第一次碰这个问题,当时用C#写了一堆COM口操作代码,跑起来倒是跑起来了,但维护起来——说实话,那代码我自己三个月后都看不懂。后来改用Python + Tkinter,加上pyserial,整个界面从零到能用,两天搞定。
这篇文章就带你从头走一遍:搭界面、连串口、收发数据、实时显示,每一步都有完整代码,每一个坑都给你标出来。
bashpip install pyserial
装完验证一下:
pythonimport serial
print(serial.__version__)
能打出版本号就行。没有报错就继续。
这是新手最头疼的问题。手边没有串口设备,代码根本没法测。
解决方案是用虚拟串口对。推荐 com0com,免费,在Windows下创建一对虚拟COM口(比如COM3和COM4),一端发数据另一端就能收到,完美模拟真实硬件。
装好之后,COM3发的数据COM4能收,反过来也一样。我们的测试就用这对虚拟口。
很多人一上来就抄代码,结果数据乱码、程序卡死,根本不知道为什么。这里必须讲清楚几个基本概念——不难,但不懂就会踩坑。
**波特率(Baud Rate)**就是每秒传输的符号数。9600是工业设备最常见的默认值,意思是每秒传9600个bit。你和设备两端必须设置一模一样,差一个数字,收到的就是乱码。
数据帧格式通常写成8N1:8个数据位,无校验位(None),1个停止位。这是默认格式,大多数设备都用这个,除非文档里特别说明。
阻塞 vs 非阻塞——这是最容易让Tkinter程序卡死的元凶。serial.read()默认是阻塞的,等不到数据就一直等。把这个调用放在Tkinter主线程里,界面立刻冻结,鼠标点什么都没反应。
解决方案只有一个:把串口读写放进独立线程。 这不是建议,这是必须。
在写Tkinter界面之前,先用最简单的方式把串口通信跑通。这一步别跳过——很多人界面没写好,其实是串口本身就没通。
python# 最简单的串口发送与接收
import serial
import time
def test_serial():
# 打开COM1,波特率9600,超时1秒
ser = serial.Serial(
port='COM1',
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1 # 关键!设置超时,避免永久阻塞
)
if ser.isOpen():
print(f"串口已打开:{ser.name}")
# 发送一条测试指令(模拟Modbus读寄存器命令)
command = bytes([0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A])
ser.write(command)
print(f"已发送:{command.hex(' ').upper()}")
time.sleep(0.1) # 等设备响应
# 读取返回数据
if ser.in_waiting > 0:
response = ser.read(ser.in_waiting)
print(f"收到响应:{response.hex(' ').upper()}")
else:
print("无响应(超时)")
ser.close()
print("串口已关闭")
if __name__ == '__main__':
test_serial()
![[Pasted image 20260305144546.png]]
用com0com的话,在COM4那端开个串口调试工具(比如SSCOM),把它设置成接收模式,运行这段代码,就能看到数据过去了。
踩坑预警:serial.Serial()里的port参数,Windows下写'COM3',注意是字符串,不是数字。有些老教程写port=3,那是Python 2时代的写法,现在会报错。
热身完毕,开始正题。咱们要做一个界面,功能包括:选择COM口、设置波特率、发送十六进制指令、实时显示接收数据。
工业现场用得上的那种,不是玩具。
python# Tkinter串口调试助手
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
class SerialDebugTool:
def __init__(self, root):
self.root = root
self.root.title("串口调试助手 v1.0")
self.root.geometry("720x560")
self.root.resizable(False, False)
self.ser = None # 串口对象
self.running = False # 接收线程控制标志
self.receive_thread = None # 接收线程
self._build_ui()
# ── 构建界面 ────────────────────────────────────────── def _build_ui(self):
# 顶部:串口配置区
config_frame = tk.LabelFrame(self.root, text="串口配置", padx=8, pady=6)
config_frame.pack(fill='x', padx=10, pady=(10, 4))
tk.Label(config_frame, text="端口:").grid(row=0, column=0, sticky='e')
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(config_frame, textvariable=self.port_var, width=10, state='readonly')
self.port_combo.grid(row=0, column=1, padx=4)
tk.Label(config_frame, text="波特率:").grid(row=0, column=2, sticky='e')
self.baud_var = tk.StringVar(value='9600')
baud_combo = ttk.Combobox(config_frame, textvariable=self.baud_var, width=9,
values=['1200','2400','4800','9600','19200','38400','57600','115200'],
state='readonly')
baud_combo.grid(row=0, column=3, padx=4)
tk.Label(config_frame, text="数据位:").grid(row=0, column=4, sticky='e')
self.data_var = tk.StringVar(value='8')
ttk.Combobox(config_frame, textvariable=self.data_var, width=4,
values=['5','6','7','8'], state='readonly').grid(row=0, column=5, padx=4)
tk.Label(config_frame, text="校验:").grid(row=0, column=6, sticky='e')
self.parity_var = tk.StringVar(value='None')
ttk.Combobox(config_frame, textvariable=self.parity_var, width=6,
values=['None','Even','Odd'], state='readonly').grid(row=0, column=7, padx=4)
self.refresh_btn = tk.Button(config_frame, text="刷新", width=5, command=self._refresh_ports)
self.refresh_btn.grid(row=0, column=8, padx=(8, 2))
self.connect_btn = tk.Button(config_frame, text="打开", width=6,
bg='#4CAF50', fg='white', command=self._toggle_connect)
self.connect_btn.grid(row=0, column=9, padx=2)
# 中部:接收区
recv_frame = tk.LabelFrame(self.root, text="接收区", padx=6, pady=4)
recv_frame.pack(fill='both', expand=True, padx=10, pady=4)
self.recv_text = scrolledtext.ScrolledText(recv_frame, height=16, font=('Consolas', 10),
bg='#1e1e1e', fg='#d4d4d4', insertbackground='white')
self.recv_text.pack(fill='both', expand=True)
btn_row = tk.Frame(recv_frame)
btn_row.pack(fill='x', pady=(4, 0))
tk.Button(btn_row, text="清空接收区", command=self._clear_recv).pack(side='left')
self.hex_display_var = tk.BooleanVar(value=True)
tk.Checkbutton(btn_row, text="HEX显示", variable=self.hex_display_var).pack(side='left', padx=8)
self.auto_scroll_var = tk.BooleanVar(value=True)
tk.Checkbutton(btn_row, text="自动滚动", variable=self.auto_scroll_var).pack(side='left')
# 底部:发送区
send_frame = tk.LabelFrame(self.root, text="发送区", padx=6, pady=4)
send_frame.pack(fill='x', padx=10, pady=(4, 10))
self.send_entry = tk.Entry(send_frame, font=('Consolas', 11), width=52)
self.send_entry.pack(side='left', padx=(0, 8))
self.send_entry.insert(0, "01 03 00 00 00 01 84 0A")
self.send_entry.bind('<Return>', lambda e: self._send_data())
self.hex_send_var = tk.BooleanVar(value=True)
tk.Checkbutton(send_frame, text="HEX发送", variable=self.hex_send_var).pack(side='left', padx=4)
tk.Button(send_frame, text="发 送", bg='#2196F3', fg='white',
width=8, command=self._send_data).pack(side='left', padx=4)
self._refresh_ports()
# ── 刷新可用COM口 ───────────────────────────────────── def _refresh_ports(self):
ports = [p.device for p in serial.tools.list_ports.comports()]
self.port_combo['values'] = ports
if ports:
self.port_combo.current(0)
# ── 打开/关闭串口 ───────────────────────────────────── def _toggle_connect(self):
if self.ser and self.ser.isOpen():
self._disconnect()
else:
self._connect()
def _connect(self):
port = self.port_var.get()
if not port:
messagebox.showwarning("提示", "请先选择COM口")
return
parity_map = {'None': serial.PARITY_NONE, 'Even': serial.PARITY_EVEN, 'Odd': serial.PARITY_ODD}
try:
self.ser = serial.Serial(
port=port,
baudrate=int(self.baud_var.get()),
bytesize=int(self.data_var.get()),
parity=parity_map[self.parity_var.get()],
stopbits=serial.STOPBITS_ONE,
timeout=0.1 # 短超时,让接收线程可以定期检查退出标志
)
self.running = True
# 启动接收线程——这是关键,绝对不能在主线程里读串口
self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)
self.receive_thread.start()
self.connect_btn.config(text="关闭", bg='#f44336')
self._log(f"[系统] 已连接 {port},波特率 {self.baud_var.get()}\n", tag='sys')
except serial.SerialException as e:
messagebox.showerror("连接失败", str(e))
def _disconnect(self):
self.running = False
if self.receive_thread:
self.receive_thread.join(timeout=1.0)
if self.ser:
self.ser.close()
self.ser = None
self.connect_btn.config(text="打开", bg='#4CAF50')
self._log("[系统] 串口已关闭\n", tag='sys')
# ── 接收线程(独立线程,不阻塞UI)────────────────────
def _receive_loop(self):
while self.running:
try:
if self.ser and self.ser.in_waiting > 0:
data = self.ser.read(self.ser.in_waiting)
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
if self.hex_display_var.get():
display = data.hex(' ').upper()
else:
display = data.decode('gbk', errors='replace')
# 注意:不能直接在子线程里操作Tkinter控件!
# 必须用after()或event_generate()把更新调度回主线程
self.root.after(0, self._append_recv, f"[{timestamp}] RX: {display}\n")
else:
time.sleep(0.01) # 没数据时稍微等一下,别让CPU跑满
except Exception as e:
if self.running:
self.root.after(0, self._log, f"[错误] {e}\n", 'err')
break
# ── 在主线程中更新接收区文本 ────────────────────────── def _append_recv(self, text):
self.recv_text.insert('end', text)
if self.auto_scroll_var.get():
self.recv_text.see('end')
# ── 发送数据 ────────────────────────────────────────── def _send_data(self):
if not (self.ser and self.ser.isOpen()):
messagebox.showwarning("提示", "请先打开串口")
return
raw = self.send_entry.get().strip()
if not raw:
return
try:
if self.hex_send_var.get():
# 把"01 03 00 00"这种字符串转成字节
data = bytes.fromhex(raw.replace(' ', ''))
else:
data = raw.encode('gbk')
self.ser.write(data)
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
self._log(f"[{timestamp}] TX: {raw.upper()}\n", tag='tx')
except ValueError:
messagebox.showerror("格式错误", "HEX格式不正确,请检查输入(如:01 03 00 00)")
# ── 工具方法 ────────────────────────────────────────── def _log(self, text, tag=None):
self.recv_text.insert('end', text)
if self.auto_scroll_var.get():
self.recv_text.see('end')
def _clear_recv(self):
self.recv_text.delete('1.0', 'end')
if __name__ == '__main__':
root = tk.Tk()
app = SerialDebugTool(root)
root.mainloop()

坑一:子线程里直接操作Tkinter控件
这是最常见的崩溃原因。Tkinter不是线程安全的,子线程里调用text.insert(),轻则乱码,重则直接崩。正确做法是用root.after(0, callback),把UI更新调度回主线程执行。代码里已经这么写了,别改它。
坑二:timeout设置为None或者太长
接收线程里ser.read()如果没有超时,running = False之后线程根本退不出来,程序关不掉。设成0.1秒,每100ms检查一次退出标志,刚刚好。
坑三:中文乱码
工业设备返回的字符串,有时候用GBK编码。decode('utf-8')铁定乱码。碰到乱码先试decode('gbk', errors='replace'),errors='replace'能防止解码失败直接抛异常。
这个程序是起点,不是终点。
有了这个基础,下一步可以往三个方向走:
方向一:Modbus协议解析。工业设备80%用Modbus RTU协议,pymodbus库封装得很好,在这个串口基础上加几层解析逻辑,就能读PLC寄存器、写线圈状态。
方向二:数据持久化。接收到的数据写进SQLite或者CSV,加个matplotlib实时曲线图,一个简单的数据采集系统就成型了。
方向三:多串口并发。车间里往往不止一台设备,用threading管理多个串口实例,每台设备一个线程,统一汇总到主界面——这就是实际项目里的架构了。
问题一:你们现场用的是什么协议?Modbus RTU、Modbus TCP,还是私有协议?私有协议的逆向分析有没有好的思路,欢迎留言聊聊。
问题二:小挑战——在现有代码基础上,加一个"定时发送"功能,每隔N秒自动发送一次指令,用threading.Timer实现,看看你能不能搞定。
核心收获三点:串口参数必须两端一致;Tkinter里串口读写必须放子线程;子线程更新UI必须用after()调度回主线程。
这三条记住了,你就不会在这个方向上踩大坑。
工业控制这个领域,Python能做的事情比很多人想象的多得多。老旧设备、私有协议、恶劣环境——这些都是工程师的战场,也是Python大展身手的地方。代码能跑起来只是第一步,让它稳定跑一年、两年、五年,才是真本事。
代码模板已经给到你了,收藏备用,下次遇到串口需求直接拿来改。
#Python工业控制 #串口通信 #Tkinter开发 #pyserial #工业自动化
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!