Skip to content
Snippets Groups Projects
Commit 39dcf2ef authored by Guido Trotter's avatar Guido Trotter
Browse files

ganeti-masterd: init and distribute common context

This patch creates a new GanetiContext class, which is used to hold
context common to all ganeti worker threads. As for the
GanetiLockingManager class it is paramount that there is only one such
class throughout the execution of Ganeti, so the class checks for that,
and also forbids its own modification after it's been initialized. The
context for now contains a ConfigWriter and a GanetiLockingManager and
is created by the daemon and propagated to PoolWorker(s) and
JobRunner(s).

Reviewed-by: iustinp
parent 827f753e
No related branches found
No related tags found
No related merge requests found
......@@ -41,10 +41,12 @@ import simplejson
from cStringIO import StringIO
from optparse import OptionParser
from ganeti import config
from ganeti import constants
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
from ganeti import locking
from ganeti import luxi
from ganeti import utils
from ganeti import errors
......@@ -65,17 +67,19 @@ class IOServer(SocketServer.UnixStreamServer):
"""
QUEUE_PROCESSOR_SIZE = 1
def __init__(self, address, rqhandler):
def __init__(self, address, rqhandler, context):
"""IOServer constructor
Args:
address: the address to bind this IOServer to
rqhandler: RequestHandler type object
context: Context Object common to all worker threads
"""
SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
self.do_quit = False
self.queue = jqueue.QueueManager()
self.context = context
self.processors = []
signal.signal(signal.SIGINT, self.handle_quit_signals)
signal.signal(signal.SIGTERM, self.handle_quit_signals)
......@@ -90,7 +94,8 @@ class IOServer(SocketServer.UnixStreamServer):
"""
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.processors.append(threading.Thread(target=PoolWorker,
args=(i, self.queue.new_queue)))
args=(i, self.queue.new_queue,
self.context)))
for t in self.processors:
t.start()
......@@ -235,7 +240,7 @@ class ClientOps:
return self.server.queue.query_jobs(fields, names)
def JobRunner(proc, job):
def JobRunner(proc, job, context):
"""Job executor.
This functions processes a single job in the context of given
......@@ -244,6 +249,7 @@ def JobRunner(proc, job):
Args:
proc: Ganeti Processor to run the job on
job: The job to run (unserialized format)
context: Ganeti shared context
"""
job.SetStatus(opcodes.Job.STATUS_RUNNING)
......@@ -263,7 +269,7 @@ def JobRunner(proc, job):
job.SetStatus(opcodes.Job.STATUS_SUCCESS)
def PoolWorker(worker_id, incoming_queue):
def PoolWorker(worker_id, incoming_queue, context):
"""A worker thread function.
This is the actual processor of a single thread of Job execution.
......@@ -271,6 +277,8 @@ def PoolWorker(worker_id, incoming_queue):
Args:
worker_id: the unique id for this worker
incoming_queue: a queue to get jobs from
context: the common server context, containing all shared data and
synchronization structures.
"""
while True:
......@@ -283,7 +291,7 @@ def PoolWorker(worker_id, incoming_queue):
try:
proc = mcpu.Processor(feedback=lambda x: None)
try:
JobRunner(proc, item)
JobRunner(proc, item, context)
except errors.GenericError, err:
msg = "ganeti exception %s" % err
item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
......@@ -305,6 +313,41 @@ def PoolWorker(worker_id, incoming_queue):
print "worker %s exiting" % worker_id
class GanetiContext(object):
"""Context common to all ganeti threads.
This class creates and holds common objects shared by all threads.
"""
_instance = None
def __init__(self):
"""Constructs a new GanetiContext object.
There should be only a GanetiContext object at any time, so this
function raises an error if this is not the case.
"""
assert self.__class__._instance is None, "double GanetiContext instance"
# Create a ConfigWriter...
self.cfg = config.ConfigWriter()
# And a GanetiLockingManager...
self.GLM = locking.GanetiLockManager(
self.cfg.GetNodeList(),
self.cfg.GetInstanceList())
# setting this also locks the class against attribute modifications
self.__class__._instance = self
def __setattr__(self, name, value):
"""Setting GanetiContext attributes is forbidden after initialization.
"""
assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
object.__setattr__(self, name, value)
def CheckMaster(debug):
"""Checks the node setup.
......@@ -362,7 +405,7 @@ def main():
CheckMaster(options.debug)
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
# become a daemon
if options.fork:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment