From 0d5dcad5ef2fc05629749ed3dc7d386f6d042c0c Mon Sep 17 00:00:00 2001
From: "li.zhenye"
Date: Fri, 26 Aug 2022 13:38:38 +0800
Subject: [PATCH] =?UTF-8?q?[doc]=20=E6=8F=90=E5=87=BA=E5=85=B1=E4=BA=AB?=
=?UTF-8?q?=E5=86=85=E5=AD=98=E7=9A=84=E8=BF=9B=E7=A8=8B=E9=97=B4=E9=80=9A?=
=?UTF-8?q?=E4=BF=A1=E6=96=B9=E6=B3=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
tests/test_transmit.py | 32 +++++++++++++++-----------
transmit.py | 51 ++++++++++++++++++++++++------------------
2 files changed, 48 insertions(+), 35 deletions(-)
diff --git a/tests/test_transmit.py b/tests/test_transmit.py
index c657f61..468fa31 100644
--- a/tests/test_transmit.py
+++ b/tests/test_transmit.py
@@ -15,7 +15,7 @@ class TransmitterTest(unittest.TestCase):
@unittest.skip("file receiver thread test pass")
def test_file_receiver(self):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logging.info('测试文件接收器')
+ logging.info('测试单线程文件接收器')
image_queue = ImgQueue()
file_receiver = FileReceiver(job_name='rgb img receive', input_dir='../data', output_queue=image_queue,
speed=0.5, name_pattern=None)
@@ -30,12 +30,12 @@ class TransmitterTest(unittest.TestCase):
self.assertEqual(virtual_data.shape, (1024, 4096, 3))
file_receiver.stop()
- # @unittest.skip('skip')
+ @unittest.skip('skip')
def test_file_receiver_subprocess(self):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.info('测试子进程文件接收器')
- # image_queue = multiprocessing.Queue()
- image_queue = QQueue()
+ image_queue = multiprocessing.Queue()
+ # image_queue = multiprocessing.Manager().Queue()
file_receiver = FileReceiver(job_name='rgb img receive', input_dir='../data', output_queue=image_queue,
speed=1, name_pattern=None, run_process=True)
virtual_data = np.random.randint(0, 255, (1024, 4096, 3), dtype=np.uint8)
@@ -50,23 +50,29 @@ class TransmitterTest(unittest.TestCase):
self.assertEqual(virtual_data.shape, (1024, 4096, 3))
file_receiver.stop()
- @unittest.skip('skip')
+ # @unittest.skip('skip')
def test_fifo_receiver_sender(self):
+ logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
total_rgb = Config.nRgbRows * Config.nRgbCols * Config.nRgbBands * 1 # int型变量
image_queue, input_queue = ImgQueue(), ImgQueue()
- fifo_receiver = FifoReceiver(job_name='fifo img receive', fifo_path='/tmp/dkimg.fifo', output=image_queue,
- read_max_num=total_rgb)
+ fifo_receiver = FifoReceiver(job_name='fifo img receive', fifo_path='/tmp/dkimg.fifo',
+ output=image_queue, read_max_num=total_rgb)
fifo_sender = FifoSender(fifo_path='/tmp/dkimg.fifo', source=input_queue, job_name='fifo img send')
- virtual_data = np.random.randint(0, 255, (1024, 4096, 3), dtype=np.uint8)
+ virtual_data = np.random.randint(0, 255, (Config.nRgbRows, Config.nRgbCols, Config.nRgbBands), dtype=np.uint8)
fifo_sender.start(preprocess=transmit.BeforeAfterMethods.mask_preprocess)
fifo_receiver.start()
- logging.debug('Start to send virtual data')
+ logging.info('Start to send virtual data')
for i in range(5):
- logging.debug('put data to input queue')
+ logging.info(f'put data {i+1} to input queue')
input_queue.put(virtual_data)
- logging.debug('put data to input queue done')
- virtual_data = image_queue.get()
- self.assertEqual(virtual_data.shape, (1024, 4096, 3))
+ logging.info(f'put data {i+1} to input queue done')
+ virtual_data_rec = image_queue.get()
+ logging.info(f'get data {i+1} from the image queue')
+ virtual_data_rec = np.frombuffer(virtual_data_rec, dtype=np.uint8).reshape((1024, 4096, 3))
+ is_equal = np.all(virtual_data_rec == virtual_data, axis=(0, 1, 2))
+ self.assertTrue(is_equal)
+ fifo_sender.stop()
+ fifo_receiver.stop()
if __name__ == '__main__':
diff --git a/transmit.py b/transmit.py
index 40da713..43b9b17 100644
--- a/transmit.py
+++ b/transmit.py
@@ -1,5 +1,6 @@
import multiprocessing
import os
+import queue
import threading
import typing
from multiprocessing import Process, Queue
@@ -18,13 +19,8 @@ from models import SpecDetector, RgbDetector
from typing import Any, Union
import logging
-def test_func(*args, **kwargs):
- print('test_func')
- print(kwargs)
- return 'test_func'
class Transmitter(object):
-
def __init__(self, job_name: str, run_process: bool = False):
self.output = None
self.job_name = job_name
@@ -74,8 +70,12 @@ class Transmitter(object):
:return:
"""
if self._running_handler is not None:
+ logging.info(f"stopping {self.job_name}")
self._stop_event.set()
self._running_handler = None
+ logging.info(f"{self.job_name} stopped")
+ else:
+ logging.info("Stopping a not running object")
def __del__(self):
self.stop()
@@ -228,12 +228,17 @@ class FifoReceiver(Transmitter):
:param post_process_func:
:return:
"""
+ logging.info(f'Opening fifo {self._input_fifo_path}')
input_fifo = os.open(self._input_fifo_path, os.O_RDONLY)
data = os.read(input_fifo, self._max_len)
+ while len(data) < self._max_len:
+ data += os.read(input_fifo, self._max_len)
+ logging.info(f'Read from fifo {self._input_fifo_path} done')
+ os.close(input_fifo)
if post_process_func is not None:
data = post_process_func(data)
+ logging.info('putting data into fifo')
self._output_queue.put(data)
- os.close(input_fifo)
def __getstate__(self):
state = self.__dict__.copy()
@@ -252,16 +257,15 @@ class FifoSender(Transmitter):
def set_source(self, source: ImgQueue):
self.stop()
- with self._io_lock:
- self._input_source = source
+ self._input_source = source
def set_output(self, output_fifo_path: str):
self.stop()
- with self._io_lock:
- if not os.access(output_fifo_path, os.F_OK):
- os.mkfifo(output_fifo_path, 0o777)
- self._output_fifo_path = output_fifo_path
+ if not os.access(output_fifo_path, os.F_OK):
+ os.mkfifo(output_fifo_path, 0o777)
+ self._output_fifo_path = output_fifo_path
+ @Transmitter.job_decorator
def job_func(self, pre_process=None, *args, **kwargs):
"""
发送线程
@@ -269,16 +273,19 @@ class FifoSender(Transmitter):
:param pre_process:
:return:
"""
- if self._input_source.empty():
- return
- data = self._input_source.get()
- if pre_process is not None:
- data = pre_process(data)
- logging.debug(f'put data to fifo {self._output_fifo_path}')
- output_fifo = os.open(self._output_fifo_path, os.O_WRONLY)
- os.write(output_fifo, data)
- os.close(output_fifo)
- logging.debug(f'put data to fifo {self._output_fifo_path} done')
+ data = None
+ try:
+ data = self._input_source.get(timeout=0.1)
+ except queue.Empty:
+ pass
+ if data is not None:
+ if pre_process is not None:
+ data = pre_process(data)
+ logging.info(f'put data to fifo {self._output_fifo_path}')
+ output_fifo = os.open(self._output_fifo_path, os.O_WRONLY)
+ os.write(output_fifo, data)
+ os.close(output_fifo)
+ logging.info(f'put data to fifo {self._output_fifo_path} done')
class CmdImgSplitMidware(Transmitter):