diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 92718b9afb70f2febd05555fa5208c73ba73813b..885db2fcbcf3dc43686a3dab90367382f07e0b54 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -41,10 +41,12 @@ import simplejson from cStringIO import StringIO from optparse import OptionParser +from ganeti import config from ganeti import constants from ganeti import mcpu from ganeti import opcodes from ganeti import jqueue +from ganeti import locking from ganeti import luxi from ganeti import utils from ganeti import errors @@ -65,17 +67,19 @@ class IOServer(SocketServer.UnixStreamServer): """ QUEUE_PROCESSOR_SIZE = 1 - def __init__(self, address, rqhandler): + def __init__(self, address, rqhandler, context): """IOServer constructor Args: address: the address to bind this IOServer to rqhandler: RequestHandler type object + context: Context Object common to all worker threads """ SocketServer.UnixStreamServer.__init__(self, address, rqhandler) self.do_quit = False self.queue = jqueue.QueueManager() + self.context = context self.processors = [] signal.signal(signal.SIGINT, self.handle_quit_signals) signal.signal(signal.SIGTERM, self.handle_quit_signals) @@ -90,7 +94,8 @@ class IOServer(SocketServer.UnixStreamServer): """ for i in range(self.QUEUE_PROCESSOR_SIZE): self.processors.append(threading.Thread(target=PoolWorker, - args=(i, self.queue.new_queue))) + args=(i, self.queue.new_queue, + self.context))) for t in self.processors: t.start() @@ -235,7 +240,7 @@ class ClientOps: return self.server.queue.query_jobs(fields, names) -def JobRunner(proc, job): +def JobRunner(proc, job, context): """Job executor. This functions processes a single job in the context of given @@ -244,6 +249,7 @@ def JobRunner(proc, job): Args: proc: Ganeti Processor to run the job on job: The job to run (unserialized format) + context: Ganeti shared context """ job.SetStatus(opcodes.Job.STATUS_RUNNING) @@ -263,7 +269,7 @@ def JobRunner(proc, job): job.SetStatus(opcodes.Job.STATUS_SUCCESS) -def PoolWorker(worker_id, incoming_queue): +def PoolWorker(worker_id, incoming_queue, context): """A worker thread function. This is the actual processor of a single thread of Job execution. @@ -271,6 +277,8 @@ def PoolWorker(worker_id, incoming_queue): Args: worker_id: the unique id for this worker incoming_queue: a queue to get jobs from + context: the common server context, containing all shared data and + synchronization structures. """ while True: @@ -283,7 +291,7 @@ def PoolWorker(worker_id, incoming_queue): try: proc = mcpu.Processor(feedback=lambda x: None) try: - JobRunner(proc, item) + JobRunner(proc, item, context) except errors.GenericError, err: msg = "ganeti exception %s" % err item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg]) @@ -305,6 +313,41 @@ def PoolWorker(worker_id, incoming_queue): print "worker %s exiting" % worker_id +class GanetiContext(object): + """Context common to all ganeti threads. + + This class creates and holds common objects shared by all threads. + + """ + _instance = None + + def __init__(self): + """Constructs a new GanetiContext object. + + There should be only a GanetiContext object at any time, so this + function raises an error if this is not the case. + + """ + assert self.__class__._instance is None, "double GanetiContext instance" + + # Create a ConfigWriter... + self.cfg = config.ConfigWriter() + # And a GanetiLockingManager... + self.GLM = locking.GanetiLockManager( + self.cfg.GetNodeList(), + self.cfg.GetInstanceList()) + + # setting this also locks the class against attribute modifications + self.__class__._instance = self + + def __setattr__(self, name, value): + """Setting GanetiContext attributes is forbidden after initialization. + + """ + assert self.__class__._instance is None, "Attempt to modify Ganeti Context" + object.__setattr__(self, name, value) + + def CheckMaster(debug): """Checks the node setup. @@ -362,7 +405,7 @@ def main(): CheckMaster(options.debug) - master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) + master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext()) # become a daemon if options.fork: