Commit 704b51ff authored by Klaus Aehlig's avatar Klaus Aehlig

Make masterd call luxid to write jobs to queue

As the responsibility for writing the job queue changed
to luxid, make masterd call to luxid when it is necessary
to write a job.
Signed-off-by: default avatarKlaus Aehlig <>
Reviewed-by: default avatarMichele Tartara <>
parent e9cf6af0
......@@ -48,6 +48,7 @@ from ganeti import constants
from ganeti import serializer
from ganeti import workerpool
from ganeti import locking
from ganeti import luxi
from ganeti import opcodes
from ganeti import opcodes_base
from ganeti import errors
......@@ -1937,36 +1938,6 @@ class JobQueue(object):
result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
def _NewSerialsUnlocked(self, count):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
@type count: integer
@param count: how many serials to return
@rtype: list of int
@return: a list of job identifiers.
assert ht.TNonNegativeInt(count)
# New number
serial = self._last_serial + count
# Write to file
"%s\n" % serial, True)
result = [jstore.FormatJobID(v)
for v in range(self._last_serial + 1, serial + 1)]
# Keep it only if we were able to write the file
self._last_serial = serial
assert len(result) == count
return result
def _GetJobPath(job_id):
"""Returns the job file for a given job id.
......@@ -2174,96 +2145,29 @@ class JobQueue(object):
return True
def _SubmitJobUnlocked(self, job_id, ops):
"""Create and store a new job.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
@type job_id: job ID
@param job_id: the job ID for the new job
@type ops: list
@param ops: The list of OpCodes that will become the new job.
@rtype: L{_QueuedJob}
@return: the job object to be queued
@raise errors.JobQueueFull: if the job queue has too many jobs in it
@raise errors.GenericError: If an opcode is not valid
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
job = _QueuedJob(self, job_id, ops, True)
for idx, op in enumerate(job.ops):
# Check priority
if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
" are %s" % (idx, op.priority, allowed))
# Check job dependencies
dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
if not opcodes_base.TNoRelativeJobDependencies(dependencies):
raise errors.GenericError("Opcode %s has invalid dependencies, must"
" match %s: %s" %
(idx, opcodes_base.TNoRelativeJobDependencies,
# Write to disk
self._queue_size += 1
logging.debug("Adding new job %s to the cache", job_id)
self._memcache[job_id] = job
return job
def SubmitJob(self, ops):
def SubmitJob(cls, ops):
"""Create and store a new job.
@see: L{_SubmitJobUnlocked}
(job_id, ) = self._NewSerialsUnlocked(1)
self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
return job_id
return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
def SubmitJobToDrainedQueue(self, ops):
def SubmitJobToDrainedQueue(cls, ops):
"""Forcefully create and store a new job.
Do so, even if the job queue is drained.
@see: L{_SubmitJobUnlocked}
(job_id, ) = self._NewSerialsUnlocked(1)
self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
return job_id
return luxi.Client(address=pathutils.QUERY_SOCKET)\
def SubmitManyJobs(self, jobs):
def SubmitManyJobs(cls, jobs):
"""Create and store multiple jobs.
@see: L{_SubmitJobUnlocked}
all_job_ids = self._NewSerialsUnlocked(len(jobs))
(results, added_jobs) = \
self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
return results
return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
def _FormatSubmitError(msg, ops):
......@@ -2304,46 +2208,6 @@ class JobQueue(object):
return (True, result)
def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
"""Create and store multiple jobs.
@see: L{_SubmitJobUnlocked}
results = []
added_jobs = []
def resolve_fn(job_idx, reljobid):
assert reljobid < 0
return (previous_job_ids + job_ids[:job_idx])[reljobid]
for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
for op in ops:
if getattr(op, opcodes_base.DEPEND_ATTR, None):
(status, data) = \
self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
if not status:
# Abort resolving dependencies
assert ht.TNonEmptyString(data), "No error message"
# Use resolved dependencies
op.depends = data
job = self._SubmitJobUnlocked(job_id, ops)
except errors.GenericError, err:
status = False
data = self._FormatSubmitError(str(err), ops)
status = True
data = job_id
results.append((status, data))
return (results, added_jobs)
def _EnqueueJobs(self, jobs):
"""Helper function to add jobs to worker pool's queue.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment