diff options
Diffstat (limited to 'Tools/portbuild/scripts/build')
-rwxr-xr-x | Tools/portbuild/scripts/build | 520 |
1 files changed, 520 insertions, 0 deletions
diff --git a/Tools/portbuild/scripts/build b/Tools/portbuild/scripts/build new file mode 100755 index 0000000..9beeeb1 --- /dev/null +++ b/Tools/portbuild/scripts/build @@ -0,0 +1,520 @@ +#!/usr/bin/env python + +# Improved build scheduler. We try to build leaf packages (those +# which can be built immediately without requiring additional +# dependencies to be built) in the order such that the ones required +# by the longest dependency chains are built first. +# +# This has the effect of favouring deep parts of the package tree and +# evening out the depth over time, hopefully avoiding the situation +# where the entire cluster waits for a deep part of the tree to +# build on a small number of machines +# +# Other advantages are that this system is easily customizable and +# will let us customize things like the matching policy of jobs to +# machines. +# +# TODO: +# * External queue manager +# * Mark completed packages instead of deleting them +# * check mtime for package staleness (cf make) +# * Check for parent mtimes after finishing child + +import os, sys, threading, time, subprocess, fcntl, operator +#from itertools import ifilter, imap +from random import choice + +def parseindex(indexfile): + + tmp={} + pkghash={} + for i in file(indexfile): + line=i.rstrip().split("|") + pkg = line[0] + tmp[pkg] = line[1:] + + # XXX hash category names too + + # Trick python into storing package names by reference instead of copying strings and wasting 60MB + pkghash[pkg] = pkg + + index=dict.fromkeys(tmp.keys()) + for pkg in tmp.iterkeys(): + line = tmp[pkg] + data={'name': pkg, 'path':line[0], + #'prefix':line[1], + #'comment':line[2], + #'descr':line[3], + #'maintainer':line[4], + 'categories':line[5], # XXX duplicates strings + 'bdep':[pkghash[i] for i in line[6].split(None)], + 'rdep':[pkghash[i] for i in line[7].split(None)], + #'www':line[8], + 'edep':[pkghash[i] for i in line[9].split(None)], + 'pdep':[pkghash[i] for i in line[10].split(None)], + 'fdep':[pkghash[i] for i in line[11].split(None)], + 'height':None} + if index[pkg] is None: + index[pkg] = data + else: + index[pkg].update(data) + if not index[pkg].has_key('parents'): + index[pkg]['parents'] = [] + + # XXX iter? + deps=set() + for j in ['bdep','rdep','edep','fdep','pdep']: + deps.update(set(index[pkg][j])) + index[pkg]['deps'] = [pkghash[i] for i in deps] + + for j in deps: + # This grossness is needed to avoid a second pass through + # the index, because we might be about to refer to + # packages that have not yet been processed + if index[j] is not None: + if index[j].has_key('parents'): + index[j]['parents'].append(pkghash[pkg]) + else: + index[j]['parents'] = [pkghash[pkg]] + else: + index[j] = {'parents':[pkghash[pkg]]} + + return index + +def gettargets(index, targets): + """ split command line arguments into list of packages to build. Returns set or iterable """ + # XXX make this return the full recursive list and use this later for processing wqueue + + plist = set() + if len(targets) == 0: + targets = ["all"] + for i in targets: + if i == "all": + plist = index.iterkeys() + break + if i.endswith("-all"): + cat = i.rpartition("-")[0] + plist.update(j for j in index.iterkeys() if cat in index[j]['categories']) + elif i.rstrip(".tbz") in index.iterkeys(): + plist.update([i.rstrip(".tbz")]) + + return plist + +def heightindex(index, targets): + """ Initial population of height tree """ + + for i in targets: + heightdown(index, i) + +def heightdown(index, pkgname): + """ + Recursively populate the height tree down from a given package, + assuming empty values on entries not yet visited + """ + + pkg=index[pkgname] + if pkg['height'] is None: + if len(pkg['deps']) > 0: + max = 0 + for i in pkg['deps']: + w = heightdown(index, i) + if w > max: + max = w + pkg['height'] = max + 1 + else: + pkg['height'] = 1 + return pkg['height'] + +def heightup(index, pkgname): + """ Recalculate the height tree going upwards from a package """ + + if not index.has_key(pkgname): + raise KeyError + + parents=set(index[pkgname]['parents']) + + while len(parents) > 0: + # XXX use a deque? + pkgname = parents.pop() + if not index.has_key(pkgname): + # XXX can this happen? + continue + pkg=index[pkgname] + oldheight=pkg['height'] + if oldheight is None: + # Parent is in our build target list + continue + if len(pkg['deps']) == 0: + newheight = 1 + else: + newheight=max(index[j]['height'] for j in pkg['deps']) + 1 + if newheight > oldheight: + print "%s height increasing: %d -> %d", pkg, oldheight, newheight + assert(False) + if newheight != oldheight: + pkg['height'] = newheight + parents.update(pkg['parents']) + +def deleteup(index, pkgname): + if not index.has_key(pkgname): + raise KeyError + + parents=set([pkgname]) + + children=[] + removed=[] + while len(parents) > 0: + pkgname = parents.pop() + if not index.has_key(pkgname): + # Parent was already deleted via another path + # XXX can happen? + print "YYYYYYYYYYYYYYYYYYYYYY %s deleted" % pkgname + continue + if index[pkgname]['height'] is None: + # parent is not in our list of build targets + continue + pkg=index[pkgname] + + children.extend(pkg['deps']) + parents.update(pkg['parents']) + removed.append(pkgname) + del index[pkgname] + + removed = set(removed) + children = set(children) +# print "Removed %d packages, touching %d children" % (len(removed), len(children)) + + for i in children.difference(removed): + par=index[i]['parents'] + index[i]['parents'] = list(set(par).difference(removed)) + +# XXX return an iter +def selectheights(index, level): + return [i for i in index.iterkeys() if index[i]['height'] == level] + +def rank(index, ready, sortd, max = None): + """ rank the list of ready packages according to those listed as + dependencies in successive entries of the sorted list """ + + input=set(ready) + output = [] + count = 0 + print "Working on depth ", + for i in sortd: + deps = set(index[i]['deps']) + both = deps.intersection(input) + if len(both) > 0: + print "%d " % index[i]['height'], + input.difference_update(both) + output.extend(list(both)) + if len(input) == 0: + break + if max: + count+=len(both) + if count > max: + return output + print + output.extend(list(input)) + + return output + +def jobsuccess(index, job): + + pkg = index[job] + # Build succeeded + for i in pkg['parents']: + index[i]['deps'].remove(job) + + # deps/parents tree now partially inconsistent but this is + # what we need to avoid counting the height of the entry + # we are about to remove (which would make it a NOP) + heightup(index, job) + + del index[job] + +def jobfailure(index, job): + + # Build failed + deleteup(index, job) + +class worker(threading.Thread): + + lock = threading.Lock() + + # List of running threads + tlist = [] + + # List of running jobs + running = [] + + # Used to signal dispatcher when we finish a job + event = threading.Event() + + def __init__(self, mach, job, queue, arch, branch): + threading.Thread.__init__(self) + self.job = job + self.mach = mach + self.queue = queue + self.arch = arch + self.branch = branch + + def run(self): + global index + + pkg = index[self.job] + + if len(pkg['deps']) != 0: + print "Running job with non-empty deps: %s" % pkg + assert(False) + + print "Running job %s" % (self.job) + while True: + retcode = subprocess.call(["/usr/bin/env", "FD=%s" % " ".join(["%s.tbz" % i for i in pkg['fdep']]), "ED=%s" % " ".join(["%s.tbz" % i for i in pkg['edep']]), "PD=%s" % " ".join(["%s.tbz" % i for i in pkg['pdep']]), "BD=%s" % " ".join(["%s.tbz" % i for i in pkg['bdep']]), "RD=%s" % " ".join(["%s.tbz" % i for i in pkg['rdep']]), "/var/portbuild/scripts/pdispatch2", self.mach, self.arch, self.branch, "/var/portbuild/scripts/portbuild", "%s.tbz" % self.job, pkg['path']]) + self.queue.release(self.mach) + if retcode != 254: + break + + # Failed to obtain job slot + time.sleep(15) + (self.mach, dummy) = self.queue.pick() + print "Retrying on %s" % self.mach + + print "Finished job %s" % self.job, + + if retcode == 0: + status = True + print + else: + status = False + print " with status %d" % retcode + + worker.lock.acquire() + worker.running.remove(self.job) + worker.tlist.remove(self) + if status == True: + jobsuccess(index, self.job) + else: + jobfailure(index, self.job) + + # Wake up dispatcher in case it was blocked + worker.event.set() + worker.event.clear() + + worker.lock.release() + + @staticmethod + def dispatch(mach, job, queue, arch, branch): + worker.lock.acquire() + wrk = worker(mach, job, queue, arch, branch) + worker.tlist.append(wrk) + worker.lock.release() + wrk.start() + +class machqueue(object): + path = ''; + fd = -1; + + # fcntl locks are per-process, so the fcntl lock acquisition will + # succeed if another thread already holds it. We need the fcntl + # lock for external visibility between processes but also need an + # internal lock for protecting against out own threads. + ilock = threading.Lock() + + def __init__(self, path): + super(machqueue, self).__init__() + self.path = path + self.fd = os.open("%s.lock" % self.path, os.O_RDWR|os.O_CREAT) + +# print "Initializing with %s %d" % (self.path, self.fd) + + def lock(self): + print "Locking...", +# ret = fcntl.lockf(self.fd, fcntl.LOCK_EX) + self.ilock.acquire() + print "success" + + def unlock(self): + print "Unlocking fd" + self.ilock.release() +# ret = fcntl.lockf(self.fd, fcntl.LOCK_UN) + + def poll(self): + """ Return currently available machines """ + + mfile = file(self.path + "../mlist", "r") + mlist = mfile.readlines() + mfile.close() + mlist = [i.rstrip() for i in mlist] # Chop \n + + list = os.listdir(self.path) + special = [] + machines = [] + for i in list: + if i.startswith('.'): + special.append(i) + else: + if i in mlist: + machines.append(i) + else: + os.unlink(self.path + i) + + print "Found machines %s" % machines + return (machines, special) + + def pick(self): + """ Choose a random machine from the queue """ + + min = 999 + while min == 999: + while True: + self.lock() + (machines, special) = self.poll() + if len(machines): + break + else: + self.unlock() + time.sleep(15) + # XXX Use kqueue to monitor for changes + + list = [] + # XXX Choose as fraction of capacity + for i in machines: + f = file(self.path + i, "r") + out = f.readline().rstrip() + try: + load = int(out) + except ValueError: + print "Bad value for %s: %s" % (i, out) + load = 999 + f.close() + if load < min: + min = load + list=[] + if load == min: + list.append(i) + print "(%s, %d)" % (list, load) + + if min == 999: + print "Bad queue length for %s" % list + self.unlock() + + machine = choice(list) + # XXX hook up config files + if min == 2: + # Queue full + os.unlink(self.path + machine) + else: + f = file(self.path + machine, "w") + f.write("%d\n" % (min + 1)) + f.flush() + f.close() + + self.unlock() + return (machine, special) + + def release(self, mach): + self.lock() + print "Releasing %s" % mach, + if os.path.exists(self.path + mach): + f = file(self.path + mach, "r+") + out = f.readline().rstrip() + try: + load = int(out) + except ValueError: + print "Queue error on release of %s: %s" % (mach, out) + load = 3 #XXX + else: + f = file(self.path + mach, "w") + load = 3 #XXX + +# f.truncate(0) + f.write("%d\n" % (load - 1)) + print "...now %d" % (load - 1) + f.flush() + f.close() + self.unlock() + +def main(arch, branch, args): + global index + + basedir="/var/portbuild/"+arch+"/"+branch + portsdir=basedir+"/ports" + indexfile=portsdir+"/INDEX-"+branch + indexfile="/var/portbuild/i386/7-exp/ports/INDEX-7" + + qlen = 100 + + q = machqueue("/var/portbuild/%s/queue/" % arch) + + print "parseindex..." + index=parseindex(indexfile) + print "length = %s" % len(index) + + targets = gettargets(index, args) + + print "heightindex..." + heightindex(index, targets) + + sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True) + wqueue = rank(index, selectheights(index, 1), (i[0] for i in sortd), qlen) + + # Main work loop + while len(sortd) > 0: + worker.lock.acquire() + print "Remaining %s" % len(sortd) + while len(wqueue) > 0: + job = wqueue.pop(0) + + if os.path.exists("/var/portbuild/%s/%s/packages/All/%s.tbz" % (arch, branch, job)): + print "Skipping %s since it already exists" % job + jobsuccess(index, job) + else: + worker.running.append(job) # Protect against a queue + # rebalance adding this + # back during build + worker.lock.release() + (machine, specials) = q.pick() + worker.dispatch(machine, job, q, arch, branch) + worker.lock.acquire() + + if len(wqueue) == 0: + if len(sortd) == 0: + # All jobs in progress, wait for children to exit + break + print "Rebalancing queue...", + sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True) + if len(sortd) == 0: + break + + print sortd[0:3] + if sortd[0][0] == 1: + # Everything left is depth 1, no need to waste time rebalancing further + qlen = len(index) + + # Don't add too many deps at once (e.g. after we build a + # package like gmake), or we will switch to buildinglots + # of shallow packages + ready = [i for i in selectheights(index, 1) if i not in worker.running] + wqueue = rank(index, ready, (i[0] for i in sortd), qlen)[:2*qlen] + print "now %s (%s ready)" % (wqueue, len(ready)) + + worker.lock.release() + + if len(wqueue) == 0: + # Ran out of work, wait for workers to free up some more + print "No work to do, sleeping on workers" + worker.event.wait() + + for i in worker.tlist: + i.join() + + print "Finished" + +if __name__ == "__main__": +# from guppy import hpy; h = hpy() + + main(sys.argv[1], sys.argv[2], sys.argv[3:]) + +# index = parseindex("/var/portbuild/i386/7-exp/ports/INDEX-7") +# print index['gmake-3.81_2'] + |