diff --git a/tests/test_transmit.py b/tests/test_transmit.py index c657f61..468fa31 100644 --- a/tests/test_transmit.py +++ b/tests/test_transmit.py @@ -15,7 +15,7 @@ class TransmitterTest(unittest.TestCase): @unittest.skip("file receiver thread test pass") def test_file_receiver(self): logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - logging.info('测试文件接收器') + logging.info('测试单线程文件接收器') image_queue = ImgQueue() file_receiver = FileReceiver(job_name='rgb img receive', input_dir='../data', output_queue=image_queue, speed=0.5, name_pattern=None) @@ -30,12 +30,12 @@ class TransmitterTest(unittest.TestCase): self.assertEqual(virtual_data.shape, (1024, 4096, 3)) file_receiver.stop() - # @unittest.skip('skip') + @unittest.skip('skip') def test_file_receiver_subprocess(self): logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.info('测试子进程文件接收器') - # image_queue = multiprocessing.Queue() - image_queue = QQueue() + image_queue = multiprocessing.Queue() + # image_queue = multiprocessing.Manager().Queue() file_receiver = FileReceiver(job_name='rgb img receive', input_dir='../data', output_queue=image_queue, speed=1, name_pattern=None, run_process=True) virtual_data = np.random.randint(0, 255, (1024, 4096, 3), dtype=np.uint8) @@ -50,23 +50,29 @@ class TransmitterTest(unittest.TestCase): self.assertEqual(virtual_data.shape, (1024, 4096, 3)) file_receiver.stop() - @unittest.skip('skip') + # @unittest.skip('skip') def test_fifo_receiver_sender(self): + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') total_rgb = Config.nRgbRows * Config.nRgbCols * Config.nRgbBands * 1 # int型变量 image_queue, input_queue = ImgQueue(), ImgQueue() - fifo_receiver = FifoReceiver(job_name='fifo img receive', fifo_path='/tmp/dkimg.fifo', output=image_queue, - read_max_num=total_rgb) + fifo_receiver = FifoReceiver(job_name='fifo img receive', fifo_path='/tmp/dkimg.fifo', + output=image_queue, read_max_num=total_rgb) fifo_sender = FifoSender(fifo_path='/tmp/dkimg.fifo', source=input_queue, job_name='fifo img send') - virtual_data = np.random.randint(0, 255, (1024, 4096, 3), dtype=np.uint8) + virtual_data = np.random.randint(0, 255, (Config.nRgbRows, Config.nRgbCols, Config.nRgbBands), dtype=np.uint8) fifo_sender.start(preprocess=transmit.BeforeAfterMethods.mask_preprocess) fifo_receiver.start() - logging.debug('Start to send virtual data') + logging.info('Start to send virtual data') for i in range(5): - logging.debug('put data to input queue') + logging.info(f'put data {i+1} to input queue') input_queue.put(virtual_data) - logging.debug('put data to input queue done') - virtual_data = image_queue.get() - self.assertEqual(virtual_data.shape, (1024, 4096, 3)) + logging.info(f'put data {i+1} to input queue done') + virtual_data_rec = image_queue.get() + logging.info(f'get data {i+1} from the image queue') + virtual_data_rec = np.frombuffer(virtual_data_rec, dtype=np.uint8).reshape((1024, 4096, 3)) + is_equal = np.all(virtual_data_rec == virtual_data, axis=(0, 1, 2)) + self.assertTrue(is_equal) + fifo_sender.stop() + fifo_receiver.stop() if __name__ == '__main__': diff --git a/transmit.py b/transmit.py index 40da713..43b9b17 100644 --- a/transmit.py +++ b/transmit.py @@ -1,5 +1,6 @@ import multiprocessing import os +import queue import threading import typing from multiprocessing import Process, Queue @@ -18,13 +19,8 @@ from models import SpecDetector, RgbDetector from typing import Any, Union import logging -def test_func(*args, **kwargs): - print('test_func') - print(kwargs) - return 'test_func' class Transmitter(object): - def __init__(self, job_name: str, run_process: bool = False): self.output = None self.job_name = job_name @@ -74,8 +70,12 @@ class Transmitter(object): :return: """ if self._running_handler is not None: + logging.info(f"stopping {self.job_name}") self._stop_event.set() self._running_handler = None + logging.info(f"{self.job_name} stopped") + else: + logging.info("Stopping a not running object") def __del__(self): self.stop() @@ -228,12 +228,17 @@ class FifoReceiver(Transmitter): :param post_process_func: :return: """ + logging.info(f'Opening fifo {self._input_fifo_path}') input_fifo = os.open(self._input_fifo_path, os.O_RDONLY) data = os.read(input_fifo, self._max_len) + while len(data) < self._max_len: + data += os.read(input_fifo, self._max_len) + logging.info(f'Read from fifo {self._input_fifo_path} done') + os.close(input_fifo) if post_process_func is not None: data = post_process_func(data) + logging.info('putting data into fifo') self._output_queue.put(data) - os.close(input_fifo) def __getstate__(self): state = self.__dict__.copy() @@ -252,16 +257,15 @@ class FifoSender(Transmitter): def set_source(self, source: ImgQueue): self.stop() - with self._io_lock: - self._input_source = source + self._input_source = source def set_output(self, output_fifo_path: str): self.stop() - with self._io_lock: - if not os.access(output_fifo_path, os.F_OK): - os.mkfifo(output_fifo_path, 0o777) - self._output_fifo_path = output_fifo_path + if not os.access(output_fifo_path, os.F_OK): + os.mkfifo(output_fifo_path, 0o777) + self._output_fifo_path = output_fifo_path + @Transmitter.job_decorator def job_func(self, pre_process=None, *args, **kwargs): """ 发送线程 @@ -269,16 +273,19 @@ class FifoSender(Transmitter): :param pre_process: :return: """ - if self._input_source.empty(): - return - data = self._input_source.get() - if pre_process is not None: - data = pre_process(data) - logging.debug(f'put data to fifo {self._output_fifo_path}') - output_fifo = os.open(self._output_fifo_path, os.O_WRONLY) - os.write(output_fifo, data) - os.close(output_fifo) - logging.debug(f'put data to fifo {self._output_fifo_path} done') + data = None + try: + data = self._input_source.get(timeout=0.1) + except queue.Empty: + pass + if data is not None: + if pre_process is not None: + data = pre_process(data) + logging.info(f'put data to fifo {self._output_fifo_path}') + output_fifo = os.open(self._output_fifo_path, os.O_WRONLY) + os.write(output_fifo, data) + os.close(output_fifo) + logging.info(f'put data to fifo {self._output_fifo_path} done') class CmdImgSplitMidware(Transmitter):