--- pickdist.py.ORIG 2010-04-28 01:55:09.000000000 -0700 +++ pickdist.py 2010-04-28 01:57:15.000000000 -0700 @@ -20,16 +20,34 @@ * Reserved. * * Contributor(s): AGNITAS AG. +* +* +* Added mutliprocessing support (forking) to improve performance via the py2.6 +* multiprocessing library. Modify numthread below to specify how many +* processes to fork out. +* +* Liam Slusser liam@slacker.com +* April 26th 2010 +* ********************************************************************************** """ # import os, time, signal import shutil import agn +# import multiprocessing goodness +from multiprocessing import Process agn.require ('1.5.3') agn.loglevel = agn.LV_INFO if agn.iswin: import subprocess + +# the number of forks to spawn to do work +numthread = 12 + +# some debugging messages if you want +debug = 0 + # class Block: def __init__ (self, path): @@ -159,7 +177,9 @@ class Pickdist: # adjust queue size to be processed def queueIsFree (self): - return len ([_f for _f in os.listdir (self.queue) if _f[:2] == 'qf']) < 5000 + #return len ([_f for _f in os.listdir (self.queue) if _f[:2] == 'qf']) < 5000 + # Why wait, just keep going + return True def hasData (self): return len (self.data) > 0 @@ -177,42 +197,68 @@ def handler (sig, stack): global term term = True -signal.signal (signal.SIGINT, handler) -signal.signal (signal.SIGTERM, handler) -if not agn.iswin: - signal.signal (signal.SIGHUP, signal.SIG_IGN) - signal.signal (signal.SIGPIPE, signal.SIG_IGN) -# -agn.lock () -agn.log (agn.LV_INFO, 'main', 'Starting up') -# -pd = Pickdist () -while not term: - time.sleep (1) - agn.mark (agn.LV_INFO, 'loop', 180) - if pd.scanForData () == 0: - delay = 30 - agn.log (agn.LV_VERBOSE, 'loop', 'No ready to send data file found') +def doPackMove(pd,blk,agn): + + if blk.unpack (pd.queue): + blk.moveTo (agn.mkArchiveDirectory (pd.archive)) else: - delay = 0 - while not term and pd.hasData (): - if not pd.queueIsFree (): - agn.log (agn.LV_INFO, 'loop', 'Queue is already filled up') - delay = 180 - break - blk = pd.getNextBlock () - if blk.unpack (pd.queue): - blk.moveTo (agn.mkArchiveDirectory (pd.archive)) - else: - blk.moveTo (pd.recover) - while not term and delay > 0: - - if agn.iswin and agn.winstop (): - term = True - break + blk.moveTo (pd.recover) + +if __name__ == '__main__': + signal.signal (signal.SIGINT, handler) + signal.signal (signal.SIGTERM, handler) + + if not agn.iswin: + signal.signal (signal.SIGHUP, signal.SIG_IGN) + signal.signal (signal.SIGPIPE, signal.SIG_IGN) + # + agn.lock () + agn.log (agn.LV_INFO, 'main', 'Starting up') + + # + pd = Pickdist () + while not term: time.sleep (1) - delay -= 1 -# -agn.log (agn.LV_INFO, 'main', 'Going down') -agn.unlock () + agn.mark (agn.LV_INFO, 'loop', 180) + if pd.scanForData () == 0: + delay = 30 + agn.log (agn.LV_VERBOSE, 'loop', 'No ready to send data file found') + else: + delay = 0 + while not term and pd.hasData (): + if not pd.queueIsFree (): + agn.log (agn.LV_INFO, 'loop', 'Queue is already filled up') + delay = 180 + break + + p = {} + # spawn a whole bunch of workers! + for proc in range(0,numthread): + if not pd.hasData(): + break + else: + if debug: + print "%s spawning process" % (proc) + blk = pd.getNextBlock () + p[proc] = Process(target=doPackMove, args=(pd,blk,agn)) + p[proc].start() + + # wait for workers to finish + for proc in range(0,numthread): + if p.has_key(proc): + if debug: + print "%s joining process" % (proc) + p[proc].join() + + + while not term and delay > 0: + + if agn.iswin and agn.winstop (): + term = True + break + time.sleep (1) + delay -= 1 + # + agn.log (agn.LV_INFO, 'main', 'Going down') + agn.unlock ()