diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index db81b6da3ac2239f33d730bb5692ca42755b0f84..80380cf0740aa8177afeb80ed18412bc2cb5fb77 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -66,8 +66,6 @@ class IOServer(SocketServer.UnixStreamServer): cleanup at shutdown. """ - QUEUE_PROCESSOR_SIZE = 5 - def __init__(self, address, rqhandler, context): """IOServer constructor @@ -79,9 +77,7 @@ class IOServer(SocketServer.UnixStreamServer): """ SocketServer.UnixStreamServer.__init__(self, address, rqhandler) self.do_quit = False - self.queue = jqueue.QueueManager() self.context = context - self.processors = [] # We'll only start threads once we've forked. self.jobqueue = None @@ -92,21 +88,6 @@ class IOServer(SocketServer.UnixStreamServer): def setup_queue(self): self.jobqueue = jqueue.JobQueue(self.context) - def setup_processors(self): - """Spawn the processors threads. - - This initializes the queue and the thread processors. It is done - separately from the constructor because we want the clone() - syscalls to happen after the daemonize part. - - """ - for i in range(self.QUEUE_PROCESSOR_SIZE): - self.processors.append(threading.Thread(target=PoolWorker, - args=(i, self.queue.new_queue, - self.context))) - for t in self.processors: - t.start() - def process_request_thread(self, request, client_address): """Process the request. @@ -150,12 +131,6 @@ class IOServer(SocketServer.UnixStreamServer): try: self.server_close() utils.RemoveFile(constants.MASTER_SOCKET) - for i in range(self.QUEUE_PROCESSOR_SIZE): - self.queue.new_queue.put(None) - for idx, t in enumerate(self.processors): - logging.debug("waiting for processor thread %s...", idx) - t.join() - logging.debug("threads done") finally: if self.jobqueue: self.jobqueue.Shutdown() @@ -428,7 +403,6 @@ def main(): return try: - master.setup_processors() master.setup_queue() try: master.serve_forever() diff --git a/lib/jqueue.py b/lib/jqueue.py index 910abac2e3889a89871a7bf0874daefeae8aec31..6dccf185e1826047bb8680266b43cecf2599cee0 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -22,12 +22,10 @@ """Module implementing the job queue handling.""" import logging -import Queue import threading from ganeti import constants from ganeti import workerpool -from ganeti import opcodes from ganeti import errors from ganeti import mcpu @@ -35,110 +33,6 @@ from ganeti import mcpu JOBQUEUE_THREADS = 5 -class JobObject: - """In-memory job representation. - - This is what we use to track the user-submitted jobs (which are of - class opcodes.Job). - - """ - def __init__(self, jid, jdesc): - self.data = jdesc - jdesc.status = opcodes.Job.STATUS_PENDING - jdesc.job_id = jid - jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list] - jdesc.op_result = [None for i in jdesc.op_list] - self.lock = threading.Lock() - - def SetStatus(self, status, result=None): - self.lock.acquire() - self.data.status = status - if result is not None: - self.data.op_result = result - self.lock.release() - - def GetData(self): - self.lock.acquire() - #FIXME(iustin): make a deep copy of result - result = self.data - self.lock.release() - return result - - -class QueueManager: - """Example queue implementation. - - """ - def __init__(self): - self.job_queue = {} - self.jid = 1 - self.lock = threading.Lock() - self.new_queue = Queue.Queue() - - def put(self, item): - """Add a new job to the queue. - - This enters the job into our job queue and also puts it on the new - queue, in order for it to be picked up by the queue processors. - - """ - self.lock.acquire() - try: - rid = self.jid - self.jid += 1 - job = JobObject(rid, item) - self.job_queue[rid] = job - finally: - self.lock.release() - self.new_queue.put(job) - return rid - - def query(self, rid): - """Query a given job ID. - - """ - self.lock.acquire() - result = self.job_queue.get(rid, None) - self.lock.release() - return result - - def query_jobs(self, fields, names): - """Query all jobs. - - The fields and names parameters are similar to the ones passed to - the OpQueryInstances. - - """ - result = [] - self.lock.acquire() - if names: - values = [self.job_queue[j_id] for j_id in names] - else: - values = self.job_queue.itervalues() - try: - for jobj in values: - row = [] - jdata = jobj.data - for fname in fields: - if fname == "id": - row.append(jdata.job_id) - elif fname == "status": - row.append(jdata.status) - elif fname == "op_list": - row.append([op.__getstate__() for op in jdata.op_list]) - elif fname == "op_status": - row.append(jdata.op_status) - elif fname == "op_result": - row.append(jdata.op_result) - else: - raise errors.OpExecError("Invalid job query field '%s'" % - fname) - result.append(row) - finally: - self.lock.release() - return result - - class _QueuedOpCode(object): """Encasulates an opcode object. diff --git a/lib/luxi.py b/lib/luxi.py index db40fd257484f521b7aec5823bec4694ddf7e032..e109138d7aea3df17370672c43dceca5a4f09660 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -96,24 +96,6 @@ class NoMasterError(ProtocolError): """ -def SerializeJob(job): - """Convert a job description to a string format. - - """ - return simplejson.dumps(job.__getstate__()) - - -def UnserializeJob(data): - """Load a job from a string format""" - try: - new_data = simplejson.loads(data) - except Exception, err: - raise DecodingError("Error while unserializing: %s" % str(err)) - job = opcodes.Job() - job.__setstate__(new_data) - return job - - class Transport: """Low-level transport class. diff --git a/lib/opcodes.py b/lib/opcodes.py index 906573880dc9290cd59dc61588544a1ae1434f93..0c8ad185fca0a75fb5950cf196e0749610c3343e 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -73,51 +73,6 @@ class BaseJO(object): setattr(self, name, state[name]) -class Job(BaseJO): - """Job definition structure - - The Job definitions has two sets of parameters: - - the parameters of the job itself (all filled by server): - - job_id, - - status: pending, running, successfull, failed, aborted - - opcode parameters: - - op_list, list of opcodes, clients creates this - - op_status, status for each opcode, server fills in - - op_result, result for each opcode, server fills in - - """ - STATUS_PENDING = 1 - STATUS_RUNNING = 2 - STATUS_SUCCESS = 3 - STATUS_FAIL = 4 - STATUS_ABORT = 5 - - __slots__ = [ - "job_id", - "status", - "op_list", - "op_status", - "op_result", - ] - - def __getstate__(self): - """Specialized getstate for jobs - - """ - data = BaseJO.__getstate__(self) - if "op_list" in data: - data["op_list"] = [op.__getstate__() for op in data["op_list"]] - return data - - def __setstate__(self, state): - """Specialized setstate for jobs - - """ - BaseJO.__setstate__(self, state) - if "op_list" in state: - self.op_list = [OpCode.LoadOpCode(op) for op in state["op_list"]] - - class OpCode(BaseJO): """Abstract OpCode""" OP_ID = "OP_ABSTRACT"