Skip to content
Snippets Groups Projects
Commit d7fd1f28 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

ganeti.jqueue: Group job archivals to reduce number of RPC calls

Reducing the actual number of RPC calls will come in another patch.

Reviewed-by: ultrotter
parent f8ad5591
No related branches found
No related tags found
No related merge requests found
......@@ -703,24 +703,23 @@ class JobQueue(object):
self._CheckRpcResult(result, self._nodes,
"Updating %s" % file_name)
def _RenameFileUnlocked(self, old, new):
def _RenameFilesUnlocked(self, rename):
"""Renames a file locally and then replicate the change.
This function will rename a file in the local queue directory
and then replicate this rename to all the other nodes we have.
@type old: str
@param old: the current name of the file
@type new: str
@param new: the new name of the file
@type rename: list of (old, new)
@param rename: List containing tuples mapping old to new names
"""
utils.RenameFile(old, new, mkdir=True)
for old, new in rename:
utils.RenameFile(old, new, mkdir=True)
names, addrs = self._GetNodeIp()
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
self._CheckRpcResult(result, self._nodes,
"Moving %s to %s" % (old, new))
names, addrs = self._GetNodeIp()
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
self._CheckRpcResult(result, self._nodes,
"Moving %s to %s" % (old, new))
def _FormatJobID(self, job_id):
"""Convert a job ID to string format.
......@@ -886,7 +885,7 @@ class JobQueue(object):
else:
# non-archived case
logging.exception("Can't parse job %s, will archive.", job_id)
self._RenameFileUnlocked(filepath, new_path)
self._RenameFilesUnlocked([(filepath, new_path)])
return None
self._memcache[job_id] = job
......@@ -1123,29 +1122,37 @@ class JobQueue(object):
self.UpdateJobUnlocked(job)
@_RequireOpenQueue
def _ArchiveJobUnlocked(self, job):
"""Archives a job.
def _ArchiveJobsUnlocked(self, jobs):
"""Archives jobs.
@type job: L{_QueuedJob}
@param job: Job object
@rtype bool
@return Whether job was archived
@type jobs: list of L{_QueuedJob}
@param job: Job objects
@rtype: int
@return: Number of archived jobs
"""
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
constants.JOB_STATUS_SUCCESS,
constants.JOB_STATUS_ERROR):
logging.debug("Job %s is not yet done", job.id)
return False
archive_jobs = []
rename_files = []
for job in jobs:
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
constants.JOB_STATUS_SUCCESS,
constants.JOB_STATUS_ERROR):
logging.debug("Job %s is not yet done", job.id)
continue
old = self._GetJobPath(job.id)
new = self._GetArchivedJobPath(job.id)
archive_jobs.append(job)
self._RenameFileUnlocked(old, new)
old = self._GetJobPath(job.id)
new = self._GetArchivedJobPath(job.id)
rename_files.append((old, new))
logging.debug("Successfully archived job %s", job.id)
# TODO: What if 1..n files fail to rename?
self._RenameFilesUnlocked(rename_files)
return True
logging.debug("Successfully archived job(s) %s",
", ".join(job.id for job in archive_jobs))
return len(archive_jobs)
@utils.LockedMethod
@_RequireOpenQueue
......@@ -1167,7 +1174,7 @@ class JobQueue(object):
logging.debug("Job %s not found", job_id)
return False
return self._ArchiveJobUnlocked(job)
return self._ArchiveJobUnlocked([job]) == 1
@utils.LockedMethod
@_RequireOpenQueue
......@@ -1191,9 +1198,13 @@ class JobQueue(object):
last_touched = 0
all_job_ids = self._GetJobIDsUnlocked(archived=False)
pending = []
for idx, job_id in enumerate(all_job_ids):
last_touched = idx
# Not optimal because jobs could be pending
# TODO: Measure average duration for job archival and take number of
# pending jobs into account.
if time.time() > end_time:
break
......@@ -1209,11 +1220,15 @@ class JobQueue(object):
job_age = job.end_timestamp
if age == -1 or now - job_age[0] > age:
archived = self._ArchiveJobUnlocked(job)
if archived:
archived_count += 1
continue
pending.append(job)
# Archive 10 jobs at a time
if len(pending) >= 10:
archived_count += self._ArchiveJobsUnlocked(pending)
pending = []
if pending:
archived_count += self._ArchiveJobsUnlocked(pending)
return (archived_count, len(all_job_ids) - last_touched - 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment