python 制作串口工具(二)

Beautiful is better than ugly.

前言

继续上一篇使用 python 制作串口工具(一),完成要实现的串口工具代码逻辑!


实现

最终效果:

获取接入的 com 口

首先,我们先实现获取电脑当前所接入的串口,实现效果为:每次点击 Combo Box,就把当前电脑接入的串口号信息读取出来。所以我们需要对 Combo Box 这个控件进行重写,这里新建个 my_combobox.py,代码如下:

my_combobox.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import serial
import serial.tools.list_ports

from PyQt5.QtWidgets import QComboBox

class My_ComBoBox(QComboBox):

def __init__(self, parent = None):
super(My_ComBoBox,self).__init__(parent)

# 重写showPopup函数
def showPopup(self):
# 先清空原有的选项
self.clear()
index = 1

# 获取接入的所有串口信息,插入combobox的选项中
port_list = self.get_port_list(self)
if port_list is not None:
for i in port_list:
self.insertItem(index, i)
index += 1
QComboBox.showPopup(self)# 弹出选项框

@staticmethod
# 获取接入的COM
def get_port_list(self):
try:
port_list = list(serial.tools.list_ports.comports())
for port in port_list:
yield str(port)
except Exception as err:
print("获取接入的串口设备出错!错误信息为:" + str(err))

上面实现了每次点击 Combo Box 后获取所有接入串口的信息。文件创建好并且写入代码后,我们需要引用它,回到 UI 文件 Ui_uart_tool_ui.py,引入刚才所写的内容,

搞定后,我们重新运行下代码,看看效果吧,这里我调试的时候刚好旁边没有串口,所以直接拿了两个 Jlink 演示一下。(我现在是有空的时候写一点,所以偶尔所处的地方,装备有可能不齐全,请谅解!),效果如下:

可以看到,每次点击,都会刷新获取一次当前接入的串口,无论是新接入还是刚断开的串口,在每次点击后都会刷新。

实现串口底层接口

上面完成后,我们先来实现串口底层的接口,后面在应用逻辑交互上,需要对串口底层操作,串口底层单独新建个文件来写, 这里我命名为 uart.py,部分代码如下:

uart.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class Uart_Recv_Data_Thread(threading.Thread):
def __init__(self, cur_self, main_self):
super(Uart_Recv_Data_Thread, self).__init__()
self.cur_self = cur_self
self.thread = threading.Event()
self.main_self = main_self

def stop(self):
self.thread.set()

def stopped(self):
return self.thread.is_set()

def run(self):
while True:
time = ''
if self.stopped():
break
try:
if False == self.cur_self.recv_queue.empty():
show_data = ''
data = self.cur_self.recv_queue.get()
data_num = len(data)
if self.cur_self.uart_time_stamp_flag == 1:# 时间戳开关打开
time = datetime.datetime.now().strftime('[%Y-%m-%d %H:%M:%S:%f]\r\n')

if self.cur_self.uart_rec_hex_lock == 1:
data_list = []
data_bytes = bytes(data, encoding='utf-8')
for i in range(len(data_bytes)):
data_list.append(hex(data_bytes[i])[2:].zfill(2))
send_text_to_hex = ' '.join(data_list)
show_data += send_text_to_hex
else:
show_data = data

self.main_self.uart_recv_updata_show_data_signal.emit(time + show_data + '\r\n')

# 统计接收字符的数量
self.main_self.uart_updata_recv_num_signal.emit(data_num)


nums = self.cur_self.serial.inWaiting()
if (nums > 0):
recv_msg = self.cur_self.serial.read(nums)
else:
continue
if self.cur_self.recv_queue.full():
self.cur_self.recv_queue.get()
self.cur_self.recv_queue.put(recv_msg.decode())


except Exception as e:
print(e)
continue

class Uart_Send_Data_Thread(threading.Thread):
def __init__(self, cur_self, main_self):
super(Uart_Send_Data_Thread, self).__init__()
self.cur_self = cur_self
self.main_self = main_self
self.thread = threading.Event()

def stop(self):
self.thread.set()

def stopped(self):
return self.thread.is_set()

def run(self):
while True:
if self.stopped():
break
try:
if not self.cur_self.send_queue.empty():
send_data = self.cur_self.send_queue.get(False)
data_num = len(send_data)
# 统计发送字符的数量
self.main_self.uart_updata_send_num_signal.emit(data_num)
#ascii 发送
self.cur_self.serial.write(send_data)
else:
continue
except queue.Empty:
continue


class Uart(object):
def __init__(self, parent):
self.err = 0
self.parent = parent

self.recv_queue = queue.Queue(1000)
self.send_queue = queue.Queue(1000)
self.uart_time_stamp_flag = 0
self.uart_rec_hex_lock = 0


def uart_init(self, port, baud, stopbit, databit, checkbit):
try:
checkbitlist = {'None': 'N', 'Odd': 'O', 'Even': 'E'}
stopbitlist = {'1': 'serial.STOPBITS_ONE', '1.5': 'serial.STOPBITS_ONE', '2': 'serial.STOPBITS_ONE'}
self.serial = serial.Serial(port.split()[0], baud, int(databit), checkbitlist[checkbit], serial.STOPBITS_ONE)
# 创建线程
self.recv_thread = Uart_Recv_Data_Thread(self, self.parent)
self.send_thread = Uart_Send_Data_Thread(self, self.parent)
self.err = 0
except Exception as e:
print(e)
self.err = -1


def open_uart_thread(self):
self.recv_thread.start()
self.send_thread.start()


def close_uart_thread(self):
self.recv_thread.stop()
self.send_thread.stop()
self.serial.close()

def uart_send_func(self, data):
self.send_queue.put(data)

上面实现了对串口的配置初始化,并且设置了两个线程,一个线程用于接收,一个线程用于发送,为了保证数据不会丢失,这里还用到了队列,保证即使快速发送时,也不会丢失数据。

界面逻辑

底层接口完成后,接下来要做界面的逻辑,先从基础的发送和接收的显示开始,要实现这两点,首先要完成的就有串口的打开和关闭发送处理接收显示

串口的打开和关闭

回到 main.py,先来实现串口打开和关闭,设置一个标志,作为判断串口是否运行的标志,这里我将标志命名为 self.uart_com_run_statu,然后我们要绑定该按纽的触发回调事件, self.uart_en_push_button.clicked.connect(self.uart_en_push_button_cb),在每次按下打开串口按钮后,我们获取 combo_box 当前内容的波特率、停止位、数据位等串口相关的信息,调用底层串口初始化 self.uart.uart_init(port, baud, stopbit, databit, checkbit) 传入这些内容,判断初始化是否成功,成功的话,将上面的运行标志置为 1,然后将打开串口按钮的文本设置为关闭串口,并且开启串口的线程,错误的话,说明有可能是串口已经被其他软件或应该打开,这里我们只要提示警告,不做任何处理就可以了。部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def uart_en_push_button_cb(self):
if self.uart_com_run_status == 0:
port = self.com_combo_box.currentText()
if port == '':
win32api.MessageBox(0, "请选择串口", "警告",win32con.MB_ICONWARNING)
return
baud = self.baud_combo_box.currentText()
stopbit = self.stopbit_combo_box.currentText()
databit = self.databit_combo_box.currentText()
checkbit = self.checkbit_combo_box.currentText()
self.uart.uart_init(port, baud, stopbit, databit, checkbit)
if self.uart.err == -1:
self.uart_com_run_status = 0
win32api.MessageBox(0, port+"已被使用", "警告",win32con.MB_ICONWARNING)
else:
self.uart_com_run_status = 1
self.uart.open_uart_thread()
self.uart_en_push_button.setText('关闭串口')
else:
self.uart_com_run_status = 0
self.uart.close_uart_thread()
self.uart_en_push_button.setText('打开串口')

串口开启成功,在此点击时关闭串口,这个时候将标志置为 0,并且关闭串口和串口线程,设置按钮文本为打开串口即可。

附上成功开启串口和失败开启串口的演示:

串口的发送

接下来实现串口的发送,发送这里,我们要知道发送的是 ascii 还是 hex 格式,所以我们要先判断当前勾选的发送格式,将之前 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.send_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_send_radio_button_cb)self.send_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_send_radio_button_cb),这两个回调事件实现了将发送编辑框的两种格式的相互转换,并且用 self.uart_send_hex_lock 作为当前格式的标志,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def uart_ascii_to_hex_send_radio_button_cb(self):
if self.send_ascii_radio_button.isChecked() == True:
self.uart_send_hex_lock = 0
send_text = self.uart_send_show.toPlainText().replace(' ', '')
self.uart_send_show.clear()
hex_send_text = self.hex2bin(send_text)
self.uart_send_show.setText(hex_send_text)
else:
return

def uart_hex_to_ascii_send_radio_button_cb(self):
if self.send_hex_radio_button.isChecked() == True:
self.uart_send_hex_lock = 1
text_list = []
send_text = bytes(self.uart_send_show.toPlainText(), encoding='utf-8')
for i in range(len(send_text)):
text_list.append(hex(send_text[i])[2:])
send_text_to_hex = ' '.join(text_list)
self.uart_send_show.clear()
self.uart_send_show.setText(send_text_to_hex)
else:
return

随后,设置一下发送按键的回调事件 self.uart_send_push_button.clicked.connect(self.uart_send_push_button_cb),这里判断当前的格式,将编辑框的内容获取后,相应的处理后,调用底层接口发送出去,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def uart_send_push_button_cb(self):
if self.uart_com_run_status == 0:
return
send_data = ''
send_text = self.uart_send_show.toPlainText()
if send_text == '':
return
if self.send_hex_radio_button.isChecked() == True: # 十六进制发送
hex_send_text = self.hex2bin(send_text.replace(' ', ''))
send_data = bytes(hex_send_text,encoding='utf-8')
else:
send_data = send_text.encode()
self.uart.uart_send_func(send_data)

附上,具体效果:

串口的接收

发送也成功后,开始实现接收,接收的话,同样需要将接收分为 ascii 和 hex 格式,所以同样要将接收 UI 的两个 radio button 分别分配一个点击回调事件,分别为 self.rec_ascii_radio_button.toggled.connect(self.uart_ascii_to_hex_rec_radio_button_cb)self.rec_hex_radio_button.toggled.connect(self.uart_hex_to_ascii_rec_radio_button_cb),这两个回调事件实现了将接收编辑框接收的的两种格式的相互转换。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
def uart_ascii_to_hex_rec_radio_button_cb(self):
if self.rec_ascii_radio_button.isChecked() == True:
self.uart.uart_set_rec_hex_lock(0)
else:
return

def uart_hex_to_ascii_rec_radio_button_cb(self):
if self.rec_hex_radio_button.isChecked() == True:
self.uart.uart_set_rec_hex_lock(1)
else:
return

上面代码可以看到,这里单纯只是设定标志,因为处理是在底层串口接收线程中对这个标志做了处理,具体可以看上面串口底层的代码,这里就不再赘述了,显示接口通过信号的方式进行更新,设置回调事件 self.uart_recv_updata_show_data_signal.connect(self.update_uart_recv_show_cb),在里面获取接收的数据,数据是由接收线程处理好后发送过来的,所以这边的回调处理同样很简单,具体如下:

1
2
3
4
def update_uart_recv_show_cb(self, data):
self.uart_rec_show.insertPlainText(data)
cursor = self.uart_rec_show.textCursor()
self.uart_rec_show.moveCursor(cursor.End)

具体效果:

定时发送

接收搞定后,我们增加定时发送功能,定时发送功能需要用到定时器,同时要获取定时功能的开启和定时时间,相关的代码如下:

1
2
3
4
5
6
7
8
# 定时器
self.uart_timer_num = 1000
self.uart_timer_line_edit.setText('1000')
self.uart_timer_send = QTimer()
self.uart_timer_send.timeout.connect(self.uart_timer_send_cb)

def uart_timer_send_cb(self):
self.uart_send_push_button_cb()

上面代码可以看出,创建了个定时器,并且将事件回调绑定在了 uart_timer_send_cb 中,回调里实现的就是不断的调用发送事件的回调,定时器的打开则通过 check box 控件,具体代码如下:

1
2
3
4
5
6
7
8
9
def uart_time_en_check_box_cb(self):
if self.uart_com_run_status == 0:
self.uart_timer_check_box.setChecked(False)
return None

if self.uart_timer_check_box.isChecked() == True:
self.uart_timer_send.start(int(self.uart_timer_num))
else:
self.uart_timer_send.stop()

上述代码实现了,如果串口未运行,则不使用定时器,判断用户是否勾选该功能来开启和停止定时器。

串口关闭后也需要相应的关闭定时器,防止无用的调用发送。

具体效果如下:

其他

定时功能做好后,还有个时间戳的功能,时间戳则是获取当前时间,根据标志判断时间戳是否开启,开启的话,则添加到显示数据中,相关的代码在串口底层的接收线程中,并且功能也比较简单,就不再详细的说了。同样的,还有发送清除和接收清除等简单的小功能,也不浪费太多篇幅。具体实现可以访问我的 github 中的 python_uart_tool 库中,详细的代码都会提交在上边。

结语

python 串口逻辑代码的编写,就到这里结束了,这是一个简单的 demo,很多例如:限制输入内容、回显、快速发送时显示问题等功能我没有添加上去。

发现自己对写这种代码加解释的博文不是很熟悉,总感觉会有些地方不清楚该怎样描述,而且我本身不是主学 python 的,所以难免会有出错或者你认为代码不合理的地方,这些也希望多多谅解和指教,我会及时修改的!

关于本篇的相关代码我也上传到 github 上去了,有兴趣的可以访问 QtComMatepython_uart_tool 查看,浏览的同时也可以在该 github 仓库中点个 Star 支持我一下😄。

与本文相关的内容还有:
python 开发环境搭建
python 获取已连接wifi密码工具
python 制作串口工具(一)