在Windows系统下的Python开发中,串口通信是连接物理设备与软件系统的重要桥梁。无论你是在开发工业自动化系统、物联网项目,还是需要与单片机、传感器进行数据交换,掌握Python串口通信技术都是必不可少的技能。
本文将通过一个完整的串口通信工具案例,带你深入理解从基础连接到高级数据处理的全套解决方案。我们不仅要实现基本的数据收发功能,还要处理异步通信、协议解析、日志记录等实战中的关键问题,让你的Python上位机开发能力更上一层楼。
传统的串口读取是阻塞式的,会导致界面卡顿,用户体验极差。特别是在需要持续接收数据的场景下,如何保证界面响应性是首要问题。
实际应用中,串口数据往往不是完整的数据包,需要处理数据粘包、分包等情况,如何正确解析协议数据是技术难点。
设备断开、端口占用、读写异常等错误情况频发,需要完善的异常处理机制和资源清理策略。
我们采用分层架构来解决上述问题:
Python# 配置层:统一管理连接参数
class SerialConfig:
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
设计亮点:将配置独立成类,便于扩展更多参数(如数据位、停止位等),也方便配置的序列化保存。
Pythonclass AsyncSerialHandler:
def __init__(self, config: SerialConfig):
self.config = config
self.serial = None
self.is_connected = False
# 关键:使用回调机制解耦UI和通信逻辑
self.on_data_received = None
self.on_error = None
self.on_connection_changed = None
self.tx_bytes = 0
self.rx_bytes = 0
self.read_task = None
核心技巧:
on_data_received 等回调函数,实现通信层与UI层的完全解耦read_task 管理异步读取任务的生命周期Pythonasync def read_loop(self):
while self.is_connected and self.serial and self.serial.is_open:
await asyncio.sleep(0.05) # 关键:适当的延迟避免CPU过载
try:
data = self.serial.read(self.serial.in_waiting or 1)
if data and self.on_data_received:
self.rx_bytes += len(data)
self.on_data_received(data)
except Exception as ex:
if self.on_error:
self.on_error(str(ex))
break
性能优化要点:
await asyncio.sleep(0.05) 既保证了响应速度,又避免了CPU资源浪费self.serial.in_waiting 先检测是否有数据,提高读取效率Pythonclass SerialDataLogger:
def __init__(self):
self.current_log_file = None
self.log_cache = []
def start_session(self, stats):
filename = f"serial_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self.current_log_file = Path(filename)
self.log_cache = []
def log_data(self, direction, data):
record = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {direction}: {data}\n"
self.log_cache.append(record)
# 实时写入文件,确保数据不丢失
if self.current_log_file:
with open(self.current_log_file, "a", encoding="utf-8") as f:
f.write(record)
专业特性:
Pythonclass SerialProtocolParser:
def __init__(self):
self.parsers = {}
def register_parser(self, name, func):
self.parsers[name] = func
def parse(self, name, data):
if name in self.parsers:
return self.parsers[name](data)
return None
# 实际协议解析示例
def parse_modbus_rtu(data: bytes):
return "MODBUS数据:%s" % data.hex()
def parse_custom_json(data: bytes):
try:
import json
return json.loads(data)
except Exception:
return None
架构优势:
Pythondef _setup_ui(self):
"""设置用户界面"""
# 创建主框架
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 连接配置区域
config_frame = ttk.LabelFrame(main_frame, text="连接配置")
config_frame.pack(fill=tk.X, pady=(0, 10))
UI设计精髓:
LabelFrame 进行功能分区,界面清晰有序fill=tk.BOTH, expand=True 确保界面自适应窗口大小padx, pady 参数保证界面美观Pythondef _on_data_received(self, data: bytes):
"""数据接收回调"""
# 记录接收数据
self.logger.log_data('RX', data)
# 协议解析
protocol = self.protocol_var.get()
if protocol != "none":
parsed = self.parser.parse(protocol, data)
display_text = f"解析结果: {parsed}" if parsed else f"原始数据: {data.decode('utf-8', errors='ignore').strip()}"
else:
display_text = data.decode('utf-8', errors='ignore').strip()
# 关键:在主线程中更新UI
self.root.after(0, lambda: self._append_receive_text(
f"[{datetime.now().strftime('%H:%M:%S')}] 接收: {display_text}", "green"
))
线程安全要点:
self.root.after(0, callback) 是Tkinter中线程安全更新UI的标准方法errors='ignore' 避免编码错误导致程序崩溃Pythondef _setup_event_loop(self):
"""设置事件循环"""
self.loop = asyncio.new_event_loop()
self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self.loop_thread.start()
def _run_event_loop(self):
"""在后台线程运行事件循环"""
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
关键技术点:
Pythondef _on_connect_click(self):
"""连接按钮点击处理"""
# 在事件循环中执行连接
future = asyncio.run_coroutine_threadsafe(
self.serial_handler.connect(), self.loop
)
最佳实践:使用 run_coroutine_threadsafe 方法在不同线程间安全地执行异步任务。
Pythonasync def connect(self):
try:
self.serial = serial.Serial(self.config.port, self.config.baudrate, timeout=0.1)
self.is_connected = True
if self.on_connection_changed:
self.on_connection_changed(self.is_connected)
self.read_task = asyncio.create_task(self.read_loop())
except Exception as ex:
self.is_connected = False
if self.on_error:
self.on_error(str(ex))
if self.on_connection_changed:
self.on_connection_changed(self.is_connected)
错误处理策略:
Pythonimport tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import asyncio
import threading
from datetime import datetime
import serial
import serial.tools.list_ports
from pathlib import Path
# 串口配置数据结构
class SerialConfig:
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
# 简单串口异步处理器(pyserial实现)
class AsyncSerialHandler:
def __init__(self, config: SerialConfig):
self.config = config
self.serial = None
self.is_connected = False
self.on_data_received = None
self.on_error = None
self.on_connection_changed = None
self.tx_bytes = 0
self.rx_bytes = 0
self.read_task = None
@staticmethod
def get_available_ports():
return [port.device for port in serial.tools.list_ports.comports()]
def get_statistics(self):
return {"bytes_sent": self.tx_bytes, "bytes_received": self.rx_bytes}
async def connect(self):
try:
self.serial = serial.Serial(self.config.port, self.config.baudrate, timeout=0.1)
self.is_connected = True
if self.on_connection_changed:
self.on_connection_changed(self.is_connected)
self.read_task = asyncio.create_task(self.read_loop())
except Exception as ex:
self.is_connected = False
if self.on_error:
self.on_error(str(ex))
if self.on_connection_changed:
self.on_connection_changed(self.is_connected)
def disconnect(self):
self.is_connected = False
if self.read_task:
self.read_task.cancel()
if self.serial and self.serial.is_open:
self.serial.close()
if self.on_connection_changed:
self.on_connection_changed(self.is_connected)
async def send_data(self, data):
if self.serial and self.serial.is_open:
self.tx_bytes += len(data)
self.serial.write(data.encode() if isinstance(data, str) else data)
async def read_loop(self):
while self.is_connected and self.serial and self.serial.is_open:
await asyncio.sleep(0.05)
try:
data = self.serial.read(self.serial.in_waiting or 1)
if data and self.on_data_received:
self.rx_bytes += len(data)
self.on_data_received(data)
except Exception as ex:
if self.on_error:
self.on_error(str(ex))
break
# 简单串口数据日志器
class SerialDataLogger:
def __init__(self):
self.current_log_file = None
self.log_cache = []
def start_session(self, stats):
filename = f"serial_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self.current_log_file = Path(filename)
self.log_cache = []
def log_data(self, direction, data):
record = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {direction}: {data}\n"
self.log_cache.append(record)
# 写入文件
if self.current_log_file:
with open(self.current_log_file, "a", encoding="utf-8") as f:
f.write(record)
# 协议解析器及演示协议解析
class SerialProtocolParser:
def __init__(self):
self.parsers = {}
def register_parser(self, name, func):
self.parsers[name] = func
def parse(self, name, data):
if name in self.parsers:
return self.parsers[name](data)
return None
# 演示 Modbus 解析器(实际应根据协议解析)
def parse_modbus_rtu(data: bytes):
return "MODBUS数据:%s" % data.hex()
# 演示自定义 JSON 协议解析
def parse_custom_json(data: bytes):
try:
import json
return json.loads(data)
except Exception:
return None
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import asyncio
import threading
from datetime import datetime
class SerialApp:
"""串口通信应用程序"""
def __init__(self):
self.root = tk.Tk()
self.root.title("Python串口通信工具")
self.root.geometry("800x600")
self.serial_handler = None
self.logger = SerialDataLogger()
self.parser = SerialProtocolParser()
# 注册协议解析器
self.parser.register_parser("modbus", parse_modbus_rtu)
self.parser.register_parser("json", parse_custom_json)
self._setup_ui()
self._setup_event_loop()
def _setup_ui(self):
"""设置用户界面"""
# 创建主框架
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 连接配置区域
config_frame = ttk.LabelFrame(main_frame, text="连接配置")
config_frame.pack(fill=tk.X, pady=(0, 10))
# 端口选择
ttk.Label(config_frame, text="端口:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(config_frame, textvariable=self.port_var, width=15)
self.port_combo.grid(row=0, column=1, padx=5, pady=5)
self._refresh_ports()
# 波特率选择
ttk.Label(config_frame, text="波特率:").grid(row=0, column=2, sticky=tk.W, padx=5, pady=5)
self.baudrate_var = tk.StringVar(value="115200")
self.baudrate_combo = ttk.Combobox(config_frame, textvariable=self.baudrate_var,
values=["9600", "19200", "38400", "57600", "115200", "230400"],
width=10)
self.baudrate_combo.grid(row=0, column=3, padx=5, pady=5)
# 连接按钮
self.connect_btn = ttk.Button(config_frame, text="连接", command=self._on_connect_click)
self.connect_btn.grid(row=0, column=4, padx=5, pady=5)
self.disconnect_btn = ttk.Button(config_frame, text="断开", command=self._on_disconnect_click,
state=tk.DISABLED)
self.disconnect_btn.grid(row=0, column=5, padx=5, pady=5)
# 刷新端口按钮
ttk.Button(config_frame, text="刷新", command=self._refresh_ports).grid(row=0, column=6, padx=5, pady=5)
# 状态显示区域
status_frame = ttk.LabelFrame(main_frame, text="连接状态")
status_frame.pack(fill=tk.X, pady=(0, 10))
self.status_label = ttk.Label(status_frame, text="状态: 未连接", foreground="red")
self.status_label.pack(side=tk.LEFT, padx=5, pady=5)
self.stats_label = ttk.Label(status_frame, text="发送: 0 字节 | 接收: 0 字节")
self.stats_label.pack(side=tk.RIGHT, padx=5, pady=5)
# 数据区域
data_frame = ttk.LabelFrame(main_frame, text="数据交互")
data_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
# 接收数据显示
ttk.Label(data_frame, text="接收数据:").pack(anchor=tk.W, padx=5, pady=(5, 0))
self.receive_text = scrolledtext.ScrolledText(data_frame, height=15, wrap=tk.WORD)
self.receive_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 发送数据区域
send_frame = ttk.Frame(data_frame)
send_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Label(send_frame, text="发送数据:").pack(anchor=tk.W)
send_input_frame = ttk.Frame(send_frame)
send_input_frame.pack(fill=tk.X, pady=(2, 0))
self.send_entry = ttk.Entry(send_input_frame)
self.send_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.send_entry.bind("<Return>", lambda e: self._on_send_click())
ttk.Button(send_input_frame, text="发送", command=self._on_send_click).pack(side=tk.RIGHT, padx=(5, 0))
# 控制按钮区域
control_frame = ttk.Frame(send_frame)
control_frame.pack(fill=tk.X, pady=(5, 0))
ttk.Button(control_frame, text="清空接收", command=self._clear_receive).pack(side=tk.LEFT)
ttk.Button(control_frame, text="保存日志", command=self._save_log).pack(side=tk.LEFT, padx=(5, 0))
# 协议解析选择
self.protocol_var = tk.StringVar(value="none")
ttk.Label(control_frame, text="协议:").pack(side=tk.LEFT, padx=(20, 5))
protocol_combo = ttk.Combobox(control_frame, textvariable=self.protocol_var,
values=["none", "modbus", "json"], width=10)
protocol_combo.pack(side=tk.LEFT)
def _setup_event_loop(self):
"""设置事件循环"""
self.loop = asyncio.new_event_loop()
self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self.loop_thread.start()
def _run_event_loop(self):
"""在后台线程运行事件循环"""
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def _refresh_ports(self):
"""刷新可用端口列表"""
ports = AsyncSerialHandler.get_available_ports()
self.port_combo['values'] = ports
if ports and not self.port_var.get():
self.port_combo.set(ports[0])
def _on_connect_click(self):
"""连接按钮点击处理"""
if not self.port_var.get():
messagebox.showerror("错误", "请选择端口")
return
config = SerialConfig(
port=self.port_var.get(),
baudrate=int(self.baudrate_var.get())
)
self.serial_handler = AsyncSerialHandler(config)
self.serial_handler.on_data_received = self._on_data_received
self.serial_handler.on_error = self._on_error
self.serial_handler.on_connection_changed = self._on_connection_changed
# 在事件循环中执行连接
future = asyncio.run_coroutine_threadsafe(
self.serial_handler.connect(), self.loop
)
# 启动日志会话
self.logger.start_session(self.serial_handler.get_statistics())
def _on_disconnect_click(self):
"""断开按钮点击处理"""
if self.serial_handler:
self.serial_handler.disconnect()
def _on_send_click(self):
"""发送按钮点击处理"""
if not self.serial_handler or not self.serial_handler.is_connected:
messagebox.showwarning("警告", "请先连接设备")
return
data = self.send_entry.get()
if not data:
return
# 在事件循环中执行发送
future = asyncio.run_coroutine_threadsafe(
self.serial_handler.send_data(data + '\n'), self.loop
)
self.send_entry.delete(0, tk.END)
# 记录发送数据
self.logger.log_data('TX', (data + '\n').encode())
# 显示发送数据
self._append_receive_text(f"[{datetime.now().strftime('%H:%M:%S')}] 发送: {data}", "blue")
def _on_data_received(self, data: bytes):
"""数据接收回调"""
# 记录接收数据
self.logger.log_data('RX', data)
# 协议解析
protocol = self.protocol_var.get()
if protocol != "none":
parsed = self.parser.parse(protocol, data)
if parsed:
display_text = f"解析结果: {parsed}"
else:
display_text = f"原始数据: {data.decode('utf-8', errors='ignore').strip()}"
else:
display_text = data.decode('utf-8', errors='ignore').strip()
# 在主线程中更新UI
self.root.after(0, lambda: self._append_receive_text(
f"[{datetime.now().strftime('%H:%M:%S')}] 接收: {display_text}", "green"
))
# 更新统计信息
self.root.after(0, self._update_stats)
def _on_error(self, error: str):
"""错误处理回调"""
self.root.after(0, lambda: self._append_receive_text(
f"[{datetime.now().strftime('%H:%M:%S')}] 错误: {error}", "red"
))
def _on_connection_changed(self, is_connected: bool):
"""连接状态变化回调"""
def update_ui():
if is_connected:
self.status_label.config(text="状态: 已连接", foreground="green")
self.connect_btn.config(state=tk.DISABLED)
self.disconnect_btn.config(state=tk.NORMAL)
else:
self.status_label.config(text="状态: 未连接", foreground="red")
self.connect_btn.config(state=tk.NORMAL)
self.disconnect_btn.config(state=tk.DISABLED)
self.root.after(0, update_ui)
def _append_receive_text(self, text: str, color: str = "black"):
"""添加接收文本"""
self.receive_text.config(state=tk.NORMAL)
self.receive_text.tag_config(color, foreground=color)
self.receive_text.insert(tk.END, text + '\n', color)
self.receive_text.see(tk.END)
self.receive_text.config(state=tk.DISABLED)
def _update_stats(self):
"""更新统计信息"""
if self.serial_handler:
stats = self.serial_handler.get_statistics()
self.stats_label.config(
text=f"发送: {stats['bytes_sent']} 字节 | 接收: {stats['bytes_received']} 字节"
)
def _clear_receive(self):
"""清空接收区域"""
self.receive_text.config(state=tk.NORMAL)
self.receive_text.delete(1.0, tk.END)
self.receive_text.config(state=tk.DISABLED)
def _save_log(self):
"""保存日志"""
if self.logger.current_log_file and self.logger.current_log_file.exists():
messagebox.showinfo("信息", f"日志已保存到: {self.logger.current_log_file}")
else:
messagebox.showwarning("警告", "没有日志数据可保存")
def run(self):
"""运行应用程序"""
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
self.root.mainloop()
def _on_closing(self):
"""关闭程序时清理资源"""
if self.serial_handler and self.serial_handler.is_connected:
self.serial_handler.disconnect()
self.loop.call_soon_threadsafe(self.loop.stop)
self.root.destroy()
# 程序入口
if __name__ == "__main__":
app = SerialApp()
app.run()
# 程序入口
if __name__ == "__main__":
app = SerialApp()
app.run()

合理设置超时时间
Pythonself.serial = serial.Serial(self.config.port, self.config.baudrate, timeout=0.1)
超时时间设置为0.1秒,在响应速度和资源占用间找到平衡。
智能缓冲区管理
Pythondata = self.serial.read(self.serial.in_waiting or 1)
优先读取缓冲区中的所有数据,没有数据时读取1字节避免阻塞。
内存使用优化
Pythonself.receive_text.see(tk.END) # 自动滚动到最新数据
长时间运行时,定期清理显示缓冲区避免内存溢出。
通过这个完整的串口通信工具案例,我们掌握了三个核心要点:
🏗️ 架构设计:采用分层架构和回调机制,实现了高内聚低耦合的代码结构,为后续功能扩展奠定了坚实基础。
⚡ 异步处理:通过独立的事件循环和线程安全的UI更新机制,解决了传统串口通信中的阻塞问题,大幅提升了用户体验。
🛡️ 工程实践:从错误处理到日志记录,从协议解析到资源管理,每个细节都考虑了生产环境的实际需求。
这套解决方案不仅适用于串口通信,其设计思想同样适用于其他物理设备的通信开发。在Python上位机开发的道路上,掌握这些最佳实践将让你的代码更加专业和可靠。
现在就动手实践吧,让你的Python开发技能再上新台阶!
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!