diff --git a/lib/jqueue.py b/lib/jqueue.py index 001bc43e2bd86efd4ee9edd547556a50f7f6c363..f7399c866650964e4133bcad8b2972d3543f9d01 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1090,9 +1090,10 @@ class JobQueue(object): @param job_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. - @rtype: job ID - @return: the job ID of the newly created job - @raise errors.JobQueueDrainError: if the job is marked for draining + @rtype: L{_QueuedJob} + @return: the job object to be queued + @raise errors.JobQueueDrainError: if the job queue is marked for draining + @raise errors.JobQueueFull: if the job queue has too many jobs in it """ # Ok when sharing the big job queue lock, as the drain file is created when @@ -1113,10 +1114,7 @@ class JobQueue(object): logging.debug("Adding new job %s to the cache", job_id) self._memcache[job_id] = job - # Add to worker pool - self._wpool.AddTask(job) - - return job.id + return job @utils.LockedMethod @_RequireOpenQueue @@ -1127,7 +1125,8 @@ class JobQueue(object): """ job_id = self._NewSerialsUnlocked(1)[0] - return self._SubmitJobUnlocked(job_id, ops) + self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops)) + return job_id @utils.LockedMethod @_RequireOpenQueue @@ -1138,15 +1137,18 @@ class JobQueue(object): """ results = [] + tasks = [] all_job_ids = self._NewSerialsUnlocked(len(jobs)) for job_id, ops in zip(all_job_ids, jobs): try: - data = self._SubmitJobUnlocked(job_id, ops) + tasks.append((self._SubmitJobUnlocked(job_id, ops), )) status = True + data = job_id except errors.GenericError, err: data = str(err) status = False results.append((status, data)) + self._wpool.AddManyTasks(tasks) return results