supermachine-tobacco/tests/test_transmit.py

80 lines
4.0 KiB
Python

import logging
import multiprocessing
import time
import unittest
import numpy as np
import transmit
from config import Config
from transmit import FileReceiver, FifoReceiver, FifoSender
from utils import ImgQueue
from quick_queue.quick_queue import QQueue
class TransmitterTest(unittest.TestCase):
@unittest.skip("file receiver thread test pass")
def test_file_receiver(self):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.info('测试单线程文件接收器')
image_queue = ImgQueue()
file_receiver = FileReceiver(job_name='rgb img receive', input_dir='../data', output_queue=image_queue,
speed=0.5, name_pattern=None)
virtual_data = np.random.randint(0, 255, (1024, 4096, 3), dtype=np.uint8)
file_receiver.start(need_time=True, virtual_data=virtual_data)
for i in range(5):
time_record, read_data, virtual_data_rec = image_queue.get()
current_time = time.time()
logging.info(f'Spent {(current_time - time_record) * 1000:.2f}ms to get image with shape {virtual_data.shape}')
is_equal = np.all(virtual_data_rec == virtual_data, axis=(0, 1, 2))
self.assertTrue(is_equal)
self.assertEqual(virtual_data.shape, (1024, 4096, 3))
file_receiver.stop()
@unittest.skip('skip')
def test_file_receiver_subprocess(self):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.info('测试子进程文件接收器')
image_queue = multiprocessing.Queue()
# image_queue = multiprocessing.Manager().Queue()
file_receiver = FileReceiver(job_name='rgb img receive', input_dir='../data', output_queue=image_queue,
speed=1, name_pattern=None, run_process=True)
virtual_data = np.random.randint(0, 255, (1024, 4096, 3), dtype=np.uint8)
file_receiver.start(need_time=True, virtual_data=virtual_data)
for i in range(5):
time_record, read_data, virtual_data_rec = image_queue.get()
current_time = time.time()
logging.info(
f'Spent {(current_time - time_record) * 1000:.2f}ms to get image with shape {virtual_data.shape}')
is_equal = np.all(virtual_data_rec == virtual_data, axis=(0, 1, 2))
self.assertTrue(is_equal)
self.assertEqual(virtual_data.shape, (1024, 4096, 3))
file_receiver.stop()
# @unittest.skip('skip')
def test_fifo_receiver_sender(self):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
total_rgb = Config.nRgbRows * Config.nRgbCols * Config.nRgbBands * 1 # int型变量
image_queue, input_queue = ImgQueue(), ImgQueue()
fifo_receiver = FifoReceiver(job_name='fifo img receive', fifo_path='/tmp/dkimg.fifo',
output=image_queue, read_max_num=total_rgb)
fifo_sender = FifoSender(fifo_path='/tmp/dkimg.fifo', source=input_queue, job_name='fifo img send')
virtual_data = np.random.randint(0, 255, (Config.nRgbRows, Config.nRgbCols, Config.nRgbBands), dtype=np.uint8)
fifo_sender.start(preprocess=transmit.BeforeAfterMethods.mask_preprocess)
fifo_receiver.start()
logging.info('Start to send virtual data')
for i in range(5):
logging.info(f'put data {i+1} to input queue')
input_queue.put(virtual_data)
logging.info(f'put data {i+1} to input queue done')
virtual_data_rec = image_queue.get()
logging.info(f'get data {i+1} from the image queue')
virtual_data_rec = np.frombuffer(virtual_data_rec, dtype=np.uint8).reshape((1024, 4096, 3))
is_equal = np.all(virtual_data_rec == virtual_data, axis=(0, 1, 2))
self.assertTrue(is_equal)
fifo_sender.stop()
fifo_receiver.stop()
if __name__ == '__main__':
unittest.main()