[python] pipeline design

Pipeline framework like assembly line in manufactory.

Image that:

  • A kind product consists of several steps to produce
  • Each step only has one worker
    • A worker can only handle one step
    • A worker can't work in parallel, must handle one by one.
    • Once the worker finished this step, he pass the product to next worker(step)
  • There are a great number of product for manufactorying.
    • In order to improve efficiency, each woker must be in full load.

Now we build a pipeline framework for this design.

First, we come out with a class Worker to represent a worker

  • He can accept a kind of task
  • We can chain to next step by Worker.set_next(next_worker)
  • Once this worker started
    • He setup a working thread to fetch and handle product
    • Product comes from previous step by Worker.feed()
    • The worker can send product to next step once this step done.
    • The worker can discard some product if result is not good.
  • use Worker.join() to wait for all task done and stop.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import Queue
import threading


class Worker(object):
    def __init__(self, task):
        self._queue = Queue.Queue()
        self._task = task
        self._next = None
        self._stop = threading.Event()
        self._stop.clear()

    def feed(self, obj):
        '''
        feed object to this worker
        '''
        self._queue.put(obj)

    def set_next(self, next):
        '''
        set next worker for the pipeline
        '''
        self._next = next

    def _thread_func(self):
        while not self._stop.is_set():
            obj = self._queue.get()
            # here, we use special object None to notify queue done
            if obj is None:
                break
            ret = self._task(obj)
            # if ret is None, we won't go to next chain
            if self._next and ret is not None:
                self._next.feed(ret)
            self._queue.task_done()

    def start(self):
        '''
        start this worker
        '''
        th = threading.Thread(target=self._thread_func)
        th.start()
        self._thread = th

    def join(self):
        '''
        wait for job done
        '''
        self._queue.join()  # wait for queue done
        self._stop.set()  # set stop flag
        self.feed(None)  # set None to exit thread
        self._thread.join()

Then we define the pipeline.

  • We can define task for each step, and add it sequentially by Pipeline.add_step(task)
  • A calling to Pipeline.start() will start the pipeline and active workers
  • Send your raw product to Pipeline.feed(obj), it will be processed in this pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Pipeline(object):
    def __init__(self):
        self._workers = []
        self._started = False

    def feed(self, obj):
        assert self._started, 'Pipeline not started'
        self._workers[0].feed(obj)

    def add_step(self, task):
        assert not self._started, 'Pipeline arealdy started'
        worker = Worker(task)
        if len(self._workers) > 0:
            # chain the next worker
            last = self._workers[-1]
            last.set_next(worker)
        self._workers.append(worker)

    def start(self):
        '''
        start the Pipeline
        '''
        assert len(self._workers) > 0, 'no steps found'
        for w in self._workers:
            w.start()
        self._started = True

    def join(self):
        '''
        wait the Pipeline done
        '''
        # Notice: the sequence is import, first comes, first join
        for w in self._workers:
            w.join()

OK, let's talk about task

  • task is a function accept only one parameter obj
  • task can return a obj, which will be passed to next step in the pipeline
    • return None means discard, and won't be passed to next.
  • here is the example
    1
    2
    3
    4
    5
    6
    
    def task1(obj):
        result, processed = handle(obj)
        if result is 'good':
            return processed
        elif result is 'bad':
            return None
    

Finally, here is a full example to use this pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def download(obj):
	url = obj
	fpath = do_download(url)
	return (url, fpath)

def check(obj):
	url, fpath = obj
	valid = check_valid(fpath)
	if valid:
		return fpath
	else:  # discard
		return None

def parse(obj):
	fpath = obj
	result = parse_file(fpath)
	save_result(result)

pipeline = Pipeline()
pipeline.add_step(download)
pipeline.add_step(check)
pipeline.add_step(parse)

pipeline.start()  # start pipeline
for url in urls:
	pipeline.feed(url)
pipeline.join()  # wait for all task done

More thoughts for enhancement:

  • Get the final results for each input product
  • What if one steps have more than 1 workers?
  • Replace python thread, using multiprocessing or gevent
  • What if each step only have limited resource for pending products
  • Statictics info about the pipeline, step, workers

留言