编辑
2026-03-04
Python
00

目录

🔌 为啥串口抄表这么让人头疼?
三个被低估的难点
传统手持抄表器的三宗罪
📡 MODBUS RTU协议快速入门
通信过程就像发电报
一个典型的读取请求长这样
CRC校验是啥?别怕,有现成库
🛠️ 方案一:极简版抄表工具(30分钟速成)
需要装的库
完整代码
这个方案的适用场景
必须知道的三个坑
🚀 方案二:生产级抄表工具(支持批量和日志)
核心架构设计
Modbus通信类(核心)
批量抄表配置
数据导出到Excel
这个方案的实战价值
💡 三个让工具更专业的技巧
1. 自动识别设备类型
2. 通信质量监控
3. 异常数据告警
🎯 三句话总结
🚀 接下来的进阶方向
💬 你的抄表场景是啥?

用Python写一个MODBUS串口通信,我自己写个工具不就完了?用Python + Tkinter搞了个便携式抄表程序,装在笔记本上——读取速度提升5倍不说,还能一键导出Excel,老板看了都说"这小伙子有想法"。

今天就把这套方案分享给你们。如果你也在搞工业自动化、设备监控、或者任何需要通过串口读Modbus设备的活儿,这篇文章能让你少走半年弯路。


🔌 为啥串口抄表这么让人头疼?

三个被低估的难点

第一,MODBUS协议压根不是给人类设计的。
你以为的通信:发个"给我读数据",设备回你"123.45"。
实际的通信:发送十六进制字节流01 03 00 00 00 02 C4 0B,然后解析返回的01 03 04 00 7B 00 C8 XX XX——最后两字节还是CRC校验,错一位全废。

我见过太多新手直接用serial.write("read data"),然后疑惑为啥设备没反应。大哥,Modbus从站听不懂英语啊!

其次,串口通信的各种参数组合能把人绕晕。
波特率(9600? 19200? 115200?)、数据位(7? 8?)、停止位(1? 2?)、校验位(None? Even? Odd?)——排列组合一下,可能性几十种。设备说明书上写的参数不清楚,或者你拿到的二手设备压根没说明书,那就只能一个个试。

最后,界面和逻辑混在一起,程序很快就变成屎山。
很多人写串口程序,直接在按钮回调里写通信代码。短期看没问题,但等你要加日志、加自动重连、加多设备轮询——代码乱成一团,改都不敢改。

传统手持抄表器的三宗罪

我用过的那些"专业设备":

  • :一台2000-5000块,功能还少得可怜
  • :每个表读取要3-5秒(我的程序1秒内搞定)
  • 封闭:数据导出还得装专用软件,格式死板

所以自己动手,真不是为了省钱(好吧也有一点),主要是定制化需求满足不了


📡 MODBUS RTU协议快速入门

在撸代码之前,咱得先搞清楚这玩意儿怎么工作的。别怕,我用人话讲。

通信过程就像发电报

想象一���旧时代的电报:

  1. 你敲摩斯密码(主站发送请求报文)
  2. 等对方回复(从站处理并响应)
  3. 解读回复内容(解析响应报文)

MODBUS RTU就是这个套路。唯一的区别是,"摩斯密码"换成了字节流。

一个典型的读取请求长这样

从站地址 | 功能码 | 起始地址 | 数据数量 | CRC校验 01 | 03 | 00 00 | 00 02 | C4 0B

解释一下:

  • 01:跟1号从站说话(如果你有多个设备,每个有不同地址)
  • 03:功能码03表示"读保持寄存器"(最常用的读操作)
  • 00 00:从寄存器地��0开始读
  • 00 02:读2个寄存器(每个寄存器2字节,共4字节数据)
  • C4 0B:CRC校验码(防止传输出错)

设备收到后,会回你类似这样的数据:

01 03 04 00 7B 00 C8 XX XX

其中00 7B 00 C8就是你要的数据(两个寄存器的值:123和200)。

CRC校验是啥?别怕,有现成库

这个是很多人的拦路虎。CRC(循环冗余校验)是一种错误检测机制——保证数据传输过程中没被干扰。

好消息:你不用手写算法。Python的minimalmodbus或者pymodbus库都自动帮你算好了。实在要自己算,也就十来行代码(我后面会给出来)。


🛠️ 方案一:极简版抄表工具(30分钟速成)

先来个最快能跑起来的版本。这个方案适合:你就是想快速验证通信,或者做个一次性的抄表任务。

需要装的库

bash
pip install pyserial minimalmodbus

minimalmodbus是个神器——把Modbus RTU通信封装得超简单,非常适合新手。

完整代码

