From 39d3aa4b59b14830625bead781674495351e263b Mon Sep 17 00:00:00 2001
From: "li.zhenye"
Date: Mon, 1 Aug 2022 22:46:34 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E4=BA=86=E5=96=B7=E9=98=80?=
=?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=A8=A1=E5=9D=97?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
main_test.py | 135 ++------------------------------------
valve_test.py | 178 ++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 185 insertions(+), 128 deletions(-)
create mode 100644 valve_test.py
diff --git a/main_test.py b/main_test.py
index 156fa13..fc3a084 100644
--- a/main_test.py
+++ b/main_test.py
@@ -125,7 +125,7 @@ class TestMain:
response_altitude = np.sum(np.sum(rgb_cross_area & spce_cross_area))
search_area[x, y] = response_altitude
delta = np.unravel_index(np.argmax(search_area), search_area.shape)
- delta = (delta[0] - search_area_size[1]//2, delta[1] - search_area_size[1]//2)
+ delta = (delta[0] - search_area_size[1] // 2, delta[1] - search_area_size[1] // 2)
delta_x, delta_y = delta
rgb_cross_area = self.get_cross_area(rgb_bin, delta_x, delta_y)
@@ -163,131 +163,10 @@ class TestMain:
return cross_area
-class ValveTest:
- def __init__(self):
- self.reminder = """
- 快,给我个指令:
- a. 开始命令 st e. 设置 光谱(a)相机 的延时,格式 e,500
- b. 停止命令 sp f. 设置 彩色(b)相机 的延时, 格式 f,500
- c. 设置光谱相机分频, 得是4的倍数而且>=8, 格式: c,8 g. 发个da和db完全重叠的mask
- d. 阀板的脉冲分频系数, >=2即可 h. 发个da和db呈现出X形的mask
- 或者你给我个小于256的数字,我就测试对应的喷阀,按q键我就退出。\n
- """
- self.s = socket.socket() # 创建 socket 对象
- host = socket.gethostname() # 获取本地主机名
- port = 13452 # 设置端口
- self.s.bind((host, port)) # 绑定端口
- self.s.listen(5) # 等待客户端连接
-
- def run(self):
- while True:
- logging.info("我在等连接...")
- c, addr = self.s.accept() # 建立客户端连接
- print('Connection Established:', addr)
- value = input(self.reminder)
- if value == 'q':
- break
- else:
- self.process_cmd(value)
- c.close() # 关闭连接
-
- def pad_cmd(self, cmd):
- return b'\xAA'+cmd+b'\xFF\xFF\xBB'
-
- def param_cmd_parser(self, cmd, default_value, checker=None):
- try:
- value = int(cmd.split(',')[-1])
- except:
- print(f'你给的值不对啊,我先给你弄个{default_value}吧')
- value = default_value
- if checker is not None:
- if not checker(value):
- return None
- return value
-
- def process_cmd(self, value):
- if value == 'a':
- # a.开始命令
- cmd = b'\x00\x03' + 'sa'.encode('ascii') + b'\xFF'
- elif value == 'b':
- # b.停止命令
- cmd = b'\x00\x03' + 'sb'.encode('ascii') + b'\xFF'
- elif value.startswith('c'):
- # c. 设置光谱相机分频,得是4的倍数而且>=8,格式:c,8
- checker = lambda x: (x // 4 == 0) and (x >= 8)
- value = self.param_cmd_parser(value, default_value=8, checker=checker)
- if value is None:
- print("值需要是4的倍数且大于8")
- return
- cmd = b'\x00\x0a' + 'sc'.encode('ascii') + f"{value:08d}".encode('ascii')
- elif value.startswith('d'):
- # d. 阀板的脉冲分频系数,>=2即可
- checker = lambda x: x >= 2
- value = self.param_cmd_parser(value, default_value=2, checker=checker)
- if value is None:
- print("你得大于等于2")
- return
- cmd = b'\x00\x0a' + 'sv'.encode('ascii') + f"{value:08d}".encode('ascii')
- elif value.startswith('e'):
- # e. 设置 光谱(a)相机 的延时,格式 e,500
- checker = lambda x: (x >= 0)
- value = self.param_cmd_parser(value, default_value=2, checker=checker)
- if value is None:
- print("你得大于等于0")
- return
- cmd = b'\x00\x0a' + 'sa'.encode('ascii') + f"{value:08d}".encode('ascii')
- elif value.startswith('f'):
- # f. 设置 RGB(b)相机 的延时,格式 e,500
- checker = lambda x: (x >= 0)
- value = self.param_cmd_parser(value, default_value=2, checker=checker)
- if value is None:
- print("你得大于等于0")
- return
- cmd = b'\x00\x0a' + 'sb'.encode('ascii') + f"{value:08d}".encode('ascii')
- elif value == 'g':
- # g.发个da和db完全重叠的mask
- mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8)
- len_a, data_a = self.format_data(mask_a)
- len_b, data_b = self.format_data(mask_b)
- cmd = len_a + 'da'.encode('ascii') + mask_a
- self.send(cmd)
- cmd = len_b + 'db'.encode('ascii') + mask_b
- elif value == 'h':
- # h.发个da和db呈现出X形的mask
- mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8).T
- len_a, data_a = self.format_data(mask_a)
- len_b, data_b = self.format_data(mask_b)
- cmd = len_a + 'da'.encode('ascii') + mask_a
- self.send(cmd)
- cmd = len_b + 'db'.encode('ascii') + mask_b
- else:
- try:
- value = int(value)
- except Exception as e:
- print(e)
- print(f"你给的指令: {value} 咋看都不对")
- return
- self.send(cmd)
-
- def send(self, cmd: bytes) -> None:
- print("我要send 这个了:\n")
- print(cmd)
- cmd = self.pad_cmd(cmd)
- try:
- self.s.send(cmd)
- except Exception as e:
- print(f"发失败了, 这是我找到的错误信息\n{e}")
- return
- print("发好了")
-
- def format_data(self, array_to_send: np.ndarray) -> (bytes, bytes):
- data = np.packbits(array_to_send, axis=-1)
- data = data.tobytes()
- data_len = (len(data)+2).to_bytes(2, 'big')
- return data_len, data
-
-
if __name__ == '__main__':
- testor = TestMain()
- testor.pony_run(test_path=r'/home/lzy/2022.7.30/tobacco_v1_0/saved_img/',
- test_rgb=True, test_spectra=True, get_delta=False)
+ import argparse
+ parser = argparse.ArgumentParser(description='Run image test or ')
+ tester = TestMain()
+ tester.pony_run(test_path=r'/home/lzy/2022.7.30/tobacco_v1_0/saved_img/',
+ test_rgb=False, test_spectra=False, get_delta=False)
+
diff --git a/valve_test.py b/valve_test.py
new file mode 100644
index 0000000..43e4bd8
--- /dev/null
+++ b/valve_test.py
@@ -0,0 +1,178 @@
+import logging
+import socket
+
+import numpy as np
+
+
+class ValveTest:
+ def __init__(self):
+ self.last_cmd = None
+ self.reminder = """======================================================================================
+快,给我个指令😉😉😉︎:
+a. 开始命令 st. e. 设置 光谱(a)相机 的延时,格式 e,500
+b. 停止命令 sp. f. 设置 彩色(b)相机 的延时, 格式 f,500
+c. 设置光谱相机分频系数,4的倍数且>=8, 格式 c,8 g. 发个da和db完全重叠的mask
+d. 阀板的脉冲分频系数,>=2即可 h. 发个da和db呈现出X形的mask
+
+你给我个小于256的数字,我就测试对应的喷阀。如果已经测试过一个,可以直接回车测下一个。
+给q指令我就退出。
+======================================================================================\n"""
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 socket 对象
+ host = socket.gethostname() # 获取本地主机名
+ port = 13452 # 设置端口
+ self.s.bind((host, port)) # 绑定端口
+ self.s.listen(1) # 等待客户端连接
+ self.c = None
+
+ def run(self):
+ print("我在等连接...")
+ self.c, addr = self.s.accept() # 建立客户端连接
+ print('和它的链接建立成功了:', addr)
+ while True:
+ value = input(self.reminder)
+ if value == 'q':
+ print("好的,我退出啦")
+ break
+ else:
+ self.process_cmd(value)
+ self.c.close() # 关闭连接
+
+ @staticmethod
+ def cmd_padding(cmd):
+ return b'\xAA' + cmd + b'\xFF\xFF\xBB'
+
+ @staticmethod
+ def param_cmd_parser(cmd, default_value, checker=None):
+ try:
+ value = int(cmd.split(',')[-1])
+ except:
+ print(f'你给的值不对啊,我先给你弄个{default_value}吧')
+ value = default_value
+ if checker is not None:
+ if not checker(value):
+ return None
+ return value
+
+ def process_cmd(self, value):
+ if value == 'a':
+ # a.开始命令
+ cmd = b'\x00\x03' + 'sa'.encode('ascii') + b'\xFF'
+ elif value == 'b':
+ # b.停止命令
+ cmd = b'\x00\x03' + 'sb'.encode('ascii') + b'\xFF'
+ elif value.startswith('c'):
+ # c. 设置光谱相机分频,得是4的倍数而且>=8,格式:c,8
+ checker = lambda x: (x % 4 == 0) and (x >= 8)
+ value = self.param_cmd_parser(value, default_value=8, checker=checker)
+ if value is None:
+ print("值需要是4的倍数且大于8")
+ return
+ cmd = b'\x00\x0a' + 'sc'.encode('ascii') + f"{value:08d}".encode('ascii')
+ elif value.startswith('d'):
+ # d. 阀板的脉冲分频系数,>=2即可
+ checker = lambda x: x >= 2
+ value = self.param_cmd_parser(value, default_value=2, checker=checker)
+ if value is None:
+ print("你得大于等于2")
+ return
+ cmd = b'\x00\x0a' + 'sv'.encode('ascii') + f"{value:08d}".encode('ascii')
+ elif value.startswith('e'):
+ # e. 设置 光谱(a)相机 的延时,格式 e,500
+ checker = lambda x: (x >= 0)
+ value = self.param_cmd_parser(value, default_value=2, checker=checker)
+ if value is None:
+ print("你得大于等于0")
+ return
+ cmd = b'\x00\x0a' + 'sa'.encode('ascii') + f"{value:08d}".encode('ascii')
+ elif value.startswith('f'):
+ # f. 设置 RGB(b)相机 的延时,格式 e,500
+ checker = lambda x: (x >= 0)
+ value = self.param_cmd_parser(value, default_value=2, checker=checker)
+ if value is None:
+ print("你得大于等于0")
+ return
+ cmd = b'\x00\x0a' + 'sb'.encode('ascii') + f"{value:08d}".encode('ascii')
+ elif value == 'g':
+ # g.发个da和db完全重叠的mask
+ mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8)
+ len_a, data_a = self.format_data(mask_a)
+ len_b, data_b = self.format_data(mask_b)
+ cmd = len_a + 'da'.encode('ascii') + data_a
+ self.send(cmd)
+ cmd = len_b + 'db'.encode('ascii') + data_b
+ elif value == 'h':
+ # h.发个da和db呈现出X形的mask
+ mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8).T
+ len_a, data_a = self.format_data(mask_a)
+ len_b, data_b = self.format_data(mask_b)
+ cmd = len_a + 'da'.encode('ascii') + data_a
+ self.send(cmd)
+ cmd = len_b + 'db'.encode('ascii') + data_b
+ elif value == '' and self.last_cmd is not None:
+ self.last_cmd += 1
+ if self.last_cmd > 256:
+ self.last_cmd = 1
+ print(f'自动变化到 喷阀测试 {self.last_cmd}')
+ value = self.last_cmd
+ cmd = b'\x00\x0A' + 'te'.encode('ascii') + f"{value:08d}".encode('ascii')
+ else:
+ try:
+ value = int(value)
+ except Exception as e:
+ print(e)
+ print(f"你给的指令: {value} 咋看都不对")
+ return
+ if (value <= 256) and (value >= 1):
+ cmd = b'\x00\x0A' + 'te'.encode('ascii') + f"{value:08d}".encode('ascii')
+ self.last_cmd = value
+ else:
+ print(f'你给的指令: {value} 值不对,我们有256个阀门, 范围是 [1, 256]')
+ return
+ self.send(cmd)
+
+ def send(self, cmd: bytes) -> None:
+ cmd = self.cmd_padding(cmd)
+ print("我要 send 这个了:")
+ print(cmd.hex())
+ try:
+ self.c.send(cmd)
+ except Exception as e:
+ print(f"发失败了, 这是我找到的错误信息:\n{e}")
+ return
+ print("发好了")
+
+ @staticmethod
+ def format_data(array_to_send: np.ndarray) -> (bytes, bytes):
+ data = np.packbits(array_to_send, axis=-1)
+ data = data.tobytes()
+ data_len = (len(data) + 2).to_bytes(2, 'big')
+ return data_len, data
+
+
+class VirtualValve:
+ def __init__(self):
+ self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 声明socket类型,同时生成链接对象
+ self.client.connect(('localhost', 13452)) # 建立一个链接,连接到本地的6969端口
+
+ def run(self):
+ while True:
+ # addr = client.accept()
+ # print '连接地址:', addr
+ data = self.client.recv(4096) # 接收一个信息,并指定接收的大小 为1024字节
+ print(data.hex()) # 输出我接收的信息
+
+
+if __name__ == '__main__':
+ import argparse
+
+ parser = argparse.ArgumentParser(description='阀门测程序')
+ parser.add_argument('-c', default=False, action='store_true', help='是否是开个客户端', required=False)
+ args = parser.parse_args()
+ if args.c:
+ print("运行客户机")
+ virtual_valve = VirtualValve()
+ virtual_valve.run()
+ else:
+ print("运行主机")
+ valve_tester = ValveTest()
+ valve_tester.run()