mirror of
https://github.com/NanjingForestryUniversity/supermachine-tobacco.git
synced 2025-11-08 14:23:55 +00:00
完善了喷阀测试模块
This commit is contained in:
parent
65c988bb05
commit
39d3aa4b59
135
main_test.py
135
main_test.py
@ -125,7 +125,7 @@ class TestMain:
|
|||||||
response_altitude = np.sum(np.sum(rgb_cross_area & spce_cross_area))
|
response_altitude = np.sum(np.sum(rgb_cross_area & spce_cross_area))
|
||||||
search_area[x, y] = response_altitude
|
search_area[x, y] = response_altitude
|
||||||
delta = np.unravel_index(np.argmax(search_area), search_area.shape)
|
delta = np.unravel_index(np.argmax(search_area), search_area.shape)
|
||||||
delta = (delta[0] - search_area_size[1]//2, delta[1] - search_area_size[1]//2)
|
delta = (delta[0] - search_area_size[1] // 2, delta[1] - search_area_size[1] // 2)
|
||||||
delta_x, delta_y = delta
|
delta_x, delta_y = delta
|
||||||
|
|
||||||
rgb_cross_area = self.get_cross_area(rgb_bin, delta_x, delta_y)
|
rgb_cross_area = self.get_cross_area(rgb_bin, delta_x, delta_y)
|
||||||
@ -163,131 +163,10 @@ class TestMain:
|
|||||||
return cross_area
|
return cross_area
|
||||||
|
|
||||||
|
|
||||||
class ValveTest:
|
|
||||||
def __init__(self):
|
|
||||||
self.reminder = """
|
|
||||||
快,给我个指令:
|
|
||||||
a. 开始命令 st e. 设置 光谱(a)相机 的延时,格式 e,500
|
|
||||||
b. 停止命令 sp f. 设置 彩色(b)相机 的延时, 格式 f,500
|
|
||||||
c. 设置光谱相机分频, 得是4的倍数而且>=8, 格式: c,8 g. 发个da和db完全重叠的mask
|
|
||||||
d. 阀板的脉冲分频系数, >=2即可 h. 发个da和db呈现出X形的mask
|
|
||||||
或者你给我个小于256的数字,我就测试对应的喷阀,按q键我就退出。\n
|
|
||||||
"""
|
|
||||||
self.s = socket.socket() # 创建 socket 对象
|
|
||||||
host = socket.gethostname() # 获取本地主机名
|
|
||||||
port = 13452 # 设置端口
|
|
||||||
self.s.bind((host, port)) # 绑定端口
|
|
||||||
self.s.listen(5) # 等待客户端连接
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
while True:
|
|
||||||
logging.info("我在等连接...")
|
|
||||||
c, addr = self.s.accept() # 建立客户端连接
|
|
||||||
print('Connection Established:', addr)
|
|
||||||
value = input(self.reminder)
|
|
||||||
if value == 'q':
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.process_cmd(value)
|
|
||||||
c.close() # 关闭连接
|
|
||||||
|
|
||||||
def pad_cmd(self, cmd):
|
|
||||||
return b'\xAA'+cmd+b'\xFF\xFF\xBB'
|
|
||||||
|
|
||||||
def param_cmd_parser(self, cmd, default_value, checker=None):
|
|
||||||
try:
|
|
||||||
value = int(cmd.split(',')[-1])
|
|
||||||
except:
|
|
||||||
print(f'你给的值不对啊,我先给你弄个{default_value}吧')
|
|
||||||
value = default_value
|
|
||||||
if checker is not None:
|
|
||||||
if not checker(value):
|
|
||||||
return None
|
|
||||||
return value
|
|
||||||
|
|
||||||
def process_cmd(self, value):
|
|
||||||
if value == 'a':
|
|
||||||
# a.开始命令
|
|
||||||
cmd = b'\x00\x03' + 'sa'.encode('ascii') + b'\xFF'
|
|
||||||
elif value == 'b':
|
|
||||||
# b.停止命令
|
|
||||||
cmd = b'\x00\x03' + 'sb'.encode('ascii') + b'\xFF'
|
|
||||||
elif value.startswith('c'):
|
|
||||||
# c. 设置光谱相机分频,得是4的倍数而且>=8,格式:c,8
|
|
||||||
checker = lambda x: (x // 4 == 0) and (x >= 8)
|
|
||||||
value = self.param_cmd_parser(value, default_value=8, checker=checker)
|
|
||||||
if value is None:
|
|
||||||
print("值需要是4的倍数且大于8")
|
|
||||||
return
|
|
||||||
cmd = b'\x00\x0a' + 'sc'.encode('ascii') + f"{value:08d}".encode('ascii')
|
|
||||||
elif value.startswith('d'):
|
|
||||||
# d. 阀板的脉冲分频系数,>=2即可
|
|
||||||
checker = lambda x: x >= 2
|
|
||||||
value = self.param_cmd_parser(value, default_value=2, checker=checker)
|
|
||||||
if value is None:
|
|
||||||
print("你得大于等于2")
|
|
||||||
return
|
|
||||||
cmd = b'\x00\x0a' + 'sv'.encode('ascii') + f"{value:08d}".encode('ascii')
|
|
||||||
elif value.startswith('e'):
|
|
||||||
# e. 设置 光谱(a)相机 的延时,格式 e,500
|
|
||||||
checker = lambda x: (x >= 0)
|
|
||||||
value = self.param_cmd_parser(value, default_value=2, checker=checker)
|
|
||||||
if value is None:
|
|
||||||
print("你得大于等于0")
|
|
||||||
return
|
|
||||||
cmd = b'\x00\x0a' + 'sa'.encode('ascii') + f"{value:08d}".encode('ascii')
|
|
||||||
elif value.startswith('f'):
|
|
||||||
# f. 设置 RGB(b)相机 的延时,格式 e,500
|
|
||||||
checker = lambda x: (x >= 0)
|
|
||||||
value = self.param_cmd_parser(value, default_value=2, checker=checker)
|
|
||||||
if value is None:
|
|
||||||
print("你得大于等于0")
|
|
||||||
return
|
|
||||||
cmd = b'\x00\x0a' + 'sb'.encode('ascii') + f"{value:08d}".encode('ascii')
|
|
||||||
elif value == 'g':
|
|
||||||
# g.发个da和db完全重叠的mask
|
|
||||||
mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8)
|
|
||||||
len_a, data_a = self.format_data(mask_a)
|
|
||||||
len_b, data_b = self.format_data(mask_b)
|
|
||||||
cmd = len_a + 'da'.encode('ascii') + mask_a
|
|
||||||
self.send(cmd)
|
|
||||||
cmd = len_b + 'db'.encode('ascii') + mask_b
|
|
||||||
elif value == 'h':
|
|
||||||
# h.发个da和db呈现出X形的mask
|
|
||||||
mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8).T
|
|
||||||
len_a, data_a = self.format_data(mask_a)
|
|
||||||
len_b, data_b = self.format_data(mask_b)
|
|
||||||
cmd = len_a + 'da'.encode('ascii') + mask_a
|
|
||||||
self.send(cmd)
|
|
||||||
cmd = len_b + 'db'.encode('ascii') + mask_b
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
value = int(value)
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
print(f"你给的指令: {value} 咋看都不对")
|
|
||||||
return
|
|
||||||
self.send(cmd)
|
|
||||||
|
|
||||||
def send(self, cmd: bytes) -> None:
|
|
||||||
print("我要send 这个了:\n")
|
|
||||||
print(cmd)
|
|
||||||
cmd = self.pad_cmd(cmd)
|
|
||||||
try:
|
|
||||||
self.s.send(cmd)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"发失败了, 这是我找到的错误信息\n{e}")
|
|
||||||
return
|
|
||||||
print("发好了")
|
|
||||||
|
|
||||||
def format_data(self, array_to_send: np.ndarray) -> (bytes, bytes):
|
|
||||||
data = np.packbits(array_to_send, axis=-1)
|
|
||||||
data = data.tobytes()
|
|
||||||
data_len = (len(data)+2).to_bytes(2, 'big')
|
|
||||||
return data_len, data
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
testor = TestMain()
|
import argparse
|
||||||
testor.pony_run(test_path=r'/home/lzy/2022.7.30/tobacco_v1_0/saved_img/',
|
parser = argparse.ArgumentParser(description='Run image test or ')
|
||||||
test_rgb=True, test_spectra=True, get_delta=False)
|
tester = TestMain()
|
||||||
|
tester.pony_run(test_path=r'/home/lzy/2022.7.30/tobacco_v1_0/saved_img/',
|
||||||
|
test_rgb=False, test_spectra=False, get_delta=False)
|
||||||
|
|
||||||
|
|||||||
178
valve_test.py
Normal file
178
valve_test.py
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class ValveTest:
|
||||||
|
def __init__(self):
|
||||||
|
self.last_cmd = None
|
||||||
|
self.reminder = """======================================================================================
|
||||||
|
快,给我个指令😉😉😉︎:
|
||||||
|
a. 开始命令 st. e. 设置 光谱(a)相机 的延时,格式 e,500
|
||||||
|
b. 停止命令 sp. f. 设置 彩色(b)相机 的延时, 格式 f,500
|
||||||
|
c. 设置光谱相机分频系数,4的倍数且>=8, 格式 c,8 g. 发个da和db完全重叠的mask
|
||||||
|
d. 阀板的脉冲分频系数,>=2即可 h. 发个da和db呈现出X形的mask
|
||||||
|
|
||||||
|
你给我个小于256的数字,我就测试对应的喷阀。如果已经测试过一个,可以直接回车测下一个。
|
||||||
|
给q指令我就退出。
|
||||||
|
======================================================================================\n"""
|
||||||
|
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建 socket 对象
|
||||||
|
host = socket.gethostname() # 获取本地主机名
|
||||||
|
port = 13452 # 设置端口
|
||||||
|
self.s.bind((host, port)) # 绑定端口
|
||||||
|
self.s.listen(1) # 等待客户端连接
|
||||||
|
self.c = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
print("我在等连接...")
|
||||||
|
self.c, addr = self.s.accept() # 建立客户端连接
|
||||||
|
print('和它的链接建立成功了:', addr)
|
||||||
|
while True:
|
||||||
|
value = input(self.reminder)
|
||||||
|
if value == 'q':
|
||||||
|
print("好的,我退出啦")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.process_cmd(value)
|
||||||
|
self.c.close() # 关闭连接
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def cmd_padding(cmd):
|
||||||
|
return b'\xAA' + cmd + b'\xFF\xFF\xBB'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def param_cmd_parser(cmd, default_value, checker=None):
|
||||||
|
try:
|
||||||
|
value = int(cmd.split(',')[-1])
|
||||||
|
except:
|
||||||
|
print(f'你给的值不对啊,我先给你弄个{default_value}吧')
|
||||||
|
value = default_value
|
||||||
|
if checker is not None:
|
||||||
|
if not checker(value):
|
||||||
|
return None
|
||||||
|
return value
|
||||||
|
|
||||||
|
def process_cmd(self, value):
|
||||||
|
if value == 'a':
|
||||||
|
# a.开始命令
|
||||||
|
cmd = b'\x00\x03' + 'sa'.encode('ascii') + b'\xFF'
|
||||||
|
elif value == 'b':
|
||||||
|
# b.停止命令
|
||||||
|
cmd = b'\x00\x03' + 'sb'.encode('ascii') + b'\xFF'
|
||||||
|
elif value.startswith('c'):
|
||||||
|
# c. 设置光谱相机分频,得是4的倍数而且>=8,格式:c,8
|
||||||
|
checker = lambda x: (x % 4 == 0) and (x >= 8)
|
||||||
|
value = self.param_cmd_parser(value, default_value=8, checker=checker)
|
||||||
|
if value is None:
|
||||||
|
print("值需要是4的倍数且大于8")
|
||||||
|
return
|
||||||
|
cmd = b'\x00\x0a' + 'sc'.encode('ascii') + f"{value:08d}".encode('ascii')
|
||||||
|
elif value.startswith('d'):
|
||||||
|
# d. 阀板的脉冲分频系数,>=2即可
|
||||||
|
checker = lambda x: x >= 2
|
||||||
|
value = self.param_cmd_parser(value, default_value=2, checker=checker)
|
||||||
|
if value is None:
|
||||||
|
print("你得大于等于2")
|
||||||
|
return
|
||||||
|
cmd = b'\x00\x0a' + 'sv'.encode('ascii') + f"{value:08d}".encode('ascii')
|
||||||
|
elif value.startswith('e'):
|
||||||
|
# e. 设置 光谱(a)相机 的延时,格式 e,500
|
||||||
|
checker = lambda x: (x >= 0)
|
||||||
|
value = self.param_cmd_parser(value, default_value=2, checker=checker)
|
||||||
|
if value is None:
|
||||||
|
print("你得大于等于0")
|
||||||
|
return
|
||||||
|
cmd = b'\x00\x0a' + 'sa'.encode('ascii') + f"{value:08d}".encode('ascii')
|
||||||
|
elif value.startswith('f'):
|
||||||
|
# f. 设置 RGB(b)相机 的延时,格式 e,500
|
||||||
|
checker = lambda x: (x >= 0)
|
||||||
|
value = self.param_cmd_parser(value, default_value=2, checker=checker)
|
||||||
|
if value is None:
|
||||||
|
print("你得大于等于0")
|
||||||
|
return
|
||||||
|
cmd = b'\x00\x0a' + 'sb'.encode('ascii') + f"{value:08d}".encode('ascii')
|
||||||
|
elif value == 'g':
|
||||||
|
# g.发个da和db完全重叠的mask
|
||||||
|
mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8)
|
||||||
|
len_a, data_a = self.format_data(mask_a)
|
||||||
|
len_b, data_b = self.format_data(mask_b)
|
||||||
|
cmd = len_a + 'da'.encode('ascii') + data_a
|
||||||
|
self.send(cmd)
|
||||||
|
cmd = len_b + 'db'.encode('ascii') + data_b
|
||||||
|
elif value == 'h':
|
||||||
|
# h.发个da和db呈现出X形的mask
|
||||||
|
mask_a, mask_b = np.eye(256, dtype=np.uint8), np.eye(256, dtype=np.uint8).T
|
||||||
|
len_a, data_a = self.format_data(mask_a)
|
||||||
|
len_b, data_b = self.format_data(mask_b)
|
||||||
|
cmd = len_a + 'da'.encode('ascii') + data_a
|
||||||
|
self.send(cmd)
|
||||||
|
cmd = len_b + 'db'.encode('ascii') + data_b
|
||||||
|
elif value == '' and self.last_cmd is not None:
|
||||||
|
self.last_cmd += 1
|
||||||
|
if self.last_cmd > 256:
|
||||||
|
self.last_cmd = 1
|
||||||
|
print(f'自动变化到 喷阀测试 {self.last_cmd}')
|
||||||
|
value = self.last_cmd
|
||||||
|
cmd = b'\x00\x0A' + 'te'.encode('ascii') + f"{value:08d}".encode('ascii')
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
value = int(value)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
print(f"你给的指令: {value} 咋看都不对")
|
||||||
|
return
|
||||||
|
if (value <= 256) and (value >= 1):
|
||||||
|
cmd = b'\x00\x0A' + 'te'.encode('ascii') + f"{value:08d}".encode('ascii')
|
||||||
|
self.last_cmd = value
|
||||||
|
else:
|
||||||
|
print(f'你给的指令: {value} 值不对,我们有256个阀门, 范围是 [1, 256]')
|
||||||
|
return
|
||||||
|
self.send(cmd)
|
||||||
|
|
||||||
|
def send(self, cmd: bytes) -> None:
|
||||||
|
cmd = self.cmd_padding(cmd)
|
||||||
|
print("我要 send 这个了:")
|
||||||
|
print(cmd.hex())
|
||||||
|
try:
|
||||||
|
self.c.send(cmd)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"发失败了, 这是我找到的错误信息:\n{e}")
|
||||||
|
return
|
||||||
|
print("发好了")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def format_data(array_to_send: np.ndarray) -> (bytes, bytes):
|
||||||
|
data = np.packbits(array_to_send, axis=-1)
|
||||||
|
data = data.tobytes()
|
||||||
|
data_len = (len(data) + 2).to_bytes(2, 'big')
|
||||||
|
return data_len, data
|
||||||
|
|
||||||
|
|
||||||
|
class VirtualValve:
|
||||||
|
def __init__(self):
|
||||||
|
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 声明socket类型,同时生成链接对象
|
||||||
|
self.client.connect(('localhost', 13452)) # 建立一个链接,连接到本地的6969端口
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
# addr = client.accept()
|
||||||
|
# print '连接地址:', addr
|
||||||
|
data = self.client.recv(4096) # 接收一个信息,并指定接收的大小 为1024字节
|
||||||
|
print(data.hex()) # 输出我接收的信息
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='阀门测程序')
|
||||||
|
parser.add_argument('-c', default=False, action='store_true', help='是否是开个客户端', required=False)
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.c:
|
||||||
|
print("运行客户机")
|
||||||
|
virtual_valve = VirtualValve()
|
||||||
|
virtual_valve.run()
|
||||||
|
else:
|
||||||
|
print("运行主机")
|
||||||
|
valve_tester = ValveTest()
|
||||||
|
valve_tester.run()
|
||||||
Loading…
Reference in New Issue
Block a user