Commit 5299e61f authored by Iustin Pop's avatar Iustin Pop
Browse files

cli.JobExecutor: poll jobs in execution order



… rather than submission order. The results are still returned in the
submission order, and for this we needed to track internally the index
of the submission.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 2de64672
......@@ -1813,8 +1813,31 @@ class JobExecutor(object):
"""
results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
for ((status, data), (name, _)) in zip(results, self.queue):
self.jobs.append((status, data, name))
for (idx, ((status, data), (name, _))) in enumerate(zip(results,
self.queue)):
self.jobs.append((idx, status, data, name))
def _ChooseJob(self):
"""Choose a non-waiting/queued job to poll next.
"""
assert self.jobs, "_ChooseJob called with empty job list"
result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
assert result
for job_data, status in zip(self.jobs, result):
if status[0] in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
# job is still waiting
continue
# good candidate found
self.jobs.remove(job_data)
return job_data
# no job found
return self.jobs.pop(0)
def GetResults(self):
"""Wait for and return the results of all jobs.
......@@ -1829,16 +1852,19 @@ class JobExecutor(object):
self.SubmitPending()
results = []
if self.verbose:
ok_jobs = [row[1] for row in self.jobs if row[0]]
ok_jobs = [row[2] for row in self.jobs if row[1]]
if ok_jobs:
ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
for submit_status, jid, name in self.jobs:
if not submit_status:
# first, remove any non-submitted jobs
self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
for idx, _, jid, name in failures:
ToStderr("Failed to submit job for %s: %s", name, jid)
results.append((False, jid))
continue
if self.verbose:
ToStdout("Waiting for job %s for %s...", jid, name)
results.append((idx, False, jid))
while self.jobs:
(idx, _, jid, name) = self._ChooseJob()
ToStdout("Waiting for job %s for %s...", jid, name)
try:
job_result = PollJob(jid, cl=self.cl)
success = True
......@@ -1848,7 +1874,12 @@ class JobExecutor(object):
# the error message will always be shown, verbose or not
ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
results.append((success, job_result))
results.append((idx, success, job_result))
# sort based on the index, then drop it
results.sort()
results = [i[1:] for i in results]
return results
def WaitOrShow(self, wait):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment