From 39dcf2efae0b4dcdd9dfb8b0ffdd3654e450ed47 Mon Sep 17 00:00:00 2001
From: Guido Trotter <ultrotter@google.com>
Date: Mon, 30 Jun 2008 12:37:48 +0000
Subject: [PATCH] ganeti-masterd: init and distribute common context

This patch creates a new GanetiContext class, which is used to hold
context common to all ganeti worker threads. As for the
GanetiLockingManager class it is paramount that there is only one such
class throughout the execution of Ganeti, so the class checks for that,
and also forbids its own modification after it's been initialized. The
context for now contains a ConfigWriter and a GanetiLockingManager and
is created by the daemon and propagated to PoolWorker(s) and
JobRunner(s).

Reviewed-by: iustinp
---
 daemons/ganeti-masterd | 55 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 49 insertions(+), 6 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index 92718b9af..885db2fcb 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -41,10 +41,12 @@ import simplejson
 from cStringIO import StringIO
 from optparse import OptionParser
 
+from ganeti import config
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
+from ganeti import locking
 from ganeti import luxi
 from ganeti import utils
 from ganeti import errors
@@ -65,17 +67,19 @@ class IOServer(SocketServer.UnixStreamServer):
   """
   QUEUE_PROCESSOR_SIZE = 1
 
-  def __init__(self, address, rqhandler):
+  def __init__(self, address, rqhandler, context):
     """IOServer constructor
 
     Args:
       address: the address to bind this IOServer to
       rqhandler: RequestHandler type object
+      context: Context Object common to all worker threads
 
     """
     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
     self.do_quit = False
     self.queue = jqueue.QueueManager()
+    self.context = context
     self.processors = []
     signal.signal(signal.SIGINT, self.handle_quit_signals)
     signal.signal(signal.SIGTERM, self.handle_quit_signals)
@@ -90,7 +94,8 @@ class IOServer(SocketServer.UnixStreamServer):
     """
     for i in range(self.QUEUE_PROCESSOR_SIZE):
       self.processors.append(threading.Thread(target=PoolWorker,
-                                              args=(i, self.queue.new_queue)))
+                                              args=(i, self.queue.new_queue,
+                                                    self.context)))
     for t in self.processors:
       t.start()
 
@@ -235,7 +240,7 @@ class ClientOps:
     return self.server.queue.query_jobs(fields, names)
 
 
-def JobRunner(proc, job):
+def JobRunner(proc, job, context):
   """Job executor.
 
   This functions processes a single job in the context of given
@@ -244,6 +249,7 @@ def JobRunner(proc, job):
   Args:
     proc: Ganeti Processor to run the job on
     job: The job to run (unserialized format)
+    context: Ganeti shared context
 
   """
   job.SetStatus(opcodes.Job.STATUS_RUNNING)
@@ -263,7 +269,7 @@ def JobRunner(proc, job):
     job.SetStatus(opcodes.Job.STATUS_SUCCESS)
 
 
-def PoolWorker(worker_id, incoming_queue):
+def PoolWorker(worker_id, incoming_queue, context):
   """A worker thread function.
 
   This is the actual processor of a single thread of Job execution.
@@ -271,6 +277,8 @@ def PoolWorker(worker_id, incoming_queue):
   Args:
     worker_id: the unique id for this worker
     incoming_queue: a queue to get jobs from
+    context: the common server context, containing all shared data and
+             synchronization structures.
 
   """
   while True:
@@ -283,7 +291,7 @@ def PoolWorker(worker_id, incoming_queue):
     try:
       proc = mcpu.Processor(feedback=lambda x: None)
       try:
-        JobRunner(proc, item)
+        JobRunner(proc, item, context)
       except errors.GenericError, err:
         msg = "ganeti exception %s" % err
         item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
@@ -305,6 +313,41 @@ def PoolWorker(worker_id, incoming_queue):
   print "worker %s exiting" % worker_id
 
 
+class GanetiContext(object):
+  """Context common to all ganeti threads.
+
+  This class creates and holds common objects shared by all threads.
+
+  """
+  _instance = None
+
+  def __init__(self):
+    """Constructs a new GanetiContext object.
+
+    There should be only a GanetiContext object at any time, so this
+    function raises an error if this is not the case.
+
+    """
+    assert self.__class__._instance is None, "double GanetiContext instance"
+
+    # Create a ConfigWriter...
+    self.cfg = config.ConfigWriter()
+    # And a GanetiLockingManager...
+    self.GLM = locking.GanetiLockManager(
+                self.cfg.GetNodeList(),
+                self.cfg.GetInstanceList())
+
+    # setting this also locks the class against attribute modifications
+    self.__class__._instance = self
+
+  def __setattr__(self, name, value):
+    """Setting GanetiContext attributes is forbidden after initialization.
+
+    """
+    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
+    object.__setattr__(self, name, value)
+
+
 def CheckMaster(debug):
   """Checks the node setup.
 
@@ -362,7 +405,7 @@ def main():
 
   CheckMaster(options.debug)
 
-  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
+  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
 
   # become a daemon
   if options.fork:
-- 
GitLab