From 325410e2b20920a9903d0cafcb992b3452dcdb15 Mon Sep 17 00:00:00 2001 From: Christopher Larson Date: Tue, 12 Feb 2013 12:28:47 -0500 Subject: bitbake: cooker: parse using bb.compat.Pool (Bitbake rev: 8af519a49a3374bd9004864ef31ca8aa328e9f34) Signed-off-by: Christopher Larson Signed-off-by: Richard Purdie --- bitbake/lib/bb/cooker.py | 161 ++++++++--------------------------------------- 1 file changed, 27 insertions(+), 134 deletions(-) diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 9d051fa..9f7121f 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -34,7 +34,7 @@ from cStringIO import StringIO from contextlib import closing from functools import wraps from collections import defaultdict -import bb, bb.exceptions, bb.command +import bb, bb.exceptions, bb.command, bb.compat from bb import utils, data, parse, event, cache, providers, taskdata, runqueue import Queue import prserv.serv @@ -1556,87 +1556,19 @@ class ParsingFailure(Exception): self.recipe = recipe Exception.__init__(self, realexception, recipe) -class Feeder(multiprocessing.Process): - def __init__(self, jobs, to_parsers, quit): - self.quit = quit - self.jobs = jobs - self.to_parsers = to_parsers - multiprocessing.Process.__init__(self) - - def run(self): - while True: - try: - quit = self.quit.get_nowait() - except Queue.Empty: - pass - else: - if quit == 'cancel': - self.to_parsers.cancel_join_thread() - break - - try: - job = self.jobs.pop() - except IndexError: - break - - try: - self.to_parsers.put(job, timeout=0.5) - except Queue.Full: - self.jobs.insert(0, job) - continue - -class Parser(multiprocessing.Process): - def __init__(self, jobs, results, quit, init): - self.jobs = jobs - self.results = results - self.quit = quit - self.init = init - multiprocessing.Process.__init__(self) - - def run(self): - if self.init: - self.init() - - pending = [] - while True: - try: - self.quit.get_nowait() - except Queue.Empty: - pass - else: - self.results.cancel_join_thread() - break - - if pending: - result = pending.pop() - else: - try: - job = self.jobs.get(timeout=0.25) - except Queue.Empty: - continue - - if job is None: - break - result = self.parse(*job) - - try: - self.results.put(result, timeout=0.25) - except Queue.Full: - pending.append(result) - - def parse(self, filename, appends, caches_array): - try: - return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array) - except Exception as exc: - tb = sys.exc_info()[2] - exc.recipe = filename - exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) - return True, exc - # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown - # and for example a worker thread doesn't just exit on its own in response to - # a SystemExit event for example. - except BaseException as exc: - return True, ParsingFailure(exc, filename) +def parse_file((filename, appends, caches_array)): + try: + return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) + except Exception as exc: + tb = sys.exc_info()[2] + exc.recipe = filename + exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) + return True, exc + # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown + # and for example a worker thread doesn't just exit on its own in response to + # a SystemExit event for example. + except BaseException as exc: + return True, ParsingFailure(exc, filename) class CookerParser(object): def __init__(self, cooker, filelist, masked): @@ -1670,32 +1602,25 @@ class CookerParser(object): self.fromcache.append((filename, appends)) self.toparse = self.total - len(self.fromcache) self.progress_chunk = max(self.toparse / 100, 1) + self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1) self.start() self.haveshutdown = False def start(self): self.results = self.load_cached() - self.processes = [] if self.toparse: + def process_init(): + parse_file.cfg = self.cfgdata + multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1) + multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1) + bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) - def init(): - Parser.cfg = self.cfgdata - multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1) - multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1) - - self.feeder_quit = multiprocessing.Queue(maxsize=1) - self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes) - self.jobs = multiprocessing.Queue(maxsize=self.num_processes) - self.result_queue = multiprocessing.Queue() - self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit) - self.feeder.start() - for i in range(0, self.num_processes): - parser = Parser(self.jobs, self.result_queue, self.parser_quit, init) - parser.start() - self.processes.append(parser) - - self.results = itertools.chain(self.results, self.parse_generator()) + + self.pool = bb.compat.Pool(self.num_processes, process_init) + parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk) + self.pool.close() + self.results = itertools.chain(self.results, parsed) def shutdown(self, clean=True, force=False): if not self.toparse: @@ -1711,25 +1636,9 @@ class CookerParser(object): self.total) bb.event.fire(event, self.cfgdata) - self.feeder_quit.put(None) - for process in self.processes: - self.jobs.put(None) else: - self.feeder_quit.put('cancel') - - self.parser_quit.cancel_join_thread() - for process in self.processes: - self.parser_quit.put(None) - - self.jobs.cancel_join_thread() - - for process in self.processes: - if force: - process.join(.1) - process.terminate() - else: - process.join() - self.feeder.join() + self.pool.terminate() + self.pool.join() sync = threading.Thread(target=self.bb_cache.sync) sync.start() @@ -1742,22 +1651,6 @@ class CookerParser(object): cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) yield not cached, infos - def parse_generator(self): - while True: - if self.parsed >= self.toparse: - break - - try: - result = self.result_queue.get(timeout=0.25) - except Queue.Empty: - pass - else: - value = result[1] - if isinstance(value, BaseException): - raise value - else: - yield result - def parse_next(self): result = [] parsed = None -- cgit v1.1