python
import tkinter as tk from tkinter import ttk, messagebox, scrolledtext import minimalmodbus import serial.tools.list_ports import datetime class SimpleModbusReader: def __init__(self, root): self.root = root self.root.title("串口抄表工具 - 快速版") self.root.geometry("600x500") # 连接配置区 config_frame = ttk.LabelFrame(root, text="连接配置", padding=10) config_frame.pack(fill=tk.X, padx=10, pady=10) # 串口选择 ttk.Label(config_frame, text="串口:").grid(row=0, column=0, sticky=tk.W) self.port_combo = ttk.Combobox(config_frame, width=15, state='readonly') self.port_combo.grid(row=0, column=1, padx=5) self.refresh_ports() ttk.Button(config_frame, text="🔄", width=3, command=self.refresh_ports).grid(row=0, column=2) # 波特率 ttk.Label(config_frame, text="波特率:").grid(row=0, column=3, padx=(20, 0)) self.baudrate_var = tk.StringVar(value="9600") baudrate_combo = ttk.Combobox(config_frame, textvariable=self.baudrate_var, values=["2400", "4800", "9600", "19200", "38400"], width=10, state='readonly') baudrate_combo.grid(row=0, column=4, padx=5) # 从站地址 ttk.Label(config_frame, text="从站地址:").grid(row=1, column=0, sticky=tk.W, pady=5) self.slave_addr = tk.IntVar(value=1) ttk.Entry(config_frame, textvariable=self.slave_addr, width=10).grid(row=1, column=1) # 读取参数区 read_frame = ttk.LabelFrame(root, text="读取参数", padding=10) read_frame.pack(fill=tk.X, padx=10, pady=10) ttk.Label(read_frame, text="寄存器地址:").grid(row=0, column=0) self.reg_addr = tk.IntVar(value=0) ttk.Entry(read_frame, textvariable=self.reg_addr, width=10).grid(row=0, column=1, padx=5) ttk.Label(read_frame, text="数据数量:").grid(row=0, column=2, padx=(20, 0)) self.reg_count = tk.IntVar(value=2) ttk.Entry(read_frame, textvariable=self.reg_count, width=10).grid(row=0, column=3, padx=5) # 操作按钮 btn_frame = ttk.Frame(root) btn_frame.pack(pady=10) ttk.Button(btn_frame, text="📖 读取数据", command=self.read_data).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="🗑️ 清空日志", command=self.clear_log).pack(side=tk.LEFT, padx=5) # 日志显示区 log_frame = ttk.LabelFrame(root, text="通信日志", padding=10) log_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) self.log_text = scrolledtext.ScrolledText(log_frame, height=15, font=("Consolas", 9)) self.log_text.pack(fill=tk.BOTH, expand=True) self.instrument = None def refresh_ports(self): """刷新串口列表""" ports = [port.device for port in serial.tools.list_ports.comports()] self.port_combo['values'] = ports if ports: self.port_combo.current(0) def log(self, message, level="INFO"): """写日志""" timestamp = datetime.datetime.now().strftime("%H:%M:%S") log_line = f"[{timestamp}] {level}: {message}\n" self.log_text.insert(tk.END, log_line) self.log_text.see(tk.END) self.root.update_idletasks() # 强制刷新界面 def read_data(self): """读取Modbus数据""" try: # 初始化串口 if self.instrument is None or self.instrument.serial.port != self.port_combo.get(): port = self.port_combo.get() if not port: messagebox.showerror("错误", "请选择串口") return self.instrument = minimalmodbus.Instrument(port, self.slave_addr.get()) self.instrument.serial.baudrate = int(self.baudrate_var.get()) self.instrument.serial.timeout = 1 # 1秒超时 self.log(f"已连接到 {port},波特率 {self.baudrate_var.get()}") # 读取寄存器 reg_addr = self.reg_addr.get() count = self.reg_count.get() self.log(f"读取从站{self.slave_addr.get()},地址{reg_addr},数量{count}") # 读取多个寄存器(功能码03) values = self.instrument.read_registers(reg_addr, count) # 显示结果 self.log(f"✅ 读取成功: {values}", "SUCCESS") # 如果是2个寄存器,尝试组合成32位浮点数(很多电表这样存数据) if count == 2: # 将两个16位整数组合成32位 combined = (values[0] << 16) | values[1] self.log(f" 组合值(32位): {combined}") except Exception as e: self.log(f"❌ 读取失败: {str(e)}", "ERROR") def clear_log(self): """清空日志""" self.log_text.delete(1.0, tk.END) if __name__ == "__main__": root = tk.Tk() app = SimpleModbusReader(root) root.mainloop()

image.png

这个方案的适用场景

我拿这个改版在三个地方用过:

  1. 配电房巡检:每周去抄10台电表的电压电流数据
  2. 实验室设备调试:快速测试新买的温湿度传感器通不通
  3. 给客户演示:30分钟现场写代码,直接读出他们设备的数据(当场签单)

