Commit ae1a845c authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Change OpClusterVerifyDisks to per-group opcodes



Until now verifying disks, which is also used by the watcher,
would lock all nodes and instances. With this patch the opcode
is changed to operate on per nodegroup, requiring fewer locks.

Both “gnt-cluster” and “ganeti-watcher” are changed for the
new interface.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent c4ff2275
......@@ -503,22 +503,30 @@ def VerifyDisks(opts, args):
cl = GetClient()
op = opcodes.OpClusterVerifyDisks()
result = SubmitOpCode(op, opts=opts, cl=cl)
if not isinstance(result, (list, tuple)) or len(result) != 3:
raise errors.ProgrammerError("Unknown result type for OpClusterVerifyDisks")
bad_nodes, instances, missing = result
result = SubmitOpCode(op, cl=cl, opts=opts)
# Keep track of submitted jobs
jex = JobExecutor(cl=cl, opts=opts)
for (status, job_id) in result[constants.JOB_IDS_KEY]:
jex.AddJobId(None, status, job_id)
retcode = constants.EXIT_SUCCESS
if bad_nodes:
for (status, result) in jex.GetResults():
if not status:
ToStdout("Job failed: %s", result)
continue
((bad_nodes, instances, missing), ) = result
for node, text in bad_nodes.items():
ToStdout("Error gathering data on node %s: %s",
node, utils.SafeEncode(text[-400:]))
retcode |= 1
retcode = constants.EXIT_FAILURE
ToStdout("You need to fix these nodes first before fixing instances")
if instances:
for iname in instances:
if iname in missing:
continue
......@@ -531,24 +539,24 @@ def VerifyDisks(opts, args):
retcode |= nret
ToStderr("Error activating disks for instance %s: %s", iname, msg)
if missing:
for iname, ival in missing.iteritems():
all_missing = compat.all(x[0] in bad_nodes for x in ival)
if all_missing:
ToStdout("Instance %s cannot be verified as it lives on"
" broken nodes", iname)
else:
ToStdout("Instance %s has missing logical volumes:", iname)
ival.sort()
for node, vol in ival:
if node in bad_nodes:
ToStdout("\tbroken node %s /dev/%s", node, vol)
else:
ToStdout("\t%s /dev/%s", node, vol)
ToStdout("You need to run replace or recreate disks for all the above"
" instances, if this message persist after fixing nodes.")
retcode |= 1
if missing:
for iname, ival in missing.iteritems():
all_missing = compat.all(x[0] in bad_nodes for x in ival)
if all_missing:
ToStdout("Instance %s cannot be verified as it lives on"
" broken nodes", iname)
else:
ToStdout("Instance %s has missing logical volumes:", iname)
ival.sort()
for node, vol in ival:
if node in bad_nodes:
ToStdout("\tbroken node %s /dev/%s", node, vol)
else:
ToStdout("\t%s /dev/%s", node, vol)
ToStdout("You need to replace or recreate disks for all the above"
" instances if this message persists after fixing broken nodes.")
retcode = constants.EXIT_FAILURE
return retcode
......
......@@ -2867,11 +2867,109 @@ class LUClusterVerifyDisks(NoHooksLU):
REQ_BGL = False
def ExpandNames(self):
self.share_locks = dict.fromkeys(locking.LEVELS, 1)
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
locking.LEVEL_NODEGROUP: locking.ALL_SET,
}
def Exec(self, feedback_fn):
group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
# Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
for group in group_names])
class LUGroupVerifyDisks(NoHooksLU):
"""Verifies the status of all disks in a node group.
"""
REQ_BGL = False
def ExpandNames(self):
# Raises errors.OpPrereqError on its own if group can't be found
self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
self.share_locks = dict.fromkeys(locking.LEVELS, 1)
self.needed_locks = {
locking.LEVEL_INSTANCE: [],
locking.LEVEL_NODEGROUP: [],
locking.LEVEL_NODE: [],
}
def DeclareLocks(self, level):
if level == locking.LEVEL_INSTANCE:
assert not self.needed_locks[locking.LEVEL_INSTANCE]
# Lock instances optimistically, needs verification once node and group
# locks have been acquired
self.needed_locks[locking.LEVEL_INSTANCE] = \
self.cfg.GetNodeGroupInstances(self.group_uuid)
elif level == locking.LEVEL_NODEGROUP:
assert not self.needed_locks[locking.LEVEL_NODEGROUP]
self.needed_locks[locking.LEVEL_NODEGROUP] = \
set([self.group_uuid] +
# Lock all groups used by instances optimistically; this requires
# going via the node before it's locked, requiring verification
# later on
[group_uuid
for instance_name in
self.glm.list_owned(locking.LEVEL_INSTANCE)
for group_uuid in
self.cfg.GetInstanceNodeGroups(instance_name)])
elif level == locking.LEVEL_NODE:
# This will only lock the nodes in the group to be verified which contain
# actual instances
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
self._LockInstancesNodes()
# Lock all nodes in group to be verified
assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
def CheckPrereq(self):
owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
assert self.group_uuid in owned_groups
# Check if locked instances are still correct
wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
if owned_instances != wanted_instances:
raise errors.OpPrereqError("Instances in node group %s changed since"
" locks were acquired, wanted %s, have %s;"
" retry the operation" %
(self.op.group_name,
utils.CommaJoin(wanted_instances),
utils.CommaJoin(owned_instances)),
errors.ECODE_STATE)
# Get instance information
self.instances = dict((name, self.cfg.GetInstanceInfo(name))
for name in owned_instances)
# Check if node groups for locked instances are still correct
for (instance_name, inst) in self.instances.items():
assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
"Instance %s has no node in group %s" % (instance_name, self.group_uuid)
assert owned_nodes.issuperset(inst.all_nodes), \
"Instance %s's nodes changed while we kept the lock" % instance_name
inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
if not owned_groups.issuperset(inst_groups):
raise errors.OpPrereqError("Instance %s's node groups changed since"
" locks were acquired, current groups are"
" are '%s', owning groups '%s'; retry the"
" operation" %
(instance_name,
utils.CommaJoin(inst_groups),
utils.CommaJoin(owned_groups)),
errors.ECODE_STATE)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
......@@ -2882,50 +2980,41 @@ class LUClusterVerifyDisks(NoHooksLU):
missing volumes
"""
result = res_nodes, res_instances, res_missing = {}, [], {}
res_nodes = {}
res_instances = set()
res_missing = {}
nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
instances = self.cfg.GetAllInstancesInfo().values()
nv_dict = _MapInstanceDisksToNodes([inst
for inst in self.instances.values()
if inst.admin_up])
nv_dict = {}
for inst in instances:
inst_lvs = {}
if not inst.admin_up:
continue
inst.MapLVsByNode(inst_lvs)
# transform { iname: {node: [vol,],},} to {(node, vol): iname}
for node, vol_list in inst_lvs.iteritems():
for vol in vol_list:
nv_dict[(node, vol)] = inst
if not nv_dict:
return result
node_lvs = self.rpc.call_lv_list(nodes, [])
for node, node_res in node_lvs.items():
if node_res.offline:
continue
msg = node_res.fail_msg
if msg:
logging.warning("Error enumerating LVs on node %s: %s", node, msg)
res_nodes[node] = msg
continue
if nv_dict:
nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
set(self.cfg.GetVmCapableNodeList()))
lvs = node_res.payload
for lv_name, (_, _, lv_online) in lvs.items():
inst = nv_dict.pop((node, lv_name), None)
if (not lv_online and inst is not None
and inst.name not in res_instances):
res_instances.append(inst.name)
node_lvs = self.rpc.call_lv_list(nodes, [])
# any leftover items in nv_dict are missing LVs, let's arrange the
# data better
for key, inst in nv_dict.iteritems():
if inst.name not in res_missing:
res_missing[inst.name] = []
res_missing[inst.name].append(key)
for (node, node_res) in node_lvs.items():
if node_res.offline:
continue
return result
msg = node_res.fail_msg
if msg:
logging.warning("Error enumerating LVs on node %s: %s", node, msg)
res_nodes[node] = msg
continue
for lv_name, (_, _, lv_online) in node_res.payload.items():
inst = nv_dict.pop((node, lv_name), None)
if not (lv_online or inst is None):
res_instances.add(inst)
# any leftover items in nv_dict are missing LVs, let's arrange the data
# better
for key, inst in nv_dict.iteritems():
res_missing.setdefault(inst, []).append(key)
return (res_nodes, list(res_instances), res_missing)
class LUClusterRepairDiskSizes(NoHooksLU):
......
......@@ -594,11 +594,14 @@ class OpClusterVerifyGroup(OpCode):
class OpClusterVerifyDisks(OpCode):
"""Verify the cluster disks.
Parameters: none
"""
Result: a tuple of four elements:
- list of node names with bad data returned (unreachable, etc.)
- dict of node names with broken volume groups (values: error msg)
class OpGroupVerifyDisks(OpCode):
"""Verifies the status of all disks in a node group.
Result: a tuple of three elements:
- dict of node names with issues (values: error msg)
- list of instances with degraded disks (that should be activated)
- dict of instances with missing logical volumes (values: (node, vol)
pairs with details about the missing volumes)
......@@ -612,6 +615,10 @@ class OpClusterVerifyDisks(OpCode):
consideration. This might need to be revisited in the future.
"""
OP_DSC_FIELD = "group_name"
OP_PARAMS = [
_PGroupName,
]
class OpClusterRepairDiskSizes(OpCode):
......
......@@ -597,20 +597,41 @@ class Watcher(object):
"""Run gnt-cluster verify-disks.
"""
op = opcodes.OpClusterVerifyDisks()
job_id = client.SubmitJob([op])
job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
client.ArchiveJob(job_id)
if not isinstance(result, (tuple, list)):
logging.error("Can't get a valid result from verify-disks")
return
offline_disk_instances = result[1]
# Keep track of submitted jobs
jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
archive_jobs = set()
for (status, job_id) in result[constants.JOB_IDS_KEY]:
jex.AddJobId(None, status, job_id)
if status:
archive_jobs.add(job_id)
offline_disk_instances = set()
for (status, result) in jex.GetResults():
if not status:
logging.error("Verify-disks job failed: %s", result)
continue
((_, instances, _), ) = result
offline_disk_instances.update(instances)
for job_id in archive_jobs:
client.ArchiveJob(job_id)
if not offline_disk_instances:
# nothing to do
logging.debug("verify-disks reported no offline disks, nothing to do")
return
logging.debug("Will activate disks for instance(s) %s",
utils.CommaJoin(offline_disk_instances))
# we submit only one job, and wait for it. not optimal, but spams
# less the job queue
job = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment