From 631b46d99a075c676bc709b3bcee471dda160e09 Mon Sep 17 00:00:00 2001
From: "li.zhenye"
Date: Thu, 11 Aug 2022 18:21:43 +0800
Subject: [PATCH] =?UTF-8?q?[ext]=20transmit.py=20file=20transmitter=20?=
=?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=AE=8C=E6=88=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
transmit.py | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git a/transmit.py b/transmit.py
index 52971bf..d7b7e89 100644
--- a/transmit.py
+++ b/transmit.py
@@ -74,6 +74,11 @@ class Transmitter(object):
self._thread_stop.set()
self._running_handler = None
+ def __del__(self):
+ self.stop()
+ if self._running_handler is not None:
+ self._running_handler.join()
+
@staticmethod
def job_decorator(func):
functools.wraps(func)
@@ -128,10 +133,11 @@ class FileReceiver(Transmitter):
self.name_pattern = name_pattern
self.file_idx = 0
self.output_queue = None
+ self.preprocess_method = None
self.set_source(input_dir, name_pattern)
self.set_output(output_queue)
- def set_source(self, input_dir, name_pattern=None):
+ def set_source(self, input_dir:str, name_pattern:str=None, preprocess_method:callable=None):
self.name_pattern = name_pattern if name_pattern is not None else self.name_pattern
file_names = os.listdir(input_dir)
if len(file_names) == 0:
@@ -153,8 +159,16 @@ class FileReceiver(Transmitter):
def job_func(self, *args, **kwargs):
with self._io_lock:
self.file_idx += 1
- if self.file_idx == len()
+ if self.file_idx >= len(self.file_names):
+ self.file_idx = 0
file_name = self.file_names[self.file_idx]
+ file_name = os.path.join(self.input_dir, file_name)
+ with open(file_name, 'rb') as f:
+ data = f.read()
+ if self.preprocess_method is not None:
+ data = self.preprocess_method(data)
+ self.output_queue.put(data)
+
class FifoReceiver(Transmitter):