用Python写一个MODBUS串口通信,我自己写个工具不就完了?用Python + Tkinter搞了个便携式抄表程序,装在笔记本上——读取速度提升5倍不说,还能一键导出Excel,老板看了都说"这小伙子有想法"。
今天就把这套方案分享给你们。如果你也在搞工业自动化、设备监控、或者任何需要通过串口读Modbus设备的活儿,这篇文章能让你少走半年弯路。
第一,MODBUS协议压根不是给人类设计的。
你以为的通信:发个"给我读数据",设备回你"123.45"。
实际的通信:发送十六进制字节流01 03 00 00 00 02 C4 0B,然后解析返回的01 03 04 00 7B 00 C8 XX XX——最后两字节还是CRC校验,错一位全废。
我见过太多新手直接用serial.write("read data"),然后疑惑为啥设备没反应。大哥,Modbus从站听不懂英语啊!
其次,串口通信的各种参数组合能把人绕晕。
波特率(9600? 19200? 115200?)、数据位(7? 8?)、停止位(1? 2?)、校验位(None? Even? Odd?)——排列组合一下,可能性几十种。设备说明书上写的参数不清楚,或者你拿到的二手设备压根没说明书,那就只能一个个试。
最后,界面和逻辑混在一起,程序很快就变成屎山。
很多人写串口程序,直接在按钮回调里写通信代码。短期看没问题,但等你要加日志、加自动重连、加多设备轮询——代码乱成一团,改都不敢改。
我用过的那些"专业设备":
所以自己动手,真不是为了省钱(好吧也有一点),主要是定制化需求满足不了。
在撸代码之前,咱得先搞清楚这玩意儿怎么工作的。别怕,我用人话讲。
想象一���旧时代的电报:
MODBUS RTU就是这个套路。唯一的区别是,"摩斯密码"换成了字节流。
从站地址 | 功能码 | 起始地址 | 数据数量 | CRC校验 01 | 03 | 00 00 | 00 02 | C4 0B
解释一下:
设备收到后,会回你类似这样的数据:
01 03 04 00 7B 00 C8 XX XX
其中00 7B 00 C8就是你要的数据(两个寄存器的值:123和200)。
这个是很多人的拦路虎。CRC(循环冗余校验)是一种错误检测机制——保证数据传输过程中没被干扰。
好消息:你不用手写算法。Python的minimalmodbus或者pymodbus库都自动帮你算好了。实在要自己算,也就十来行代码(我后面会给出来)。
先来个最快能跑起来的版本。这个方案适合:你就是想快速验证通信,或者做个一次性的抄表任务。
bashpip install pyserial minimalmodbus
minimalmodbus是个神器——把Modbus RTU通信封装得超简单,非常适合新手。
pythonimport tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import minimalmodbus
import serial.tools.list_ports
import datetime
class SimpleModbusReader:
def __init__(self, root):
self.root = root
self.root.title("串口抄表工具 - 快速版")
self.root.geometry("600x500")
# 连接配置区
config_frame = ttk.LabelFrame(root, text="连接配置", padding=10)
config_frame.pack(fill=tk.X, padx=10, pady=10)
# 串口选择
ttk.Label(config_frame, text="串口:").grid(row=0, column=0, sticky=tk.W)
self.port_combo = ttk.Combobox(config_frame, width=15, state='readonly')
self.port_combo.grid(row=0, column=1, padx=5)
self.refresh_ports()
ttk.Button(config_frame, text="🔄", width=3,
command=self.refresh_ports).grid(row=0, column=2)
# 波特率
ttk.Label(config_frame, text="波特率:").grid(row=0, column=3, padx=(20, 0))
self.baudrate_var = tk.StringVar(value="9600")
baudrate_combo = ttk.Combobox(config_frame, textvariable=self.baudrate_var,
values=["2400", "4800", "9600", "19200", "38400"],
width=10, state='readonly')
baudrate_combo.grid(row=0, column=4, padx=5)
# 从站地址
ttk.Label(config_frame, text="从站地址:").grid(row=1, column=0, sticky=tk.W, pady=5)
self.slave_addr = tk.IntVar(value=1)
ttk.Entry(config_frame, textvariable=self.slave_addr, width=10).grid(row=1, column=1)
# 读取参数区
read_frame = ttk.LabelFrame(root, text="读取参数", padding=10)
read_frame.pack(fill=tk.X, padx=10, pady=10)
ttk.Label(read_frame, text="寄存器地址:").grid(row=0, column=0)
self.reg_addr = tk.IntVar(value=0)
ttk.Entry(read_frame, textvariable=self.reg_addr, width=10).grid(row=0, column=1, padx=5)
ttk.Label(read_frame, text="数据数量:").grid(row=0, column=2, padx=(20, 0))
self.reg_count = tk.IntVar(value=2)
ttk.Entry(read_frame, textvariable=self.reg_count, width=10).grid(row=0, column=3, padx=5)
# 操作按钮
btn_frame = ttk.Frame(root)
btn_frame.pack(pady=10)
ttk.Button(btn_frame, text="📖 读取数据",
command=self.read_data).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="🗑️ 清空日志",
command=self.clear_log).pack(side=tk.LEFT, padx=5)
# 日志显示区
log_frame = ttk.LabelFrame(root, text="通信日志", padding=10)
log_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
self.log_text = scrolledtext.ScrolledText(log_frame, height=15,
font=("Consolas", 9))
self.log_text.pack(fill=tk.BOTH, expand=True)
self.instrument = None
def refresh_ports(self):
"""刷新串口列表"""
ports = [port.device for port in serial.tools.list_ports.comports()]
self.port_combo['values'] = ports
if ports:
self.port_combo.current(0)
def log(self, message, level="INFO"):
"""写日志"""
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
log_line = f"[{timestamp}] {level}: {message}\n"
self.log_text.insert(tk.END, log_line)
self.log_text.see(tk.END)
self.root.update_idletasks() # 强制刷新界面
def read_data(self):
"""读取Modbus数据"""
try:
# 初始化串口
if self.instrument is None or self.instrument.serial.port != self.port_combo.get():
port = self.port_combo.get()
if not port:
messagebox.showerror("错误", "请选择串口")
return
self.instrument = minimalmodbus.Instrument(port, self.slave_addr.get())
self.instrument.serial.baudrate = int(self.baudrate_var.get())
self.instrument.serial.timeout = 1 # 1秒超时
self.log(f"已连接到 {port},波特率 {self.baudrate_var.get()}")
# 读取寄存器
reg_addr = self.reg_addr.get()
count = self.reg_count.get()
self.log(f"读取从站{self.slave_addr.get()},地址{reg_addr},数量{count}")
# 读取多个寄存器(功能码03)
values = self.instrument.read_registers(reg_addr, count)
# 显示结果
self.log(f"✅ 读取成功: {values}", "SUCCESS")
# 如果是2个寄存器,尝试组合成32位浮点数(很多电表这样存数据)
if count == 2:
# 将两个16位整数组合成32位
combined = (values[0] << 16) | values[1]
self.log(f" 组合值(32位): {combined}")
except Exception as e:
self.log(f"❌ 读取失败: {str(e)}", "ERROR")
def clear_log(self):
"""清空日志"""
self.log_text.delete(1.0, tk.END)
if __name__ == "__main__":
root = tk.Tk()
app = SimpleModbusReader(root)
root.mainloop()

