Python+QT开发的Modbus采集软件

介绍

ScopeModbus 是基于 Python2.7 和 PyQT4 开发的 modbus 通信采集软件,已在 windows、deepin linux 和树莓派上测试!

目前已开源

PyQT UI 开发

  • QT 设计师

    使用 QT 设计师设计 layout,再使用 pyuic 进行转换

    • python 转换代码”~QT_UI_Conv.py”

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      import os
      for root, dirs, files in os.walk('.'):
      for file in files:
      if file.endswith('.ui'):
      os.system("D:\Python\Python27\python.exe" ' "D:\Python\Python27\Lib\site-packages\PyQt4\uic\pyuic.py" -o ui_%s.py %s' %
      (file.rsplit('.', 1)[0], file))
      # print "D:\Python\Python27\python.exe" ' "D:\Python\Python27\Lib\site-packages\PyQt4\uic\pyuic.py" -o ui_%s.py %s' % (
      # file.rsplit('.', 1)[0], file)
      elif file.endswith('.qrc'):
      os.system('"D:\Python\Python27\Lib\site-packages\PyQt4\pyrcc4.exe" -o %s_rc.py %s' %
      (file.rsplit('.', 1)[0], file))
    • 使用批处理命令处理 ui 文件夹下的所有 layout 文件,并将生成的文件复制到源代码中的 layout,生成的文件统一命名为 ui_layout_(*).py

      1
      2
      3
      4
      5
      6
      7
      8
      9
      echo off
      echo "_______python build_______"
      cd ui
      echo "UI build..."
      "D:\Python\Python27\python.exe" ~QT_UI_Conv.py
      echo "Copy image resouce..."
      copy pyimg_rc.py ..\src\app\layout\pyimg_rc.py
      echo "Copy ui resouce..."
      copy ui*.py ..\src\app\layout
  • UI 调用

    在主程序体中,调用生成的 class

    导入即可from layout.ui_layout_main import Ui_MainWindow

    1
    2
    3
    4
    5
    6
    7
    8
    class MyWindow(QtGui.QMainWindow):
    def __init__(self):
    super(MyWindow, self).__init__()
    # 这里需要注意self.ui已经将Ui_MainWindow类实例
    # 化,因此继承了该类的所有属性,后面更改设置属性都用self.ui“冠名”
    # 主UI
    self.ui = Ui_MainWindow()
    self.ui.setupUi(self)

    多线程

采集程序既要采集数据,也要显示数据,需使用到多线程

  • 采集线程

    采集线程与主线程通信,进行数据交互

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    # 采集线程
    class AcqThread(QtCore.QThread):
    signal_acqdata = QtCore.pyqtSignal(str, list) # 信号

    def __init__(self, parent=None):
    super(AcqThread, self).__init__(parent)
    self.bol_working = True
    self.bol_emit = False
    self.int_cyc = 1 # 单位为s
    self.num = 0

    def run(self):
    while self.bol_working:
    # print "Working", self.thread()
    if self.bol_emit:
    try:
    self.cmdtxt.setUpdatesEnabled(False)
    v = []
    for item in self.productpar:
    func = item['func']
    startaddr = item['startaddr']
    length = item['length']
    d = self.mb.read(func, startaddr, length)
    #print func,startaddr,length,d
    if item['type'] == 'int':
    dv = mbf.ReadInt(d)
    elif item['type'] == 'float':
    dv = mbf.ReadFloat(d)
    elif item['type'] == 'list':
    dv = mbf.ReadInt(d)
    elif item['type'] == 'bool':
    if mbf.ReadInt(d) == 1:
    dv = True
    else:
    dv = False
    v.append(dv)
    #print v
    self.cmdtxt.setUpdatesEnabled(True)
    self.num += 1
    self.signal_acqdata.emit(
    "Running:"+str(self.num), v) # 发送信号
    except Exception as e:
    self.cmdtxt.setUpdatesEnabled(True)
    pass
    self.sleep(self.int_cyc)
  • 时间线程

    系统时间显示放入单独的线程内

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # 时间线程
    class TimeThread(QtCore.QThread):
    def __init__(self, parent=None):
    super(TimeThread, self).__init__(parent)

    def run(self):
    while True:
    t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    time.sleep(1)
    self.txt_time.setText(t)

    def getTimeObj(self, obj):
    self.txt_time = obj
  • 主线程

    加载线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # Load thread
    # 创建新线程
    self.threadAcq = AcqThread()
    # thread2 = myThread(2, "Thread-2", 2)
    # 连接信号
    self.threadAcq.signal_acqdata.connect(self.Graph_Update)
    # 开启线程
    self.threadAcq.start()
    self.threadAcq.CommandTxt(self.ui.txt_CMD)

    self.threadTime = TimeThread()
    self.threadTime.getTimeObj(self.Txt_time)
    self.threadTime.start()

    Modbus

  • Modbus RTU

    这里采用的是 modbus rtu,是修改自modbus_tk这个标准库,只取了里面的 RTU 部分

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    # -*- coding: utf-8 -*-
    #
    # @author: Toso
    # @created: 2019-05-08T13:40:09.624Z+08:00
    # @comment: ______________
    #

    import serial
    import serial.tools.list_ports
    import socket
    import MBdefines as defines
    import struct
    import sys
    import MBFormat
    from PyQt4 import QtCore

    PY2 = sys.version_info[0] == 2

    class MB(QtCore.QObject):
    MBrecord = QtCore.pyqtSignal(str)

    def ComAutoFind(self):
    self.comboBox_Port = []
    # 先获取所以有USB串口挂载的设备
    self.scomList = list(serial.tools.list_ports.comports())

    if(len(self.scomList) <= 0):
    return self.tr(u"No Port Find")
    else:
    for item in self.scomList:
    self.comboBox_Port.append(item.device)
    return self.comboBox_Port

    def Open(self, DevAddr, Port, Baudrate, Parity, Bytesize, Stopbits, Timeout):
    try:
    self.slaveAddr = DevAddr
    self.serial = serial.Serial(port=Port,
    baudrate=Baudrate,
    bytesize=Bytesize,
    parity=Parity,
    stopbits=Stopbits,
    xonxoff=0)
    self.serial.timeout = Timeout
    self.ConnType = 'Serial'
    return 0
    except Exception as err:
    raise err

    def Log(self, msg):
    self.MBrecord.emit(msg)

    # 3 0 1 (23,)
    # 3 2 2 (34, 63)

    def read(self, Func, Start, Length):
    try:
    return self.execute(
    self.slaveAddr, Func, Start, Length)
    except Exception as err:
    raise err

    def write(self, Func, Start, val):
    try:
    return self.execute(self.slaveAddr, Func, Start, output_value=val)
    except Exception as err:
    raise err

    def close(self):
    try:
    self.ConnType
    except:
    self.ConnType = ''
    try:
    if self.ConnType == 'Serial':
    self.serial.close()
    elif self.ConnType == 'TCP':
    self.sock.close()
    except:
    pass
  • 主窗体调用 modbus

    1
    2
    3
    4
    5
    # modbus通信class
    self.mb = MB()
    self.ports = self.mb.ComAutoFind()
    # self.mb.Log(self.ui.txt_CMD)
    self.mb.MBrecord.connect(self.LogRecord)
坚持原创技术分享,您的支持将鼓励我继续创作!