Commit 85f03e0d authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Rewrite job queue

We found several issues in the old job queue implementation. It had race
conditions, deadlocks and other deficiencies.

Short summary:
- _QueuedOpCode and _QueuedJob are now more or less data structures with a few
  utility functions. __Setup is gone.
- DiskJobStorage and JobQueue classes merged into one to reduce code complexity.
- One lock in JobQueue for almost everything. There's also a lock per opcode
  for log messages.

Reviewed-by: iustinp
parent c0a8eb9e
......@@ -50,81 +50,66 @@ class _QueuedOpCode(object):
of the form (timestamp, level, message).
"""
def __init__(self, op):
self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
def __new__(cls, *args, **kwargs):
obj = object.__new__(cls, *args, **kwargs)
# Create a special lock for logging
obj._log_lock = threading.Lock()
return obj
def __Setup(self, input_, status, result, log):
self._lock = threading.Lock()
self.input = input_
self.status = status
self.result = result
self.log = log
def __init__(self, op):
self.input = op
self.status = constants.OP_STATUS_QUEUED
self.result = None
self.log = []
@classmethod
def Restore(cls, state):
obj = object.__new__(cls)
obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
state["status"], state["result"], state["log"])
obj = _QueuedOpCode.__new__(cls)
obj.input = opcodes.OpCode.LoadOpCode(state["input"])
obj.status = state["status"]
obj.result = state["result"]
obj.log = state["log"]
return obj
@utils.LockedMethod
def Serialize(self):
self._log_lock.acquire()
try:
return {
"input": self.input.__getstate__(),
"status": self.status,
"result": self.result,
"log": self.log,
}
finally:
self._log_lock.release()
@utils.LockedMethod
def GetInput(self):
"""Returns the original opcode.
"""
return self.input
@utils.LockedMethod
def SetStatus(self, status, result):
"""Update the opcode status and result.
"""
self.status = status
self.result = result
@utils.LockedMethod
def GetStatus(self):
"""Get the opcode status.
"""
return self.status
@utils.LockedMethod
def GetResult(self):
"""Get the opcode result.
"""
return self.result
@utils.LockedMethod
def Log(self, *args):
"""Append a log entry.
"""
assert len(args) < 2
assert len(args) < 3
if len(args) == 1:
log_type = constants.ELOG_MESSAGE
log_msg = args[0]
else:
log_type, log_msg = args
self._log_lock.acquire()
try:
self.log.append((time.time(), log_type, log_msg))
finally:
self._log_lock.release()
@utils.LockedMethod
def RetrieveLog(self, start_at=0):
"""Retrieve (a part of) the execution log.
"""
self._log_lock.acquire()
try:
return self.log[start_at:]
finally:
self._log_lock.release()
class _QueuedJob(object):
......@@ -133,67 +118,51 @@ class _QueuedJob(object):
This is what we use to track the user-submitted jobs.
"""
def __init__(self, storage, job_id, ops):
def __init__(self, queue, job_id, ops):
if not ops:
# TODO
raise Exception("No opcodes")
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
def __Setup(self, storage, job_id, ops, run_op_index):
self._lock = threading.Lock()
self.storage = storage
self.queue = queue
self.id = job_id
self._ops = ops
self.run_op_index = run_op_index
self.ops = [_QueuedOpCode(op) for op in ops]
self.run_op_index = -1
@classmethod
def Restore(cls, storage, state):
obj = object.__new__(cls)
op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
def Restore(cls, queue, state):
obj = _QueuedJob.__new__(cls)
obj.queue = queue
obj.id = state["id"]
obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
obj.run_op_index = state["run_op_index"]
return obj
def Serialize(self):
return {
"id": self.id,
"ops": [op.Serialize() for op in self._ops],
"ops": [op.Serialize() for op in self.ops],
"run_op_index": self.run_op_index,
}
def _SetStatus(self, status, msg):
try:
for op in self._ops:
op.SetStatus(status, msg)
finally:
self.storage.UpdateJob(self)
def SetUnclean(self, msg):
return self._SetStatus(constants.OP_STATUS_ERROR, msg)
def SetCanceled(self, msg):
return self._SetStatus(constants.JOB_STATUS_CANCELED, msg)
def GetStatus(self):
def CalcStatus(self):
status = constants.JOB_STATUS_QUEUED
all_success = True
for op in self._ops:
op_status = op.GetStatus()
if op_status == constants.OP_STATUS_SUCCESS:
for op in self.ops:
if op.status == constants.OP_STATUS_SUCCESS:
continue
all_success = False
if op_status == constants.OP_STATUS_QUEUED:
if op.status == constants.OP_STATUS_QUEUED:
pass
elif op_status == constants.OP_STATUS_RUNNING:
elif op.status == constants.OP_STATUS_RUNNING:
status = constants.JOB_STATUS_RUNNING
elif op_status == constants.OP_STATUS_ERROR:
elif op.status == constants.OP_STATUS_ERROR:
status = constants.JOB_STATUS_ERROR
# The whole job fails if one opcode failed
break
elif op_status == constants.OP_STATUS_CANCELED:
elif op.status == constants.OP_STATUS_CANCELED:
status = constants.OP_STATUS_CANCELED
break
......@@ -202,66 +171,74 @@ class _QueuedJob(object):
return status
@utils.LockedMethod
def GetRunOpIndex(self):
return self.run_op_index
def Run(self, proc):
class _JobQueueWorker(workerpool.BaseWorker):
def RunTask(self, job):
"""Job executor.
This functions processes a this job in the context of given processor
instance.
Args:
- proc: Ganeti Processor to run the job with
This functions processes a job.
"""
logging.debug("Worker %s processing job %s",
self.worker_id, job.id)
proc = mcpu.Processor(self.pool.context)
queue = job.queue
try:
try:
count = len(self._ops)
for idx, op in enumerate(self._ops):
count = len(job.ops)
for idx, op in enumerate(job.ops):
try:
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
self._lock.acquire()
queue.acquire()
try:
self.run_op_index = idx
job.run_op_index = idx
op.status = constants.OP_STATUS_RUNNING
op.result = None
queue.UpdateJobUnlocked(job)
input = op.input
finally:
self._lock.release()
queue.release()
op.SetStatus(constants.OP_STATUS_RUNNING, None)
self.storage.UpdateJob(self)
result = proc.ExecOpCode(input, op.Log)
result = proc.ExecOpCode(op.input, op.Log)
queue.acquire()
try:
op.status = constants.OP_STATUS_SUCCESS
op.result = result
queue.UpdateJobUnlocked(job)
finally:
queue.release()
op.SetStatus(constants.OP_STATUS_SUCCESS, result)
self.storage.UpdateJob(self)
logging.debug("Op %s/%s: Successfully finished %s",
idx + 1, count, op)
except Exception, err:
queue.acquire()
try:
op.SetStatus(constants.OP_STATUS_ERROR, str(err))
try:
op.status = constants.OP_STATUS_ERROR
op.result = str(err)
logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
finally:
self.storage.UpdateJob(self)
queue.UpdateJobUnlocked(job)
finally:
queue.release()
raise
except errors.GenericError, err:
logging.exception("Ganeti exception")
except:
logging.exception("Unhandled exception")
class _JobQueueWorker(workerpool.BaseWorker):
def RunTask(self, job):
logging.debug("Worker %s processing job %s",
self.worker_id, job.id)
# TODO: feedback function
proc = mcpu.Processor(self.pool.context)
finally:
queue.acquire()
try:
job.Run(proc)
job_id = job.id
status = job.CalcStatus()
finally:
queue.release()
logging.debug("Worker %s finished job %s, status = %s",
self.worker_id, job.id, job.GetStatus())
self.worker_id, job_id, status)
class _JobQueueWorkerPool(workerpool.WorkerPool):
......@@ -271,53 +248,18 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
self.context = context
class JobStorageBase(object):
def __init__(self, id_prefix):
self.id_prefix = id_prefix
if id_prefix:
prefix_pattern = re.escape("%s-" % id_prefix)
else:
prefix_pattern = ""
# Apart from the prefix, all job IDs are numeric
self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
def OwnsJobId(self, job_id):
return self._re_job_id.match(job_id)
def FormatJobID(self, job_id):
if not isinstance(job_id, (int, long)):
raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
if job_id < 0:
raise errors.ProgrammerError("Job ID %s is negative" % job_id)
if self.id_prefix:
prefix = "%s-" % self.id_prefix
else:
prefix = ""
return "%s%010d" % (prefix, job_id)
def _ShouldJobBeArchivedUnlocked(self, job):
if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
constants.JOB_STATUS_SUCCESS,
constants.JOB_STATUS_ERROR):
logging.debug("Job %s is not yet done", job.id)
return False
return True
class DiskJobStorage(JobStorageBase):
class JobQueue(object):
_RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
def __init__(self, id_prefix):
JobStorageBase.__init__(self, id_prefix)
self._lock = threading.Lock()
def __init__(self, context):
self._memcache = {}
self._my_hostname = utils.HostInfo().name
# Locking
self._lock = threading.Lock()
self.acquire = self._lock.acquire
self.release = self._lock.release
# Make sure our directories exists
for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
try:
......@@ -363,6 +305,30 @@ class DiskJobStorage(JobStorageBase):
raise errors.ConfigurationError("Can't read/parse the job queue serial"
" file")
# Setup worker pool
self._wpool = _JobQueueWorkerPool(context)
# We need to lock here because WorkerPool.AddTask() may start a job while
# we're still doing our work.
self.acquire()
try:
for job in self._GetJobsUnlocked(None):
status = job.CalcStatus()
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask(job)
elif status in (constants.JOB_STATUS_RUNNING, ):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
for op in job.ops:
op.status = constants.OP_STATUS_ERROR
op.result = "Unclean master daemon shutdown"
finally:
self.UpdateJobUnlocked(job)
finally:
self.release()
@staticmethod
def _ReadSerial():
"""Try to read the job serial file.
......@@ -384,12 +350,6 @@ class DiskJobStorage(JobStorageBase):
return serial
def Close(self):
assert self.lock_fd, "Queue should be open"
self.lock_fd.close()
self.lock_fd = None
def _InitQueueUnlocked(self):
assert self.lock_fd, "Queue should be open"
......@@ -399,6 +359,14 @@ class DiskJobStorage(JobStorageBase):
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % 0)
def _FormatJobID(self, job_id):
if not isinstance(job_id, (int, long)):
raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
if job_id < 0:
raise errors.ProgrammerError("Job ID %s is negative" % job_id)
return str(job_id)
def _NewSerialUnlocked(self, nodes):
"""Generates a new job identifier.
......@@ -430,16 +398,19 @@ class DiskJobStorage(JobStorageBase):
if not result[node]:
logging.error("copy of job queue file to node %s failed", node)
return self.FormatJobID(serial)
return self._FormatJobID(serial)
def _GetJobPath(self, job_id):
@staticmethod
def _GetJobPath(job_id):
return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
def _GetArchivedJobPath(self, job_id):
@staticmethod
def _GetArchivedJobPath(job_id):
return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
def _ExtractJobID(self, name):
m = self._RE_JOB_FILE.match(name)
@classmethod
def _ExtractJobID(cls, name):
m = cls._RE_JOB_FILE.match(name)
if m:
return m.group(1)
else:
......@@ -498,12 +469,11 @@ class DiskJobStorage(JobStorageBase):
return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
@utils.LockedMethod
def GetJobs(self, job_ids):
return self._GetJobsUnlocked(job_ids)
def SubmitJob(self, ops, nodes):
"""Create and store a new job.
@utils.LockedMethod
def AddJob(self, ops, nodes):
"""Create and store on disk a new job.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
@type ops: list
@param ops: The list of OpCodes that will become the new job.
......@@ -519,14 +489,17 @@ class DiskJobStorage(JobStorageBase):
job = _QueuedJob(self, job_id, ops)
# Write to disk
self._UpdateJobUnlocked(job)
self.UpdateJobUnlocked(job)
logging.debug("Added new job %s to the cache", job_id)
self._memcache[job_id] = job
return job
# Add to worker pool
self._wpool.AddTask(job)
def _UpdateJobUnlocked(self, job):
return job.id
def UpdateJobUnlocked(self, job):
assert self.lock_fd, "Queue should be open"
filename = self._GetJobPath(job.id)
......@@ -543,10 +516,11 @@ class DiskJobStorage(JobStorageBase):
"""
assert isinstance(exclude, list)
for job in self._memcache.values():
if job.id in exclude:
continue
if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING):
logging.debug("Cleaning job %s from the cache", job.id)
try:
......@@ -555,11 +529,6 @@ class DiskJobStorage(JobStorageBase):
pass
@utils.LockedMethod
def UpdateJob(self, job):
return self._UpdateJobUnlocked(job)
# TODO: Figure out locking
#@utils.LockedMethod
def CancelJob(self, job_id):
"""Cancels a job.
......@@ -569,20 +538,21 @@ class DiskJobStorage(JobStorageBase):
"""
logging.debug("Cancelling job %s", job_id)
self._lock.acquire()
try:
job = self._LoadJobUnlocked(job_id)
finally:
self._lock.release()
if not job:
logging.debug("Job %s not found", job_id)
return
if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,):
if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
logging.debug("Job %s is no longer in the queue", job.id)
return
job.SetCanceled("Job cancelled by request")
try:
for op in job.ops:
op.status = constants.OP_STATUS_ERROR
op.result = "Job cancelled by request"
finally:
self.UpdateJobUnlocked(job)
@utils.LockedMethod
def ArchiveJob(self, job_id):
......@@ -599,7 +569,10 @@ class DiskJobStorage(JobStorageBase):
logging.debug("Job %s not found", job_id)
return
if not self._ShouldJobBeArchivedUnlocked(job):
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
constants.JOB_STATUS_SUCCESS,
constants.JOB_STATUS_ERROR):
logging.debug("Job %s is not yet done", job.id)
return
try:
......@@ -614,72 +587,25 @@ class DiskJobStorage(JobStorageBase):
# and to be on the safe side.
self._CleanCacheUnlocked([])
class JobQueue:
"""The job queue.
"""
def __init__(self, context):
self._lock = threading.Lock()
self._jobs = DiskJobStorage("")
self._wpool = _JobQueueWorkerPool(context)
for job in self._jobs.GetJobs(None):
status = job.GetStatus()
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask(job)
elif status in (constants.JOB_STATUS_RUNNING, ):
logging.warning("Unfinished job %s found: %s", job.id, job)
job.SetUnclean("Unclean master daemon shutdown")
@utils.LockedMethod
def SubmitJob(self, ops, nodes):
"""Add a new job to the queue.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
@type ops: list
@param ops: the sequence of opcodes that will become the new job
@type nodes: list
@param nodes: the list of nodes to which the queue should be
distributed
"""
job = self._jobs.AddJob(ops, nodes)
# Add to worker pool
self._wpool.AddTask(job)
return job.id
def ArchiveJob(self, job_id):
self._jobs.ArchiveJob(job_id)
@utils.LockedMethod
def CancelJob(self, job_id):
self._jobs.CancelJob(job_id)
def _GetJobInfo(self, job, fields):
def _GetJobInfoUnlocked(self, job, fields):
row = []
for fname in fields:
if fname == "id":
row.append(job.id)
elif fname == "status":
row.append(job.GetStatus())
row.append(job.CalcStatus())
elif fname == "ops":
row.append([op.GetInput().__getstate__() for op in job._ops])
row.append([op.input.__getstate__() for op in job.ops])
elif fname == "opresult":
row.append([op.GetResult() for op in job._ops])
row.append([op.result for op in job.ops])
elif fname == "opstatus":
row.append([op.GetStatus() for op in job._ops])
row.append([op.status for op in job.ops])
elif fname == "ticker":
ji = job.GetRunOpIndex()
ji = job.run_op_index
if ji < 0:
lmsg = None
else:
lmsg = job._ops[ji].RetrieveLog(-1)
lmsg = job.ops[ji].RetrieveLog(-1)
# message might be empty here
if lmsg:
lmsg = lmsg[0]
......@@ -690,6 +616,7 @@ class JobQueue:
raise errors.OpExecError("Invalid job query field '%s'" % fname)
return row
@utils.LockedMethod
def QueryJobs(self, job_ids, fields):