supermachine--tomato-passio.../20240627test4/pipe_utils.py
TG 242bb9a71b feat:新增20240627test4为现场部署版本的测试版(包含传统方法、实例分割、目标检测、图像分类多个模型)
fix:修复在20240627test4中的classifier.py的analyze_tomato函数中white_defect的函数忘记传递两个阈值量的错误;修复analyze_tomato函数中的叶片实例分割存在的问题,解决由于变量污染引起的分割错误;
2024-07-21 22:17:46 +08:00

265 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Time : 2024/4/20 18:24
# @Author : TG
# @File : pipe_utils.py
# @Software: PyCharm
import os
import win32file
import win32pipe
import time
import logging
import numpy as np
from config import Config as setting
from utils.augmentations import *
class Pipe:
def __init__(self, rgb_receive_name, rgb_send_name, spec_receive_name):
self.rgb_receive_name = rgb_receive_name
self.rgb_send_name = rgb_send_name
self.spec_receive_name = spec_receive_name
self.rgb_receive = None
self.rgb_send = None
self.spec_receive = None
def create_pipes(self, rgb_receive_name, rgb_send_name, spec_receive_name):
while True:
try:
# 打开或创建命名管道
self.rgb_receive = win32pipe.CreateNamedPipe(
rgb_receive_name,
win32pipe.PIPE_ACCESS_INBOUND,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 80000000, 80000000, 0, None
)
self.rgb_send = win32pipe.CreateNamedPipe(
rgb_send_name,
win32pipe.PIPE_ACCESS_OUTBOUND, # 修改为输出模式
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 80000000, 80000000, 0, None
)
self.spec_receive = win32pipe.CreateNamedPipe(
spec_receive_name,
win32pipe.PIPE_ACCESS_INBOUND,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 200000000, 200000000, 0, None
)
print("pipe管道创建成功等待连接...")
time.sleep(0.5)
print('Fruit正在启动中预计需要30秒')
time.sleep(0.5)
print('请勿关闭Fruit程序否则程序将无法正常运行')
time.sleep(0.5)
print('等待中..........')
# 等待发送端连接
win32pipe.ConnectNamedPipe(self.rgb_receive, None)
print("rgb_receive connected.")
# 等待发送端连接
win32pipe.ConnectNamedPipe(self.rgb_send, None)
print("rgb_send connected.")
win32pipe.ConnectNamedPipe(self.spec_receive, None)
print("spec_receive connected.")
print('Fruit程序启动成功')
return self.rgb_receive, self.rgb_send, self.spec_receive
except Exception as e:
print(f"管道创建连接失败,失败原因: {e}")
print("等待5秒后重试...")
time.sleep(5)
continue
def receive_rgb_data(self, rgb_receive):
try:
# 读取图片数据长度
len_img = win32file.ReadFile(rgb_receive, 4, None)
data_size = int.from_bytes(len_img[1], byteorder='big')
# 读取实际图片数据
result, data = win32file.ReadFile(rgb_receive, data_size, None)
# 检查读取操作是否成功
if result != 0:
logging.error(f"读取失败,错误代码: {result}")
return None
# 返回成功读取的数据
return data
except Exception as e:
logging.error(f"数据接收失败,错误原因: {e}")
return None
def receive_spec_data(self, spec_receive):
try:
# 读取光谱数据长度
len_spec = win32file.ReadFile(spec_receive, 4, None)
# print(f'len_data:{len_spec}')
data_size = int.from_bytes(len_spec[1], byteorder='big')
# print(f'所需读取数据长度:{data_size}')
# 读取光谱数据
result, spec_data = win32file.ReadFile(spec_receive, data_size, None)
# 检查读取操作是否成功
if result != 0:
logging.error(f"读取失败,错误代码: {result}")
return None
# 返回成功读取的数据
return spec_data
except Exception as e:
logging.error(f"数据接收失败,错误原因: {e}")
return None
def parse_img(self, data: bytes) -> (str, any):
"""
图像数据转换.
:param data:接收到的报文
:return: 指令类型和内容
"""
try:
assert len(data) > 1
except AssertionError:
logging.error('指令转换失败长度不足2')
return '', None
cmd, data = data[:2], data[2:]
cmd = cmd.decode('ascii').strip().upper()
# 如果收到的是预热指令'YR'直接返回命令和None不处理图像数据
if cmd == 'YR':
return cmd, None
n_rows, n_cols, img = data[:2], data[2:4], data[4:]
try:
n_rows, n_cols = [int.from_bytes(x, byteorder='big') for x in [n_rows, n_cols]]
except Exception as e:
logging.error(f'长宽转换失败, 错误代码{e}, 报文大小: n_rows:{n_rows}, n_cols:{n_cols}')
return '', None
try:
assert n_rows * n_cols * 3 == len(img)
# 因为是float32类型 所以长度要乘12 如果是uint8则乘3
except AssertionError:
logging.error('图像指令转换失败,数据长度错误')
return '', None
img = np.frombuffer(img, dtype=np.uint8).reshape(n_rows, n_cols, -1)
return cmd, img
def parse_spec(self, data: bytes) -> (str, any):
"""
光谱数据转换.
:param data:接收到的报文
:return: 指令类型和内容
"""
try:
assert len(data) > 2
except AssertionError:
logging.error('指令转换失败长度不足3')
return '', None
cmd, data = data[:2], data[2:]
cmd = cmd.decode('ascii').strip().upper()
n_rows, n_cols, n_bands, spec = data[:2], data[2:4], data[4:6], data[6:]
try:
n_rows, n_cols, n_bands = [int.from_bytes(x, byteorder='big') for x in [n_rows, n_cols, n_bands]]
# print(f'高度:{n_rows}, 宽度:{n_cols}, 谱段数:{n_bands}')
except Exception as e:
logging.error(f'长宽转换失败, 错误代码{e}, 报文大小: n_rows:{n_rows}, n_cols:{n_cols}, n_bands:{n_bands}')
return '', None
try:
assert n_rows * n_cols * n_bands * 2 == len(spec)
except AssertionError:
logging.error('图像指令转换失败,数据长度错误')
return '', None
spec = np.frombuffer(spec, dtype=np.uint16).reshape((n_rows, n_bands, -1)).transpose(0, 2, 1)
return cmd, spec
def send_data(self,cmd:str, brix, green_percentage, weight, diameter, defect_num, total_defect_area, rp):
'''
发送数据
:param cmd:
:param brix:
:param green_percentage:
:param weight:
:param diameter:
:param defect_num:
:param total_defect_area:
:param rp:
:return:
'''
cmd = cmd.strip().upper()
cmd_type = 'RE'
cmd_re = cmd_type.upper().encode('ascii')
img = np.asarray(rp, dtype=np.uint8) # 将图像转换为 NumPy 数组
height = img.shape[0] # 获取图像的高度
width = img.shape[1] # 获取图像的宽度
height = height.to_bytes(2, byteorder='big')
width = width.to_bytes(2, byteorder='big')
img_bytes = img.tobytes()
diameter = int(diameter * 100).to_bytes(2, byteorder='big')
defect_num = defect_num.to_bytes(2, byteorder='big')
total_defect_area = int(total_defect_area * 1000).to_bytes(4, byteorder='big')
length = len(img_bytes) + 18
length = length.to_bytes(4, byteorder='big')
if cmd == 'TO':
brix = 0
brix = brix.to_bytes(2, byteorder='big')
gp = int(green_percentage * 100).to_bytes(1, byteorder='big')
weight = 0
weight = weight.to_bytes(1, byteorder='big')
send_message = (length + cmd_re + brix + gp + diameter + weight +
defect_num + total_defect_area + height + width + img_bytes)
elif cmd == 'PF':
brix = int(brix * 1000).to_bytes(2, byteorder='big')
gp = int(green_percentage * 100).to_bytes(1, byteorder='big')
weight = weight.to_bytes(1, byteorder='big')
send_message = (length + cmd_re + brix + gp + diameter + weight +
defect_num + total_defect_area + height + width + img_bytes)
elif cmd == 'KO':
brix = 0
brix = brix.to_bytes(2, byteorder='big')
gp = 0
gp = gp.to_bytes(1, byteorder='big')
weight = 0
weight = weight.to_bytes(1, byteorder='big')
defect_num = 0
defect_num = defect_num.to_bytes(2, byteorder='big')
total_defect_area = 0
total_defect_area = total_defect_area.to_bytes(4, byteorder='big')
height = setting.n_rgb_rows
height = height.to_bytes(2, byteorder='big')
width = setting.n_rgb_cols
width = width.to_bytes(2, byteorder='big')
img_bytes = np.zeros((setting.n_rgb_rows, setting.n_rgb_cols, setting.n_rgb_bands),
dtype=np.uint8).tobytes()
length = (18).to_bytes(4, byteorder='big')
send_message = (length + cmd_re + brix + gp + diameter + weight +
defect_num + total_defect_area + height + width + img_bytes)
try:
win32file.WriteFile(self.rgb_send, send_message)
except Exception as e:
logging.error(f'发送指令失败,错误类型:{e}')
return False
return True
def create_file(file_name):
"""
创建文件
:param file_name: 文件名
:return: None
"""
if os.path.exists(file_name):
print("文件存在:%s" % file_name)
return False
# os.remove(file_name) # 删除已有文件
if not os.path.exists(file_name):
print("文件不存在,创建文件:%s" % file_name)
open(file_name, 'a').close()
return True
class Logger(object):
def __init__(self, is_to_file=False, path=None):
self.is_to_file = is_to_file
if path is None:
path = "tomato-passion_fruit.log"
self.path = path
create_file(path)
def log(self, content):
if self.is_to_file:
with open(self.path, "a") as f:
print(time.strftime("[%Y-%m-%d_%H-%M-%S]:"), file=f)
print(content, file=f)
else:
print(content)