fix:切换了resnet34模型文件,注释掉了有果无果判别模型部分,添加部分函数注释

This commit is contained in:
TG 2024-06-22 17:08:41 +08:00
parent 28f81a135a
commit b0cf31fd60
3 changed files with 214 additions and 253 deletions

View File

@ -12,12 +12,14 @@ import joblib
import logging
import numpy as np
from PIL import Image
from torchvision import transforms
from sklearn.ensemble import RandomForestRegressor
import torch
import torch.nn as nn
#图像分类网络所需库,实际并未使用分类网络
# import torch
# import torch.nn as nn
# from torchvision import transforms
#番茄RGB处理模型
class Tomato:
def __init__(self):
''' 初始化 Tomato 类。'''
@ -233,6 +235,7 @@ class Tomato:
img_filled = cv2.bitwise_or(new_bin_img, img_filled_inv)
return img_filled
#百香果RGB处理模型
class Passion_fruit:
def __init__(self, hue_value=37, hue_delta=10, value_target=25, value_delta=10):
# 初始化常用参数
@ -315,6 +318,7 @@ class Passion_fruit:
return np.zeros_like(rgb_img)
return result
#糖度预测模型
class Spec_predict(object):
def __init__(self, load_from=None, debug_mode=False):
self.debug_mode = debug_mode
@ -350,6 +354,7 @@ class Spec_predict(object):
data_y = self.model.predict(data_x)
return data_y[0]
#数据处理模型
class Data_processing:
def __init__(self):
pass
@ -520,222 +525,170 @@ class Data_processing:
return diameter, weigth, number_defects, total_pixels, rp
#下面封装的是ResNet18和ResNet34的网络模型构建
#原定用于构建RGB图像有果无果判断后续发现存在纰漏暂时搁置并未实际使用
class BasicBlock(nn.Module):
'''
BasicBlock for ResNet18 and ResNet34
'''
expansion = 1
def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,
kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channel)
self.relu = nn.ReLU()
self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,
kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channel)
self.downsample = downsample
def forward(self, x):
identity = x
if self.downsample is not None:
identity = self.downsample(x)
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out += identity
out = self.relu(out)
return out
class Bottleneck(nn.Module):
"""
注意原论文中在虚线残差结构的主分支上第一个1x1卷积层的步距是2第二个3x3卷积层步距是1
但在pytorch官方实现过程中是第一个1x1卷积层的步距是1第二个3x3卷积层步距是2
这么做的好处是能够在top1上提升大概0.5%的准确率
可参考Resnet v1.5 https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch
"""
expansion = 4
def __init__(self, in_channel, out_channel, stride=1, downsample=None,
groups=1, width_per_group=64):
super(Bottleneck, self).__init__()
width = int(out_channel * (width_per_group / 64.)) * groups
self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width,
kernel_size=1, stride=1, bias=False) # squeeze channels
self.bn1 = nn.BatchNorm2d(width)
# -----------------------------------------
self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,
kernel_size=3, stride=stride, bias=False, padding=1)
self.bn2 = nn.BatchNorm2d(width)
# -----------------------------------------
self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel*self.expansion,
kernel_size=1, stride=1, bias=False) # unsqueeze channels
self.bn3 = nn.BatchNorm2d(out_channel*self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
def forward(self, x):
identity = x
if self.downsample is not None:
identity = self.downsample(x)
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Module):
'''
ResNet18 and ResNet34
'''
def __init__(self,
block,
blocks_num,
num_classes=1000,
include_top=True,
groups=1,
width_per_group=64):
super(ResNet, self).__init__()
self.include_top = include_top
self.in_channel = 64
self.groups = groups
self.width_per_group = width_per_group
self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2,
padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(self.in_channel)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, blocks_num[0])
self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)
if self.include_top:
self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size = (1, 1)
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
def _make_layer(self, block, channel, block_num, stride=1):
downsample = None
if stride != 1 or self.in_channel != channel * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(channel * block.expansion))
layers = []
layers.append(block(self.in_channel,
channel,
downsample=downsample,
stride=stride,
groups=self.groups,
width_per_group=self.width_per_group))
self.in_channel = channel * block.expansion
for _ in range(1, block_num):
layers.append(block(self.in_channel,
channel,
groups=self.groups,
width_per_group=self.width_per_group))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
if self.include_top:
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def resnet18(num_classes=1000, include_top=True):
return ResNet(BasicBlock, [2, 2, 2, 2], num_classes=num_classes, include_top=include_top)
def resnetzy(num_classes=1000, include_top=True):
return ResNet(Bottleneck, [2, 2, 2, 2], num_classes=num_classes, include_top=include_top)
class ImageClassifier:
'''
图像分类器用于加载预训练的 ResNet 模型并进行图像分类
'''
def __init__(self, model_path, class_indices_path, device=None):
if device is None:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
else:
self.device = device
# 加载类别索引
assert os.path.exists(class_indices_path), f"File: '{class_indices_path}' does not exist."
with open(class_indices_path, "r") as json_file:
self.class_indict = json.load(json_file)
# 创建模型并加载权重
self.model = resnetzy(num_classes=len(self.class_indict)).to(self.device)
assert os. path.exists(model_path), f"File: '{model_path}' does not exist."
self.model.load_state_dict(torch.load(model_path, map_location=self.device))
self.model.eval()
# 设置图像转换
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
def predict(self, image_np):
'''
对图像进行分类预测
:param image_np:
:return:
'''
# 将numpy数组转换为图像
image = Image.fromarray(image_np.astype('uint8'), 'RGB')
image = self.transform(image).unsqueeze(0).to(self.device)
with torch.no_grad():
output = self.model(image).cpu()
predict = torch.softmax(output, dim=1)
predict_cla = torch.argmax(predict, dim=1).numpy()
# return self.class_indict[str(predict_cla[0])]
return predict_cla[0]
# #下面封装的是ResNet18和ResNet34的网络模型构建
# #原定用于构建RGB图像有果无果判断后续发现存在纰漏暂时搁置并未实际使用
# class BasicBlock(nn.Module):
# '''
# BasicBlock for ResNet18 and ResNet34
#
# '''
# expansion = 1
#
# def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs):
# super(BasicBlock, self).__init__()
# self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,
# kernel_size=3, stride=stride, padding=1, bias=False)
# self.bn1 = nn.BatchNorm2d(out_channel)
# self.relu = nn.ReLU()
# self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,
# kernel_size=3, stride=1, padding=1, bias=False)
# self.bn2 = nn.BatchNorm2d(out_channel)
# self.downsample = downsample
#
# def forward(self, x):
# identity = x
# if self.downsample is not None:
# identity = self.downsample(x)
#
# out = self.conv1(x)
# out = self.bn1(out)
# out = self.relu(out)
#
# out = self.conv2(out)
# out = self.bn2(out)
#
# out += identity
# out = self.relu(out)
#
# return out
#
# class ResNet(nn.Module):
# '''
# ResNet18 and ResNet34
# '''
# def __init__(self,
# block,
# blocks_num,
# num_classes=1000,
# include_top=True,
# groups=1,
# width_per_group=64):
# super(ResNet, self).__init__()
# self.include_top = include_top
# self.in_channel = 64
#
# self.groups = groups
# self.width_per_group = width_per_group
#
# self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2,
# padding=3, bias=False)
# self.bn1 = nn.BatchNorm2d(self.in_channel)
# self.relu = nn.ReLU(inplace=True)
# self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
# self.layer1 = self._make_layer(block, 64, blocks_num[0])
# self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
# self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
# self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)
# if self.include_top:
# self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size = (1, 1)
# self.fc = nn.Linear(512 * block.expansion, num_classes)
#
# for m in self.modules():
# if isinstance(m, nn.Conv2d):
# nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
#
# def _make_layer(self, block, channel, block_num, stride=1):
# downsample = None
# if stride != 1 or self.in_channel != channel * block.expansion:
# downsample = nn.Sequential(
# nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
# nn.BatchNorm2d(channel * block.expansion))
#
# layers = []
# layers.append(block(self.in_channel,
# channel,
# downsample=downsample,
# stride=stride,
# groups=self.groups,
# width_per_group=self.width_per_group))
# self.in_channel = channel * block.expansion
#
# for _ in range(1, block_num):
# layers.append(block(self.in_channel,
# channel,
# groups=self.groups,
# width_per_group=self.width_per_group))
#
# return nn.Sequential(*layers)
#
# def forward(self, x):
# x = self.conv1(x)
# x = self.bn1(x)
# x = self.relu(x)
# x = self.maxpool(x)
#
# x = self.layer1(x)
# x = self.layer2(x)
# x = self.layer3(x)
# x = self.layer4(x)
#
# if self.include_top:
# x = self.avgpool(x)
# x = torch.flatten(x, 1)
# x = self.fc(x)
#
# return x
#
# def resnet18(num_classes=1000, include_top=True):
# return ResNet(BasicBlock, [2, 2, 2, 2], num_classes=num_classes, include_top=include_top)
#
# def resnet34(num_classes=1000, include_top=True):
# return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)
#
# #图像有无果判别模型
# class ImageClassifier:
# '''
# 图像分类器,用于加载预训练的 ResNet 模型并进行图像分类。
# '''
# def __init__(self, model_path, class_indices_path, device=None):
# if device is None:
# self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# else:
# self.device = device
#
# # 加载类别索引
# assert os.path.exists(class_indices_path), f"File: '{class_indices_path}' does not exist."
# with open(class_indices_path, "r") as json_file:
# self.class_indict = json.load(json_file)
#
# # 创建模型并加载权重
# self.model = resnet34(num_classes=len(self.class_indict)).to(self.device)
# assert os. path.exists(model_path), f"File: '{model_path}' does not exist."
# self.model.load_state_dict(torch.load(model_path, map_location=self.device))
# self.model.eval()
#
# # 设置图像转换
# self.transform = transforms.Compose([
# transforms.Resize(256),
# transforms.CenterCrop(224),
# transforms.ToTensor(),
# transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# ])
#
# def predict(self, image_np):
# '''
# 对图像进行分类预测。
# :param image_np:
# :return:
# '''
# # 将numpy数组转换为图像
# image = Image.fromarray(image_np.astype('uint8'), 'RGB')
# image = self.transform(image).unsqueeze(0).to(self.device)
#
# with torch.no_grad():
# output = self.model(image).cpu()
# predict = torch.softmax(output, dim=1)
# predict_cla = torch.argmax(predict, dim=1).numpy()
#
# # return self.class_indict[str(predict_cla[0])]
# return predict_cla[0]

View File

@ -6,11 +6,10 @@
import sys
import os
import cv2
from root_dir import ROOT_DIR
from classifer import Spec_predict, Data_processing, ImageClassifier
from classifer import Spec_predict, Data_processing
# from classifer import ImageClassifier
import logging
from utils import Pipe
import numpy as np
@ -83,12 +82,14 @@ def main(is_debug=False):
logging.basicConfig(format='%(asctime)s %(filename)s[line:%(lineno)d] - %(levelname)s - %(message)s',
handlers=[file_handler, console_handler],
level=logging.DEBUG)
#模型加载
detector = Spec_predict(ROOT_DIR/'models'/'passion_fruit_2.joblib')
classifier = ImageClassifier(ROOT_DIR/'models'/'resnet18_0616.pth', ROOT_DIR/'models'/'class_indices.json')
# classifier = ImageClassifier(ROOT_DIR/'models'/'resnet34_0619.pth', ROOT_DIR/'models'/'class_indices.json')
dp = Data_processing()
print('系统初始化中...')
#模型预热
_ = detector.predict(np.ones((30, 30, 224), dtype=np.uint16))
_ = classifier.predict(np.ones((224, 224, 3), dtype=np.uint8))
# _ = classifier.predict(np.ones((224, 224, 3), dtype=np.uint8))
# _, _, _, _, _ =dp.analyze_tomato(cv2.imread(r'D:\project\supermachine--tomato-passion_fruit\20240529RGBtest3\data\tomato_img\bad\71.bmp'))
# _, _, _, _, _ = dp.analyze_passion_fruit(cv2.imread(r'D:\project\supermachine--tomato-passion_fruit\20240529RGBtest3\data\passion_fruit_img\38.bmp'))
print('系统初始化完成')
@ -99,30 +100,33 @@ def main(is_debug=False):
pipe = Pipe(rgb_receive_name, rgb_send_name, spec_receive_name)
rgb_receive, rgb_send, spec_receive = pipe.create_pipes(rgb_receive_name, rgb_send_name, spec_receive_name)
# 预热循环只处理cmd为'YR'的数据
# 当接收到的第一个指令预热命令时,结束预热循环
while True:
start_time00 = time.time()
# start_time00 = time.time()
data = pipe.receive_rgb_data(rgb_receive)
cmd, _ = pipe.parse_img(data)
end_time00 = time.time()
print(f'接收预热数据时间:{(end_time00 - start_time00) * 1000}毫秒')
# end_time00 = time.time()
# print(f'接收预热数据时间:{(end_time00 - start_time00) * 1000}毫秒')
if cmd == 'YR':
break # 当接收到的不是预热命令时,结束预热循环
q = 1
break
#主循环
# q = 1
while True:
start_time = time.time()
#RGB图像部分
# start_time = time.time()
images = []
cmd = None
for i in range(5):
start_time1 = time.time()
for _ in range(5):
# start_time1 = time.time()
data = pipe.receive_rgb_data(rgb_receive)
end_time10 = time.time()
# end_time10 = time.time()
# print(f'接收第{q}组第{i}份RGB数据时间{(end_time10 - start_time1) * 1000}毫秒')
start_time11 = time.time()
# start_time11 = time.time()
cmd, img = pipe.parse_img(data)
end_time1 = time.time()
# end_time1 = time.time()
# print(f'解析第{q}组第{i}份RGB数据时间{(end_time1 - start_time11) * 1000}毫秒')
print(f'接收第{q}组第{i}张RGB图时间{(end_time1 - start_time1) * 1000}毫秒')
# print(f'接收第{q}组第{i}张RGB图时间{(end_time1 - start_time1) * 1000}毫秒')
# 使用分类器进行预测
# prediction = classifier.predict(img)
@ -140,24 +144,24 @@ def main(is_debug=False):
if cmd not in ['TO', 'PF', 'YR', 'KO']:
logging.error(f'错误指令,指令为{cmd}')
continue
#Spec数据部分
spec = None
if cmd == 'PF':
start_time2 = time.time()
# start_time2 = time.time()
spec_data = pipe.receive_spec_data(spec_receive)
print(f'接收第{q}组光谱数据长度:{len(spec_data)}')
# print(f'接收第{q}组光谱数据长度:{len(spec_data)}')
_, spec = pipe.parse_spec(spec_data)
print(f'处理第{q}组光谱数据长度:{len(spec)}')
print(spec.shape)
print(f'解析第{q}组光谱数据时间:{(time.time() - start_time2) * 1000}毫秒')
end_time2 = time.time()
print(f'接收第{q}组光谱数据时间:{(end_time2 - start_time2) * 1000}毫秒')
start_time3 = time.time()
# print(f'处理第{q}组光谱数据长度:{len(spec)}')
# print(spec.shape)
# print(f'解析第{q}组光谱数据时间:{(time.time() - start_time2) * 1000}毫秒')
# end_time2 = time.time()
# print(f'接收第{q}组光谱数据时间:{(end_time2 - start_time2) * 1000}毫秒')
#数据处理部分
# start_time3 = time.time()
if images: # 确保images不为空
response = process_data(cmd, images, spec, dp, pipe, detector)
end_time3 = time.time()
print(f'{q}组处理时间:{(end_time3 - start_time3) * 1000}毫秒')
# print(f'第{q}组处理时间:{(end_time3 - start_time3) * 1000}毫秒')
if response:
logging.info(f'处理成功,响应为: {response}')
else:
@ -165,10 +169,14 @@ def main(is_debug=False):
else:
logging.error("没有有效的图像进行处理")
end_time = time.time()
print(f'{q}组全流程时间:{(end_time - start_time) * 1000}毫秒')
q += 1
# end_time = time.time()
# print(f'第{q}组全流程时间:{(end_time - start_time) * 1000}毫秒')
# q += 1
if __name__ == '__main__':
'''
python与qt采用windows下的命名管道进行通信数据流按照约定的通信协议进行
数据处理逻辑为连续接收5张RGB图然后根据解析出的指令部分决定是否接收一张光谱图然后进行处理最后将处理得到的指标结果进行编码回传
'''
main(is_debug=False)

Binary file not shown.