我拿这个改版在三个地方用过:
性能表现:
坑1:某些设备的寄存器地址要减1
Modbus协议里地址从0开始,但有些设备说明书写的是从1开始。比如说明书说"电压存在地址1",你实际要读地址0。这个我被坑过好几次。
坑2:timeout设太短会频繁报错
有些老设备反应慢,你给它1秒timeout可能不够。建议先设2秒,稳定后再优化。
坑3:USB转串口线质量影响巨大
我用过10块钱的淘宝线,通信失败率30%;换了个40块的品牌线,直接降到5%以下。别在线上省钱。
前面那个方案够简单,但功能太基础了。真要在项目里用,你需要:
这个方案我在一个光伏电站项目用了快两年,稳得一批。
用MVC模式分离关注点:
代码有点长,我挑关键部分讲。
pythonimport minimalmodbus
import serial
import time
from typing import List, Tuple, Optional
import logging
class ModbusRTUClient:
"""MODBUS RTU客户端(支持重连和错误处理)"""
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.instrument = None
self.logger = logging.getLogger(__name__)
def connect(self, slave_address: int) -> bool:
"""连接到指定从站"""
try:
self.instrument = minimalmodbus.Instrument(self.port, slave_address)
self.instrument.serial.baudrate = self.baudrate
self.instrument.serial.timeout = self.timeout
self.instrument.serial.bytesize = 8
self.instrument.serial.parity = serial.PARITY_NONE
self.instrument.serial.stopbits = 1
# 测试连接(读一个寄存器试试)
self.instrument.read_register(0, functioncode=3)
self.logger.info(f"成功连接从站 {slave_address}")
return True
except Exception as e:
self.logger.error(f"连接失败: {e}")
return False
def read_registers_with_retry(self, address: int, count: int,
max_retries: int = 3) -> Optional[List[int]]:
"""带重试机制的读取"""
for attempt in range(max_retries):
try:
values = self.instrument.read_registers(address, count)
return values
except Exception as e:
self.logger.warning(f"读取失败 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(0.5) # 重试前等一下
else:
raise
return None
def read_float32(self, address: int, byte_order: str = 'big') -> Optional[float]:
"""读取32位浮点数(占用2个寄存器)"""
try:
values = self.read_registers_with_retry(address, 2)
if values:
# 根据字节序组合
if byte_order == 'big':
combined = (values[0] << 16) | values[1]
else:
combined = (values[1] << 16) | values[0]
# 转换为浮点数(这里简化了,实际要用struct模块)
import struct
bytes_data = combined.to_bytes(4, byteorder='big')
return struct.unpack('>f', bytes_data)[0]
except Exception as e:
self.logger.error(f"读取浮点数失败: {e}")
return None
def close(self):
"""关闭连接"""
if self.instrument and self.instrument.serial.is_open:
self.instrument.serial.close()
self.logger.info("连接已关闭")
python# 设备配置示例
DEVICE_CONFIG = [
{
'name': '1号配电柜-电表',
'slave_addr': 1,
'registers': [
{'addr': 0, 'count': 2, 'name': '电压', 'unit': 'V', 'type': 'float32'},
{'addr': 2, 'count': 2, 'name': '电流', 'unit': 'A', 'type': 'float32'},
{'addr': 4, 'count': 2, 'name': '功率', 'unit': 'kW', 'type': 'float32'},
]
},
{
'name': '2号配电柜-电表',
'slave_addr': 2,
'registers': [
{'addr': 0, 'count': 2, 'name': '电压', 'unit': 'V', 'type': 'float32'},
# ... 同上
]
}
]
def batch_read_devices(client: ModbusRTUClient, config: list) -> dict:
"""批量读取所有设备"""
results = {}
for device in config:
device_name = device['name']
slave_addr = device['slave_addr']
print(f"\n正在读取: {device_name} (从站{slave_addr})")
if not client.connect(slave_addr):
results[device_name] = {'error': '连接失败'}
continue
device_data = {}
for reg in device['registers']:
try:
if reg['type'] == 'float32':
value = client.read_float32(reg['addr'])
else:
values = client.read_registers_with_retry(reg['addr'], reg['count'])
value = values[0] if values else None
device_data[reg['name']] = {
'value': value,
'unit': reg['unit']
}
print(f" ✓ {reg['name']}: {value} {reg['unit']}")
except Exception as e:
device_data[reg['name']] = {'error': str(e)}
print(f" ✗ {reg['name']}: 读取失败")
results[device_name] = device_data
time.sleep(0.1) # 设备之间间隔一下
return results
pythondef export_to_excel(data: dict, filename: str = None):
"""导出抄表数据到Excel"""
try:
import pandas as pd
from datetime import datetime
if filename is None:
filename = f"抄表数据_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
# 整理数据
rows = []
for device_name, device_data in data.items():
if 'error' in device_data:
rows.append({
'设备名称': device_name,
'参数': 'ERROR',
'数值': device_data['error'],
'单位': '-'
})
else:
for param_name, param_data in device_data.items():
rows.append({
'设备名称': device_name,
'参数': param_name,
'数值': param_data.get('value', 'N/A'),
'单位': param_data.get('unit', '')
})
df = pd.DataFrame(rows)
df.to_excel(filename, index=False)
print(f"\n✅ 数据已导出: {filename}")
return True
except ImportError:
print("❌ 请先安装pandas和openpyxl: pip install pandas openpyxl")
return False

我在某光伏电站项目用这套代码:
性能对比:
| 指标 | 手工抄表 | 方案一 | 方案二 |
|---|---|---|---|
| 100台设备耗时 | 120分钟 | 30分钟 | 5分钟 |
| 错误率 | 10% | 8% | <2% |
| 数据可追溯性 | 无 | 差 | 完整日志 |
不同厂家的电表、传感器,寄存器地址都不一样。你可以做个"设备指纹识别":
pythonDEVICE_FINGERPRINTS = {
'DTSD1352': { # 某品牌电表
'test_register': 0x0000,
'expected_range': (220, 240), # 电压范围
'registers': {...}
},
'AM2301': { # 温湿度传感器
'test_register': 0x0001,
'expected_range': (0, 100),
'registers': {...}
}
}
def auto_detect_device(client, slave_addr):
"""自动检测设备型号"""
for device_type, config in DEVICE_FINGERPRINTS.items():
try:
value = client.read_register(config['test_register'])
if config['expected_range'][0] <= value <= config['expected_range'][1]:
return device_type, config
except:
continue
return None, None
做个"健康度指标",实时显示通信质量:
pythonclass CommunicationMonitor:
def __init__(self):
self.total_requests = 0
self.failed_requests = 0
self.response_times = []
def record_request(self, success: bool, response_time: float):
self.total_requests += 1
if not success:
self.failed_requests += 1
self.response_times.append(response_time)
# 只保留最近100次
if len(self.response_times) > 100:
self.response_times.pop(0)
def get_health_score(self) -> float:
"""计算健康度(0-100分)"""
if self.total_requests == 0:
return 100.0
success_rate = 1 - (self.failed_requests / self.total_requests)
avg_response = sum(self.response_times) / len(self.response_times)
# 成功率占70%,响应速度占30%
score = success_rate * 70 + (1 - min(avg_response, 2) / 2) * 30
return round(score, 1)
很多时候设备通信成功了,但数据不对(比如电压读出来是999999)。加个合理性检查:
pythondef validate_data(param_name, value, expected_range):
"""数据合理性校验"""
if value is None:
return False, "数据为空"
min_val, max_val = expected_range
if not (min_val <= value <= max_val):
return False, f"数据超出合理范围({min_val}-{max_val})"
return True, "正常"
# 使用示例
is_valid, msg = validate_data('电压', voltage, (180, 260))
if not is_valid:
logging.warning(f"异常数据告警: {msg}")
schedule库)pymodbus的TCP模式)我特别想知道:你是在抄什么设备的表?电表?水表?还是工业传感器?遇到过什么奇葩问题?评论区唠唠,说不定我能帮你解决。
如果这套方案帮你省了时间(或者让你在老板面前涨脸了),记得点赞让更多人看到。收藏的同时别忘了真的去试一试——我这代码都是实战验证过的,拿来就能用!
相关技术标签:#Python工业自动化 #Modbus通信 #串口编程 #Tkinter桌面应用 #设备数据采集
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!