From b9c7892fb60cf3e4c814d5a05c46a3b46ba3a288 Mon Sep 17 00:00:00 2001 From: lyz55 <1942503466@qq.com> Date: Thu, 19 Dec 2024 13:32:55 +0800 Subject: [PATCH] First version --- .gitignore | 0 ThresholdSimulation.py | 238 +++++++++++++++++++++++++++++++++++++++++ valve_test.py | 229 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 467 insertions(+) create mode 100644 .gitignore create mode 100644 ThresholdSimulation.py create mode 100644 valve_test.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/ThresholdSimulation.py b/ThresholdSimulation.py new file mode 100644 index 0000000..782e63a --- /dev/null +++ b/ThresholdSimulation.py @@ -0,0 +1,238 @@ +import sys +import os +import cv2 +import numpy as np +from PyQt5 import QtCore, QtGui, QtWidgets + + +class ImageViewer(QtWidgets.QWidget): + def __init__(self, folder_path): + super().__init__() + self.folder_path = folder_path + self.image_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.bmp')] + self.image_files.sort() + self.current_index = 0 + self.threshold_g = 50 # 初始 T_g 值,根据需要调整 + self.threshold_diff = 10 # 初始 T 值,根据需要调整 + + # 设置UI + self.initUI() + if self.image_files: + self.loadImage(self.current_index) + + def initUI(self): + # 主布局 + main_layout = QtWidgets.QHBoxLayout(self) + + # 侧边栏 + self.sidebar = QtWidgets.QListWidget() + self.sidebar.addItems(self.image_files) + self.sidebar.currentRowChanged.connect(self.sidebarSelectionChanged) + main_layout.addWidget(self.sidebar, 1) + + # 右侧区域 + right_layout = QtWidgets.QVBoxLayout() + + # 图片显示区域 + self.original_label = QtWidgets.QLabel("Original Image") + self.masked_label = QtWidgets.QLabel("Masked Image") + + # 使用垂直布局上下显示两张图片 + images_layout = QtWidgets.QVBoxLayout() + images_layout.addWidget(self.original_label) + images_layout.addWidget(self.masked_label) + right_layout.addLayout(images_layout, 8) + + # 阈值调整区域 + threshold_layout = QtWidgets.QGridLayout() + + # T_g 调整 + self.slider_g = QtWidgets.QSlider(QtCore.Qt.Horizontal) + self.slider_g.setMinimum(0) + self.slider_g.setMaximum(255) + self.slider_g.setValue(self.threshold_g) + self.slider_g.valueChanged.connect(self.sliderGChanged) + self.input_g = QtWidgets.QLineEdit(str(self.threshold_g)) + self.input_g.setFixedWidth(50) + self.input_g.returnPressed.connect(self.inputGChanged) + + threshold_layout.addWidget(QtWidgets.QLabel("T_g (G > T_g):"), 0, 0) + threshold_layout.addWidget(self.slider_g, 0, 1) + threshold_layout.addWidget(self.input_g, 0, 2) + + # T 调整 + self.slider_diff = QtWidgets.QSlider(QtCore.Qt.Horizontal) + self.slider_diff.setMinimum(-255) + self.slider_diff.setMaximum(255) + self.slider_diff.setValue(self.threshold_diff) + self.slider_diff.valueChanged.connect(self.sliderDiffChanged) + self.input_diff = QtWidgets.QLineEdit(str(self.threshold_diff)) + self.input_diff.setFixedWidth(50) + self.input_diff.returnPressed.connect(self.inputDiffChanged) + + threshold_layout.addWidget(QtWidgets.QLabel("T (G - R > T):"), 1, 0) + threshold_layout.addWidget(self.slider_diff, 1, 1) + threshold_layout.addWidget(self.input_diff, 1, 2) + + right_layout.addLayout(threshold_layout, 2) + + # 按钮 + button_layout = QtWidgets.QHBoxLayout() + self.prev_button = QtWidgets.QPushButton("Previous (P)") + self.next_button = QtWidgets.QPushButton("Next (N)") + self.prev_button.clicked.connect(self.showPreviousImage) + self.next_button.clicked.connect(self.showNextImage) + button_layout.addWidget(self.prev_button) + button_layout.addWidget(self.next_button) + right_layout.addLayout(button_layout, 1) + + main_layout.addLayout(right_layout, 4) + + self.setLayout(main_layout) + self.setWindowTitle("BMP Image Viewer with Mask") + self.resize(1600, 900) + + def loadImage(self, index): + if index < 0 or index >= len(self.image_files): + return + + image_path = os.path.join(self.folder_path, self.image_files[index]) + image = cv2.imread(image_path) + if image is None: + QtWidgets.QMessageBox.warning(self, "Error", f"无法加载图片: {image_path}") + return + + # 调整大小以适应窗口 + image = self.resizeImage(image, max_width=800, max_height=800) + self.original_image = image.copy() + self.displayImage(self.original_label, self.original_image) + + # 生成掩膜并显示 + mask = self.generateMask(image, self.threshold_g, self.threshold_diff) + masked_image = self.applyMask(image, mask) + self.masked_image = masked_image + self.displayImage(self.masked_label, self.masked_image) + + # 更新侧边栏选择 + self.sidebar.blockSignals(True) + self.sidebar.setCurrentRow(index) + self.sidebar.blockSignals(False) + + def resizeImage(self, image, max_width=800, max_height=800): + height, width = image.shape[:2] + scaling_factor = min(max_width / width, max_height / height, 1) + new_size = (int(width * scaling_factor), int(height * scaling_factor)) + resized_image = cv2.resize(image, new_size, interpolation=cv2.INTER_AREA) + return resized_image + + def displayImage(self, label, image): + # 转换颜色格式从 BGR 到 RGB + rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + height, width, channel = rgb_image.shape + bytes_per_line = 3 * width + q_image = QtGui.QImage(rgb_image.data, width, height, bytes_per_line, QtGui.QImage.Format_RGB888) + pixmap = QtGui.QPixmap.fromImage(q_image) + label.setPixmap(pixmap) + label.setAlignment(QtCore.Qt.AlignCenter) + + def generateMask(self, image, threshold_g, threshold_diff): + # 分离 G 和 R 通道 + B, G, R = cv2.split(image) + + # 生成掩膜:G > T_g 且 (G - R) > T + mask = (G > threshold_g) & ((G.astype(int) - R.astype(int)) > threshold_diff) + return mask + + def applyMask(self, image, mask): + # 创建一个红色的掩膜 + overlay = image.copy() + overlay[mask] = [0, 0, 255] # BGR格式,红色 + + # 透明度混合 + alpha = 0.5 + cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) + + return image + + def showNextImage(self): + if self.current_index < len(self.image_files) - 1: + self.current_index += 1 + self.loadImage(self.current_index) + + def showPreviousImage(self): + if self.current_index > 0: + self.current_index -= 1 + self.loadImage(self.current_index) + + def sliderGChanged(self, value): + self.threshold_g = value + self.input_g.setText(str(self.threshold_g)) + self.updateMask() + + def sliderDiffChanged(self, value): + self.threshold_diff = value + self.input_diff.setText(str(self.threshold_diff)) + self.updateMask() + + def inputGChanged(self): + try: + value = int(self.input_g.text()) + if 0 <= value <= 255: + self.threshold_g = value + self.slider_g.setValue(value) + self.updateMask() + else: + raise ValueError + except ValueError: + QtWidgets.QMessageBox.warning(self, "Invalid Input", "T_g 必须是0到255之间的整数。") + self.input_g.setText(str(self.threshold_g)) + + def inputDiffChanged(self): + try: + value = int(self.input_diff.text()) + if -255 <= value <= 255: + self.threshold_diff = value + self.slider_diff.setValue(value) + self.updateMask() + else: + raise ValueError + except ValueError: + QtWidgets.QMessageBox.warning(self, "Invalid Input", "T 必须是-255到255之间的整数。") + self.input_diff.setText(str(self.threshold_diff)) + + def updateMask(self): + if hasattr(self, 'original_image'): + mask = self.generateMask(self.original_image, self.threshold_g, self.threshold_diff) + masked_image = self.applyMask(self.original_image.copy(), mask) + self.masked_image = masked_image + self.displayImage(self.masked_label, self.masked_image) + + def sidebarSelectionChanged(self, index): + if 0 <= index < len(self.image_files): + self.current_index = index + self.loadImage(self.current_index) + + def keyPressEvent(self, event): + if event.key() == QtCore.Qt.Key_N: + self.showNextImage() + elif event.key() == QtCore.Qt.Key_P: + self.showPreviousImage() + else: + super().keyPressEvent(event) + + +def main(): + app = QtWidgets.QApplication(sys.argv) + + # 这里你可以修改为你想要读取的文件夹路径 + folder_path = QtWidgets.QFileDialog.getExistingDirectory(None, "选择包含 BMP 图片的文件夹") + if not folder_path: + sys.exit() + + viewer = ImageViewer(folder_path) + viewer.show() + sys.exit(app.exec_()) + + +if __name__ == '__main__': + main() diff --git a/valve_test.py b/valve_test.py new file mode 100644 index 0000000..90a317c --- /dev/null +++ b/valve_test.py @@ -0,0 +1,229 @@ +import socket +import time + +import cv2 +import numpy as np + + +class ValveTest: + def __init__(self, host=None, port=13452, column_num=64): + self.column_num = column_num + self.increase_modes = ['测下一个', '重复测试'] + self.last_cmd = None + self.increase_mode = 0 + self.reminder = None + self.update_reminder() + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 socket 对象 + self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + host = socket.gethostname() if host is None else host # 获取本地主机名 + print(f"Service Address {host}, {port}.") + self.s.bind((host, port)) # 绑定端口 + self.s.listen(5) # 等待客户端连接 + self.c = None + + + def update_reminder(self): + self.reminder = f"""====================================================================================== +快,给我个指令😉😉😉︎: +a. 开始命令 st. e. 设置 光谱(a)相机 的延时,格式 e,500 +b. 停止命令 sp. f. 设置 彩色(b)相机 的延时, 格式 f,500 +c. 设置光谱相机分频系数,4的倍数且>=8, 格式 c,8 g. 发个da和db完全重叠的mask +d. 阀板的脉冲分频系数,>=2即可 h. 发个da和db呈现出X形的mask +m. 模式切换:测下一个喷阀还是重发? +你给我个小于{self.column_num}的数字,我就测试对应的喷阀。如果已经测试过一个,可以直接回车{self.increase_modes[self.increase_mode]}。 +给q指令我就退出。 +======================================================================================\n""" + + def run(self): + print("我在等连接...") + self.c, addr = self.s.accept() # 建立客户端连接 + print('和它的链接建立成功了:', addr) + self.c.settimeout(0.1) + while True: + data = '' + try: + data = self.c.recv(1024) + except Exception as e: + print(f"===================================================================") + if len(data) > 0: + print("receive data!!!") + print(data) + value = input(self.reminder) + if value == 'q': + print("好的,我退出啦") + self.s.close() + break + else: + self.process_cmd(value) + self.c.close() # 关闭连接 + + @staticmethod + def cmd_padding(cmd): + return b'\xAA' + cmd + b'\xFF\xFF\xBB' + + @staticmethod + def param_cmd_parser(cmd, default_value, checker=None): + try: + value = int(cmd.split(',')[-1]) + except: + print(f'你给的值不对啊,我先给你弄个{default_value}吧') + value = default_value + if checker is not None: + if not checker(value): + return None + return value + + def process_cmd(self, value): + if value == 'a': + # a.开始命令 + cmd = b'\x00\x03' + 'st'.encode('ascii') + b'\xFF' + elif value == 'b': + # b.停止命令 + cmd = b'\x00\x03' + 'sp'.encode('ascii') + b'\xFF' + elif value.startswith('c'): + # c. 设置光谱相机分频,得是4的倍数而且>=8,格式:c,8 + checker = lambda x: (x % 4 == 0) and (x >= 8) + value = self.param_cmd_parser(value, default_value=8, checker=checker) + if value is None: + print("值需要是4的倍数且大于8") + return + cmd = b'\x00\x0a' + 'sc'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value.startswith('d'): + # d. 阀板的脉冲分频系数,>=2即可 + checker = lambda x: x >= 2 + value = self.param_cmd_parser(value, default_value=2, checker=checker) + if value is None: + print("你得大于等于2") + return + cmd = b'\x00\x0a' + 'sv'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value.startswith('e'): + # e. 设置 光谱(a)相机 的延时,格式 e,500 + checker = lambda x: (x >= 0) + value = self.param_cmd_parser(value, default_value=2, checker=checker) + if value is None: + print("你得大于等于0") + return + cmd = b'\x00\x0a' + 'sd'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value.startswith('f'): + # f. 设置 RGB(b)相机 的延时,格式 e,500 + checker = lambda x: (x >= 0) + value = self.param_cmd_parser(value, default_value=2, checker=checker) + if value is None: + print("你得大于等于0") + return + cmd = b'\x00\x0a' + 'sb'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value == 'g': + # g.发个da和db完全重叠的mask + mask_a, mask_b = np.eye(self.column_num, dtype=np.uint8), np.eye(self.column_num, dtype=np.uint8) + mask_a, mask_b = [cv2.resize(mask, mask_size) for mask in [mask_a, mask_b]] + len_a, data_a = self.format_data(mask_a) + len_b, data_b = self.format_data(mask_b) + cmd = len_a + 'da'.encode('ascii') + data_a + self.send(cmd) + cmd = len_b + 'db'.encode('ascii') + data_b + elif value == 'h': + # h.发个da和db呈现出X形的mask + mask_a, mask_b = np.eye(self.column_num, dtype=np.uint8), np.eye(self.column_num, dtype=np.uint8).T + mask_a, mask_b = [cv2.resize(mask, mask_size) for mask in [mask_a, mask_b]] + len_a, data_a = self.format_data(mask_a) + len_b, data_b = self.format_data(mask_b) + cmd = len_a + 'da'.encode('ascii') + data_a + self.send(cmd) + cmd = len_b + 'db'.encode('ascii') + data_b + elif value == 'i': + # i. 完全模拟全流程 + # a.开始命令 + cmd = b'\x00\x03' + 'st'.encode('ascii') + b'\xFF' + self.send(cmd) + time.sleep(0.05) + mask_a, mask_b = np.eye(self.column_num, dtype=np.uint8), np.eye(self.column_num, dtype=np.uint8) + mask_a, mask_b = [cv2.resize(mask, mask_size) for mask in [mask_a, mask_b]] + len_a, data_a = self.format_data(mask_a) + len_b, data_b = self.format_data(mask_b) + cmd = len_a + 'da'.encode('ascii') + data_a + for i in range(1000): + for i in range(1): + self.send(cmd) + time.sleep(0.045) + # cmd = len_b + 'db'.encode('ascii') + data_b + elif value == 'm': + self.increase_mode = int(1 - self.increase_mode) + self.update_reminder() + print("模式切换") + return + elif value == '' and self.last_cmd is not None: + if self.increase_mode == 0: + self.last_cmd += 1 + if self.last_cmd > 256: + self.last_cmd = 1 + print(f'自动变化到 喷阀测试 {self.last_cmd}') + value = self.last_cmd + cmd = b'\x00\x0A' + 'te'.encode('ascii') + f"{value - 1:08d}".encode('ascii') + else: + try: + value = int(value) + except Exception as e: + print(e) + print(f"你给的指令: {value} 咋看都不对") + return + if (value <= self.column_num) and (value >= 1): + cmd = b'\x00\x0A' + 'te'.encode('ascii') + f"{value - 1:08d}".encode('ascii') + self.last_cmd = value + elif value == (self.column_num+1): + cmd = b'\x00\x0A' + 'te'.encode('ascii') + f"{value:08d}".encode('ascii') + print(f"恭喜你发现了这个隐藏的{self.column_num+1}号流水灯指令😝😝😝,好厉害。") + else: + print(f'你给的指令: {value} 值不对,我们有 {self.column_num} 个阀门, 范围是 [1, {self.column_num}],略大一个好像也可以') + return + self.send(cmd) + + def send(self, cmd: bytes) -> None: + cmd = self.cmd_padding(cmd) + print("我要 send 这个了:") + print(cmd.hex()) + try: + self.c.send(cmd) + except Exception as e: + print(f"发失败了, 这是我找到的错误信息:\n{e}") + return + print("发好了") + + @staticmethod + def format_data(array_to_send: np.ndarray) -> (bytes, bytes): + data = np.packbits(array_to_send, axis=-1) + data = data.tobytes() + data_len = (len(data) + 2).to_bytes(2, 'big') + return data_len, data + + +class VirtualValve: + def __init__(self, host, port): + self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 声明socket类型,同时生成链接对象 + self.client.connect((host, port)) # 建立一个链接,连接到本地的13452端口 + + def run(self): + while True: + # addr = client.accept() + # print '连接地址:', addr + data = self.client.recv(4096) # 接收一个信息,并指定接收的大小 为1024字节 + print("原始报文:") + print(data.hex()) # 输出我接收的信息 + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description='阀门测程序') + parser.add_argument('-c', default=False, action='store_true', help='是否是开个客户端', required=False) + parser.add_argument('-m', default='192.168.10.8', help='指定master主机名') + parser.add_argument('-p', default=13452, help='指定端口') + parser.add_argument('-cnt', default=64, help='指定端口') + args = parser.parse_args() + mask_size = (512, args.cnt) # size of cv (Width, Height) + if args.c: + print("运行客户机") + virtual_valve = VirtualValve(host=args.m, port=args.p) + virtual_valve.run() + else: + print("运行主机") + valve_tester = ValveTest(host=args.m, port=args.p, column_num=args.cnt) + valve_tester.run()