diff options
author | kris <kris@FreeBSD.org> | 2008-05-10 13:22:51 +0000 |
---|---|---|
committer | kris <kris@FreeBSD.org> | 2008-05-10 13:22:51 +0000 |
commit | 1ea90be952dba9e2afd147c3d9e6af7a75e3d1d0 (patch) | |
tree | 693d6389f6baa48154f864aee5aec252121018a3 /Tools/portbuild | |
parent | b952a3b1e192cf225826b2286122d5e161c7afaa (diff) | |
download | FreeBSD-ports-1ea90be952dba9e2afd147c3d9e6af7a75e3d1d0.zip FreeBSD-ports-1ea90be952dba9e2afd147c3d9e6af7a75e3d1d0.tar.gz |
NNew build scheduler written in python to replace the make+sh job
ordering, which had become too limited.
We now build packages ordered by those that are part of the longest
dependency chains first. This has the effect of building the deepest
parts of the tree first and levelling out the tree height, hopefully
avoiding the situation we currently face where there appear
bottlenecks late in the build where the cluster becomes mostly idle
while waiting for a few long dependency chains to finish building
before the cluster can become fully loaded again.
The algorithm is that we sort the list of remaining packages according
to height (longest dependency chain), then add leaf packages from each
in order until we have filled a queue of length between 100 and 200,
to amortise the cost of this queue rebalancing while not losing the
height averaging property. Jobs are dispatched from this queue into
worker threads as machine slots become available.
Unlike the make-based solution that required a fixed -j concurrency
value and could not respond to addition/removal of build resources, we
now can dynamically add new machines as they become available to the
queue.
The other advantage of using python is that we have more
customisability and visibility into the build status, e.g. we
periodically report the number of remaining packages, as well as the
list of deepest packages that we are working on.
TODO:
* Implement mtime checking for parent package staleness, so that
parents are rebuilt if the dependencies are touched more recently.
Currently packages will not be rebuild if they exist, whether or not
they are "stale" wrt their dependencies.
* Offload the machine selection into an external queue manager.
Currently the queue manager used here doesn't interoperate with the
old one (getmachine/releasemachine) because it's not possible to use
the lockf()-based mutual exclusion within a multithreaded client.
Doing that will also allow for a more flexible job placement
algorithm as well as finer queue customization.
Diffstat (limited to 'Tools/portbuild')
-rwxr-xr-x | Tools/portbuild/scripts/build | 520 | ||||
-rwxr-xr-x | Tools/portbuild/scripts/packagebuild | 520 |
2 files changed, 1040 insertions, 0 deletions
diff --git a/Tools/portbuild/scripts/build b/Tools/portbuild/scripts/build new file mode 100755 index 0000000..9beeeb1 --- /dev/null +++ b/Tools/portbuild/scripts/build @@ -0,0 +1,520 @@ +#!/usr/bin/env python + +# Improved build scheduler. We try to build leaf packages (those +# which can be built immediately without requiring additional +# dependencies to be built) in the order such that the ones required +# by the longest dependency chains are built first. +# +# This has the effect of favouring deep parts of the package tree and +# evening out the depth over time, hopefully avoiding the situation +# where the entire cluster waits for a deep part of the tree to +# build on a small number of machines +# +# Other advantages are that this system is easily customizable and +# will let us customize things like the matching policy of jobs to +# machines. +# +# TODO: +# * External queue manager +# * Mark completed packages instead of deleting them +# * check mtime for package staleness (cf make) +# * Check for parent mtimes after finishing child + +import os, sys, threading, time, subprocess, fcntl, operator +#from itertools import ifilter, imap +from random import choice + +def parseindex(indexfile): + + tmp={} + pkghash={} + for i in file(indexfile): + line=i.rstrip().split("|") + pkg = line[0] + tmp[pkg] = line[1:] + + # XXX hash category names too + + # Trick python into storing package names by reference instead of copying strings and wasting 60MB + pkghash[pkg] = pkg + + index=dict.fromkeys(tmp.keys()) + for pkg in tmp.iterkeys(): + line = tmp[pkg] + data={'name': pkg, 'path':line[0], + #'prefix':line[1], + #'comment':line[2], + #'descr':line[3], + #'maintainer':line[4], + 'categories':line[5], # XXX duplicates strings + 'bdep':[pkghash[i] for i in line[6].split(None)], + 'rdep':[pkghash[i] for i in line[7].split(None)], + #'www':line[8], + 'edep':[pkghash[i] for i in line[9].split(None)], + 'pdep':[pkghash[i] for i in line[10].split(None)], + 'fdep':[pkghash[i] for i in line[11].split(None)], + 'height':None} + if index[pkg] is None: + index[pkg] = data + else: + index[pkg].update(data) + if not index[pkg].has_key('parents'): + index[pkg]['parents'] = [] + + # XXX iter? + deps=set() + for j in ['bdep','rdep','edep','fdep','pdep']: + deps.update(set(index[pkg][j])) + index[pkg]['deps'] = [pkghash[i] for i in deps] + + for j in deps: + # This grossness is needed to avoid a second pass through + # the index, because we might be about to refer to + # packages that have not yet been processed + if index[j] is not None: + if index[j].has_key('parents'): + index[j]['parents'].append(pkghash[pkg]) + else: + index[j]['parents'] = [pkghash[pkg]] + else: + index[j] = {'parents':[pkghash[pkg]]} + + return index + +def gettargets(index, targets): + """ split command line arguments into list of packages to build. Returns set or iterable """ + # XXX make this return the full recursive list and use this later for processing wqueue + + plist = set() + if len(targets) == 0: + targets = ["all"] + for i in targets: + if i == "all": + plist = index.iterkeys() + break + if i.endswith("-all"): + cat = i.rpartition("-")[0] + plist.update(j for j in index.iterkeys() if cat in index[j]['categories']) + elif i.rstrip(".tbz") in index.iterkeys(): + plist.update([i.rstrip(".tbz")]) + + return plist + +def heightindex(index, targets): + """ Initial population of height tree """ + + for i in targets: + heightdown(index, i) + +def heightdown(index, pkgname): + """ + Recursively populate the height tree down from a given package, + assuming empty values on entries not yet visited + """ + + pkg=index[pkgname] + if pkg['height'] is None: + if len(pkg['deps']) > 0: + max = 0 + for i in pkg['deps']: + w = heightdown(index, i) + if w > max: + max = w + pkg['height'] = max + 1 + else: + pkg['height'] = 1 + return pkg['height'] + +def heightup(index, pkgname): + """ Recalculate the height tree going upwards from a package """ + + if not index.has_key(pkgname): + raise KeyError + + parents=set(index[pkgname]['parents']) + + while len(parents) > 0: + # XXX use a deque? + pkgname = parents.pop() + if not index.has_key(pkgname): + # XXX can this happen? + continue + pkg=index[pkgname] + oldheight=pkg['height'] + if oldheight is None: + # Parent is in our build target list + continue + if len(pkg['deps']) == 0: + newheight = 1 + else: + newheight=max(index[j]['height'] for j in pkg['deps']) + 1 + if newheight > oldheight: + print "%s height increasing: %d -> %d", pkg, oldheight, newheight + assert(False) + if newheight != oldheight: + pkg['height'] = newheight + parents.update(pkg['parents']) + +def deleteup(index, pkgname): + if not index.has_key(pkgname): + raise KeyError + + parents=set([pkgname]) + + children=[] + removed=[] + while len(parents) > 0: + pkgname = parents.pop() + if not index.has_key(pkgname): + # Parent was already deleted via another path + # XXX can happen? + print "YYYYYYYYYYYYYYYYYYYYYY %s deleted" % pkgname + continue + if index[pkgname]['height'] is None: + # parent is not in our list of build targets + continue + pkg=index[pkgname] + + children.extend(pkg['deps']) + parents.update(pkg['parents']) + removed.append(pkgname) + del index[pkgname] + + removed = set(removed) + children = set(children) +# print "Removed %d packages, touching %d children" % (len(removed), len(children)) + + for i in children.difference(removed): + par=index[i]['parents'] + index[i]['parents'] = list(set(par).difference(removed)) + +# XXX return an iter +def selectheights(index, level): + return [i for i in index.iterkeys() if index[i]['height'] == level] + +def rank(index, ready, sortd, max = None): + """ rank the list of ready packages according to those listed as + dependencies in successive entries of the sorted list """ + + input=set(ready) + output = [] + count = 0 + print "Working on depth ", + for i in sortd: + deps = set(index[i]['deps']) + both = deps.intersection(input) + if len(both) > 0: + print "%d " % index[i]['height'], + input.difference_update(both) + output.extend(list(both)) + if len(input) == 0: + break + if max: + count+=len(both) + if count > max: + return output + print + output.extend(list(input)) + + return output + +def jobsuccess(index, job): + + pkg = index[job] + # Build succeeded + for i in pkg['parents']: + index[i]['deps'].remove(job) + + # deps/parents tree now partially inconsistent but this is + # what we need to avoid counting the height of the entry + # we are about to remove (which would make it a NOP) + heightup(index, job) + + del index[job] + +def jobfailure(index, job): + + # Build failed + deleteup(index, job) + +class worker(threading.Thread): + + lock = threading.Lock() + + # List of running threads + tlist = [] + + # List of running jobs + running = [] + + # Used to signal dispatcher when we finish a job + event = threading.Event() + + def __init__(self, mach, job, queue, arch, branch): + threading.Thread.__init__(self) + self.job = job + self.mach = mach + self.queue = queue + self.arch = arch + self.branch = branch + + def run(self): + global index + + pkg = index[self.job] + + if len(pkg['deps']) != 0: + print "Running job with non-empty deps: %s" % pkg + assert(False) + + print "Running job %s" % (self.job) + while True: + retcode = subprocess.call(["/usr/bin/env", "FD=%s" % " ".join(["%s.tbz" % i for i in pkg['fdep']]), "ED=%s" % " ".join(["%s.tbz" % i for i in pkg['edep']]), "PD=%s" % " ".join(["%s.tbz" % i for i in pkg['pdep']]), "BD=%s" % " ".join(["%s.tbz" % i for i in pkg['bdep']]), "RD=%s" % " ".join(["%s.tbz" % i for i in pkg['rdep']]), "/var/portbuild/scripts/pdispatch2", self.mach, self.arch, self.branch, "/var/portbuild/scripts/portbuild", "%s.tbz" % self.job, pkg['path']]) + self.queue.release(self.mach) + if retcode != 254: + break + + # Failed to obtain job slot + time.sleep(15) + (self.mach, dummy) = self.queue.pick() + print "Retrying on %s" % self.mach + + print "Finished job %s" % self.job, + + if retcode == 0: + status = True + print + else: + status = False + print " with status %d" % retcode + + worker.lock.acquire() + worker.running.remove(self.job) + worker.tlist.remove(self) + if status == True: + jobsuccess(index, self.job) + else: + jobfailure(index, self.job) + + # Wake up dispatcher in case it was blocked + worker.event.set() + worker.event.clear() + + worker.lock.release() + + @staticmethod + def dispatch(mach, job, queue, arch, branch): + worker.lock.acquire() + wrk = worker(mach, job, queue, arch, branch) + worker.tlist.append(wrk) + worker.lock.release() + wrk.start() + +class machqueue(object): + path = ''; + fd = -1; + + # fcntl locks are per-process, so the fcntl lock acquisition will + # succeed if another thread already holds it. We need the fcntl + # lock for external visibility between processes but also need an + # internal lock for protecting against out own threads. + ilock = threading.Lock() + + def __init__(self, path): + super(machqueue, self).__init__() + self.path = path + self.fd = os.open("%s.lock" % self.path, os.O_RDWR|os.O_CREAT) + +# print "Initializing with %s %d" % (self.path, self.fd) + + def lock(self): + print "Locking...", +# ret = fcntl.lockf(self.fd, fcntl.LOCK_EX) + self.ilock.acquire() + print "success" + + def unlock(self): + print "Unlocking fd" + self.ilock.release() +# ret = fcntl.lockf(self.fd, fcntl.LOCK_UN) + + def poll(self): + """ Return currently available machines """ + + mfile = file(self.path + "../mlist", "r") + mlist = mfile.readlines() + mfile.close() + mlist = [i.rstrip() for i in mlist] # Chop \n + + list = os.listdir(self.path) + special = [] + machines = [] + for i in list: + if i.startswith('.'): + special.append(i) + else: + if i in mlist: + machines.append(i) + else: + os.unlink(self.path + i) + + print "Found machines %s" % machines + return (machines, special) + + def pick(self): + """ Choose a random machine from the queue """ + + min = 999 + while min == 999: + while True: + self.lock() + (machines, special) = self.poll() + if len(machines): + break + else: + self.unlock() + time.sleep(15) + # XXX Use kqueue to monitor for changes + + list = [] + # XXX Choose as fraction of capacity + for i in machines: + f = file(self.path + i, "r") + out = f.readline().rstrip() + try: + load = int(out) + except ValueError: + print "Bad value for %s: %s" % (i, out) + load = 999 + f.close() + if load < min: + min = load + list=[] + if load == min: + list.append(i) + print "(%s, %d)" % (list, load) + + if min == 999: + print "Bad queue length for %s" % list + self.unlock() + + machine = choice(list) + # XXX hook up config files + if min == 2: + # Queue full + os.unlink(self.path + machine) + else: + f = file(self.path + machine, "w") + f.write("%d\n" % (min + 1)) + f.flush() + f.close() + + self.unlock() + return (machine, special) + + def release(self, mach): + self.lock() + print "Releasing %s" % mach, + if os.path.exists(self.path + mach): + f = file(self.path + mach, "r+") + out = f.readline().rstrip() + try: + load = int(out) + except ValueError: + print "Queue error on release of %s: %s" % (mach, out) + load = 3 #XXX + else: + f = file(self.path + mach, "w") + load = 3 #XXX + +# f.truncate(0) + f.write("%d\n" % (load - 1)) + print "...now %d" % (load - 1) + f.flush() + f.close() + self.unlock() + +def main(arch, branch, args): + global index + + basedir="/var/portbuild/"+arch+"/"+branch + portsdir=basedir+"/ports" + indexfile=portsdir+"/INDEX-"+branch + indexfile="/var/portbuild/i386/7-exp/ports/INDEX-7" + + qlen = 100 + + q = machqueue("/var/portbuild/%s/queue/" % arch) + + print "parseindex..." + index=parseindex(indexfile) + print "length = %s" % len(index) + + targets = gettargets(index, args) + + print "heightindex..." + heightindex(index, targets) + + sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True) + wqueue = rank(index, selectheights(index, 1), (i[0] for i in sortd), qlen) + + # Main work loop + while len(sortd) > 0: + worker.lock.acquire() + print "Remaining %s" % len(sortd) + while len(wqueue) > 0: + job = wqueue.pop(0) + + if os.path.exists("/var/portbuild/%s/%s/packages/All/%s.tbz" % (arch, branch, job)): + print "Skipping %s since it already exists" % job + jobsuccess(index, job) + else: + worker.running.append(job) # Protect against a queue + # rebalance adding this + # back during build + worker.lock.release() + (machine, specials) = q.pick() + worker.dispatch(machine, job, q, arch, branch) + worker.lock.acquire() + + if len(wqueue) == 0: + if len(sortd) == 0: + # All jobs in progress, wait for children to exit + break + print "Rebalancing queue...", + sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True) + if len(sortd) == 0: + break + + print sortd[0:3] + if sortd[0][0] == 1: + # Everything left is depth 1, no need to waste time rebalancing further + qlen = len(index) + + # Don't add too many deps at once (e.g. after we build a + # package like gmake), or we will switch to buildinglots + # of shallow packages + ready = [i for i in selectheights(index, 1) if i not in worker.running] + wqueue = rank(index, ready, (i[0] for i in sortd), qlen)[:2*qlen] + print "now %s (%s ready)" % (wqueue, len(ready)) + + worker.lock.release() + + if len(wqueue) == 0: + # Ran out of work, wait for workers to free up some more + print "No work to do, sleeping on workers" + worker.event.wait() + + for i in worker.tlist: + i.join() + + print "Finished" + +if __name__ == "__main__": +# from guppy import hpy; h = hpy() + + main(sys.argv[1], sys.argv[2], sys.argv[3:]) + +# index = parseindex("/var/portbuild/i386/7-exp/ports/INDEX-7") +# print index['gmake-3.81_2'] + diff --git a/Tools/portbuild/scripts/packagebuild b/Tools/portbuild/scripts/packagebuild new file mode 100755 index 0000000..9beeeb1 --- /dev/null +++ b/Tools/portbuild/scripts/packagebuild @@ -0,0 +1,520 @@ +#!/usr/bin/env python + +# Improved build scheduler. We try to build leaf packages (those +# which can be built immediately without requiring additional +# dependencies to be built) in the order such that the ones required +# by the longest dependency chains are built first. +# +# This has the effect of favouring deep parts of the package tree and +# evening out the depth over time, hopefully avoiding the situation +# where the entire cluster waits for a deep part of the tree to +# build on a small number of machines +# +# Other advantages are that this system is easily customizable and +# will let us customize things like the matching policy of jobs to +# machines. +# +# TODO: +# * External queue manager +# * Mark completed packages instead of deleting them +# * check mtime for package staleness (cf make) +# * Check for parent mtimes after finishing child + +import os, sys, threading, time, subprocess, fcntl, operator +#from itertools import ifilter, imap +from random import choice + +def parseindex(indexfile): + + tmp={} + pkghash={} + for i in file(indexfile): + line=i.rstrip().split("|") + pkg = line[0] + tmp[pkg] = line[1:] + + # XXX hash category names too + + # Trick python into storing package names by reference instead of copying strings and wasting 60MB + pkghash[pkg] = pkg + + index=dict.fromkeys(tmp.keys()) + for pkg in tmp.iterkeys(): + line = tmp[pkg] + data={'name': pkg, 'path':line[0], + #'prefix':line[1], + #'comment':line[2], + #'descr':line[3], + #'maintainer':line[4], + 'categories':line[5], # XXX duplicates strings + 'bdep':[pkghash[i] for i in line[6].split(None)], + 'rdep':[pkghash[i] for i in line[7].split(None)], + #'www':line[8], + 'edep':[pkghash[i] for i in line[9].split(None)], + 'pdep':[pkghash[i] for i in line[10].split(None)], + 'fdep':[pkghash[i] for i in line[11].split(None)], + 'height':None} + if index[pkg] is None: + index[pkg] = data + else: + index[pkg].update(data) + if not index[pkg].has_key('parents'): + index[pkg]['parents'] = [] + + # XXX iter? + deps=set() + for j in ['bdep','rdep','edep','fdep','pdep']: + deps.update(set(index[pkg][j])) + index[pkg]['deps'] = [pkghash[i] for i in deps] + + for j in deps: + # This grossness is needed to avoid a second pass through + # the index, because we might be about to refer to + # packages that have not yet been processed + if index[j] is not None: + if index[j].has_key('parents'): + index[j]['parents'].append(pkghash[pkg]) + else: + index[j]['parents'] = [pkghash[pkg]] + else: + index[j] = {'parents':[pkghash[pkg]]} + + return index + +def gettargets(index, targets): + """ split command line arguments into list of packages to build. Returns set or iterable """ + # XXX make this return the full recursive list and use this later for processing wqueue + + plist = set() + if len(targets) == 0: + targets = ["all"] + for i in targets: + if i == "all": + plist = index.iterkeys() + break + if i.endswith("-all"): + cat = i.rpartition("-")[0] + plist.update(j for j in index.iterkeys() if cat in index[j]['categories']) + elif i.rstrip(".tbz") in index.iterkeys(): + plist.update([i.rstrip(".tbz")]) + + return plist + +def heightindex(index, targets): + """ Initial population of height tree """ + + for i in targets: + heightdown(index, i) + +def heightdown(index, pkgname): + """ + Recursively populate the height tree down from a given package, + assuming empty values on entries not yet visited + """ + + pkg=index[pkgname] + if pkg['height'] is None: + if len(pkg['deps']) > 0: + max = 0 + for i in pkg['deps']: + w = heightdown(index, i) + if w > max: + max = w + pkg['height'] = max + 1 + else: + pkg['height'] = 1 + return pkg['height'] + +def heightup(index, pkgname): + """ Recalculate the height tree going upwards from a package """ + + if not index.has_key(pkgname): + raise KeyError + + parents=set(index[pkgname]['parents']) + + while len(parents) > 0: + # XXX use a deque? + pkgname = parents.pop() + if not index.has_key(pkgname): + # XXX can this happen? + continue + pkg=index[pkgname] + oldheight=pkg['height'] + if oldheight is None: + # Parent is in our build target list + continue + if len(pkg['deps']) == 0: + newheight = 1 + else: + newheight=max(index[j]['height'] for j in pkg['deps']) + 1 + if newheight > oldheight: + print "%s height increasing: %d -> %d", pkg, oldheight, newheight + assert(False) + if newheight != oldheight: + pkg['height'] = newheight + parents.update(pkg['parents']) + +def deleteup(index, pkgname): + if not index.has_key(pkgname): + raise KeyError + + parents=set([pkgname]) + + children=[] + removed=[] + while len(parents) > 0: + pkgname = parents.pop() + if not index.has_key(pkgname): + # Parent was already deleted via another path + # XXX can happen? + print "YYYYYYYYYYYYYYYYYYYYYY %s deleted" % pkgname + continue + if index[pkgname]['height'] is None: + # parent is not in our list of build targets + continue + pkg=index[pkgname] + + children.extend(pkg['deps']) + parents.update(pkg['parents']) + removed.append(pkgname) + del index[pkgname] + + removed = set(removed) + children = set(children) +# print "Removed %d packages, touching %d children" % (len(removed), len(children)) + + for i in children.difference(removed): + par=index[i]['parents'] + index[i]['parents'] = list(set(par).difference(removed)) + +# XXX return an iter +def selectheights(index, level): + return [i for i in index.iterkeys() if index[i]['height'] == level] + +def rank(index, ready, sortd, max = None): + """ rank the list of ready packages according to those listed as + dependencies in successive entries of the sorted list """ + + input=set(ready) + output = [] + count = 0 + print "Working on depth ", + for i in sortd: + deps = set(index[i]['deps']) + both = deps.intersection(input) + if len(both) > 0: + print "%d " % index[i]['height'], + input.difference_update(both) + output.extend(list(both)) + if len(input) == 0: + break + if max: + count+=len(both) + if count > max: + return output + print + output.extend(list(input)) + + return output + +def jobsuccess(index, job): + + pkg = index[job] + # Build succeeded + for i in pkg['parents']: + index[i]['deps'].remove(job) + + # deps/parents tree now partially inconsistent but this is + # what we need to avoid counting the height of the entry + # we are about to remove (which would make it a NOP) + heightup(index, job) + + del index[job] + +def jobfailure(index, job): + + # Build failed + deleteup(index, job) + +class worker(threading.Thread): + + lock = threading.Lock() + + # List of running threads + tlist = [] + + # List of running jobs + running = [] + + # Used to signal dispatcher when we finish a job + event = threading.Event() + + def __init__(self, mach, job, queue, arch, branch): + threading.Thread.__init__(self) + self.job = job + self.mach = mach + self.queue = queue + self.arch = arch + self.branch = branch + + def run(self): + global index + + pkg = index[self.job] + + if len(pkg['deps']) != 0: + print "Running job with non-empty deps: %s" % pkg + assert(False) + + print "Running job %s" % (self.job) + while True: + retcode = subprocess.call(["/usr/bin/env", "FD=%s" % " ".join(["%s.tbz" % i for i in pkg['fdep']]), "ED=%s" % " ".join(["%s.tbz" % i for i in pkg['edep']]), "PD=%s" % " ".join(["%s.tbz" % i for i in pkg['pdep']]), "BD=%s" % " ".join(["%s.tbz" % i for i in pkg['bdep']]), "RD=%s" % " ".join(["%s.tbz" % i for i in pkg['rdep']]), "/var/portbuild/scripts/pdispatch2", self.mach, self.arch, self.branch, "/var/portbuild/scripts/portbuild", "%s.tbz" % self.job, pkg['path']]) + self.queue.release(self.mach) + if retcode != 254: + break + + # Failed to obtain job slot + time.sleep(15) + (self.mach, dummy) = self.queue.pick() + print "Retrying on %s" % self.mach + + print "Finished job %s" % self.job, + + if retcode == 0: + status = True + print + else: + status = False + print " with status %d" % retcode + + worker.lock.acquire() + worker.running.remove(self.job) + worker.tlist.remove(self) + if status == True: + jobsuccess(index, self.job) + else: + jobfailure(index, self.job) + + # Wake up dispatcher in case it was blocked + worker.event.set() + worker.event.clear() + + worker.lock.release() + + @staticmethod + def dispatch(mach, job, queue, arch, branch): + worker.lock.acquire() + wrk = worker(mach, job, queue, arch, branch) + worker.tlist.append(wrk) + worker.lock.release() + wrk.start() + +class machqueue(object): + path = ''; + fd = -1; + + # fcntl locks are per-process, so the fcntl lock acquisition will + # succeed if another thread already holds it. We need the fcntl + # lock for external visibility between processes but also need an + # internal lock for protecting against out own threads. + ilock = threading.Lock() + + def __init__(self, path): + super(machqueue, self).__init__() + self.path = path + self.fd = os.open("%s.lock" % self.path, os.O_RDWR|os.O_CREAT) + +# print "Initializing with %s %d" % (self.path, self.fd) + + def lock(self): + print "Locking...", +# ret = fcntl.lockf(self.fd, fcntl.LOCK_EX) + self.ilock.acquire() + print "success" + + def unlock(self): + print "Unlocking fd" + self.ilock.release() +# ret = fcntl.lockf(self.fd, fcntl.LOCK_UN) + + def poll(self): + """ Return currently available machines """ + + mfile = file(self.path + "../mlist", "r") + mlist = mfile.readlines() + mfile.close() + mlist = [i.rstrip() for i in mlist] # Chop \n + + list = os.listdir(self.path) + special = [] + machines = [] + for i in list: + if i.startswith('.'): + special.append(i) + else: + if i in mlist: + machines.append(i) + else: + os.unlink(self.path + i) + + print "Found machines %s" % machines + return (machines, special) + + def pick(self): + """ Choose a random machine from the queue """ + + min = 999 + while min == 999: + while True: + self.lock() + (machines, special) = self.poll() + if len(machines): + break + else: + self.unlock() + time.sleep(15) + # XXX Use kqueue to monitor for changes + + list = [] + # XXX Choose as fraction of capacity + for i in machines: + f = file(self.path + i, "r") + out = f.readline().rstrip() + try: + load = int(out) + except ValueError: + print "Bad value for %s: %s" % (i, out) + load = 999 + f.close() + if load < min: + min = load + list=[] + if load == min: + list.append(i) + print "(%s, %d)" % (list, load) + + if min == 999: + print "Bad queue length for %s" % list + self.unlock() + + machine = choice(list) + # XXX hook up config files + if min == 2: + # Queue full + os.unlink(self.path + machine) + else: + f = file(self.path + machine, "w") + f.write("%d\n" % (min + 1)) + f.flush() + f.close() + + self.unlock() + return (machine, special) + + def release(self, mach): + self.lock() + print "Releasing %s" % mach, + if os.path.exists(self.path + mach): + f = file(self.path + mach, "r+") + out = f.readline().rstrip() + try: + load = int(out) + except ValueError: + print "Queue error on release of %s: %s" % (mach, out) + load = 3 #XXX + else: + f = file(self.path + mach, "w") + load = 3 #XXX + +# f.truncate(0) + f.write("%d\n" % (load - 1)) + print "...now %d" % (load - 1) + f.flush() + f.close() + self.unlock() + +def main(arch, branch, args): + global index + + basedir="/var/portbuild/"+arch+"/"+branch + portsdir=basedir+"/ports" + indexfile=portsdir+"/INDEX-"+branch + indexfile="/var/portbuild/i386/7-exp/ports/INDEX-7" + + qlen = 100 + + q = machqueue("/var/portbuild/%s/queue/" % arch) + + print "parseindex..." + index=parseindex(indexfile) + print "length = %s" % len(index) + + targets = gettargets(index, args) + + print "heightindex..." + heightindex(index, targets) + + sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True) + wqueue = rank(index, selectheights(index, 1), (i[0] for i in sortd), qlen) + + # Main work loop + while len(sortd) > 0: + worker.lock.acquire() + print "Remaining %s" % len(sortd) + while len(wqueue) > 0: + job = wqueue.pop(0) + + if os.path.exists("/var/portbuild/%s/%s/packages/All/%s.tbz" % (arch, branch, job)): + print "Skipping %s since it already exists" % job + jobsuccess(index, job) + else: + worker.running.append(job) # Protect against a queue + # rebalance adding this + # back during build + worker.lock.release() + (machine, specials) = q.pick() + worker.dispatch(machine, job, q, arch, branch) + worker.lock.acquire() + + if len(wqueue) == 0: + if len(sortd) == 0: + # All jobs in progress, wait for children to exit + break + print "Rebalancing queue...", + sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True) + if len(sortd) == 0: + break + + print sortd[0:3] + if sortd[0][0] == 1: + # Everything left is depth 1, no need to waste time rebalancing further + qlen = len(index) + + # Don't add too many deps at once (e.g. after we build a + # package like gmake), or we will switch to buildinglots + # of shallow packages + ready = [i for i in selectheights(index, 1) if i not in worker.running] + wqueue = rank(index, ready, (i[0] for i in sortd), qlen)[:2*qlen] + print "now %s (%s ready)" % (wqueue, len(ready)) + + worker.lock.release() + + if len(wqueue) == 0: + # Ran out of work, wait for workers to free up some more + print "No work to do, sleeping on workers" + worker.event.wait() + + for i in worker.tlist: + i.join() + + print "Finished" + +if __name__ == "__main__": +# from guppy import hpy; h = hpy() + + main(sys.argv[1], sys.argv[2], sys.argv[3:]) + +# index = parseindex("/var/portbuild/i386/7-exp/ports/INDEX-7") +# print index['gmake-3.81_2'] + |