diff --git a/main_test.py b/main_test.py index 8e1c912..156fa13 100644 --- a/main_test.py +++ b/main_test.py @@ -4,8 +4,11 @@ # @File: main_test.py # @Software:PyCharm import itertools +import logging import os import time +import socket +import typing import cv2 import matplotlib.pyplot as plt @@ -160,6 +163,130 @@ class TestMain: return cross_area +class ValveTest: + def __init__(self): + self.reminder = """ + 快,给我个指令: + a. 开始命令 st e. 设置 光谱(a)相机 的延时,格式 e,500 + b. 停止命令 sp f. 设置 彩色(b)相机 的延时, 格式 f,500 + c. 设置光谱相机分频, 得是4的倍数而且>=8, 格式: c,8 g. 发个da和db完全重叠的mask + d. 阀板的脉冲分频系数, >=2即可 h. 发个da和db呈现出X形的mask + 或者你给我个小于256的数字,我就测试对应的喷阀,按q键我就退出。\n + """ + self.s = socket.socket() # 创建 socket 对象 + host = socket.gethostname() # 获取本地主机名 + port = 13452 # 设置端口 + self.s.bind((host, port)) # 绑定端口 + self.s.listen(5) # 等待客户端连接 + + def run(self): + while True: + logging.info("我在等连接...") + c, addr = self.s.accept() # 建立客户端连接 + print('Connection Established:', addr) + value = input(self.reminder) + if value == 'q': + break + else: + self.process_cmd(value) + c.close() # 关闭连接 + + def pad_cmd(self, cmd): + return b'\xAA'+cmd+b'\xFF\xFF\xBB' + + def param_cmd_parser(self, cmd, default_value, checker=None): + try: + value = int(cmd.split(',')[-1]) + except: + print(f'你给的值不对啊,我先给你弄个{default_value}吧') + value = default_value + if checker is not None: + if not checker(value): + return None + return value + + def process_cmd(self, value): + if value == 'a': + # a.开始命令 + cmd = b'\x00\x03' + 'sa'.encode('ascii') + b'\xFF' + elif value == 'b': + # b.停止命令 + cmd = b'\x00\x03' + 'sb'.encode('ascii') + b'\xFF' + elif value.startswith('c'): + # c. 设置光谱相机分频,得是4的倍数而且>=8,格式:c,8 + checker = lambda x: (x // 4 == 0) and (x >= 8) + value = self.param_cmd_parser(value, default_value=8, checker=checker) + if value is None: + print("值需要是4的倍数且大于8") + return + cmd = b'\x00\x0a' + 'sc'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value.startswith('d'): + # d. 阀板的脉冲分频系数,>=2即可 + checker = lambda x: x >= 2 + value = self.param_cmd_parser(value, default_value=2, checker=checker) + if value is None: + print("你得大于等于2") + return + cmd = b'\x00\x0a' + 'sv'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value.startswith('e'): + # e. 设置 光谱(a)相机 的延时,格式 e,500 + checker = lambda x: (x >= 0) + value = self.param_cmd_parser(value, default_value=2, checker=checker) + if value is None: + print("你得大于等于0") + return + cmd = b'\x00\x0a' + 'sa'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value.startswith('f'): + # f. 设置 RGB(b)相机 的延时,格式 e,500 + checker = lambda x: (x >= 0) + value = self.param_cmd_parser(value, default_value=2, checker=checker) + if value is None: + print("你得大于等于0") + return + cmd = b'\x00\x0a' + 'sb'.encode('ascii') + f"{value:08d}".encode('ascii') + elif value == 'g': + # g.发个da和db完全重叠的mask + mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8) + len_a, data_a = self.format_data(mask_a) + len_b, data_b = self.format_data(mask_b) + cmd = len_a + 'da'.encode('ascii') + mask_a + self.send(cmd) + cmd = len_b + 'db'.encode('ascii') + mask_b + elif value == 'h': + # h.发个da和db呈现出X形的mask + mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8).T + len_a, data_a = self.format_data(mask_a) + len_b, data_b = self.format_data(mask_b) + cmd = len_a + 'da'.encode('ascii') + mask_a + self.send(cmd) + cmd = len_b + 'db'.encode('ascii') + mask_b + else: + try: + value = int(value) + except Exception as e: + print(e) + print(f"你给的指令: {value} 咋看都不对") + return + self.send(cmd) + + def send(self, cmd: bytes) -> None: + print("我要send 这个了:\n") + print(cmd) + cmd = self.pad_cmd(cmd) + try: + self.s.send(cmd) + except Exception as e: + print(f"发失败了, 这是我找到的错误信息\n{e}") + return + print("发好了") + + def format_data(self, array_to_send: np.ndarray) -> (bytes, bytes): + data = np.packbits(array_to_send, axis=-1) + data = data.tobytes() + data_len = (len(data)+2).to_bytes(2, 'big') + return data_len, data + + if __name__ == '__main__': testor = TestMain() testor.pony_run(test_path=r'/home/lzy/2022.7.30/tobacco_v1_0/saved_img/',