diff --git a/lib/jqueue.py b/lib/jqueue.py index e06c5f866550660baa71c96d6912234a57c1950c..9219caedecee7ca141ecdf5b78140ad394aebf9a 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -796,26 +796,31 @@ class JobQueue(object): """ return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) - def _NewSerialUnlocked(self): + def _NewSerialsUnlocked(self, count): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. + @type count: integer + @param count: how many serials to return @rtype: str @return: a string representing the job identifier. """ + assert count > 0 # New number - serial = self._last_serial + 1 + serial = self._last_serial + count # Write to file self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, "%s\n" % serial) + result = [self._FormatJobID(v) + for v in range(self._last_serial, serial + 1)] # Keep it only if we were able to write the file self._last_serial = serial - return self._FormatJobID(serial) + return result @staticmethod def _GetJobPath(job_id): @@ -981,12 +986,14 @@ class JobQueue(object): return True @_RequireOpenQueue - def _SubmitJobUnlocked(self, ops): + def _SubmitJobUnlocked(self, job_id, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new queue, in order for it to be picked up by the queue processors. + @type job_id: job ID + @param jod_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -1008,8 +1015,6 @@ class JobQueue(object): if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() - # Get job identifier - job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) # Write to disk @@ -1031,7 +1036,8 @@ class JobQueue(object): @see: L{_SubmitJobUnlocked} """ - return self._SubmitJobUnlocked(ops) + job_id = self._NewSerialsUnlocked(1)[0] + return self._SubmitJobUnlocked(job_id, ops) @utils.LockedMethod @_RequireOpenQueue @@ -1042,9 +1048,10 @@ class JobQueue(object): """ results = [] - for ops in jobs: + all_job_ids = self._NewSerialsUnlocked(len(jobs)) + for job_id, ops in zip(all_job_ids, jobs): try: - data = self._SubmitJobUnlocked(ops) + data = self._SubmitJobUnlocked(job_id, ops) status = True except errors.GenericError, err: data = str(err)