Skip to content
Snippets Groups Projects
Commit 23b4b983 authored by Iustin Pop's avatar Iustin Pop
Browse files

Modify cli.JobExecutor to use SubmitManyJobs


This patch changes the generic "multiple job executor" to use the many
jobs submit model, which automatically makes all its users use the new
model.

This makes, for example, startup/shutdown of a full cluster much more
logical (all the submitted job IDs are visible fast, and then waiting
for them proceeds normally).

Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 64bfbc08
No related branches found
No related tags found
No related merge requests found
......@@ -992,15 +992,24 @@ class JobExecutor(object):
cl = GetClient()
self.cl = cl
self.verbose = verbose
self.jobs = []
def QueueJob(self, name, *ops):
"""Submit a job for execution.
"""Record a job for later submit.
@type name: string
@param name: a description of the job, will be used in WaitJobSet
"""
job_id = SendJob(ops, cl=self.cl)
self.queue.append((job_id, name))
self.queue.append((name, ops))
def SubmitPending(self):
"""Submit all pending jobs.
"""
results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
for ((status, data), (name, _)) in zip(results, self.queue):
self.jobs.append((status, data, name))
def GetResults(self):
"""Wait for and return the results of all jobs.
......@@ -1011,10 +1020,18 @@ class JobExecutor(object):
there will be the error message
"""
if not self.jobs:
self.SubmitPending()
results = []
if self.verbose:
ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
for jid, name in self.queue:
ok_jobs = [row[1] for row in self.jobs if row[0]]
if ok_jobs:
ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
for submit_status, jid, name in self.jobs:
if not submit_status:
ToStderr("Failed to submit job for %s: %s", name, jid)
results.append((False, jid))
continue
if self.verbose:
ToStdout("Waiting for job %s for %s...", jid, name)
try:
......@@ -1039,5 +1056,10 @@ class JobExecutor(object):
if wait:
return self.GetResults()
else:
for jid, name in self.queue:
ToStdout("%s: %s", jid, name)
if not self.jobs:
self.SubmitPending()
for status, result, name in self.jobs:
if status:
ToStdout("%s: %s", result, name)
else:
ToStderr("Failure for %s: %s", name, result)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment