diff --git a/lib/jqueue.py b/lib/jqueue.py index 62a90662b70a18013584e14a296f5631f6e7f3bb..25eb3d9fbf230d719d39f3f9d450926a6fdec4af 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -703,24 +703,23 @@ class JobQueue(object): self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) - def _RenameFileUnlocked(self, old, new): + def _RenameFilesUnlocked(self, rename): """Renames a file locally and then replicate the change. This function will rename a file in the local queue directory and then replicate this rename to all the other nodes we have. - @type old: str - @param old: the current name of the file - @type new: str - @param new: the new name of the file + @type rename: list of (old, new) + @param rename: List containing tuples mapping old to new names """ - utils.RenameFile(old, new, mkdir=True) + for old, new in rename: + utils.RenameFile(old, new, mkdir=True) - names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new) - self._CheckRpcResult(result, self._nodes, - "Moving %s to %s" % (old, new)) + names, addrs = self._GetNodeIp() + result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new) + self._CheckRpcResult(result, self._nodes, + "Moving %s to %s" % (old, new)) def _FormatJobID(self, job_id): """Convert a job ID to string format. @@ -886,7 +885,7 @@ class JobQueue(object): else: # non-archived case logging.exception("Can't parse job %s, will archive.", job_id) - self._RenameFileUnlocked(filepath, new_path) + self._RenameFilesUnlocked([(filepath, new_path)]) return None self._memcache[job_id] = job @@ -1123,29 +1122,37 @@ class JobQueue(object): self.UpdateJobUnlocked(job) @_RequireOpenQueue - def _ArchiveJobUnlocked(self, job): - """Archives a job. + def _ArchiveJobsUnlocked(self, jobs): + """Archives jobs. - @type job: L{_QueuedJob} - @param job: Job object - @rtype bool - @return Whether job was archived + @type jobs: list of L{_QueuedJob} + @param job: Job objects + @rtype: int + @return: Number of archived jobs """ - if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, - constants.JOB_STATUS_SUCCESS, - constants.JOB_STATUS_ERROR): - logging.debug("Job %s is not yet done", job.id) - return False + archive_jobs = [] + rename_files = [] + for job in jobs: + if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, + constants.JOB_STATUS_SUCCESS, + constants.JOB_STATUS_ERROR): + logging.debug("Job %s is not yet done", job.id) + continue - old = self._GetJobPath(job.id) - new = self._GetArchivedJobPath(job.id) + archive_jobs.append(job) - self._RenameFileUnlocked(old, new) + old = self._GetJobPath(job.id) + new = self._GetArchivedJobPath(job.id) + rename_files.append((old, new)) - logging.debug("Successfully archived job %s", job.id) + # TODO: What if 1..n files fail to rename? + self._RenameFilesUnlocked(rename_files) - return True + logging.debug("Successfully archived job(s) %s", + ", ".join(job.id for job in archive_jobs)) + + return len(archive_jobs) @utils.LockedMethod @_RequireOpenQueue @@ -1167,7 +1174,7 @@ class JobQueue(object): logging.debug("Job %s not found", job_id) return False - return self._ArchiveJobUnlocked(job) + return self._ArchiveJobUnlocked([job]) == 1 @utils.LockedMethod @_RequireOpenQueue @@ -1191,9 +1198,13 @@ class JobQueue(object): last_touched = 0 all_job_ids = self._GetJobIDsUnlocked(archived=False) + pending = [] for idx, job_id in enumerate(all_job_ids): last_touched = idx + # Not optimal because jobs could be pending + # TODO: Measure average duration for job archival and take number of + # pending jobs into account. if time.time() > end_time: break @@ -1209,11 +1220,15 @@ class JobQueue(object): job_age = job.end_timestamp if age == -1 or now - job_age[0] > age: - archived = self._ArchiveJobUnlocked(job) - if archived: - archived_count += 1 - continue + pending.append(job) + + # Archive 10 jobs at a time + if len(pending) >= 10: + archived_count += self._ArchiveJobsUnlocked(pending) + pending = [] + if pending: + archived_count += self._ArchiveJobsUnlocked(pending) return (archived_count, len(all_job_ids) - last_touched - 1)