feat:新增百香果spec预测模型、重量预测模型、spec预测权重;修改通信协议;重构部分代码;

This commit is contained in:
TG 2024-06-16 17:01:17 +08:00
parent 4420dd9f18
commit d9d0152c5f
14 changed files with 1049 additions and 438 deletions

View File

@ -7,6 +7,12 @@
import cv2
import numpy as np
import logging
import os
import utils
from root_dir import ROOT_DIR
from sklearn.ensemble import RandomForestRegressor
import joblib
class Tomato:
def __init__(self):
@ -223,7 +229,6 @@ class Tomato:
img_filled = cv2.bitwise_or(new_bin_img, img_filled_inv)
return img_filled
class Passion_fruit:
def __init__(self, hue_value=37, hue_delta=10, value_target=25, value_delta=10):
# 初始化常用参数
@ -293,4 +298,253 @@ class Passion_fruit:
result = cv2.bitwise_and(rgb_img, bin_img_3channel)
return result
class Spec_predict(object):
def __init__(self, load_from=None, debug_mode=False, class_weight=None):
if load_from is None:
self.model = RandomForestRegressor(n_estimators=100)
else:
self.load(load_from)
self.log = utils.Logger(is_to_file=debug_mode)
self.debug_mode = debug_mode
def load(self, path=None):
if path is None:
path = os.path.join(ROOT_DIR, 'models')
model_files = os.listdir(path)
if len(model_files) == 0:
self.log.log("No model found!")
return 1
self.log.log("./ Models Found:")
_ = [self.log.log("├--" + str(model_file)) for model_file in model_files]
file_times = [model_file[6:-2] for model_file in model_files]
latest_model = model_files[int(np.argmax(file_times))]
self.log.log("└--Using the latest model: " + str(latest_model))
path = os.path.join(ROOT_DIR, "models", str(latest_model))
if not os.path.isabs(path):
logging.warning('给的是相对路径')
return -1
if not os.path.exists(path):
logging.warning('文件不存在')
return -1
with open(path, 'rb') as f:
model_dic = joblib.load(f)
self.model = model_dic['model']
return 0
def predict(self, data_x):
'''
对数据进行预测
:param data_x: 波段选择后的数据
:return: 预测结果二值化后的数据0为背景1为黄芪,2为杂质23为杂质14为甘草片5为红芪
'''
data_y = self.model.predict(data_x)
return data_y
# def get_tomato_dimensions(edge_img):
# """
# 根据边缘二值化轮廓图,计算果子的长径、短径和长短径比值。
# 使用最小外接矩形和最小外接圆两种方法。
#
# 参数:
# edge_img (numpy.ndarray): 边缘二值化轮廓图,背景为黑色,番茄区域为白色。
#
# 返回:
# tuple: (长径, 短径, 长短径比值)
# """
# if edge_img is None or edge_img.any() == 0:
# return (0, 0)
# # 最小外接矩形
# rect = cv2.minAreaRect(cv2.findContours(edge_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0][0])
# major_axis, minor_axis = rect[1]
# # aspect_ratio = max(major_axis, minor_axis) / min(major_axis, minor_axis)
#
# # # 最小外接圆
# # (x, y), radius = cv2.minEnclosingCircle(
# # cv2.findContours(edge_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0][0])
# # diameter = 2 * radius
# # aspect_ratio_circle = 1.0
#
# return (max(major_axis, minor_axis), min(major_axis, minor_axis))
# def get_defect_info(defect_img):
# """
# 根据区域缺陷二值化轮廓图,计算缺陷区域的个数和总面积。
#
# 参数:
# defect_img (numpy.ndarray): 番茄区域缺陷二值化轮廓图,背景为黑色,番茄区域为白色,缺陷区域为黑色连通域。
#
# 返回:
# tuple: (缺陷区域个数, 缺陷区域像素面积,缺陷像素总面积)
# """
# # 检查输入是否为空
# if defect_img is None or defect_img.any() == 0:
# return (0, 0)
#
# nb_components, output, stats, centroids = cv2.connectedComponentsWithStats(defect_img, connectivity=4)
# max_area = max(stats[i, cv2.CC_STAT_AREA] for i in range(1, nb_components))
# areas = []
# for i in range(1, nb_components):
# area = stats[i, cv2.CC_STAT_AREA]
# if area != max_area:
# areas.append(area)
# number_defects = len(areas)
# total_pixels = sum(areas)
# return number_defects, total_pixels
class Data_processing:
def __init__(self):
pass
def contour_process(self, image_array):
# 应用中值滤波
image_filtered = cv2.medianBlur(image_array, 5)
# 形态学闭操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
image_closed = cv2.morphologyEx(image_filtered, cv2.MORPH_CLOSE, kernel)
# 查找轮廓
contours, _ = cv2.findContours(image_closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 创建空白图像以绘制轮廓
image_contours = np.zeros_like(image_array)
# 进行多边形拟合并填充轮廓
for contour in contours:
epsilon = 0.001 * cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, epsilon, True)
if cv2.contourArea(approx) > 100: # 仅处理较大的轮廓
cv2.drawContours(image_contours, [approx], -1, (255, 255, 255), -1)
return image_contours
def analyze_ellipse(self, image_array):
# 查找白色区域的轮廓
_, binary_image = cv2.threshold(image_array, 127, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 初始化变量用于存储最大轮廓的长径和短径
major_axis = 0
minor_axis = 0
# 对每个找到的轮廓,找出可以包围它的最小椭圆,并计算长径和短径
for contour in contours:
if len(contour) >= 5: # 至少需要5个点来拟合椭圆
ellipse = cv2.fitEllipse(contour)
(center, axes, orientation) = ellipse
major_axis0 = max(axes)
minor_axis0 = min(axes)
# 更新最大的长径和短径
if major_axis0 > major_axis:
major_axis = major_axis0
minor_axis = minor_axis0
return major_axis, minor_axis
def analyze_defect(self, image_array):
# 查找白色区域的轮廓
_, binary_image = cv2.threshold(image_array, 127, 255, cv2.THRESH_BINARY)
contours_white, _ = cv2.findContours(binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 初始化统计数据
count_black_areas = 0
total_pixels_black_areas = 0
# 对于每个白色区域,查找内部的黑色小区域
for contour in contours_white:
# 创建一个mask以查找内部的黑色区域
mask = np.zeros_like(image_array)
cv2.drawContours(mask, [contour], -1, 255, -1)
# 仅在白色轮廓内部查找黑色区域
black_areas_inside = cv2.bitwise_and(cv2.bitwise_not(image_array), mask)
# 查找黑色区域的轮廓
contours_black, _ = cv2.findContours(black_areas_inside, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
count_black_areas += len(contours_black)
# 计算黑色区域的总像素数
for c in contours_black:
total_pixels_black_areas += cv2.contourArea(c)
number_defects = count_black_areas
total_pixels = total_pixels_black_areas
return number_defects, total_pixels
def weight_estimates(self, long_axis, short_axis):
"""
根据西红柿的长径短径和直径估算其体积
使用椭圆体积公式计算体积
参数:
diameter (float): 西红柿的直径
long_axis (float): 西红柿的长径
short_axis (float): 西红柿的短径
返回:
float: 估算的西红柿体积
"""
density = 0.652228972
a = long_axis / 2
b = short_axis /2
volume = 4 / 3 * np.pi * a * b * b
weigth = volume * density
return weigth
def analyze_tomato(self, img):
"""
分析给定图像提取和返回西红柿的长径短径缺陷数量和缺陷总面积并返回处理后的图像
使用 Tomoto 类的图像处理方法以及自定义的尺寸和缺陷信息获取函数
参数:
img (numpy.ndarray): 输入的 BGR 图像
返回:
tuple: (长径, 短径, 缺陷区域个数, 缺陷区域总像素, 处理后的图像)
"""
tomato = Tomato() # 创建 Tomato 类的实例
# 设置 S-L 通道阈值并处理图像
threshold_s_l = 180
threshold_fore_g_r_t = 20
s_l = tomato.extract_s_l(img)
thresholded_s_l = tomato.threshold_segmentation(s_l, threshold_s_l)
new_bin_img = tomato.largest_connected_component(thresholded_s_l)
# 绘制西红柿边缘并获取缺陷信息
edge, mask = tomato.draw_tomato_edge(img, new_bin_img)
org_defect = tomato.bitwise_and_rgb_with_binary(edge, new_bin_img)
fore = tomato.bitwise_and_rgb_with_binary(img, mask)
fore_g_r_t = tomato.threshold_segmentation(tomato.extract_g_r(fore), threshold=threshold_fore_g_r_t)
# 统计白色像素点个数
# print(np.sum(fore_g_r_t == 255))
# print(np.sum(mask == 255))
# print(np.sum(fore_g_r_t == 255) / np.sum(mask == 255))
green_percentage = np.sum(fore_g_r_t == 255) / np.sum(mask == 255)
green_percentage = round(green_percentage, 2) * 100
# 获取西红柿的尺寸信息
long_axis, short_axis = self.analyze_ellipse(mask)
# 获取缺陷信息
number_defects, total_pixels = self.analyze_defect(new_bin_img)
# 将处理后的图像转换为 RGB 格式
rp = cv2.cvtColor(org_defect, cv2.COLOR_BGR2RGB)
diameter = (long_axis + short_axis) / 2
return diameter, green_percentage, number_defects, total_pixels, rp
def analyze_passion_fruit(self, img, hue_value=37, hue_delta=10, value_target=25, value_delta=10):
if img is None:
print("Error: 无图像数据.")
return None
# 创建PassionFruit类的实例
pf = Passion_fruit(hue_value=hue_value, hue_delta=hue_delta, value_target=value_target, value_delta=value_delta)
hsv_image = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
combined_mask = pf.create_mask(hsv_image)
combined_mask = pf.apply_morphology(combined_mask)
max_mask = pf.find_largest_component(combined_mask)
contour_mask = self.contour_process(max_mask)
long_axis, short_axis = self.analyze_ellipse(contour_mask)
weigth = self.weight_estimates(long_axis, short_axis)
number_defects, total_pixels = self.analyze_defect(max_mask)
edge = pf.draw_contours_on_image(img, contour_mask)
org_defect = pf.bitwise_and_rgb_with_binary(edge, max_mask)
rp = cv2.cvtColor(org_defect, cv2.COLOR_BGR2RGB)
diameter = (long_axis + short_axis) / 2
return diameter, weigth, number_defects, total_pixels, rp

View File

@ -3,126 +3,107 @@
# @Author : TG
# @File : main.py
# @Software: PyCharm
# -*- coding: utf-8 -*-
# @Time : 2024/4/12 15:04
# @Author : TG
# @File : main.py
# @Software: PyCharm
import socket
import sys
import numpy as np
import cv2
import root_dir
import time
import os
from root_dir import ROOT_DIR
from classifer import Spec_predict, Data_processing
import logging
from utils import parse_protocol, create_pipes, receive_rgb_data, send_data, receive_spec_data, analyze_tomato, analyze_passion_fruit
from collections import deque
import time
import io
from PIL import Image
import threading
import queue
from utils import Pipe
import numpy as np
def process_data(cmd: str, img: any) -> tuple:
pipe = Pipe()
dp = Data_processing()
rgb_receive_name = r'\\.\pipe\rgb_receive'
rgb_send_name = r'\\.\pipe\rgb_send'
spec_receive_name = r'\\.\pipe\spec_receive'
rgb_receive, rgb_send, spec_receive = pipe.create_pipes(rgb_receive_name, rgb_send_name, spec_receive_name)
def process_data(cmd: str, images: list, spec: any, detector: Spec_predict) -> bool:
"""
处理指令
:param cmd: 指令类型
:param data: 指令内容
:param connected_sock: socket
:param images: 图像数据列表
:param spec: 光谱数据
:param detector: 模型
:return: 是否处理成功
"""
diameter_axis_list = []
max_defect_num = 0 # 初始化最大缺陷数量为0
max_total_defect_area = 0 # 初始化最大总像素数为0
for i, img in enumerate(images):
if cmd == 'TO':
# 番茄
diameter, green_percentage, number_defects, total_pixels, rp = dp.analyze_tomato(img)
if i <= 2:
diameter_axis_list.append(diameter)
max_defect_num = max(max_defect_num, number_defects)
max_total_defect_area = max(max_total_defect_area, total_pixels)
if i == 1:
rp_result = rp
gp = green_percentage
elif cmd == 'PF':
# 百香果
diameter, weigth, number_defects, total_pixels, rp = dp.analyze_passion_fruit(img)
if i <= 2:
diameter_axis_list.append(diameter)
max_defect_num = max(max_defect_num, number_defects)
max_total_defect_area = max(max_total_defect_area, total_pixels)
if i == 1:
rp_result = rp
weigth = weigth
else:
logging.error(f'错误指令,指令为{cmd}')
return False
diameter = round(sum(diameter_axis_list) / 3)
if cmd == 'TO':
# 番茄
long_axis, short_axis, number_defects, total_pixels, rp = analyze_tomato(img)
response = pipe.send_data(cmd=cmd, diameter=diameter, green_percentage=gp,
defect_num=max_defect_num, total_defect_area=max_total_defect_area, rp=rp_result)
elif cmd == 'PF':
# 百香果
long_axis, short_axis, number_defects, total_pixels, rp = analyze_passion_fruit(img)
brix = detector.predict(spec)
response = pipe.send_data(cmd=cmd, brix=brix, diameter=diameter, weigth=weigth,
defect_num=max_defect_num, total_defect_area=max_total_defect_area, rp=rp_result)
return response
return long_axis, short_axis, number_defects, total_pixels, rp
## 20240423代码
def main(is_debug=False):
file_handler = logging.FileHandler(os.path.join(ROOT_DIR, 'report.log'))
file_handler.setLevel(logging.DEBUG if is_debug else logging.WARNING)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG if is_debug else logging.WARNING)
logging.basicConfig(format='%(asctime)s %(filename)s[line:%(lineno)d] - %(levelname)s - %(message)s',
logging.basicConfig(format='%(asctime)s %(filename)s[line:%(lineno)d] - %(levellevel)s - %(message)s',
handlers=[file_handler, console_handler],
level=logging.DEBUG)
rgb_receive_name = r'\\.\pipe\rgb_receive'
rgb_send_name = r'\\.\pipe\rgb_send'
spec_receive_name = r'\\.\pipe\spec_receive'
rgb_receive, rgb_send, spec_receive = create_pipes(rgb_receive_name, rgb_send_name, spec_receive_name)
# data_size = 15040566
detector = Spec_predict(ROOT_DIR/'20240529RGBtest3'/'models'/'passion_fruit.joblib')
while True:
long_axis_list = []
short_axis_list = []
max_defect_num = 0 # 初始化最大缺陷数量为0
max_total_defect_area = 0 # 初始化最大总像素数为0
rp = None
images = []
cmd = None
# start_time = time.time()
for _ in range(5):
data = pipe.receive_rgb_data(rgb_receive)
cmd, img = pipe.parse_img(data)
images.append(img)
for i in range(5):
# start_time = time.time()
data = receive_rgb_data(rgb_receive)
cmd, img = parse_protocol(data)
# print(img.shape)
# end_time = time.time()
# elapsed_time = end_time - start_time
# print(f'接收时间:{elapsed_time}秒')
long_axis, short_axis, number_defects, total_pixels, rp = process_data(cmd=cmd, img=img)
# print(long_axis, short_axis, number_defects, type(total_pixels), rp.shape)
if i <= 2:
long_axis_list.append(long_axis)
short_axis_list.append(short_axis)
# 更新最大缺陷数量和最大总像素数
max_defect_num = max(max_defect_num, number_defects)
max_total_defect_area = max(max_total_defect_area, total_pixels)
if i == 1:
rp_result = rp
long_axis = round(sum(long_axis_list) / 3)
short_axis = round(sum(short_axis_list) / 3)
# print(type(long_axis), type(short_axis), type(defect_num_sum), type(total_defect_area_sum), type(rp_result))
spec_data = receive_spec_data(spec_receive)
cmd, spec_data = parse_protocol(spec_data)
# print(f'光谱数据接收长度:', len(spec_data))
response = send_data(pipe_send=rgb_send, long_axis=long_axis, short_axis=short_axis,
defect_num=max_defect_num, total_defect_area=max_total_defect_area, rp=rp_result)
# end_time = time.time()
# elapsed_time = (end_time - start_time) * 1000
# print(f'总时间:{elapsed_time}毫秒')
#
# print(long_axis, short_axis, defect_num_sum, total_defect_area_sum, rp_result.shape)
if cmd not in ['TO', 'PF']:
logging.error(f'错误指令,指令为{cmd}')
continue
spec = None
if cmd == 'PF':
spec_data = pipe.receive_spec_data(spec_receive)
_, spec = pipe.parse_spec(spec_data)
response = process_data(cmd, images, spec, detector)
if response:
logging.info(f'处理成功,响应为: {response}')
else:
logging.error('处理失败')
if __name__ == '__main__':
# 2个pipe管道
# 接收到图片 n_rows * n_cols * 3 uint8
# 发送long_axis, short_axis, defect_num_sum, total_defect_area_sum, rp_result
main(is_debug=False)

Binary file not shown.

Binary file not shown.

View File

@ -20,60 +20,116 @@ import msvcrt
from classifer import Tomato, Passion_fruit
def receive_rgb_data(pipe):
try:
# 读取图片数据长度
len_img = win32file.ReadFile(pipe, 4, None)
data_size = int.from_bytes(len_img[1], byteorder='big')
# 读取实际图片数据
result, data = win32file.ReadFile(pipe, data_size, None)
# 检查读取操作是否成功
if result != 0:
print(f"读取失败,错误代码: {result}")
import win32file
import win32pipe
import time
import logging
import numpy as np
from PIL import Image
import io
class Pipe:
def __init__(self, rgb_receive_name, rgb_send_name, spec_receive_name):
self.rgb_receive_name = rgb_receive_name
self.rgb_send_name = rgb_send_name
self.spec_receive_name = spec_receive_name
self.rgb_receive = None
self.rgb_send = None
self.spec_receive = None
def create_pipes(self):
while True:
try:
# 打开或创建命名管道
self.rgb_receive = win32pipe.CreateNamedPipe(
self.rgb_receive_name,
win32pipe.PIPE_ACCESS_INBOUND,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 80000000, 80000000, 0, None
)
self.rgb_send = win32pipe.CreateNamedPipe(
self.rgb_send_name,
win32pipe.PIPE_ACCESS_OUTBOUND, # 修改为输出模式
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 80000000, 80000000, 0, None
)
self.spec_receive = win32pipe.CreateNamedPipe(
self.spec_receive_name,
win32pipe.PIPE_ACCESS_INBOUND,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 200000000, 200000000, 0, None
)
print("pipe管道创建成功等待连接...")
# 等待发送端连接
win32pipe.ConnectNamedPipe(self.rgb_receive, None)
print("rgb_receive connected.")
# 等待发送端连接
win32pipe.ConnectNamedPipe(self.rgb_send, None)
print("rgb_send connected.")
win32pipe.ConnectNamedPipe(self.spec_receive, None)
print("spec_receive connected.")
return self.rgb_receive, self.rgb_send, self.spec_receive
except Exception as e:
print(f"管道创建连接失败,失败原因: {e}")
print("等待5秒后重试...")
time.sleep(5)
continue
def receive_rgb_data(self):
try:
# 读取图片数据长度
len_img = win32file.ReadFile(self.rgb_receive, 4, None)
data_size = int.from_bytes(len_img[1], byteorder='big')
# 读取实际图片数据
result, data = win32file.ReadFile(self.rgb_receive, data_size, None)
# 检查读取操作是否成功
if result != 0:
print(f"读取失败,错误代码: {result}")
return None
# 返回成功读取的数据
return data
except Exception as e:
print(f"数据接收失败,错误原因: {e}")
return None
# 返回成功读取的数据
return data
except Exception as e:
print(f"数据接收失败,错误原因: {e}")
return None
def receive_spec_data(pipe):
try:
# 读取光谱数据长度
len_spec = win32file.ReadFile(pipe, 4, None)
data_size = int.from_bytes(len_spec[1], byteorder='big')
# 读取光谱数据
result, spec_data = win32file.ReadFile(pipe, data_size, None)
# 检查读取操作是否成功
if result != 0:
print(f"读取失败,错误代码: {result}")
def receive_spec_data(self):
try:
# 读取光谱数据长度
len_spec = win32file.ReadFile(self.spec_receive, 4, None)
data_size = int.from_bytes(len_spec[1], byteorder='big')
# 读取光谱数据
result, spec_data = win32file.ReadFile(self.spec_receive, data_size, None)
# 检查读取操作是否成功
if result != 0:
print(f"读取失败,错误代码: {result}")
return None
# 返回成功读取的数据
return spec_data
except Exception as e:
print(f"数据接收失败,错误原因: {e}")
return None
# 返回成功读取的数据
return spec_data
except Exception as e:
print(f"数据接收失败,错误原因: {e}")
return None
def parse_protocol(data: bytes) -> (str, any):
"""
指令转换.
def parse_img(self, data: bytes) -> (str, any):
"""
图像数据转换.
:param data:接收到的报文
:return: 指令类型和内容
"""
try:
assert len(data) > 2
except AssertionError:
logging.error('指令转换失败长度不足3')
return '', None
cmd, data = data[:2], data[2:]
cmd = cmd.decode('ascii').strip().upper()
:param data:接收到的报文
:return: 指令类型和内容
"""
try:
assert len(data) > 2
except AssertionError:
logging.error('指令转换失败长度不足3')
return '', None
cmd, data = data[:2], data[2:]
cmd = cmd.decode('ascii').strip().upper()
if cmd == 'TO':
n_rows, n_cols, img = data[:2], data[2:4], data[4:]
try:
n_rows, n_cols = [int.from_bytes(x, byteorder='big') for x in [n_rows, n_cols]]
except Exception as e:
logging.error(f'长宽转换失败, 错误代码{e}, 报文大小: n_rows:{n_rows}, n_cols: {n_cols}')
logging.error(f'长宽转换失败, 错误代码{e}, 报文大小: n_rows:{n_rows}, n_cols:{n_cols}')
return '', None
try:
assert n_rows * n_cols * 3 == len(img)
@ -83,12 +139,27 @@ def parse_protocol(data: bytes) -> (str, any):
return '', None
img = np.frombuffer(img, dtype=np.uint8).reshape((n_rows, n_cols, -1))
return cmd, img
elif cmd == 'PF':
def parse_spec(self, data: bytes) -> (str, any):
"""
光谱数据转换.
:param data:接收到的报文
:return: 指令类型和内容
"""
try:
assert len(data) > 2
except AssertionError:
logging.error('指令转换失败长度不足3')
return '', None
cmd, data = data[:2], data[2:]
cmd = cmd.decode('ascii').strip().upper()
n_rows, n_cols, n_bands, spec = data[:2], data[2:4], data[4:6], data[6:]
try:
n_rows, n_cols, n_bands = [int.from_bytes(x, byteorder='big') for x in [n_rows, n_cols, n_bands]]
except Exception as e:
logging.error(f'长宽转换失败, 错误代码{e}, 报文大小: n_rows:{n_rows}, n_cols: {n_cols}, n_bands: {n_bands}')
logging.error(f'长宽转换失败, 错误代码{e}, 报文大小: n_rows:{n_rows}, n_cols:{n_cols}, n_bands:{n_bands}')
return '', None
try:
assert n_rows * n_cols * n_bands * 4 == len(spec)
@ -96,102 +167,70 @@ def parse_protocol(data: bytes) -> (str, any):
except AssertionError:
logging.error('图像指令转换失败,数据长度错误')
return '', None
spec = np.frombuffer(spec, dtype=np.uint16).reshape(n_cols, n_rows, -1)
spec = spec.reshape((n_rows, n_bands, -1)).transpose(0, 2, 1)
return cmd, spec
def create_pipes(rgb_receive_name, rgb_send_name, spec_receive_name):
while True:
def send_data(self,cmd:str, brix, green_percentage, weigth, diameter, defect_num, total_defect_area, rp):
# start_time = time.time()
#
# rp1 = Image.fromarray(rp.astype(np.uint8))
# # cv2.imwrite('rp1.bmp', rp1)
#
# # 将 Image 对象保存到 BytesIO 流中
# img_bytes = io.BytesIO()
# rp1.save(img_bytes, format='BMP')
# img_bytes = img_bytes.getvalue()
# width = rp.shape[0]
# height = rp.shape[1]
# print(width, height)
# img_bytes = rp.tobytes()
# length = len(img_bytes) + 18
# print(length)
# length = length.to_bytes(4, byteorder='big')
# width = width.to_bytes(2, byteorder='big')
# height = height.to_bytes(2, byteorder='big')
cmd = cmd.strip().upper()
cmd_type = 'RE'
cmd_re = cmd_type.upper().encode('ascii')
img = np.asarray(rp, dtype=np.uint8) # 将图像转换为 NumPy 数组
height = img.shape[0] # 获取图像的高度
width = img.shape[1] # 获取图像的宽度
height = height.to_bytes(2, byteorder='big')
width = width.to_bytes(2, byteorder='big')
img_bytes = img.tobytes()
diameter = diameter.to_bytes(2, byteorder='big')
defect_num = defect_num.to_bytes(2, byteorder='big')
total_defect_area = int(total_defect_area).to_bytes(4, byteorder='big')
length = len(img_bytes) + 15
length = length.to_bytes(4, byteorder='big')
if cmd == 'TO':
brix = 0
brix = brix.to_bytes(1, byteorder='big')
gp = green_percentage.to_bytes(1, byteorder='big')
weigth = 0
weigth = weigth.to_bytes(1, byteorder='big')
send_message = length + cmd_re + brix + gp + diameter + weigth + defect_num + total_defect_area + height + width + img_bytes
elif cmd == 'PF':
brix = brix.to_bytes(1, byteorder='big')
gp = 0
gp = gp.to_bytes(1, byteorder='big')
weigth = weigth.to_bytes(1, byteorder='big')
send_message = length + cmd_re + brix + gp + diameter + weigth + defect_num + total_defect_area + height + width + img_bytes
try:
# 打开或创建命名管道
rgb_receive = win32pipe.CreateNamedPipe(
rgb_receive_name,
win32pipe.PIPE_ACCESS_INBOUND,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 80000000, 80000000, 0, None
)
rgb_send = win32pipe.CreateNamedPipe(
rgb_send_name,
win32pipe.PIPE_ACCESS_OUTBOUND, # 修改为输出模式
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 80000000, 80000000, 0, None
)
spec_receive = win32pipe.CreateNamedPipe(
spec_receive_name,
win32pipe.PIPE_ACCESS_INBOUND,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_WAIT,
1, 200000000, 200000000, 0, None
)
print("pipe管道创建成功等待连接...")
# 等待发送端连接
win32pipe.ConnectNamedPipe(rgb_receive, None)
print("rgb_receive connected.")
# 等待发送端连接
win32pipe.ConnectNamedPipe(rgb_send, None)
print("rgb_send connected.")
win32pipe.ConnectNamedPipe(rgb_receive, None)
print("spec_receive connected.")
return rgb_receive, rgb_send, spec_receive
win32file.WriteFile(self.rgb_send, send_message)
time.sleep(0.01)
print('发送成功')
# print(len(send_message))
except Exception as e:
print(f"管道创建连接失败,失败原因: {e}")
print("等待5秒后重试...")
time.sleep(5)
continue
logging.error(f'发送完成指令失败,错误类型:{e}')
return False
def send_data(pipe_send, long_axis, short_axis, defect_num, total_defect_area, rp):
# start_time = time.time()
#
rp1 = Image.fromarray(rp.astype(np.uint8))
# cv2.imwrite('rp1.bmp', rp1)
# end_time = time.time()
# print(f'发送时间:{end_time - start_time}秒')
# 将 Image 对象保存到 BytesIO 流中
img_bytes = io.BytesIO()
rp1.save(img_bytes, format='BMP')
img_bytes = img_bytes.getvalue()
return True
# width = rp.shape[0]
# height = rp.shape[1]
# print(width, height)
# img_bytes = rp.tobytes()
# length = len(img_bytes) + 18
# print(length)
# length = length.to_bytes(4, byteorder='big')
# width = width.to_bytes(2, byteorder='big')
# height = height.to_bytes(2, byteorder='big')
print(f'原始长度:', len(rp.tobytes()))
print(f'发送长度:', len(img_bytes))
long_axis = long_axis.to_bytes(2, byteorder='big')
short_axis = short_axis.to_bytes(2, byteorder='big')
defect_num = defect_num.to_bytes(2, byteorder='big')
total_defect_area = int(total_defect_area).to_bytes(4, byteorder='big')
length = (len(img_bytes) + 4).to_bytes(4, byteorder='big')
# cmd_type = 'RIM'
# result = result.encode('ascii')
# send_message = b'\xaa' + length + (' ' + cmd_type).upper().encode('ascii') + long_axis + short_axis + defect_num + total_defect_area + width + height + img_bytes + b'\xff\xff\xbb'
# send_message = long_axis + short_axis + defect_num + total_defect_area + img_bytes
send_message = long_axis + short_axis + defect_num + total_defect_area + length + img_bytes
# print(long_axis)
# print(short_axis)
# print(defect_num)
# print(total_defect_area)
# print(width)
# print(height)
try:
win32file.WriteFile(pipe_send, send_message)
time.sleep(0.01)
print('发送成功')
# print(len(send_message))
except Exception as e:
logging.error(f'发送完成指令失败,错误类型:{e}')
return False
# end_time = time.time()
# print(f'发送时间:{end_time - start_time}秒')
return True
@ -247,203 +286,3 @@ class Logger(object):
print(content, file=f)
else:
print(content)
def contour_process(image_array):
# 应用中值滤波
image_filtered = cv2.medianBlur(image_array, 5)
# 形态学闭操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
image_closed = cv2.morphologyEx(image_filtered, cv2.MORPH_CLOSE, kernel)
# 查找轮廓
contours, _ = cv2.findContours(image_closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 创建空白图像以绘制轮廓
image_contours = np.zeros_like(image_array)
# 进行多边形拟合并填充轮廓
for contour in contours:
epsilon = 0.001 * cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, epsilon, True)
if cv2.contourArea(approx) > 100: # 仅处理较大的轮廓
cv2.drawContours(image_contours, [approx], -1, (255, 255, 255), -1)
return image_contours
# def get_tomato_dimensions(edge_img):
# """
# 根据边缘二值化轮廓图,计算果子的长径、短径和长短径比值。
# 使用最小外接矩形和最小外接圆两种方法。
#
# 参数:
# edge_img (numpy.ndarray): 边缘二值化轮廓图,背景为黑色,番茄区域为白色。
#
# 返回:
# tuple: (长径, 短径, 长短径比值)
# """
# if edge_img is None or edge_img.any() == 0:
# return (0, 0)
# # 最小外接矩形
# rect = cv2.minAreaRect(cv2.findContours(edge_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0][0])
# major_axis, minor_axis = rect[1]
# # aspect_ratio = max(major_axis, minor_axis) / min(major_axis, minor_axis)
#
# # # 最小外接圆
# # (x, y), radius = cv2.minEnclosingCircle(
# # cv2.findContours(edge_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0][0])
# # diameter = 2 * radius
# # aspect_ratio_circle = 1.0
#
# return (max(major_axis, minor_axis), min(major_axis, minor_axis))
# def get_defect_info(defect_img):
# """
# 根据区域缺陷二值化轮廓图,计算缺陷区域的个数和总面积。
#
# 参数:
# defect_img (numpy.ndarray): 番茄区域缺陷二值化轮廓图,背景为黑色,番茄区域为白色,缺陷区域为黑色连通域。
#
# 返回:
# tuple: (缺陷区域个数, 缺陷区域像素面积,缺陷像素总面积)
# """
# # 检查输入是否为空
# if defect_img is None or defect_img.any() == 0:
# return (0, 0)
#
# nb_components, output, stats, centroids = cv2.connectedComponentsWithStats(defect_img, connectivity=4)
# max_area = max(stats[i, cv2.CC_STAT_AREA] for i in range(1, nb_components))
# areas = []
# for i in range(1, nb_components):
# area = stats[i, cv2.CC_STAT_AREA]
# if area != max_area:
# areas.append(area)
# number_defects = len(areas)
# total_pixels = sum(areas)
# return number_defects, total_pixels
def analyze_ellipse(image_array):
# 查找白色区域的轮廓
_, binary_image = cv2.threshold(image_array, 127, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 初始化变量用于存储最大轮廓的长径和短径
major_axis = 0
minor_axis = 0
# 对每个找到的轮廓,找出可以包围它的最小椭圆,并计算长径和短径
for contour in contours:
if len(contour) >= 5: # 至少需要5个点来拟合椭圆
ellipse = cv2.fitEllipse(contour)
(center, axes, orientation) = ellipse
major_axis0 = max(axes)
minor_axis0 = min(axes)
# 更新最大的长径和短径
if major_axis0 > major_axis:
major_axis = major_axis0
minor_axis = minor_axis0
return major_axis, minor_axis
# 示例用法
# image_array = cv2.imread('path_to_your_image.bmp', cv2.IMREAD_GRAYSCALE)
# major_axis, minor_axis = analyze_ellipse(image_array)
# print(f"Major Axis: {major_axis}, Minor Axis: {minor_axis}")
# 加载新上传的图像进行分析
new_ellipse_image_path = '/mnt/data/未标题-2.png'
new_ellipse_image = cv2.imread(new_ellipse_image_path, cv2.IMREAD_GRAYSCALE)
# 使用上述函数进行分析
analyze_ellipse(new_ellipse_image)
def analyze_defect(image_array):
# 查找白色区域的轮廓
_, binary_image = cv2.threshold(image_array, 127, 255, cv2.THRESH_BINARY)
contours_white, _ = cv2.findContours(binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 初始化统计数据
count_black_areas = 0
total_pixels_black_areas = 0
# 对于每个白色区域,查找内部的黑色小区域
for contour in contours_white:
# 创建一个mask以查找内部的黑色区域
mask = np.zeros_like(image_array)
cv2.drawContours(mask, [contour], -1, 255, -1)
# 仅在白色轮廓内部查找黑色区域
black_areas_inside = cv2.bitwise_and(cv2.bitwise_not(image_array), mask)
# 查找黑色区域的轮廓
contours_black, _ = cv2.findContours(black_areas_inside, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
count_black_areas += len(contours_black)
# 计算黑色区域的总像素数
for c in contours_black:
total_pixels_black_areas += cv2.contourArea(c)
number_defects = count_black_areas
total_pixels = total_pixels_black_areas
return number_defects, total_pixels
# 示例用法
# image_array = cv2.imread('path_to_your_image.bmp', cv2.IMREAD_GRAYSCALE)
# black_areas_count, total_pixels = analyze_black_areas_in_white(image_array)
# print(f"Number of black areas: {black_areas_count}, Total pixels in black areas: {
def analyze_tomato(img):
"""
分析给定图像提取和返回西红柿的长径短径缺陷数量和缺陷总面积并返回处理后的图像
使用 Tomoto 类的图像处理方法以及自定义的尺寸和缺陷信息获取函数
参数:
img (numpy.ndarray): 输入的 BGR 图像
返回:
tuple: (长径, 短径, 缺陷区域个数, 缺陷区域总像素, 处理后的图像)
"""
tomato = Tomato() # 创建 Tomato 类的实例
# 设置 S-L 通道阈值并处理图像
threshold_s_l = 180
s_l = tomato.extract_s_l(img)
thresholded_s_l = tomato.threshold_segmentation(s_l, threshold_s_l)
new_bin_img = tomato.largest_connected_component(thresholded_s_l)
# 绘制西红柿边缘并获取缺陷信息
edge, mask = tomato.draw_tomato_edge(img, new_bin_img)
org_defect = tomato.bitwise_and_rgb_with_binary(edge, new_bin_img)
# 获取西红柿的尺寸信息
long_axis, short_axis = analyze_ellipse(mask)
# 获取缺陷信息
number_defects, total_pixels = analyze_defect(new_bin_img)
# 将处理后的图像转换为 RGB 格式
rp = cv2.cvtColor(org_defect, cv2.COLOR_BGR2RGB)
return long_axis, short_axis, number_defects, total_pixels, rp
def analyze_passion_fruit(img, hue_value=37, hue_delta=10, value_target=25, value_delta=10):
if img is None:
print("Error: 无图像数据.")
return None
# 创建PassionFruit类的实例
pf = Passion_fruit(hue_value=hue_value, hue_delta=hue_delta, value_target=value_target, value_delta=value_delta)
hsv_image = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
combined_mask = pf.create_mask(hsv_image)
combined_mask = pf.apply_morphology(combined_mask)
max_mask = pf.find_largest_component(combined_mask)
# if max_mask is None:
# print("No significant components found.")
# return None
contour_mask = contour_process(max_mask)
long_axis, short_axis = analyze_ellipse(contour_mask)
number_defects, total_pixels = analyze_defect(max_mask)
edge = pf.draw_contours_on_image(img, contour_mask)
org_defect = pf.bitwise_and_rgb_with_binary(edge, max_mask)
rp = cv2.cvtColor(org_defect, cv2.COLOR_BGR2RGB)
return long_axis, short_axis, number_defects, total_pixels, rp

197
20240529RGBtest3/xs/01.py Normal file
View File

@ -0,0 +1,197 @@
# -*- coding: utf-8 -*-
# @Time : 2024/6/15 15:40
# @Author : TG
# @File : 01.py
# @Software: PyCharm
import joblib
import numpy as np
import os
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
def prepare_data(data):
"""Reshape data and select specified spectral bands."""
reshaped_data = data.reshape(data.shape[0], -1) # 使用动态批量大小
selected_bands = [8, 9, 10, 48, 49, 50, 77, 80, 103, 108, 115, 143, 145]
return reshaped_data[:, selected_bands]
class SpectralModelingAndPrediction:
def __init__(self, model_paths=None):
self.models = {
"RandomForest": RandomForestRegressor(n_estimators=100),
"GradientBoosting": GradientBoostingRegressor(n_estimators=100),
"SVR": SVR(kernel='rbf', C=100, gamma=0.1, epsilon=.1),
}
self.model_paths = model_paths or {}
def split_data(self, X, y, test_size=0.20, random_state=12):
"""Split data into training and test sets."""
return train_test_split(X, y, test_size=test_size, random_state=random_state)
def evaluate_model(self, model, X_test, y_test):
"""Evaluate the model and return MSE and predictions."""
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
return mse, y_pred
def print_predictions(self, y_test, y_pred, model_name):
"""Print actual and predicted values."""
print(f"Test Set Predictions for {model_name}:")
for i, (real, pred) in enumerate(zip(y_test, y_pred)):
print(f"Sample {i + 1}: True Value = {real:.2f}, Predicted Value = {pred:.2f}")
def fit_and_evaluate(self, X_train, y_train, X_test, y_test):
for model_name, model in self.models.items():
model.fit(X_train, y_train)
if model_name in self.model_paths:
joblib.dump(model, self.model_paths[model_name])
mse, y_pred = self.evaluate_model(model, X_test, y_test)
print(f"Model: {model_name}")
print(f"Mean Squared Error on the test set: {mse}")
self.print_predictions(y_test, y_pred, model_name)
print("\n" + "-" * 50 + "\n")
def load_model(self, model_path):
"""加载模型"""
return joblib.load(model_path)
def read_spectral_data(self, hdr_path, raw_path):
"""读取光谱数据"""
with open(hdr_path, 'r', encoding='latin1') as hdr_file:
lines = hdr_file.readlines()
height = width = bands = 0
for line in lines:
if line.startswith('lines'):
height = int(line.split()[-1])
elif line.startswith('samples'):
width = int(line.split()[-1])
elif line.startswith('bands'):
bands = int(line.split()[-1])
raw_image = np.fromfile(raw_path, dtype='uint16')
formatImage = np.zeros((height, width, bands))
for row in range(height):
for dim in range(bands):
formatImage[row, :, dim] = raw_image[(dim + row * bands) * width:(dim + 1 + row * bands) * width]
target_height, target_width, target_bands = 30, 30, 224
formatImage = self._crop_or_pad(formatImage, height, width, bands, target_height, target_width, target_bands)
return formatImage
def _crop_or_pad(self, formatImage, height, width, bands, target_height, target_width, target_bands):
"""裁剪或填充图像"""
if height > target_height:
formatImage = formatImage[:target_height, :, :]
elif height < target_height:
pad_height = target_height - height
formatImage = np.pad(formatImage, ((0, pad_height), (0, 0), (0, 0)), mode='constant', constant_values=0)
if width > target_width:
formatImage = formatImage[:, :target_width, :]
elif width < target_width:
pad_width = target_width - width
formatImage = np.pad(formatImage, ((0, 0), (0, pad_width), (0, 0)), mode='constant', constant_values=0)
if bands > target_bands:
formatImage = formatImage[:, :, :target_bands]
elif bands < target_bands:
pad_bands = target_bands - bands
formatImage = np.pad(formatImage, ((0, 0), (0, 0), (0, pad_bands)), mode='constant', constant_values=0)
return formatImage
def predict(self, data, model_name):
"""预测数据"""
model = self.load_model(self.model_paths[model_name])
return model.predict(data)
def run_training_and_prediction(self, training_data, training_target, prediction_directory):
"""运行训练和预测流程"""
# 将数据重塑为2维
training_data = training_data.reshape(training_data.shape[0], -1)
# 训练阶段
X_train, X_test, y_train, y_test = self.split_data(training_data, training_target)
self.fit_and_evaluate(X_train, y_train, X_test, y_test)
# 预测阶段
all_spectral_data = []
for i in range(1, 101):
hdr_path = os.path.join(prediction_directory, f'{i}.HDR')
raw_path = os.path.join(prediction_directory, f'{i}')
if not os.path.exists(hdr_path) or not os.path.exists(raw_path):
print(f"File {hdr_path} or {raw_path} does not exist.")
continue
spectral_data = self.read_spectral_data(hdr_path, raw_path)
all_spectral_data.append(spectral_data)
if not all_spectral_data:
print("No spectral data was read. Please check the file paths and try again.")
return
all_spectral_data = np.stack(all_spectral_data)
print(all_spectral_data.shape) # This should print (100, 30, 30, 224) or fewer if some files are missing
data_prepared = prepare_data(all_spectral_data)
for model_name in self.models.keys():
predictions = self.predict(data_prepared, model_name)
print(f"Predictions for {model_name}:")
print(predictions)
print("\n" + "-" * 50 + "\n")
if __name__ == "__main__":
model_paths = {
"RandomForest": '../20240529RGBtest3/models/random_forest_model_3.joblib',
"GradientBoosting": '../20240529RGBtest3/models/gradient_boosting_model_3.joblib',
"SVR": '../20240529RGBtest3/models/svr_model_3.joblib',
}
sweetness_acidity = np.array([
16.2, 16.1, 17, 16.9, 16.8, 17.8, 18.1, 17.2, 17, 17.2, 17.1, 17.2,
17.2, 17.2, 18.1, 17, 17.6, 17.4, 17.1, 17.1, 16.9, 17.6, 17.3, 16.3,
16.5, 18.7, 17.6, 16.2, 16.8, 17.2, 16.8, 17.3, 16, 16.6, 16.7, 16.7,
17.3, 16.3, 16.8, 17.4, 17.3, 16.3, 16.1, 17.2, 18.6, 16.8, 16.1, 17.2,
18.3, 16.5, 16.6, 17, 17, 17.8, 16.4, 18, 17.7, 17, 18.3, 16.8, 17.5,
17.7, 18.5, 18, 17.7, 17, 18.3, 18.1, 17.4, 17.7, 17.8, 16.3, 17.1, 16.8,
17.2, 17.5, 16.6, 17.7, 17.1, 17.7, 19.4, 20.3, 17.3, 15.8, 18, 17.7,
17.2, 15.2, 18, 18.4, 18.3, 15.7, 17.2, 18.6, 15.6, 17, 16.9, 17.4, 17.8,
16.5
])
# Specify the directory containing the HDR and RAW files
directory = r'D:\project\supermachine--tomato-passion_fruit\20240529RGBtest3\xs\光谱数据3030'
modeling = SpectralModelingAndPrediction(model_paths)
# Initialize a list to hold all the spectral data arrays
all_spectral_data = []
# Loop through each data set (assuming there are 100 datasets)
for i in range(1, 101):
hdr_path = os.path.join(directory, f'{i}.HDR')
raw_path = os.path.join(directory, f'{i}')
# Check if files exist
if not os.path.exists(hdr_path) or not os.path.exists(raw_path):
print(f"File {hdr_path} or {raw_path} does not exist.")
continue
# Read data
spectral_data = modeling.read_spectral_data(hdr_path, raw_path)
all_spectral_data.append(spectral_data)
# Stack all data into a single numpy array if not empty
if all_spectral_data:
all_spectral_data = np.stack(all_spectral_data)
print(all_spectral_data.shape) # This should print (100, 30, 30, 224) or fewer if some files are missing
# Run training and prediction
modeling.run_training_and_prediction(all_spectral_data, sweetness_acidity, directory)
else:
print("No spectral data was read. Please check the file paths and try again.")

View File

@ -6,14 +6,15 @@ from sklearn.neighbors import KNeighborsRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from spec_read import all_spectral_data
import joblib
def prepare_data(data):
"""Reshape data and select specified spectral bands."""
reshaped_data = data.reshape(100, -1)
selected_bands = [1, 2, 3, 58, 59, 60, 106, 107, 108, 112, 113, 114, 142, 146, 200, 201, 202]
selected_bands = [8, 9, 10, 48, 49, 50, 77, 80, 103, 108, 115, 143, 145]
return reshaped_data[:, selected_bands]
def split_data(X, y, test_size=0.20, random_state=1):
def split_data(X, y, test_size=0.20, random_state=12):
"""Split data into training and test sets."""
return train_test_split(X, y, test_size=test_size, random_state=random_state)
@ -49,11 +50,13 @@ def main():
"RandomForest": RandomForestRegressor(n_estimators=100),
"GradientBoosting": GradientBoostingRegressor(n_estimators=100),
"SVR": SVR(kernel='rbf', C=100, gamma=0.1, epsilon=.1),
"KNeighbors": KNeighborsRegressor(n_neighbors=5)
}
for model_name, model in models.items():
model.fit(X_train, y_train)
if model_name == "RandomForest":
joblib.dump(model, '../models/random_forest_model_2.joblib')
mse, y_pred = evaluate_model(model, X_test, y_test)
print(f"Model: {model_name}")
print(f"Mean Squared Error on the test set: {mse}")

View File

@ -0,0 +1,117 @@
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from spec_read import all_spectral_data
def prepare_data(data):
"""Calculate the average spectral values and their gradients for each fruit across all pixels."""
avg_spectra = np.mean(data, axis=(1, 2))
gradients = np.gradient(avg_spectra, axis=1)
second_gradients = np.gradient(gradients, axis=1)
return avg_spectra, gradients, second_gradients
def train_model(X, y):
"""Train a RandomForest model."""
rf = RandomForestRegressor(n_estimators=100)
rf.fit(X, y)
return rf
def split_data(X, y, test_size=0.20, random_state=2):
"""Split data into training and test sets."""
return train_test_split(X, y, test_size=test_size, random_state=random_state)
def evaluate_model(model, X_test, y_test):
"""Evaluate the model and return MSE and predictions."""
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
return mse, y_pred
def print_predictions(y_test, y_pred):
"""Print actual and predicted values."""
print("Test Set Predictions:")
for i, (real, pred) in enumerate(zip(y_test, y_pred)):
print(f"Sample {i + 1}: True Value = {real:.2f}, Predicted Value = {pred:.2f}")
def plot_spectra(X, y):
"""Plot the average spectra for all samples and annotate with sweetness_acidity values."""
plt.figure(figsize=(10, 6))
for i in range(X.shape[0]):
plt.plot(X[i], label=f'Sample {i+1}')
plt.annotate(f'{y[i]:.1f}', xy=(len(X[i])-1, X[i][-1]), xytext=(5, 0),
textcoords='offset points', ha='left', va='center')
plt.xlabel('Wavelength Index')
plt.ylabel('Average Spectral Value')
plt.title('Average Spectral Curves for All Samples')
plt.legend(loc='upper right', bbox_to_anchor=(1.1, 1.05))
plt.show()
def plot_gradients(gradients):
"""Plot the gradient of the average spectra for all samples."""
plt.figure(figsize=(10, 6))
for i in range(gradients.shape[0]):
plt.plot(gradients[i], label=f'Sample {i+1}')
plt.xlabel('Wavelength Index')
plt.ylabel('Gradient Value')
plt.title('Gradient of Average Spectral Curves for All Samples')
plt.legend(loc='upper right', bbox_to_anchor=(1.1, 1.05))
plt.show()
def plot_second_gradients(second_gradients):
"""Plot the second gradient of the average spectra for all samples."""
plt.figure(figsize=(10, 6))
for i in range(second_gradients.shape[0]):
plt.plot(second_gradients[i], label=f'Sample {i+1}')
plt.xlabel('Wavelength Index')
plt.ylabel('Second Gradient Value')
plt.title('Second Gradient of Average Spectral Curves for All Samples')
plt.legend(loc='upper right', bbox_to_anchor=(1.1, 1.05))
plt.show()
def main():
sweetness_acidity = np.array([
16.2, 16.1, 17, 16.9, 16.8, 17.8, 18.1, 17.2, 17, 17.2, 17.1, 17.2,
17.2, 17.2, 18.1, 17, 17.6, 17.4, 17.1, 17.1, 16.9, 17.6, 17.3, 16.3,
16.5, 18.7, 17.6, 16.2, 16.8, 17.2, 16.8, 17.3, 16, 16.6, 16.7, 16.7,
17.3, 16.3, 16.8, 17.4, 17.3, 16.3, 16.1, 17.2, 18.6, 16.8, 16.1, 17.2,
18.3, 16.5, 16.6, 17, 17, 17.8, 16.4, 18, 17.7, 17, 18.3, 16.8, 17.5,
17.7, 18.5, 18, 17.7, 17, 18.3, 18.1, 17.4, 17.7, 17.8, 16.3, 17.1, 16.8,
17.2, 17.5, 16.6, 17.7, 17.1, 17.7, 19.4, 20.3, 17.3, 15.8, 18, 17.7,
17.2, 15.2, 18, 18.4, 18.3, 15.7, 17.2, 18.6, 15.6, 17, 16.9, 17.4, 17.8,
16.5
])
X_avg, X_grad, X_second_grad = prepare_data(all_spectral_data)
plot_spectra(X_avg, sweetness_acidity) # Plot average spectral curves
plot_gradients(X_grad) # Plot gradient curves
plot_second_gradients(X_second_grad) # Plot second gradient curves
# Train and evaluate using average spectral values
X_train_avg, X_test_avg, y_train_avg, y_test_avg = split_data(X_avg, sweetness_acidity)
rf_model_avg = train_model(X_train_avg, y_train_avg)
mse_avg, y_pred_avg = evaluate_model(rf_model_avg, X_test_avg, y_test_avg)
print("Mean Squared Error using average spectral values:", mse_avg)
# Train and evaluate using first gradients
X_train_grad, X_test_grad, y_train_grad, y_test_grad = split_data(X_grad, sweetness_acidity)
rf_model_grad = train_model(X_train_grad, y_train_grad)
mse_grad, y_pred_grad = evaluate_model(rf_model_grad, X_test_grad, y_test_grad)
print("Mean Squared Error using first gradients:", mse_grad)
# Train and evaluate using second gradients
X_train_second_grad, X_test_second_grad, y_train_second_grad, y_test_second_grad = split_data(X_second_grad, sweetness_acidity)
rf_model_second_grad = train_model(X_train_second_grad, y_train_second_grad)
mse_second_grad, y_pred_second_grad = evaluate_model(rf_model_second_grad, X_test_second_grad, y_test_second_grad)
print("Mean Squared Error using second gradients:", mse_second_grad)
print("Predictions using average spectral values:")
print_predictions(y_test_avg, y_pred_avg)
print("Predictions using first gradients:")
print_predictions(y_test_grad, y_pred_grad)
print("Predictions using second gradients:")
print_predictions(y_test_second_grad, y_pred_second_grad)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,135 @@
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from spec_read import all_spectral_data
def prepare_data(data):
"""Calculate the average spectral values and their gradients for each fruit across all pixels, and normalize them."""
avg_spectra = np.mean(data, axis=(1, 2))
gradients = np.gradient(avg_spectra, axis=1)
second_gradients = np.gradient(gradients, axis=1)
scaler = MinMaxScaler()
avg_spectra = scaler.fit_transform(avg_spectra)
gradients = scaler.fit_transform(gradients)
second_gradients = scaler.fit_transform(second_gradients)
return avg_spectra, gradients, second_gradients
def train_model(X, y):
"""Train a RandomForest model."""
rf = RandomForestRegressor(n_estimators=100)
rf.fit(X, y)
return rf
def split_data(X, y, test_size=0.20, random_state=2):
"""Split data into training and test sets."""
return train_test_split(X, y, test_size=test_size, random_state=random_state)
def evaluate_model(model, X_test, y_test):
"""Evaluate the model and return MSE and predictions."""
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
return mse, y_pred
def print_predictions(y_test, y_pred):
"""Print actual and predicted values."""
print("Test Set Predictions:")
for i, (real, pred) in enumerate(zip(y_test, y_pred)):
print(f"Sample {i + 1}: True Value = {real:.2f}, Predicted Value = {pred:.2f}")
def plot_spectra(X, y):
"""Plot the average spectra for all samples and annotate with sweetness_acidity values."""
plt.figure(figsize=(10, 6))
for i in range(X.shape[0]):
plt.plot(X[i], label=f'Sample {i + 1}')
plt.annotate(f'{y[i]:.1f}', xy=(len(X[i]) - 1, X[i][-1]), xytext=(5, 0),
textcoords='offset points', ha='left', va='center')
plt.xlabel('Wavelength Index')
plt.ylabel('Average Spectral Value')
plt.title('Average Spectral Curves for All Samples')
plt.legend(loc='upper right', bbox_to_anchor=(1.1, 1.05))
plt.show()
def plot_gradients(gradients):
"""Plot the gradient of the average spectra for all samples."""
plt.figure(figsize=(10, 6))
for i in range(gradients.shape[0]):
plt.plot(gradients[i], label=f'Sample {i + 1}')
plt.xlabel('Wavelength Index')
plt.ylabel('Gradient Value')
plt.title('Gradient of Average Spectral Curves for All Samples')
plt.legend(loc='upper right', bbox_to_anchor=(1.1, 1.05))
plt.show()
def plot_second_gradients(second_gradients):
"""Plot the second gradient of the average spectra for all samples."""
plt.figure(figsize=(10, 6))
for i in range(second_gradients.shape[0]):
plt.plot(second_gradients[i], label=f'Sample {i + 1}')
plt.xlabel('Wavelength Index')
plt.ylabel('Second Gradient Value')
plt.title('Second Gradient of Average Spectral Curves for All Samples')
plt.legend(loc='upper right', bbox_to_anchor=(1.1, 1.05))
plt.show()
def main():
sweetness_acidity = np.array([
16.2, 16.1, 17, 16.9, 16.8, 17.8, 18.1, 17.2, 17, 17.2, 17.1, 17.2,
17.2, 17.2, 18.1, 17, 17.6, 17.4, 17.1, 17.1, 16.9, 17.6, 17.3, 16.3,
16.5, 18.7, 17.6, 16.2, 16.8, 17.2, 16.8, 17.3, 16, 16.6, 16.7, 16.7,
17.3, 16.3, 16.8, 17.4, 17.3, 16.3, 16.1, 17.2, 18.6, 16.8, 16.1, 17.2,
18.3, 16.5, 16.6, 17, 17, 17.8, 16.4, 18, 17.7, 17, 18.3, 16.8, 17.5,
17.7, 18.5, 18, 17.7, 17, 18.3, 18.1, 17.4, 17.7, 17.8, 16.3, 17.1, 16.8,
17.2, 17.5, 16.6, 17.7, 17.1, 17.7, 19.4, 20.3, 17.3, 15.8, 18, 17.7,
17.2, 15.2, 18, 18.4, 18.3, 15.7, 17.2, 18.6, 15.6, 17, 16.9, 17.4, 17.8,
16.5
])
X_avg, X_grad, X_second_grad = prepare_data(all_spectral_data)
plot_spectra(X_avg, sweetness_acidity) # Plot average spectral curves
plot_gradients(X_grad) # Plot gradient curves
plot_second_gradients(X_second_grad) # Plot second gradient curves
# Train and evaluate using average spectral values
X_train_avg, X_test_avg, y_train_avg, y_test_avg = split_data(X_avg, sweetness_acidity)
rf_model_avg = train_model(X_train_avg, y_train_avg)
mse_avg, y_pred_avg = evaluate_model(rf_model_avg, X_test_avg, y_test_avg)
print("Mean Squared Error using average spectral values:", mse_avg)
# Train and evaluate using first gradients
X_train_grad, X_test_grad, y_train_grad, y_test_grad = split_data(X_grad, sweetness_acidity)
rf_model_grad = train_model(X_train_grad, y_train_grad)
mse_grad, y_pred_grad = evaluate_model(rf_model_grad, X_test_grad, y_test_grad)
print("Mean Squared Error using first gradients:", mse_grad)
# Train and evaluate using second gradients
X_train_second_grad, X_test_second_grad, y_train_second_grad, y_test_second_grad = split_data(X_second_grad,
sweetness_acidity)
rf_model_second_grad = train_model(X_train_second_grad, y_train_second_grad)
mse_second_grad, y_pred_second_grad = evaluate_model(rf_model_second_grad, X_test_second_grad, y_test_second_grad)
print("Mean Squared Error using second gradients:", mse_second_grad)
print("Predictions using average spectral values:")
print_predictions(y_test_avg, y_pred_avg)
print("Predictions using first gradients:")
print_predictions(y_test_grad, y_pred_grad)
print("Predictions using second gradients:")
print_predictions(y_test_second_grad, y_pred_second_grad)
if __name__ == "__main__":
main()

View File

@ -84,6 +84,6 @@ def process_images_in_folder(input_folder, output_folder):
# 主函数调用
input_folder = r'D:\project\supermachine--tomato-passion_fruit\20240529RGBtest3\data\passion_fruit_img' # 替换为你的输入文件夹路径
output_folder = r'D:\project\supermachine--tomato-passion_fruit\20240529RGBtest3\data\01' # 替换为你的输出文件夹路径
input_folder = '/Users/xs/PycharmProjects/super-tomato/baixiangguo/rgb效果/test' # 替换为你的输入文件夹路径
output_folder = '/Users/xs/PycharmProjects/super-tomato/baixiangguo/rgb效果/testfore' # 替换为你的输出文件夹路径
process_images_in_folder(input_folder, output_folder)

View File

@ -15,7 +15,7 @@ def train_model(X, y):
rf.fit(X, y)
return rf
def split_data(X, y, test_size=0.20, random_state=42):
def split_data(X, y, test_size=0.20, random_state=4):
"""Split data into training and test sets."""
return train_test_split(X, y, test_size=test_size, random_state=random_state)

View File

@ -0,0 +1,85 @@
import joblib
import numpy as np
import os
from dimensionality_reduction import prepare_data
def read_spectral_data(hdr_path, raw_path):
# Read HDR file for image dimensions information
with open(hdr_path, 'r', encoding='latin1') as hdr_file:
lines = hdr_file.readlines()
height = width = bands = 0
for line in lines:
if line.startswith('lines'):
height = int(line.split()[-1])
elif line.startswith('samples'):
width = int(line.split()[-1])
elif line.startswith('bands'):
bands = int(line.split()[-1])
# Read spectral data from RAW file
raw_image = np.fromfile(raw_path, dtype='uint16')
# Initialize the image with the actual read dimensions
formatImage = np.zeros((height, width, bands))
for row in range(height):
for dim in range(bands):
formatImage[row, :, dim] = raw_image[(dim + row * bands) * width:(dim + 1 + row * bands) * width]
# Ensure the image is 30x30x224 by cropping or padding
target_height, target_width, target_bands = 30, 30, 224
# Crop or pad height
if height > target_height:
formatImage = formatImage[:target_height, :, :]
elif height < target_height:
pad_height = target_height - height
formatImage = np.pad(formatImage, ((0, pad_height), (0, 0), (0, 0)), mode='constant', constant_values=0)
# Crop or pad width
if width > target_width:
formatImage = formatImage[:, :target_width, :]
elif width < target_width:
pad_width = target_width - width
formatImage = np.pad(formatImage, ((0, 0), (0, pad_width), (0, 0)), mode='constant', constant_values=0)
# Crop or pad bands if necessary (usually bands should not change)
if bands > target_bands:
formatImage = formatImage[:, :, :target_bands]
elif bands < target_bands:
pad_bands = target_bands - bands
formatImage = np.pad(formatImage, ((0, 0), (0, 0), (0, pad_bands)), mode='constant', constant_values=0)
return formatImage
def load_model(model_path):
"""加载模型"""
return joblib.load(model_path)
def predict(model, data):
"""预测数据"""
return model.predict(data)
def main():
# 加载模型
model = load_model('../models/random_forest_model_2.joblib')
# 读取数据
directory = '/Users/xs/PycharmProjects/super-tomato/baixiangguo/光谱数据3030/'
all_spectral_data = []
for i in range(1, 101):
hdr_path = os.path.join(directory, f'{i}.HDR')
raw_path = os.path.join(directory, f'{i}')
spectral_data = read_spectral_data(hdr_path, raw_path)
all_spectral_data.append(spectral_data)
all_spectral_data = np.stack(all_spectral_data)
# 预处理数据
data_prepared = prepare_data(all_spectral_data)
# 预测数据
predictions = predict(model, data_prepared)
# 打印预测结果
print(predictions)
if __name__ == "__main__":
main()

View File

@ -66,5 +66,5 @@ def dual_threshold_and_max_component(image_path, hue_value=37, hue_delta=10, val
plt.show()
# 使用函数
image_path = r'D:\project\supermachine--tomato-passion_fruit\20240529RGBtest3\data\passion_fruit_img\50.bmp' # 替换为你的图片路径
image_path = '/Users/xs/PycharmProjects/super-tomato/baixiangguo/middle/52.bmp' # 替换为你的图片路径
dual_threshold_and_max_component(image_path)

View File

@ -8,7 +8,7 @@
## 长度
一个32位无符号数length长度 = 数据字节数i + 6<br>`长度1`指length[31:24]`长度2`指length[23:16]`长度3`指length[15:8]`长度4`指length[7:0]
一个32位无符号数length长度 = 数据字节数i + 2<br>`长度1`指length[31:24]`长度2`指length[23:16]`长度3`指length[15:8]`长度4`指length[7:0]
## 指令
@ -38,7 +38,7 @@ $$
**光谱数据包:' 指令1''指令2 '**`数据1`~`数据i`包含了光谱数据的行数rows(高度)、列数cols(宽度)、谱段数bands、以及光谱数据组合方式为**高度+宽度+谱段数+光谱数据**
$$
i-6=rows \times cols \times bands \times 4
i-6=rows \times cols \times bands \times 2
$$
`数据1`~`数据i`的分布具体如下: