Commit 479636a3 authored by Iustin Pop's avatar Iustin Pop
Browse files

Rework the multi-instance gnt commands

This patch changes the multi-instance gnt-* commands (gnt-instance
start/stop, gnt-node evacuate/failover) such that the individual
operations are submitted in parallel, ideally improving the speed of the
execution.

The patch does this by abstracting the job set functionality into a new
class in cli.py, that takes care of the job submit, job poll and error
handling.

Reviewed-by: ultrotter
parent 5278185a
......@@ -50,10 +50,8 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
"ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
"FormatError", "SplitNodeOption", "SubmitOrSend",
"JobSubmittedException", "FormatTimestamp", "ParseTimespec",
"ValidateBeParams",
"ToStderr", "ToStdout",
"UsesRPC",
"GetOnlineNodes",
"ValidateBeParams", "ToStderr", "ToStdout", "UsesRPC",
"GetOnlineNodes", "JobExecutor",
]
......@@ -989,3 +987,67 @@ def ToStderr(txt, *args):
"""
_ToStream(sys.stderr, txt, *args)
class JobExecutor(object):
"""Class which manages the submission and execution of multiple jobs.
Note that instances of this class should not be reused between
GetResults() calls.
"""
def __init__(self, cl=None, verbose=True):
self.queue = []
if cl is None:
cl = GetClient()
self.cl = cl
self.verbose = verbose
def QueueJob(self, name, *ops):
"""Submit a job for execution.
@type name: string
@param name: a description of the job, will be used in WaitJobSet
"""
job_id = SendJob(ops, cl=self.cl)
self.queue.append((job_id, name))
def GetResults(self):
"""Wait for and return the results of all jobs.
@rtype: list
@return: list of tuples (success, job results), in the same order
as the submitted jobs; if a job has failed, instead of the result
there will be the error message
"""
results = []
if self.verbose:
ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
for jid, name in self.queue:
if self.verbose:
ToStdout("Waiting for job %s for %s...", jid, name)
try:
job_result = PollJob(jid, cl=self.cl)
success = True
except (errors.GenericError, luxi.ProtocolError), err:
_, job_result = FormatError(err)
success = False
# the error message will always be shown, verbose or not
ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
results.append((success, job_result))
return results
def WaitOrShow(self, wait):
"""Wait for job results or only print the job IDs.
@type wait: boolean
@param wait: whether to wait or not
"""
if wait:
return self.GetResults()
else:
for jid, name in self.queue:
ToStdout("%s: %s", jid, name)
......@@ -53,7 +53,7 @@ _LIST_DEF_FIELDS = [
]
def _ExpandMultiNames(mode, names):
def _ExpandMultiNames(mode, names, client=None):
"""Expand the given names using the passed mode.
For _SHUTDOWN_CLUSTER, all instances will be returned. For
......@@ -76,10 +76,11 @@ def _ExpandMultiNames(mode, names):
@raise errors.OpPrereqError: for invalid input parameters
"""
if client is None:
client = GetClient()
if mode == _SHUTDOWN_CLUSTER:
if names:
raise errors.OpPrereqError("Cluster filter mode takes no arguments")
client = GetClient()
idata = client.QueryInstances([], ["name"])
inames = [row[0] for row in idata]
......@@ -88,7 +89,6 @@ def _ExpandMultiNames(mode, names):
_SHUTDOWN_NODES_SEC):
if not names:
raise errors.OpPrereqError("No node names passed")
client = GetClient()
ndata = client.QueryNodes(names, ["name", "pinst_list", "sinst_list"])
ipri = [row[1] for row in ndata]
pri_names = list(itertools.chain(*ipri))
......@@ -106,7 +106,6 @@ def _ExpandMultiNames(mode, names):
elif mode == _SHUTDOWN_INSTANCES:
if not names:
raise errors.OpPrereqError("No instance names passed")
client = GetClient()
idata = client.QueryInstances(names, ["name"])
inames = [row[0] for row in idata]
......@@ -673,26 +672,23 @@ def StartupInstance(opts, args):
@return: the desired exit code
"""
cl = GetClient()
if opts.multi_mode is None:
opts.multi_mode = _SHUTDOWN_INSTANCES
inames = _ExpandMultiNames(opts.multi_mode, args)
inames = _ExpandMultiNames(opts.multi_mode, args, client=cl)
if not inames:
raise errors.OpPrereqError("Selection filter does not match any instances")
multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
if not (opts.force_multi or not multi_on
or _ConfirmOperation(inames, "startup")):
return 1
jex = cli.JobExecutor(verbose=multi_on, cl=cl)
for name in inames:
op = opcodes.OpStartupInstance(instance_name=name,
force=opts.force,
extra_args=opts.extra_args)
if multi_on:
ToStdout("Starting up %s", name)
try:
SubmitOrSend(op, opts)
except JobSubmittedException, err:
_, txt = FormatError(err)
ToStdout("%s", txt)
jex.QueueJob(name, op)
jex.WaitOrShow(not opts.submit_only)
return 0
......@@ -711,21 +707,23 @@ def RebootInstance(opts, args):
@return: the desired exit code
"""
cl = GetClient()
if opts.multi_mode is None:
opts.multi_mode = _SHUTDOWN_INSTANCES
inames = _ExpandMultiNames(opts.multi_mode, args)
inames = _ExpandMultiNames(opts.multi_mode, args, client=cl)
if not inames:
raise errors.OpPrereqError("Selection filter does not match any instances")
multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
if not (opts.force_multi or not multi_on
or _ConfirmOperation(inames, "reboot")):
return 1
jex = JobExecutor(verbose=multi_on, cl=cl)
for name in inames:
op = opcodes.OpRebootInstance(instance_name=name,
reboot_type=opts.reboot_type,
ignore_secondaries=opts.ignore_secondaries)
SubmitOrSend(op, opts)
jex.QueueJob(name, op)
jex.WaitOrShow(not opts.submit_only)
return 0
......@@ -741,24 +739,22 @@ def ShutdownInstance(opts, args):
@return: the desired exit code
"""
cl = GetClient()
if opts.multi_mode is None:
opts.multi_mode = _SHUTDOWN_INSTANCES
inames = _ExpandMultiNames(opts.multi_mode, args)
inames = _ExpandMultiNames(opts.multi_mode, args, client=cl)
if not inames:
raise errors.OpPrereqError("Selection filter does not match any instances")
multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
if not (opts.force_multi or not multi_on
or _ConfirmOperation(inames, "shutdown")):
return 1
jex = cli.JobExecutor(verbose=multi_on, cl=cl)
for name in inames:
op = opcodes.OpShutdownInstance(instance_name=name)
if multi_on:
ToStdout("Shutting down %s", name)
try:
SubmitOrSend(op, opts)
except JobSubmittedException, err:
_, txt = FormatError(err)
ToStdout("%s", txt)
jex.QueueJob(name, op)
jex.WaitOrShow(not opts.submit_only)
return 0
......
......@@ -167,15 +167,16 @@ def EvacuateNode(opts, args):
@return: the desired exit code
"""
cl = GetClient()
force = opts.force
selected_fields = ["name", "sinst_list"]
src_node, dst_node = args
op = opcodes.OpQueryNodes(output_fields=selected_fields, names=[src_node])
result = SubmitOpCode(op)
result = SubmitOpCode(op, cl=cl)
src_node, sinst = result[0]
op = opcodes.OpQueryNodes(output_fields=["name"], names=[dst_node])
result = SubmitOpCode(op)
result = SubmitOpCode(op, cl=cl)
dst_node = result[0][0]
if src_node == dst_node:
......@@ -189,36 +190,30 @@ def EvacuateNode(opts, args):
sinst = utils.NiceSort(sinst)
retcode = constants.EXIT_SUCCESS
if not force and not AskUser("Relocate instance(s) %s from node\n"
" %s to node\n %s?" %
(",".join("'%s'" % name for name in sinst),
src_node, dst_node)):
return constants.EXIT_CONFIRMATION
good_cnt = bad_cnt = 0
jex = JobExecutor()
for iname in sinst:
op = opcodes.OpReplaceDisks(instance_name=iname,
remote_node=dst_node,
mode=constants.REPLACE_DISK_ALL,
disks=["sda", "sdb"])
try:
ToStdout("Replacing disks for instance %s", iname)
SubmitOpCode(op)
ToStdout("Instance %s has been relocated", iname)
good_cnt += 1
except errors.GenericError, err:
nret, msg = FormatError(err)
retcode |= nret
ToStderr("Error replacing disks for instance %s: %s", iname, msg)
bad_cnt += 1
if retcode == constants.EXIT_SUCCESS:
ToStdout("All %d instance(s) relocated successfully.", good_cnt)
mode=constants.REPLACE_DISK_CHG,
disks=[])
jex.QueueJob(iname, op)
results = jex.GetResults()
bad_cnt = len([row for row in results if not row[0]])
if bad_cnt == 0:
ToStdout("All %d instance(s) relocated successfully.", len(results))
retcode = constants.EXIT_SUCCESS
else:
ToStdout("There were errors during the relocation:\n"
"%d error(s) out of %d instance(s).", bad_cnt, good_cnt + bad_cnt)
"%d error(s) out of %d instance(s).", bad_cnt, len(results))
retcode = constants.EXIT_FAILURE
return retcode
......@@ -232,11 +227,12 @@ def FailoverNode(opts, args):
@return: the desired exit code
"""
cl = GetClient()
force = opts.force
selected_fields = ["name", "pinst_list"]
op = opcodes.OpQueryNodes(output_fields=selected_fields, names=args)
result = SubmitOpCode(op)
result = SubmitOpCode(op, cl=cl)
node, pinst = result[0]
if not pinst:
......@@ -251,26 +247,18 @@ def FailoverNode(opts, args):
(",".join("'%s'" % name for name in pinst))):
return 2
good_cnt = bad_cnt = 0
jex = JobExecutor(cl=cl)
for iname in pinst:
op = opcodes.OpFailoverInstance(instance_name=iname,
ignore_consistency=opts.ignore_consistency)
try:
ToStdout("Failing over instance %s", iname)
SubmitOpCode(op)
ToStdout("Instance %s has been failed over", iname)
good_cnt += 1
except errors.GenericError, err:
nret, msg = FormatError(err)
retcode |= nret
ToStderr("Error failing over instance %s: %s", iname, msg)
bad_cnt += 1
if retcode == 0:
ToStdout("All %d instance(s) failed over successfully.", good_cnt)
jex.QueueJob(iname, op)
results = jex.GetResults()
bad_cnt = len([row for row in results if not row[0]])
if bad_cnt == 0:
ToStdout("All %d instance(s) failed over successfully.", len(results))
else:
ToStdout("There were errors during the failover:\n"
"%d error(s) out of %d instance(s).", bad_cnt, good_cnt + bad_cnt)
"%d error(s) out of %d instance(s).", bad_cnt, len(results))
return retcode
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment