summaryrefslogtreecommitdiffstats
path: root/Tools/portbuild/scripts/build
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/portbuild/scripts/build')
-rwxr-xr-xTools/portbuild/scripts/build520
1 files changed, 520 insertions, 0 deletions
diff --git a/Tools/portbuild/scripts/build b/Tools/portbuild/scripts/build
new file mode 100755
index 0000000..9beeeb1
--- /dev/null
+++ b/Tools/portbuild/scripts/build
@@ -0,0 +1,520 @@
+#!/usr/bin/env python
+
+# Improved build scheduler. We try to build leaf packages (those
+# which can be built immediately without requiring additional
+# dependencies to be built) in the order such that the ones required
+# by the longest dependency chains are built first.
+#
+# This has the effect of favouring deep parts of the package tree and
+# evening out the depth over time, hopefully avoiding the situation
+# where the entire cluster waits for a deep part of the tree to
+# build on a small number of machines
+#
+# Other advantages are that this system is easily customizable and
+# will let us customize things like the matching policy of jobs to
+# machines.
+#
+# TODO:
+# * External queue manager
+# * Mark completed packages instead of deleting them
+# * check mtime for package staleness (cf make)
+# * Check for parent mtimes after finishing child
+
+import os, sys, threading, time, subprocess, fcntl, operator
+#from itertools import ifilter, imap
+from random import choice
+
+def parseindex(indexfile):
+
+ tmp={}
+ pkghash={}
+ for i in file(indexfile):
+ line=i.rstrip().split("|")
+ pkg = line[0]
+ tmp[pkg] = line[1:]
+
+ # XXX hash category names too
+
+ # Trick python into storing package names by reference instead of copying strings and wasting 60MB
+ pkghash[pkg] = pkg
+
+ index=dict.fromkeys(tmp.keys())
+ for pkg in tmp.iterkeys():
+ line = tmp[pkg]
+ data={'name': pkg, 'path':line[0],
+ #'prefix':line[1],
+ #'comment':line[2],
+ #'descr':line[3],
+ #'maintainer':line[4],
+ 'categories':line[5], # XXX duplicates strings
+ 'bdep':[pkghash[i] for i in line[6].split(None)],
+ 'rdep':[pkghash[i] for i in line[7].split(None)],
+ #'www':line[8],
+ 'edep':[pkghash[i] for i in line[9].split(None)],
+ 'pdep':[pkghash[i] for i in line[10].split(None)],
+ 'fdep':[pkghash[i] for i in line[11].split(None)],
+ 'height':None}
+ if index[pkg] is None:
+ index[pkg] = data
+ else:
+ index[pkg].update(data)
+ if not index[pkg].has_key('parents'):
+ index[pkg]['parents'] = []
+
+ # XXX iter?
+ deps=set()
+ for j in ['bdep','rdep','edep','fdep','pdep']:
+ deps.update(set(index[pkg][j]))
+ index[pkg]['deps'] = [pkghash[i] for i in deps]
+
+ for j in deps:
+ # This grossness is needed to avoid a second pass through
+ # the index, because we might be about to refer to
+ # packages that have not yet been processed
+ if index[j] is not None:
+ if index[j].has_key('parents'):
+ index[j]['parents'].append(pkghash[pkg])
+ else:
+ index[j]['parents'] = [pkghash[pkg]]
+ else:
+ index[j] = {'parents':[pkghash[pkg]]}
+
+ return index
+
+def gettargets(index, targets):
+ """ split command line arguments into list of packages to build. Returns set or iterable """
+ # XXX make this return the full recursive list and use this later for processing wqueue
+
+ plist = set()
+ if len(targets) == 0:
+ targets = ["all"]
+ for i in targets:
+ if i == "all":
+ plist = index.iterkeys()
+ break
+ if i.endswith("-all"):
+ cat = i.rpartition("-")[0]
+ plist.update(j for j in index.iterkeys() if cat in index[j]['categories'])
+ elif i.rstrip(".tbz") in index.iterkeys():
+ plist.update([i.rstrip(".tbz")])
+
+ return plist
+
+def heightindex(index, targets):
+ """ Initial population of height tree """
+
+ for i in targets:
+ heightdown(index, i)
+
+def heightdown(index, pkgname):
+ """
+ Recursively populate the height tree down from a given package,
+ assuming empty values on entries not yet visited
+ """
+
+ pkg=index[pkgname]
+ if pkg['height'] is None:
+ if len(pkg['deps']) > 0:
+ max = 0
+ for i in pkg['deps']:
+ w = heightdown(index, i)
+ if w > max:
+ max = w
+ pkg['height'] = max + 1
+ else:
+ pkg['height'] = 1
+ return pkg['height']
+
+def heightup(index, pkgname):
+ """ Recalculate the height tree going upwards from a package """
+
+ if not index.has_key(pkgname):
+ raise KeyError
+
+ parents=set(index[pkgname]['parents'])
+
+ while len(parents) > 0:
+ # XXX use a deque?
+ pkgname = parents.pop()
+ if not index.has_key(pkgname):
+ # XXX can this happen?
+ continue
+ pkg=index[pkgname]
+ oldheight=pkg['height']
+ if oldheight is None:
+ # Parent is in our build target list
+ continue
+ if len(pkg['deps']) == 0:
+ newheight = 1
+ else:
+ newheight=max(index[j]['height'] for j in pkg['deps']) + 1
+ if newheight > oldheight:
+ print "%s height increasing: %d -> %d", pkg, oldheight, newheight
+ assert(False)
+ if newheight != oldheight:
+ pkg['height'] = newheight
+ parents.update(pkg['parents'])
+
+def deleteup(index, pkgname):
+ if not index.has_key(pkgname):
+ raise KeyError
+
+ parents=set([pkgname])
+
+ children=[]
+ removed=[]
+ while len(parents) > 0:
+ pkgname = parents.pop()
+ if not index.has_key(pkgname):
+ # Parent was already deleted via another path
+ # XXX can happen?
+ print "YYYYYYYYYYYYYYYYYYYYYY %s deleted" % pkgname
+ continue
+ if index[pkgname]['height'] is None:
+ # parent is not in our list of build targets
+ continue
+ pkg=index[pkgname]
+
+ children.extend(pkg['deps'])
+ parents.update(pkg['parents'])
+ removed.append(pkgname)
+ del index[pkgname]
+
+ removed = set(removed)
+ children = set(children)
+# print "Removed %d packages, touching %d children" % (len(removed), len(children))
+
+ for i in children.difference(removed):
+ par=index[i]['parents']
+ index[i]['parents'] = list(set(par).difference(removed))
+
+# XXX return an iter
+def selectheights(index, level):
+ return [i for i in index.iterkeys() if index[i]['height'] == level]
+
+def rank(index, ready, sortd, max = None):
+ """ rank the list of ready packages according to those listed as
+ dependencies in successive entries of the sorted list """
+
+ input=set(ready)
+ output = []
+ count = 0
+ print "Working on depth ",
+ for i in sortd:
+ deps = set(index[i]['deps'])
+ both = deps.intersection(input)
+ if len(both) > 0:
+ print "%d " % index[i]['height'],
+ input.difference_update(both)
+ output.extend(list(both))
+ if len(input) == 0:
+ break
+ if max:
+ count+=len(both)
+ if count > max:
+ return output
+ print
+ output.extend(list(input))
+
+ return output
+
+def jobsuccess(index, job):
+
+ pkg = index[job]
+ # Build succeeded
+ for i in pkg['parents']:
+ index[i]['deps'].remove(job)
+
+ # deps/parents tree now partially inconsistent but this is
+ # what we need to avoid counting the height of the entry
+ # we are about to remove (which would make it a NOP)
+ heightup(index, job)
+
+ del index[job]
+
+def jobfailure(index, job):
+
+ # Build failed
+ deleteup(index, job)
+
+class worker(threading.Thread):
+
+ lock = threading.Lock()
+
+ # List of running threads
+ tlist = []
+
+ # List of running jobs
+ running = []
+
+ # Used to signal dispatcher when we finish a job
+ event = threading.Event()
+
+ def __init__(self, mach, job, queue, arch, branch):
+ threading.Thread.__init__(self)
+ self.job = job
+ self.mach = mach
+ self.queue = queue
+ self.arch = arch
+ self.branch = branch
+
+ def run(self):
+ global index
+
+ pkg = index[self.job]
+
+ if len(pkg['deps']) != 0:
+ print "Running job with non-empty deps: %s" % pkg
+ assert(False)
+
+ print "Running job %s" % (self.job)
+ while True:
+ retcode = subprocess.call(["/usr/bin/env", "FD=%s" % " ".join(["%s.tbz" % i for i in pkg['fdep']]), "ED=%s" % " ".join(["%s.tbz" % i for i in pkg['edep']]), "PD=%s" % " ".join(["%s.tbz" % i for i in pkg['pdep']]), "BD=%s" % " ".join(["%s.tbz" % i for i in pkg['bdep']]), "RD=%s" % " ".join(["%s.tbz" % i for i in pkg['rdep']]), "/var/portbuild/scripts/pdispatch2", self.mach, self.arch, self.branch, "/var/portbuild/scripts/portbuild", "%s.tbz" % self.job, pkg['path']])
+ self.queue.release(self.mach)
+ if retcode != 254:
+ break
+
+ # Failed to obtain job slot
+ time.sleep(15)
+ (self.mach, dummy) = self.queue.pick()
+ print "Retrying on %s" % self.mach
+
+ print "Finished job %s" % self.job,
+
+ if retcode == 0:
+ status = True
+ print
+ else:
+ status = False
+ print " with status %d" % retcode
+
+ worker.lock.acquire()
+ worker.running.remove(self.job)
+ worker.tlist.remove(self)
+ if status == True:
+ jobsuccess(index, self.job)
+ else:
+ jobfailure(index, self.job)
+
+ # Wake up dispatcher in case it was blocked
+ worker.event.set()
+ worker.event.clear()
+
+ worker.lock.release()
+
+ @staticmethod
+ def dispatch(mach, job, queue, arch, branch):
+ worker.lock.acquire()
+ wrk = worker(mach, job, queue, arch, branch)
+ worker.tlist.append(wrk)
+ worker.lock.release()
+ wrk.start()
+
+class machqueue(object):
+ path = '';
+ fd = -1;
+
+ # fcntl locks are per-process, so the fcntl lock acquisition will
+ # succeed if another thread already holds it. We need the fcntl
+ # lock for external visibility between processes but also need an
+ # internal lock for protecting against out own threads.
+ ilock = threading.Lock()
+
+ def __init__(self, path):
+ super(machqueue, self).__init__()
+ self.path = path
+ self.fd = os.open("%s.lock" % self.path, os.O_RDWR|os.O_CREAT)
+
+# print "Initializing with %s %d" % (self.path, self.fd)
+
+ def lock(self):
+ print "Locking...",
+# ret = fcntl.lockf(self.fd, fcntl.LOCK_EX)
+ self.ilock.acquire()
+ print "success"
+
+ def unlock(self):
+ print "Unlocking fd"
+ self.ilock.release()
+# ret = fcntl.lockf(self.fd, fcntl.LOCK_UN)
+
+ def poll(self):
+ """ Return currently available machines """
+
+ mfile = file(self.path + "../mlist", "r")
+ mlist = mfile.readlines()
+ mfile.close()
+ mlist = [i.rstrip() for i in mlist] # Chop \n
+
+ list = os.listdir(self.path)
+ special = []
+ machines = []
+ for i in list:
+ if i.startswith('.'):
+ special.append(i)
+ else:
+ if i in mlist:
+ machines.append(i)
+ else:
+ os.unlink(self.path + i)
+
+ print "Found machines %s" % machines
+ return (machines, special)
+
+ def pick(self):
+ """ Choose a random machine from the queue """
+
+ min = 999
+ while min == 999:
+ while True:
+ self.lock()
+ (machines, special) = self.poll()
+ if len(machines):
+ break
+ else:
+ self.unlock()
+ time.sleep(15)
+ # XXX Use kqueue to monitor for changes
+
+ list = []
+ # XXX Choose as fraction of capacity
+ for i in machines:
+ f = file(self.path + i, "r")
+ out = f.readline().rstrip()
+ try:
+ load = int(out)
+ except ValueError:
+ print "Bad value for %s: %s" % (i, out)
+ load = 999
+ f.close()
+ if load < min:
+ min = load
+ list=[]
+ if load == min:
+ list.append(i)
+ print "(%s, %d)" % (list, load)
+
+ if min == 999:
+ print "Bad queue length for %s" % list
+ self.unlock()
+
+ machine = choice(list)
+ # XXX hook up config files
+ if min == 2:
+ # Queue full
+ os.unlink(self.path + machine)
+ else:
+ f = file(self.path + machine, "w")
+ f.write("%d\n" % (min + 1))
+ f.flush()
+ f.close()
+
+ self.unlock()
+ return (machine, special)
+
+ def release(self, mach):
+ self.lock()
+ print "Releasing %s" % mach,
+ if os.path.exists(self.path + mach):
+ f = file(self.path + mach, "r+")
+ out = f.readline().rstrip()
+ try:
+ load = int(out)
+ except ValueError:
+ print "Queue error on release of %s: %s" % (mach, out)
+ load = 3 #XXX
+ else:
+ f = file(self.path + mach, "w")
+ load = 3 #XXX
+
+# f.truncate(0)
+ f.write("%d\n" % (load - 1))
+ print "...now %d" % (load - 1)
+ f.flush()
+ f.close()
+ self.unlock()
+
+def main(arch, branch, args):
+ global index
+
+ basedir="/var/portbuild/"+arch+"/"+branch
+ portsdir=basedir+"/ports"
+ indexfile=portsdir+"/INDEX-"+branch
+ indexfile="/var/portbuild/i386/7-exp/ports/INDEX-7"
+
+ qlen = 100
+
+ q = machqueue("/var/portbuild/%s/queue/" % arch)
+
+ print "parseindex..."
+ index=parseindex(indexfile)
+ print "length = %s" % len(index)
+
+ targets = gettargets(index, args)
+
+ print "heightindex..."
+ heightindex(index, targets)
+
+ sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
+ wqueue = rank(index, selectheights(index, 1), (i[0] for i in sortd), qlen)
+
+ # Main work loop
+ while len(sortd) > 0:
+ worker.lock.acquire()
+ print "Remaining %s" % len(sortd)
+ while len(wqueue) > 0:
+ job = wqueue.pop(0)
+
+ if os.path.exists("/var/portbuild/%s/%s/packages/All/%s.tbz" % (arch, branch, job)):
+ print "Skipping %s since it already exists" % job
+ jobsuccess(index, job)
+ else:
+ worker.running.append(job) # Protect against a queue
+ # rebalance adding this
+ # back during build
+ worker.lock.release()
+ (machine, specials) = q.pick()
+ worker.dispatch(machine, job, q, arch, branch)
+ worker.lock.acquire()
+
+ if len(wqueue) == 0:
+ if len(sortd) == 0:
+ # All jobs in progress, wait for children to exit
+ break
+ print "Rebalancing queue...",
+ sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
+ if len(sortd) == 0:
+ break
+
+ print sortd[0:3]
+ if sortd[0][0] == 1:
+ # Everything left is depth 1, no need to waste time rebalancing further
+ qlen = len(index)
+
+ # Don't add too many deps at once (e.g. after we build a
+ # package like gmake), or we will switch to buildinglots
+ # of shallow packages
+ ready = [i for i in selectheights(index, 1) if i not in worker.running]
+ wqueue = rank(index, ready, (i[0] for i in sortd), qlen)[:2*qlen]
+ print "now %s (%s ready)" % (wqueue, len(ready))
+
+ worker.lock.release()
+
+ if len(wqueue) == 0:
+ # Ran out of work, wait for workers to free up some more
+ print "No work to do, sleeping on workers"
+ worker.event.wait()
+
+ for i in worker.tlist:
+ i.join()
+
+ print "Finished"
+
+if __name__ == "__main__":
+# from guppy import hpy; h = hpy()
+
+ main(sys.argv[1], sys.argv[2], sys.argv[3:])
+
+# index = parseindex("/var/portbuild/i386/7-exp/ports/INDEX-7")
+# print index['gmake-3.81_2']
+
OpenPOWER on IntegriCloud