Source code for pkgcore.util.thread_pool

import queue
import threading
from collections import deque
from multiprocessing import cpu_count
from types import GeneratorType

from snakeoil import klass
from snakeoil.compatibility import IGNORED_EXCEPTIONS


[docs] def reclaim_threads(threads): for x in threads: try: x.join() except IGNORED_EXCEPTIONS: raise except Exception as e: # should do something better here pass
[docs] def map_async(iterable, functor, *args, **kwds): per_thread_args = kwds.pop("per_thread_args", lambda: ()) per_thread_kwds = kwds.pop("per_thread_kwds", lambda: {}) parallelism = kwds.pop("threads", None) if parallelism is None: parallelism = cpu_count() if hasattr(iterable, "__len__"): # if there are less items than parallelism, don't # spawn pointless threads. parallelism = max(min(len(iterable), parallelism), 0) # note we allow an infinite queue since .put below is blocking, and won't # return till it succeeds (regardless of signal) as such, we do it this way # to ensure the put succeeds, then the keyboardinterrupt can be seen. q = queue.Queue() results = deque() kill = threading.Event() kill.clear() def iter_queue(kill, qlist, empty_signal): while not kill.isSet(): item = qlist.get() if item is empty_signal: return yield item def worker(*args): result = functor(*args) if result is not None: # avoid appending chars from a string into results if isinstance(result, GeneratorType): results.extend(result) else: results.append(result) threads = [] for x in range(parallelism): tkwds = kwds.copy() tkwds.update(per_thread_kwds()) targs = (iter_queue(kill, q, klass.sentinel),) + args + per_thread_args() threads.append(threading.Thread(target=worker, args=targs, kwargs=tkwds)) try: try: for x in threads: x.start() # now we feed the queue. for data in iterable: q.put(data) except Exception: kill.set() raise finally: for x in range(parallelism): q.put(klass.sentinel) reclaim_threads(threads) return results