Pipeline framework like assembly line in manufactory.
- A kind product consists of several steps to produce
- Each step only has one worker
- A worker can only handle one step
- A worker can't work in parallel, must handle one by one.
- Once the worker finished this step, he pass the product to next worker(step)
- There are a great number of product for manufactorying.
- In order to improve efficiency, each woker must be in full load.
First, we come out with a class
Worker to represent a worker
- He can accept a kind of
- We can chain to next step by
- Once this worker started
- He setup a working thread to fetch and handle product
- Product comes from previous step by
- The worker can send product to next step once this step done.
- The worker can discard some product if result is not good.
Worker.join()to wait for all task done and stop.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
import Queue import threading class Worker(object): def __init__(self, task): self._queue = Queue.Queue() self._task = task self._next = None self._stop = threading.Event() self._stop.clear() def feed(self, obj): ''' feed object to this worker ''' self._queue.put(obj) def set_next(self, next): ''' set next worker for the pipeline ''' self._next = next def _thread_func(self): while not self._stop.is_set(): obj = self._queue.get() # here, we use special object None to notify queue done if obj is None: break ret = self._task(obj) # if ret is None, we won't go to next chain if self._next and ret is not None: self._next.feed(ret) self._queue.task_done() def start(self): ''' start this worker ''' th = threading.Thread(target=self._thread_func) th.start() self._thread = th def join(self): ''' wait for job done ''' self._queue.join() # wait for queue done self._stop.set() # set stop flag self.feed(None) # set None to exit thread self._thread.join()
Then we define the pipeline.
- We can define task for each step, and add it sequentially by
- A calling to
Pipeline.start()will start the pipeline and active workers
- Send your raw product to
Pipeline.feed(obj), it will be processed in this pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
class Pipeline(object): def __init__(self): self._workers =  self._started = False def feed(self, obj): assert self._started, 'Pipeline not started' self._workers.feed(obj) def add_step(self, task): assert not self._started, 'Pipeline arealdy started' worker = Worker(task) if len(self._workers) > 0: # chain the next worker last = self._workers[-1] last.set_next(worker) self._workers.append(worker) def start(self): ''' start the Pipeline ''' assert len(self._workers) > 0, 'no steps found' for w in self._workers: w.start() self._started = True def join(self): ''' wait the Pipeline done ''' # Notice: the sequence is import, first comes, first join for w in self._workers: w.join()
OK, let's talk about task
taskis a function accept only one parameter
taskcan return a
obj, which will be passed to next step in the pipeline
Nonemeans discard, and won't be passed to next.
- here is the example
1 2 3 4 5 6
def task1(obj): result, processed = handle(obj) if result is 'good': return processed elif result is 'bad': return None
Finally, here is a full example to use this pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
def download(obj): url = obj fpath = do_download(url) return (url, fpath) def check(obj): url, fpath = obj valid = check_valid(fpath) if valid: return fpath else: # discard return None def parse(obj): fpath = obj result = parse_file(fpath) save_result(result) pipeline = Pipeline() pipeline.add_step(download) pipeline.add_step(check) pipeline.add_step(parse) pipeline.start() # start pipeline for url in urls: pipeline.feed(url) pipeline.join() # wait for all task done
More thoughts for enhancement:
- Get the final results for each input product
- What if one steps have more than 1 workers?
- Replace python
- What if each step only have limited resource for pending products
- Statictics info about the pipeline, step, workers