Commit 009e73d0 authored by Iustin Pop's avatar Iustin Pop
Browse files

Optimise multi-job submit



Currently, on multi-job submits we simply iterate over the
single-job-submit function. This means we grab a new serial, write and
replicate (and wait for the remote nodes to ack) the serial file, and
only then create the job file; this is repeated N times, once for each
job.

Since job identifiers are ‘cheap’, it's simpler to simply grab at the
start a block of new IDs, write and replicate the serial count file a
single time, and then proceed with the jobs as before. This is a cheap
change that reduces I/O and reduces slightly the CPU consumption of the
master daemon: submit time seems to be cut in half for big batches of
jobs and the masterd cpu time by (I can't get consistent numbers)
between 15%-50%.

Note that this doesn't change anything for single-job submits and most
probably for < 5 job submits either.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 9d95c3af
......@@ -796,26 +796,31 @@ class JobQueue(object):
"""
return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
def _NewSerialUnlocked(self):
def _NewSerialsUnlocked(self, count):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
@type count: integer
@param count: how many serials to return
@rtype: str
@return: a string representing the job identifier.
"""
assert count > 0
# New number
serial = self._last_serial + 1
serial = self._last_serial + count
# Write to file
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
"%s\n" % serial)
result = [self._FormatJobID(v)
for v in range(self._last_serial, serial + 1)]
# Keep it only if we were able to write the file
self._last_serial = serial
return self._FormatJobID(serial)
return result
@staticmethod
def _GetJobPath(job_id):
......@@ -981,12 +986,14 @@ class JobQueue(object):
return True
@_RequireOpenQueue
def _SubmitJobUnlocked(self, ops):
def _SubmitJobUnlocked(self, job_id, ops):
"""Create and store a new job.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
@type job_id: job ID
@param jod_id: the job ID for the new job
@type ops: list
@param ops: The list of OpCodes that will become the new job.
@rtype: job ID
......@@ -1008,8 +1015,6 @@ class JobQueue(object):
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
# Get job identifier
job_id = self._NewSerialUnlocked()
job = _QueuedJob(self, job_id, ops)
# Write to disk
......@@ -1031,7 +1036,8 @@ class JobQueue(object):
@see: L{_SubmitJobUnlocked}
"""
return self._SubmitJobUnlocked(ops)
job_id = self._NewSerialsUnlocked(1)[0]
return self._SubmitJobUnlocked(job_id, ops)
@utils.LockedMethod
@_RequireOpenQueue
......@@ -1042,9 +1048,10 @@ class JobQueue(object):
"""
results = []
for ops in jobs:
all_job_ids = self._NewSerialsUnlocked(len(jobs))
for job_id, ops in zip(all_job_ids, jobs):
try:
data = self._SubmitJobUnlocked(ops)
data = self._SubmitJobUnlocked(job_id, ops)
status = True
except errors.GenericError, err:
data = str(err)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment