Beautiful is better than ugly.
前言
继续上一篇使用 python 制作串口工具(一) ,完成要实现的串口工具代码逻辑!
实现
最终效果:
首先,我们先实现获取电脑当前所接入的串口,实现效果为:每次点击 Combo Box,就把当前电脑接入的串口号信息读取出来 。所以我们需要对 Combo Box 这个控件进行重写,这里新建个 my_combobox.py,代码如下:
my_combobox.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import serial import serial.tools.list_ports from PyQt5.QtWidgets import QComboBox class My_ComBoBox(QComboBox): def __init__(self, parent = None): super(My_ComBoBox,self).__init__(parent) # 重写showPopup函数 def showPopup(self): # 先清空原有的选项 self.clear() index = 1 # 获取接入的所有串口信息,插入combobox的选项中 port_list = self.get_port_list(self) if port_list is not None: for i in port_list: self.insertItem(index, i) index += 1 QComboBox.showPopup(self)# 弹出选项框 @staticmethod # 获取接入的COM def get_port_list(self): try: port_list = list(serial.tools.list_ports.comports()) for port in port_list: yield str(port) except Exception as err: print("获取接入的串口设备出错!错误信息为:" + str(err))
上面实现了每次点击 Combo Box 后获取所有接入串口的信息。文件创建好并且写入代码后,我们需要引用它,回到 UI 文件 Ui_uart_tool_ui.py,引入刚才所写的内容,
搞定后,我们重新运行下代码,看看效果吧,这里我调试的时候刚好旁边没有串口,所以直接拿了两个 Jlink 演示一下。(我现在是有空的时候写一点,所以偶尔所处的地方,装备有可能不齐全,请谅解! ),效果如下:
可以看到,每次点击,都会刷新获取一次当前接入的串口,无论是新接入还是刚断开的串口,在每次点击后都会刷新。
实现串口底层接口
上面完成后,我们先来实现串口底层的接口,后面在应用逻辑交互上,需要对串口底层操作,串口底层单独新建个文件来写, 这里我命名为 uart.py ,部分代码如下:
uart.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 class Uart_Recv_Data_Thread(threading.Thread): def __init__(self, cur_self, main_self): super(Uart_Recv_Data_Thread, self).__init__() self.cur_self = cur_self self.thread = threading.Event() self.main_self = main_self def stop(self): self.thread.set() def stopped(self): return self.thread.is_set() def run(self): while True: time = '' if self.stopped(): break try: if False == self.cur_self.recv_queue.empty(): show_data = '' data = self.cur_self.recv_queue.get() data_num = len(data) if self.cur_self.uart_time_stamp_flag == 1:# 时间戳开关打开 time = datetime.datetime.now().strftime('[%Y-%m-%d %H:%M:%S:%f]\r\n') if self.cur_self.uart_rec_hex_lock == 1: data_list = [] data_bytes = bytes(data, encoding='utf-8') for i in range(len(data_bytes)): data_list.append(hex(data_bytes[i])[2:].zfill(2)) send_text_to_hex = ' '.join(data_list) show_data += send_text_to_hex else: show_data = data self.main_self.uart_recv_updata_show_data_signal.emit(time + show_data + '\r\n') # 统计接收字符的数量 self.main_self.uart_updata_recv_num_signal.emit(data_num) nums = self.cur_self.serial.inWaiting() if (nums > 0): recv_msg = self.cur_self.serial.read(nums) else: continue if self.cur_self.recv_queue.full(): self.cur_self.recv_queue.get() self.cur_self.recv_queue.put(recv_msg.decode()) except Exception as e: print(e) continue class Uart_Send_Data_Thread(threading.Thread): def __init__(self, cur_self, main_self): super(Uart_Send_Data_Thread, self).__init__() self.cur_self = cur_self self.main_self = main_self self.thread = threading.Event() def stop(self): self.thread.set() def stopped(self): return self.thread.is_set() def run(self): while True: if self.stopped(): break try: if not self.cur_self.send_queue.empty(): send_data = self.cur_self.send_queue.get(False) data_num = len(send_data) # 统计发送字符的数量 self.main_self.uart_updata_send_num_signal.emit(data_num) #ascii 发送 self.cur_self.serial.write(send_data) else: continue except queue.Empty: continue class Uart(object): def __init__(self, parent): self.err = 0 self.parent = parent self.recv_queue = queue.Queue(1000) self.send_queue = queue.Queue(1000) self.uart_time_stamp_flag = 0 self.uart_rec_hex_lock = 0 def uart_init(self, port, baud, stopbit, databit, checkbit): try: checkbitlist = {'None': 'N', 'Odd': 'O', 'Even': 'E'} stopbitlist = {'1': 'serial.STOPBITS_ONE', '1.5': 'serial.STOPBITS_ONE', '2': 'serial.STOPBITS_ONE'} self.serial = serial.Serial(port.split()[0], baud, int(databit), checkbitlist[checkbit], serial.STOPBITS_ONE) # 创建线程 self.recv_thread = Uart_Recv_Data_Thread(self, self.parent) self.send_thread = Uart_Send_Data_Thread(self, self.parent) self.err = 0 except Exception as e: print(e) self.err = -1 def open_uart_thread(self): self.recv_thread.start() self.send_thread.start() def close_uart_thread(self): self.recv_thread.stop() self.send_thread.stop() self.serial.close() def uart_send_func(self, data): self.send_queue.put(data)
上面实现了对串口的配置初始化,并且设置了两个线程,一个线程用于接收,一个线程用于发送,为了保证数据不会丢失,这里还用到了队列,保证即使快速发送时,也不会丢失数据。
界面逻辑
底层接口完成后,接下来要做界面的逻辑,先从基础的发送和接收的显示开始,要实现这两点,首先要完成的就有串口的打开和关闭
、发送处理
和接收显示
。
串口的打开和关闭
回到 main.py ,先来实现串口打开和关闭,设置一个标志,作为判断串口是否运行的标志,这里我将标志命名为 self.uart_com_run_statu
,然后我们要绑定该按纽的触发回调事件, self.uart_en_push_button.clicked.connect(self.uart_en_push_button_cb)
,在每次按下打开串口按钮后,我们获取 combo_box 当前内容的波特率、停止位、数据位等串口相关的信息,调用底层串口初始化 self.uart.uart_init(port, baud, stopbit, databit, checkbit)
传入这些内容,判断初始化是否成功,成功的话,将上面的运行标志置为 1,然后将打开串口按钮的文本设置为关闭串口,并且开启串口的线程,错误的话,说明有可能是串口已经被其他软件或应该打开,这里我们只要提示警告,不做任何处理就可以了。部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def uart_en_push_button_cb(self): if self.uart_com_run_status == 0: port = self.com_combo_box.currentText() if port == '': win32api.MessageBox(0, "请选择串口", "警告",win32con.MB_ICONWARNING) return baud = self.baud_combo_box.currentText() stopbit = self.stopbit_combo_box.currentText() databit = self.databit_combo_box.currentText() checkbit = self.checkbit_combo_box.currentText() self.uart.uart_init(port, baud, stopbit, databit, checkbit) if self.uart.err == -1: self.uart_com_run_status = 0 win32api.MessageBox(0, port+"已被使用", "警告",win32con.MB_ICONWARNING) else: self.uart_com_run_status = 1 self.uart.open_uart_thread() self.uart_en_push_button.setText('关闭串口') else: self.uart_com_run_status = 0 self.uart.close_uart_thread() self.uart_en_push_button.setText('打开串口')
串口开启成功,在此点击时关闭串口,这个时候将标志置为 0,并且关闭串口和串口线程,设置按钮文本为打开串口即可。
附上成功开启串口和失败开启串口的演示:
串口的发送
接下来实现串口的发送,发送这里,我们要知道发送的是 ascii 还是 hex 格式,所以我们要先判断当前勾选的发送格式,将之前 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.send_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_send_radio_button_cb)
和 self.send_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_send_radio_button_cb)
,这两个回调事件实现了将发送编辑框的两种格式的相互转换,并且用 self.uart_send_hex_lock
作为当前格式的标志,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def uart_ascii_to_hex_send_radio_button_cb(self): if self.send_ascii_radio_button.isChecked() == True: self.uart_send_hex_lock = 0 send_text = self.uart_send_show.toPlainText().replace(' ', '') self.uart_send_show.clear() hex_send_text = self.hex2bin(send_text) self.uart_send_show.setText(hex_send_text) else: return def uart_hex_to_ascii_send_radio_button_cb(self): if self.send_hex_radio_button.isChecked() == True: self.uart_send_hex_lock = 1 text_list = [] send_text = bytes(self.uart_send_show.toPlainText(), encoding='utf-8') for i in range(len(send_text)): text_list.append(hex(send_text[i])[2:]) send_text_to_hex = ' '.join(text_list) self.uart_send_show.clear() self.uart_send_show.setText(send_text_to_hex) else: return
随后,设置一下发送按键的回调事件 self.uart_send_push_button.clicked.connect(self.uart_send_push_button_cb)
,这里判断当前的格式,将编辑框的内容获取后,相应的处理后,调用底层接口发送出去,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 def uart_send_push_button_cb(self): if self.uart_com_run_status == 0: return send_data = '' send_text = self.uart_send_show.toPlainText() if send_text == '': return if self.send_hex_radio_button.isChecked() == True: # 十六进制发送 hex_send_text = self.hex2bin(send_text.replace(' ', '')) send_data = bytes(hex_send_text,encoding='utf-8') else: send_data = send_text.encode() self.uart.uart_send_func(send_data)
附上,具体效果:
串口的接收
发送也成功后,开始实现接收,接收的话,同样需要将接收分为 ascii 和 hex 格式,所以同样要将接收 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.rec_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_rec_radio_button_cb)
和 self.rec_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_rec_radio_button_cb)
,这两个回调事件实现了将接收编辑框接收的的两种格式的相互转换。具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 def uart_ascii_to_hex_rec_radio_button_cb(self): if self.rec_ascii_radio_button.isChecked() == True: self.uart.uart_set_rec_hex_lock(0) else: return def uart_hex_to_ascii_rec_radio_button_cb(self): if self.rec_hex_radio_button.isChecked() == True: self.uart.uart_set_rec_hex_lock(1) else: return
上面代码可以看到,这里单纯只是设定标志,因为处理是在底层串口接收线程中对这个标志做了处理,具体可以看上面串口底层的代码,这里就不再赘述了,显示接口通过信号的方式进行更新,设置回调事件 self.uart_recv_updata_show_data_signal.connect(self.update_uart_recv_show_cb)
,在里面获取接收的数据,数据是由接收线程处理好后发送过来的,所以这边的回调处理同样很简单,具体如下:
1 2 3 4 def update_uart_recv_show_cb(self, data): self.uart_rec_show.insertPlainText(data) cursor = self.uart_rec_show.textCursor() self.uart_rec_show.moveCursor(cursor.End)
具体效果:
定时发送
接收搞定后,我们增加定时发送功能,定时发送功能需要用到定时器,同时要获取定时功能的开启和定时时间,相关的代码如下:
1 2 3 4 5 6 7 8 # 定时器 self.uart_timer_num = 1000 self.uart_timer_line_edit.setText('1000') self.uart_timer_send = QTimer() self.uart_timer_send.timeout.connect(self.uart_timer_send_cb) def uart_timer_send_cb(self): self.uart_send_push_button_cb()
上面代码可以看出,创建了个定时器,并且将事件回调绑定在了 uart_timer_send_cb
中,回调里实现的就是不断的调用发送事件的回调,定时器的打开则通过 check box 控件,具体代码如下:
1 2 3 4 5 6 7 8 9 def uart_time_en_check_box_cb(self): if self.uart_com_run_status == 0: self.uart_timer_check_box.setChecked(False) return None if self.uart_timer_check_box.isChecked() == True: self.uart_timer_send.start(int(self.uart_timer_num)) else: self.uart_timer_send.stop()
上述代码实现了,如果串口未运行,则不使用定时器,判断用户是否勾选该功能来开启和停止定时器。
串口关闭后也需要相应的关闭定时器,防止无用的调用发送。
具体效果如下:
其他
定时功能做好后,还有个时间戳的功能,时间戳则是获取当前时间,根据标志判断时间戳是否开启,开启的话,则添加到显示数据中,相关的代码在串口底层的接收线程中,并且功能也比较简单,就不再详细的说了。同样的,还有发送清除和接收清除等简单的小功能,也不浪费太多篇幅。具体实现可以访问我的 github 中的 python_uart_tool 库中,详细的代码都会提交在上边。
结语
python 串口逻辑代码的编写,就到这里结束了,这是一个简单的 demo,很多例如:限制输入内容、回显、快速发送时显示问题等功能我没有添加上去。
发现自己对写这种代码加解释的博文不是很熟悉,总感觉会有些地方不清楚该怎样描述,而且我本身不是主学 python 的,所以难免会有出错或者你认为代码不合理的地方,这些也希望多多谅解和指教,我会及时修改的!
关于本篇的相关代码我也上传到 github 上去了,有兴趣的可以访问 QtComMate 和 python_uart_tool 查看,浏览的同时也可以在该 github 仓库中点个 Star 支持我一下😄。
与本文相关的内容还有:
python 开发环境搭建
python 获取已连接wifi密码工具
python 制作串口工具(一)