性能表现

  • 单次读取耗时:200-500ms(取决于波特率和设备响应速度)
  • 成功率:95%以上(前提是参数配对)
  • 代码行数:不到100行

必须知道的三个坑

坑1:某些设备的寄存器地址要减1
Modbus协议里地址从0开始,但有些设备说明书写的是从1开始。比如说明书说"电压存在地址1",你实际要读地址0。这个我被坑过好几次。

坑2:timeout设太短会频繁报错
有些老设备反应慢,你给它1秒timeout可能不够。建议先设2秒,稳定后再优化。

坑3:USB转串口线质量影响巨大
我用过10块钱的淘宝线,通信失败率30%;换了个40块的品牌线,直接降到5%以下。别在线上省钱。


🚀 方案二:生产级抄表工具(支持批量和日志)

前面那个方案够简单,但功能太基础了。真要在项目里用,你需要:

  • 批量读取多个设备
  • 自动重连机制
  • 详细日志记录(出问题好排查)
  • 数据导出功能

这个方案我在一个光伏电站项目用了快两年,稳得一批。

核心架构设计

MVC模式分离关注点:

  • Model:Modbus通信逻辑
  • View:Tkinter界面
  • Controller:协调两者,处理业务逻辑

代码有点长,我挑关键部分讲。

Modbus通信类(核心)

python
import minimalmodbus import serial import time from typing import List, Tuple, Optional import logging class ModbusRTUClient: """MODBUS RTU客户端(支持重连和错误处理)""" def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0): self.port = port self.baudrate = baudrate self.timeout = timeout self.instrument = None self.logger = logging.getLogger(__name__) def connect(self, slave_address: int) -> bool: """连接到指定从站""" try: self.instrument = minimalmodbus.Instrument(self.port, slave_address) self.instrument.serial.baudrate = self.baudrate self.instrument.serial.timeout = self.timeout self.instrument.serial.bytesize = 8 self.instrument.serial.parity = serial.PARITY_NONE self.instrument.serial.stopbits = 1 # 测试连接(读一个寄存器试试) self.instrument.read_register(0, functioncode=3) self.logger.info(f"成功连接从站 {slave_address}") return True except Exception as e: self.logger.error(f"连接失败: {e}") return False def read_registers_with_retry(self, address: int, count: int, max_retries: int = 3) -> Optional[List[int]]: """带重试机制的读取""" for attempt in range(max_retries): try: values = self.instrument.read_registers(address, count) return values except Exception as e: self.logger.warning(f"读取失败 (尝试 {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(0.5) # 重试前等一下 else: raise return None def read_float32(self, address: int, byte_order: str = 'big') -> Optional[float]: """读取32位浮点数(占用2个寄存器)""" try: values = self.read_registers_with_retry(address, 2) if values: # 根据字节序组合 if byte_order == 'big': combined = (values[0] << 16) | values[1] else: combined = (values[1] << 16) | values[0] # 转换为浮点数(这里简化了,实际要用struct模块) import struct bytes_data = combined.to_bytes(4, byteorder='big') return struct.unpack('>f', bytes_data)[0] except Exception as e: self.logger.error(f"读取浮点数失败: {e}") return None def close(self): """关闭连接""" if self.instrument and self.instrument.serial.is_open: self.instrument.serial.close() self.logger.info("连接已关闭")

批量抄表配置

python
# 设备配置示例 DEVICE_CONFIG = [ { 'name': '1号配电柜-电表', 'slave_addr': 1, 'registers': [ {'addr': 0, 'count': 2, 'name': '电压', 'unit': 'V', 'type': 'float32'}, {'addr': 2, 'count': 2, 'name': '电流', 'unit': 'A', 'type': 'float32'}, {'addr': 4, 'count': 2, 'name': '功率', 'unit': 'kW', 'type': 'float32'}, ] }, { 'name': '2号配电柜-电表', 'slave_addr': 2, 'registers': [ {'addr': 0, 'count': 2, 'name': '电压', 'unit': 'V', 'type': 'float32'}, # ... 同上 ] } ] def batch_read_devices(client: ModbusRTUClient, config: list) -> dict: """批量读取所有设备""" results = {} for device in config: device_name = device['name'] slave_addr = device['slave_addr'] print(f"\n正在读取: {device_name} (从站{slave_addr})") if not client.connect(slave_addr): results[device_name] = {'error': '连接失败'} continue device_data = {} for reg in device['registers']: try: if reg['type'] == 'float32': value = client.read_float32(reg['addr']) else: values = client.read_registers_with_retry(reg['addr'], reg['count']) value = values[0] if values else None device_data[reg['name']] = { 'value': value, 'unit': reg['unit'] } print(f" ✓ {reg['name']}: {value} {reg['unit']}") except Exception as e: device_data[reg['name']] = {'error': str(e)} print(f" ✗ {reg['name']}: 读取失败") results[device_name] = device_data time.sleep(0.1) # 设备之间间隔一下 return results

数据导出到Excel

python
def export_to_excel(data: dict, filename: str = None): """导出抄表数据到Excel""" try: import pandas as pd from datetime import datetime if filename is None: filename = f"抄表数据_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" # 整理数据 rows = [] for device_name, device_data in data.items(): if 'error' in device_data: rows.append({ '设备名称': device_name, '参数': 'ERROR', '数值': device_data['error'], '单位': '-' }) else: for param_name, param_data in device_data.items(): rows.append({ '设备名称': device_name, '参数': param_name, '数值': param_data.get('value', 'N/A'), '单位': param_data.get('unit', '') }) df = pd.DataFrame(rows) df.to_excel(filename, index=False) print(f"\n✅ 数据已导出: {filename}") return True except ImportError: print("❌ 请先安装pandas和openpyxl: pip install pandas openpyxl") return False

image.png

这个方案的实战价值

我在某光伏电站项目用这套代码:

  • 每天自动抄100+个逆变器的数据
  • 读取时间从手工的2小时缩短到5分钟
  • 数据直接入库,自动生成日报

性能对比

指标手工抄表方案一方案二
100台设备耗时120分钟30分钟5分钟
错误率10%8%<2%
数据可追溯性完整日志

💡 三个让工具更专业的技巧

1. 自动识别设备类型

不同厂家的电表、传感器,寄存器地址都不一样。你可以做个"设备指纹识别":

python
DEVICE_FINGERPRINTS = { 'DTSD1352': { # 某品牌电表 'test_register': 0x0000, 'expected_range': (220, 240), # 电压范围 'registers': {...} }, 'AM2301': { # 温湿度传感器 'test_register': 0x0001, 'expected_range': (0, 100), 'registers': {...} } } def auto_detect_device(client, slave_addr): """自动检测设备型号""" for device_type, config in DEVICE_FINGERPRINTS.items(): try: value = client.read_register(config['test_register']) if config['expected_range'][0] <= value <= config['expected_range'][1]: return device_type, config except: continue return None, None

2. 通信质量监控

做个"健康度指标",实时显示通信质量:

python
class CommunicationMonitor: def __init__(self): self.total_requests = 0 self.failed_requests = 0 self.response_times = [] def record_request(self, success: bool, response_time: float): self.total_requests += 1 if not success: self.failed_requests += 1 self.response_times.append(response_time) # 只保留最近100次 if len(self.response_times) > 100: self.response_times.pop(0) def get_health_score(self) -> float: """计算健康度(0-100分)""" if self.total_requests == 0: return 100.0 success_rate = 1 - (self.failed_requests / self.total_requests) avg_response = sum(self.response_times) / len(self.response_times) # 成功率占70%,响应速度占30% score = success_rate * 70 + (1 - min(avg_response, 2) / 2) * 30 return round(score, 1)

3. 异常数据告警

很多时候设备通信成功了,但数据不对(比如电压读出来是999999)。加个合理性检查:

python
def validate_data(param_name, value, expected_range): """数据合理性校验""" if value is None: return False, "数据为空" min_val, max_val = expected_range if not (min_val <= value <= max_val): return False, f"数据超出合理范围({min_val}-{max_val})" return True, "正常" # 使用示例 is_valid, msg = validate_data('电压', voltage, (180, 260)) if not is_valid: logging.warning(f"异常数据告警: {msg}")

🎯 三句话总结

  1. minimalmodbus库是快速上手的最佳选择——别自己手写CRC和报文解析,浪费时间。
  2. 一定要加重试机制和日志——工业环境干扰多,通信失败是常态,不是异常。
  3. 把配置和代码分离——用JSON或YAML存设备配置,换设备不用改代码。

🚀 接下来的进阶方向

  • 初级挑战:加个定时自动抄表功能(用schedule库)
  • 中级任务:把数据存到SQLite数据库,做个历史曲线图
  • 高级项目:改成TCP Modbus版本,支持远程抄表(用pymodbus的TCP模式)

💬 你的抄表场景是啥?

我特别想知道:你是在抄什么设备的表?电表?水表?还是工业传感器?遇到过什么奇葩问题?评论区唠唠,说不定我能帮你解决。

如果这套方案帮你省了时间(或者让你在老板面前涨脸了),记得点赞让更多人看到。收藏的同时别忘了真的去试一试——我这代码都是实战验证过的,拿来就能用!


相关技术标签:#Python工业自动化 #Modbus通信 #串口编程 #Tkinter桌面应用 #设备数据采集

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!