介绍
ScopeModbus 是基于 Python2.7 和 PyQT4 开发的 modbus 通信采集软件,已在 windows、deepin linux 和树莓派上测试!
目前已开源
PyQT UI 开发
QT 设计师
使用 QT 设计师设计 layout,再使用 pyuic 进行转换
python 转换代码”~QT_UI_Conv.py”
1
2
3
4
5
6
7
8
9
10
11import os
for root, dirs, files in os.walk('.'):
for file in files:
if file.endswith('.ui'):
os.system("D:\Python\Python27\python.exe" ' "D:\Python\Python27\Lib\site-packages\PyQt4\uic\pyuic.py" -o ui_%s.py %s' %
(file.rsplit('.', 1)[0], file))
# print "D:\Python\Python27\python.exe" ' "D:\Python\Python27\Lib\site-packages\PyQt4\uic\pyuic.py" -o ui_%s.py %s' % (
# file.rsplit('.', 1)[0], file)
elif file.endswith('.qrc'):
os.system('"D:\Python\Python27\Lib\site-packages\PyQt4\pyrcc4.exe" -o %s_rc.py %s' %
(file.rsplit('.', 1)[0], file))使用批处理命令处理 ui 文件夹下的所有 layout 文件,并将生成的文件复制到源代码中的 layout,生成的文件统一命名为
ui_layout_(*).py
1
2
3
4
5
6
7
8
9echo off
echo "_______python build_______"
cd ui
echo "UI build..."
"D:\Python\Python27\python.exe" ~QT_UI_Conv.py
echo "Copy image resouce..."
copy pyimg_rc.py ..\src\app\layout\pyimg_rc.py
echo "Copy ui resouce..."
copy ui*.py ..\src\app\layout
UI 调用
在主程序体中,调用生成的 class
导入即可
from layout.ui_layout_main import Ui_MainWindow
1
2
3
4
5
6
7
8class MyWindow(QtGui.QMainWindow):
def __init__(self):
super(MyWindow, self).__init__()
# 这里需要注意self.ui已经将Ui_MainWindow类实例
# 化,因此继承了该类的所有属性,后面更改设置属性都用self.ui“冠名”
# 主UI
self.ui = Ui_MainWindow()
self.ui.setupUi(self)多线程
采集程序既要采集数据,也要显示数据,需使用到多线程
采集线程
采集线程与主线程通信,进行数据交互
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45# 采集线程
class AcqThread(QtCore.QThread):
signal_acqdata = QtCore.pyqtSignal(str, list) # 信号
def __init__(self, parent=None):
super(AcqThread, self).__init__(parent)
self.bol_working = True
self.bol_emit = False
self.int_cyc = 1 # 单位为s
self.num = 0
def run(self):
while self.bol_working:
# print "Working", self.thread()
if self.bol_emit:
try:
self.cmdtxt.setUpdatesEnabled(False)
v = []
for item in self.productpar:
func = item['func']
startaddr = item['startaddr']
length = item['length']
d = self.mb.read(func, startaddr, length)
#print func,startaddr,length,d
if item['type'] == 'int':
dv = mbf.ReadInt(d)
elif item['type'] == 'float':
dv = mbf.ReadFloat(d)
elif item['type'] == 'list':
dv = mbf.ReadInt(d)
elif item['type'] == 'bool':
if mbf.ReadInt(d) == 1:
dv = True
else:
dv = False
v.append(dv)
#print v
self.cmdtxt.setUpdatesEnabled(True)
self.num += 1
self.signal_acqdata.emit(
"Running:"+str(self.num), v) # 发送信号
except Exception as e:
self.cmdtxt.setUpdatesEnabled(True)
pass
self.sleep(self.int_cyc)时间线程
系统时间显示放入单独的线程内
1
2
3
4
5
6
7
8
9
10
11
12
13# 时间线程
class TimeThread(QtCore.QThread):
def __init__(self, parent=None):
super(TimeThread, self).__init__(parent)
def run(self):
while True:
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
time.sleep(1)
self.txt_time.setText(t)
def getTimeObj(self, obj):
self.txt_time = obj主线程
加载线程
1
2
3
4
5
6
7
8
9
10
11
12
13# Load thread
# 创建新线程
self.threadAcq = AcqThread()
# thread2 = myThread(2, "Thread-2", 2)
# 连接信号
self.threadAcq.signal_acqdata.connect(self.Graph_Update)
# 开启线程
self.threadAcq.start()
self.threadAcq.CommandTxt(self.ui.txt_CMD)
self.threadTime = TimeThread()
self.threadTime.getTimeObj(self.Txt_time)
self.threadTime.start()Modbus
Modbus RTU
这里采用的是 modbus rtu,是修改自
modbus_tk
这个标准库,只取了里面的 RTU 部分1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79# -*- coding: utf-8 -*-
#
# @author: Toso
# @created: 2019-05-08T13:40:09.624Z+08:00
# @comment: ______________
#
import serial
import serial.tools.list_ports
import socket
import MBdefines as defines
import struct
import sys
import MBFormat
from PyQt4 import QtCore
PY2 = sys.version_info[0] == 2
class MB(QtCore.QObject):
MBrecord = QtCore.pyqtSignal(str)
def ComAutoFind(self):
self.comboBox_Port = []
# 先获取所以有USB串口挂载的设备
self.scomList = list(serial.tools.list_ports.comports())
if(len(self.scomList) <= 0):
return self.tr(u"No Port Find")
else:
for item in self.scomList:
self.comboBox_Port.append(item.device)
return self.comboBox_Port
def Open(self, DevAddr, Port, Baudrate, Parity, Bytesize, Stopbits, Timeout):
try:
self.slaveAddr = DevAddr
self.serial = serial.Serial(port=Port,
baudrate=Baudrate,
bytesize=Bytesize,
parity=Parity,
stopbits=Stopbits,
xonxoff=0)
self.serial.timeout = Timeout
self.ConnType = 'Serial'
return 0
except Exception as err:
raise err
def Log(self, msg):
self.MBrecord.emit(msg)
# 3 0 1 (23,)
# 3 2 2 (34, 63)
def read(self, Func, Start, Length):
try:
return self.execute(
self.slaveAddr, Func, Start, Length)
except Exception as err:
raise err
def write(self, Func, Start, val):
try:
return self.execute(self.slaveAddr, Func, Start, output_value=val)
except Exception as err:
raise err
def close(self):
try:
self.ConnType
except:
self.ConnType = ''
try:
if self.ConnType == 'Serial':
self.serial.close()
elif self.ConnType == 'TCP':
self.sock.close()
except:
pass主窗体调用 modbus
1
2
3
4
5# modbus通信class
self.mb = MB()
self.ports = self.mb.ComAutoFind()
# self.mb.Log(self.ui.txt_CMD)
self.mb.MBrecord.connect(self.LogRecord)