Commit f8ad5591 authored by Michael Hanselmann's avatar Michael Hanselmann

Prevent RPC timeout on auto-archiving jobs

With a large job queue, auto-archiving jobs can take a very long time,
causing timeouts on the luxi RPC layer. With this change, auto-
archive returns after half of the RPC timeout has passed. The user
will see how many jobs are left unchecked.

Reviewed-by: ultrotter
parent 78d12585
......@@ -221,8 +221,8 @@ class ClientOps:
return queue.ArchiveJob(job_id)
elif method == luxi.REQ_AUTOARCHIVE_JOBS:
age = args
return queue.AutoArchiveJobs(age)
(age, timeout) = args
return queue.AutoArchiveJobs(age, timeout)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
(job_id, fields, prev_job_info, prev_log_serial, timeout) = args
......
......@@ -1171,7 +1171,7 @@ class JobQueue(object):
@utils.LockedMethod
@_RequireOpenQueue
def AutoArchiveJobs(self, age):
def AutoArchiveJobs(self, age, timeout):
"""Archives all jobs based on age.
The method will archive all jobs which are older than the age
......@@ -1186,22 +1186,36 @@ class JobQueue(object):
logging.info("Archiving jobs with age more than %s seconds", age)
now = time.time()
for job_id in self._GetJobIDsUnlocked(archived=False):
end_time = now + timeout
archived_count = 0
last_touched = 0
all_job_ids = self._GetJobIDsUnlocked(archived=False)
for idx, job_id in enumerate(all_job_ids):
last_touched = idx
if time.time() > end_time:
break
# Returns None if the job failed to load
job = self._LoadJobUnlocked(job_id)
if not job:
continue
if job.end_timestamp is None:
if job.start_timestamp is None:
job_age = job.received_timestamp
if job:
if job.end_timestamp is None:
if job.start_timestamp is None:
job_age = job.received_timestamp
else:
job_age = job.start_timestamp
else:
job_age = job.start_timestamp
else:
job_age = job.end_timestamp
job_age = job.end_timestamp
if age == -1 or now - job_age[0] > age:
archived = self._ArchiveJobUnlocked(job)
if archived:
archived_count += 1
continue
if age == -1 or now - job_age[0] > age:
self._ArchiveJobUnlocked(job)
return (archived_count, len(all_job_ids) - last_touched - 1)
def _GetJobInfoUnlocked(self, job, fields):
"""Returns information about a job.
......
......@@ -306,7 +306,8 @@ class Client(object):
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def AutoArchiveJobs(self, age):
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
timeout = (DEF_RWTO - 1) / 2
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
timeout = (DEF_RWTO - 1) / 2
......
......@@ -156,7 +156,9 @@ def AutoArchiveJobs(opts, args):
else:
age = ParseTimespec(age)
client.AutoArchiveJobs(age)
(archived_count, jobs_left) = client.AutoArchiveJobs(age)
ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left)
return 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment