Commit 2467e0d3 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Remove old job queue code

Reviewed-by: iustinp
parent 0bbe448c
......@@ -66,8 +66,6 @@ class IOServer(SocketServer.UnixStreamServer):
cleanup at shutdown.
"""
QUEUE_PROCESSOR_SIZE = 5
def __init__(self, address, rqhandler, context):
"""IOServer constructor
......@@ -79,9 +77,7 @@ class IOServer(SocketServer.UnixStreamServer):
"""
SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
self.do_quit = False
self.queue = jqueue.QueueManager()
self.context = context
self.processors = []
# We'll only start threads once we've forked.
self.jobqueue = None
......@@ -92,21 +88,6 @@ class IOServer(SocketServer.UnixStreamServer):
def setup_queue(self):
self.jobqueue = jqueue.JobQueue(self.context)
def setup_processors(self):
"""Spawn the processors threads.
This initializes the queue and the thread processors. It is done
separately from the constructor because we want the clone()
syscalls to happen after the daemonize part.
"""
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.processors.append(threading.Thread(target=PoolWorker,
args=(i, self.queue.new_queue,
self.context)))
for t in self.processors:
t.start()
def process_request_thread(self, request, client_address):
"""Process the request.
......@@ -150,12 +131,6 @@ class IOServer(SocketServer.UnixStreamServer):
try:
self.server_close()
utils.RemoveFile(constants.MASTER_SOCKET)
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.queue.new_queue.put(None)
for idx, t in enumerate(self.processors):
logging.debug("waiting for processor thread %s...", idx)
t.join()
logging.debug("threads done")
finally:
if self.jobqueue:
self.jobqueue.Shutdown()
......@@ -428,7 +403,6 @@ def main():
return
try:
master.setup_processors()
master.setup_queue()
try:
master.serve_forever()
......
......@@ -22,12 +22,10 @@
"""Module implementing the job queue handling."""
import logging
import Queue
import threading
from ganeti import constants
from ganeti import workerpool
from ganeti import opcodes
from ganeti import errors
from ganeti import mcpu
......@@ -35,110 +33,6 @@ from ganeti import mcpu
JOBQUEUE_THREADS = 5
class JobObject:
"""In-memory job representation.
This is what we use to track the user-submitted jobs (which are of
class opcodes.Job).
"""
def __init__(self, jid, jdesc):
self.data = jdesc
jdesc.status = opcodes.Job.STATUS_PENDING
jdesc.job_id = jid
jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list]
jdesc.op_result = [None for i in jdesc.op_list]
self.lock = threading.Lock()
def SetStatus(self, status, result=None):
self.lock.acquire()
self.data.status = status
if result is not None:
self.data.op_result = result
self.lock.release()
def GetData(self):
self.lock.acquire()
#FIXME(iustin): make a deep copy of result
result = self.data
self.lock.release()
return result
class QueueManager:
"""Example queue implementation.
"""
def __init__(self):
self.job_queue = {}
self.jid = 1
self.lock = threading.Lock()
self.new_queue = Queue.Queue()
def put(self, item):
"""Add a new job to the queue.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
"""
self.lock.acquire()
try:
rid = self.jid
self.jid += 1
job = JobObject(rid, item)
self.job_queue[rid] = job
finally:
self.lock.release()
self.new_queue.put(job)
return rid
def query(self, rid):
"""Query a given job ID.
"""
self.lock.acquire()
result = self.job_queue.get(rid, None)
self.lock.release()
return result
def query_jobs(self, fields, names):
"""Query all jobs.
The fields and names parameters are similar to the ones passed to
the OpQueryInstances.
"""
result = []
self.lock.acquire()
if names:
values = [self.job_queue[j_id] for j_id in names]
else:
values = self.job_queue.itervalues()
try:
for jobj in values:
row = []
jdata = jobj.data
for fname in fields:
if fname == "id":
row.append(jdata.job_id)
elif fname == "status":
row.append(jdata.status)
elif fname == "op_list":
row.append([op.__getstate__() for op in jdata.op_list])
elif fname == "op_status":
row.append(jdata.op_status)
elif fname == "op_result":
row.append(jdata.op_result)
else:
raise errors.OpExecError("Invalid job query field '%s'" %
fname)
result.append(row)
finally:
self.lock.release()
return result
class _QueuedOpCode(object):
"""Encasulates an opcode object.
......
......@@ -96,24 +96,6 @@ class NoMasterError(ProtocolError):
"""
def SerializeJob(job):
"""Convert a job description to a string format.
"""
return simplejson.dumps(job.__getstate__())
def UnserializeJob(data):
"""Load a job from a string format"""
try:
new_data = simplejson.loads(data)
except Exception, err:
raise DecodingError("Error while unserializing: %s" % str(err))
job = opcodes.Job()
job.__setstate__(new_data)
return job
class Transport:
"""Low-level transport class.
......
......@@ -73,51 +73,6 @@ class BaseJO(object):
setattr(self, name, state[name])
class Job(BaseJO):
"""Job definition structure
The Job definitions has two sets of parameters:
- the parameters of the job itself (all filled by server):
- job_id,
- status: pending, running, successfull, failed, aborted
- opcode parameters:
- op_list, list of opcodes, clients creates this
- op_status, status for each opcode, server fills in
- op_result, result for each opcode, server fills in
"""
STATUS_PENDING = 1
STATUS_RUNNING = 2
STATUS_SUCCESS = 3
STATUS_FAIL = 4
STATUS_ABORT = 5
__slots__ = [
"job_id",
"status",
"op_list",
"op_status",
"op_result",
]
def __getstate__(self):
"""Specialized getstate for jobs
"""
data = BaseJO.__getstate__(self)
if "op_list" in data:
data["op_list"] = [op.__getstate__() for op in data["op_list"]]
return data
def __setstate__(self, state):
"""Specialized setstate for jobs
"""
BaseJO.__setstate__(self, state)
if "op_list" in state:
self.op_list = [OpCode.LoadOpCode(op) for op in state["op_list"]]
class OpCode(BaseJO):
"""Abstract OpCode"""
OP_ID = "OP_ABSTRACT"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment