From 31b836b8b1d17c70023ef5b8307a5df1b9a0d7db Mon Sep 17 00:00:00 2001 From: Thomas Thrainer <thomasth@google.com> Date: Mon, 13 May 2013 15:16:27 +0200 Subject: [PATCH] Extract node related logical units from cmdlib All LUNode* classes are extracted to node.py. Common functions are moved to common.py if used by non-node logical units as well. Signed-off-by: Thomas Thrainer <thomasth@google.com> Reviewed-by: Bernardo Dal Seno <bdalseno@google.com> --- Makefile.am | 1 + lib/cmdlib/__init__.py | 1874 ++--------------------------- lib/cmdlib/common.py | 139 +++ lib/cmdlib/node.py | 1569 ++++++++++++++++++++++++ test/py/ganeti.cmdlib_unittest.py | 2 +- 5 files changed, 1821 insertions(+), 1764 deletions(-) create mode 100644 lib/cmdlib/node.py diff --git a/Makefile.am b/Makefile.am index 594fe3175..cc9d2d21e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -313,6 +313,7 @@ cmdlib_PYTHON = \ lib/cmdlib/base.py \ lib/cmdlib/cluster.py \ lib/cmdlib/group.py \ + lib/cmdlib/node.py \ lib/cmdlib/tags.py \ lib/cmdlib/network.py \ lib/cmdlib/test.py diff --git a/lib/cmdlib/__init__.py b/lib/cmdlib/__init__.py index 705038bd9..f37230dde 100644 --- a/lib/cmdlib/__init__.py +++ b/lib/cmdlib/__init__.py @@ -56,7 +56,9 @@ from ganeti.masterd import iallocator from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \ Tasklet, _QueryBase -from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \ +from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \ + INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \ + _ExpandInstanceName, _ExpandItemName, \ _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \ _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \ _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \ @@ -65,7 +67,9 @@ from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \ _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \ _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \ _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \ - _CheckInstanceNodeGroups + _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \ + _IsExclusiveStorageEnabledNode, _CheckInstanceState, \ + _CheckIAllocatorOrNode, _FindFaultyInstanceDisks from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \ LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \ @@ -76,6 +80,10 @@ from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \ from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \ _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \ LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks +from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \ + LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \ + _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \ + LUNodeRemove, LURepairNodeStorage from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \ LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \ @@ -85,31 +93,6 @@ from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator import ganeti.masterd.instance # pylint: disable=W0611 -# States of instance -INSTANCE_DOWN = [constants.ADMINST_DOWN] -INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] -INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE] - -#: Instance status in which an instance can be marked as offline/online -CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([ - constants.ADMINST_OFFLINE, - ])) - - -def _IsExclusiveStorageEnabledNode(cfg, node): - """Whether exclusive_storage is in effect for the given node. - - @type cfg: L{config.ConfigWriter} - @param cfg: The cluster configuration - @type node: L{objects.Node} - @param node: The node - @rtype: bool - @return: The effective value of exclusive_storage - - """ - return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE] - - def _IsExclusiveStorageEnabledNodeName(cfg, nodename): """Whether exclusive_storage is in effect for the given node. @@ -191,54 +174,6 @@ def _ReleaseLocks(lu, level, names=None, keep=None): assert not lu.glm.is_owned(level), "No locks should be owned" -def _CheckOutputFields(static, dynamic, selected): - """Checks whether all selected fields are valid. - - @type static: L{utils.FieldSet} - @param static: static fields set - @type dynamic: L{utils.FieldSet} - @param dynamic: dynamic fields set - - """ - f = utils.FieldSet() - f.Extend(static) - f.Extend(dynamic) - - delta = f.NonMatching(selected) - if delta: - raise errors.OpPrereqError("Unknown output fields selected: %s" - % ",".join(delta), errors.ECODE_INVAL) - - -def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels): - """Make sure that none of the given paramters is global. - - If a global parameter is found, an L{errors.OpPrereqError} exception is - raised. This is used to avoid setting global parameters for individual nodes. - - @type params: dictionary - @param params: Parameters to check - @type glob_pars: dictionary - @param glob_pars: Forbidden parameters - @type kind: string - @param kind: Kind of parameters (e.g. "node") - @type bad_levels: string - @param bad_levels: Level(s) at which the parameters are forbidden (e.g. - "instance") - @type good_levels: strings - @param good_levels: Level(s) at which the parameters are allowed (e.g. - "cluster or group") - - """ - used_globals = glob_pars.intersection(params) - if used_globals: - msg = ("The following %s parameters are global and cannot" - " be customized at %s level, please modify them at" - " %s level: %s" % - (kind, bad_levels, good_levels, utils.CommaJoin(used_globals))) - raise errors.OpPrereqError(msg, errors.ECODE_INVAL) - - def _CheckNodeOnline(lu, node, msg=None): """Ensure that a given node is online. @@ -298,33 +233,6 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant): _CheckOSVariant(result.payload, os_name) -def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq): - """Ensure that a node has the given secondary ip. - - @type lu: L{LogicalUnit} - @param lu: the LU on behalf of which we make the check - @type node: string - @param node: the node to check - @type secondary_ip: string - @param secondary_ip: the ip to check - @type prereq: boolean - @param prereq: whether to throw a prerequisite or an execute error - @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True - @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False - - """ - result = lu.rpc.call_node_has_ip_address(node, secondary_ip) - result.Raise("Failure checking secondary ip on node %s" % node, - prereq=prereq, ecode=errors.ECODE_ENVIRON) - if not result.payload: - msg = ("Node claims it doesn't have the secondary ip you gave (%s)," - " please fix and re-run this command" % secondary_ip) - if prereq: - raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) - else: - raise errors.OpExecError(msg) - - def _GetClusterDomainSecret(): """Reads the cluster domain secret. @@ -333,37 +241,6 @@ def _GetClusterDomainSecret(): strict=True) -def _CheckInstanceState(lu, instance, req_states, msg=None): - """Ensure that an instance is in one of the required states. - - @param lu: the LU on behalf of which we make the check - @param instance: the instance to check - @param msg: if passed, should be a message to replace the default one - @raise errors.OpPrereqError: if the instance is not in the required state - - """ - if msg is None: - msg = ("can't use instance from outside %s states" % - utils.CommaJoin(req_states)) - if instance.admin_state not in req_states: - raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % - (instance.name, instance.admin_state, msg), - errors.ECODE_STATE) - - if constants.ADMINST_UP not in req_states: - pnode = instance.primary_node - if not lu.cfg.GetNodeInfo(pnode).offline: - ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] - ins_l.Raise("Can't contact node %s for instance information" % pnode, - prereq=True, ecode=errors.ECODE_ENVIRON) - if instance.name in ins_l.payload: - raise errors.OpPrereqError("Instance %s is running, %s" % - (instance.name, msg), errors.ECODE_STATE) - else: - lu.LogWarning("Primary node offline, ignoring check that instance" - " is down") - - def _ComputeIPolicyInstanceSpecViolation( ipolicy, instance_spec, disk_template, _compute_fn=_ComputeIPolicySpecViolation): @@ -621,17 +498,6 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): return _BuildInstanceHookEnv(**args) # pylint: disable=W0142 -def _DecideSelfPromotion(lu, exceptions=None): - """Decide whether I should promote myself as a master candidate. - - """ - cp_size = lu.cfg.GetClusterInfo().candidate_pool_size - mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) - # the new node will increase mc_max with one, so: - mc_should = min(mc_should + 1, cp_size) - return mc_now < mc_should - - def _CheckNicsBridgesExist(lu, target_nics, target_node): """Check that the brigdes needed by a list of nics exist. @@ -679,100 +545,6 @@ def _CheckOSVariant(os_obj, name): raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL) -def _GetNodeInstancesInner(cfg, fn): - return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] - - -def _GetNodeInstances(cfg, node_name): - """Returns a list of all primary and secondary instances on a node. - - """ - - return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes) - - -def _GetNodePrimaryInstances(cfg, node_name): - """Returns primary instances on a node. - - """ - return _GetNodeInstancesInner(cfg, - lambda inst: node_name == inst.primary_node) - - -def _GetNodeSecondaryInstances(cfg, node_name): - """Returns secondary instances on a node. - - """ - return _GetNodeInstancesInner(cfg, - lambda inst: node_name in inst.secondary_nodes) - - -def _GetStorageTypeArgs(cfg, storage_type): - """Returns the arguments for a storage type. - - """ - # Special case for file storage - if storage_type == constants.ST_FILE: - # storage.FileStorage wants a list of storage directories - return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]] - - return [] - - -def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq): - faulty = [] - - for dev in instance.disks: - cfg.SetDiskID(dev, node_name) - - result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks, - instance)) - result.Raise("Failed to get disk status from node %s" % node_name, - prereq=prereq, ecode=errors.ECODE_ENVIRON) - - for idx, bdev_status in enumerate(result.payload): - if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: - faulty.append(idx) - - return faulty - - -def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): - """Check the sanity of iallocator and node arguments and use the - cluster-wide iallocator if appropriate. - - Check that at most one of (iallocator, node) is specified. If none is - specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT}, - then the LU's opcode's iallocator slot is filled with the cluster-wide - default iallocator. - - @type iallocator_slot: string - @param iallocator_slot: the name of the opcode iallocator slot - @type node_slot: string - @param node_slot: the name of the opcode target node slot - - """ - node = getattr(lu.op, node_slot, None) - ialloc = getattr(lu.op, iallocator_slot, None) - if node == []: - node = None - - if node is not None and ialloc is not None: - raise errors.OpPrereqError("Do not specify both, iallocator and node", - errors.ECODE_INVAL) - elif ((node is None and ialloc is None) or - ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT): - default_iallocator = lu.cfg.GetDefaultIAllocator() - if default_iallocator: - setattr(lu.op, iallocator_slot, default_iallocator) - else: - raise errors.OpPrereqError("No iallocator or node given and no" - " cluster-wide default iallocator found;" - " please specify either an iallocator or a" - " node, or set a cluster-wide default" - " iallocator", errors.ECODE_INVAL) - - def _CheckHostnameSane(lu, name): """Ensures that a given hostname resolves to a 'sane' name. @@ -1431,358 +1203,6 @@ class LUExtStorageDiagnose(NoHooksLU): return self.eq.OldStyleQuery(self) -class LUNodeRemove(LogicalUnit): - """Logical unit for removing a node. - - """ - HPATH = "node-remove" - HTYPE = constants.HTYPE_NODE - - def BuildHooksEnv(self): - """Build hooks env. - - """ - return { - "OP_TARGET": self.op.node_name, - "NODE_NAME": self.op.node_name, - } - - def BuildHooksNodes(self): - """Build hooks nodes. - - This doesn't run on the target node in the pre phase as a failed - node would then be impossible to remove. - - """ - all_nodes = self.cfg.GetNodeList() - try: - all_nodes.remove(self.op.node_name) - except ValueError: - pass - return (all_nodes, all_nodes) - - def CheckPrereq(self): - """Check prerequisites. - - This checks: - - the node exists in the configuration - - it does not have primary or secondary instances - - it's not the master - - Any errors are signaled by raising errors.OpPrereqError. - - """ - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - node = self.cfg.GetNodeInfo(self.op.node_name) - assert node is not None - - masternode = self.cfg.GetMasterNode() - if node.name == masternode: - raise errors.OpPrereqError("Node is the master node, failover to another" - " node is required", errors.ECODE_INVAL) - - for instance_name, instance in self.cfg.GetAllInstancesInfo().items(): - if node.name in instance.all_nodes: - raise errors.OpPrereqError("Instance %s is still running on the node," - " please remove first" % instance_name, - errors.ECODE_INVAL) - self.op.node_name = node.name - self.node = node - - def Exec(self, feedback_fn): - """Removes the node from the cluster. - - """ - node = self.node - logging.info("Stopping the node daemon and removing configs from node %s", - node.name) - - modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup - - assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ - "Not owning BGL" - - # Promote nodes to master candidate as needed - _AdjustCandidatePool(self, exceptions=[node.name]) - self.context.RemoveNode(node.name) - - # Run post hooks on the node before it's removed - _RunPostHook(self, node.name) - - result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) - msg = result.fail_msg - if msg: - self.LogWarning("Errors encountered on the remote node while leaving" - " the cluster: %s", msg) - - # Remove node from our /etc/hosts - if self.cfg.GetClusterInfo().modify_etc_hosts: - master_node = self.cfg.GetMasterNode() - result = self.rpc.call_etc_hosts_modify(master_node, - constants.ETC_HOSTS_REMOVE, - node.name, None) - result.Raise("Can't update hosts file with new host data") - _RedistributeAncillaryFiles(self) - - -class _NodeQuery(_QueryBase): - FIELDS = query.NODE_FIELDS - - def ExpandNames(self, lu): - lu.needed_locks = {} - lu.share_locks = _ShareAll() - - if self.names: - self.wanted = _GetWantedNodes(lu, self.names) - else: - self.wanted = locking.ALL_SET - - self.do_locking = (self.use_locking and - query.NQ_LIVE in self.requested_data) - - if self.do_locking: - # If any non-static field is requested we need to lock the nodes - lu.needed_locks[locking.LEVEL_NODE] = self.wanted - lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET - - def DeclareLocks(self, lu, level): - pass - - def _GetQueryData(self, lu): - """Computes the list of nodes and their attributes. - - """ - all_info = lu.cfg.GetAllNodesInfo() - - nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE) - - # Gather data as requested - if query.NQ_LIVE in self.requested_data: - # filter out non-vm_capable nodes - toquery_nodes = [name for name in nodenames if all_info[name].vm_capable] - - es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes) - node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()], - [lu.cfg.GetHypervisorType()], es_flags) - live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload)) - for (name, nresult) in node_data.items() - if not nresult.fail_msg and nresult.payload) - else: - live_data = None - - if query.NQ_INST in self.requested_data: - node_to_primary = dict([(name, set()) for name in nodenames]) - node_to_secondary = dict([(name, set()) for name in nodenames]) - - inst_data = lu.cfg.GetAllInstancesInfo() - - for inst in inst_data.values(): - if inst.primary_node in node_to_primary: - node_to_primary[inst.primary_node].add(inst.name) - for secnode in inst.secondary_nodes: - if secnode in node_to_secondary: - node_to_secondary[secnode].add(inst.name) - else: - node_to_primary = None - node_to_secondary = None - - if query.NQ_OOB in self.requested_data: - oob_support = dict((name, bool(_SupportsOob(lu.cfg, node))) - for name, node in all_info.iteritems()) - else: - oob_support = None - - if query.NQ_GROUP in self.requested_data: - groups = lu.cfg.GetAllNodeGroupsInfo() - else: - groups = {} - - return query.NodeQueryData([all_info[name] for name in nodenames], - live_data, lu.cfg.GetMasterNode(), - node_to_primary, node_to_secondary, groups, - oob_support, lu.cfg.GetClusterInfo()) - - -class LUNodeQuery(NoHooksLU): - """Logical unit for querying nodes. - - """ - # pylint: disable=W0142 - REQ_BGL = False - - def CheckArguments(self): - self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names), - self.op.output_fields, self.op.use_locking) - - def ExpandNames(self): - self.nq.ExpandNames(self) - - def DeclareLocks(self, level): - self.nq.DeclareLocks(self, level) - - def Exec(self, feedback_fn): - return self.nq.OldStyleQuery(self) - - -class LUNodeQueryvols(NoHooksLU): - """Logical unit for getting volumes on node(s). - - """ - REQ_BGL = False - _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") - _FIELDS_STATIC = utils.FieldSet("node") - - def CheckArguments(self): - _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=self._FIELDS_DYNAMIC, - selected=self.op.output_fields) - - def ExpandNames(self): - self.share_locks = _ShareAll() - - if self.op.nodes: - self.needed_locks = { - locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), - } - else: - self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_NODE_ALLOC: locking.ALL_SET, - } - - def Exec(self, feedback_fn): - """Computes the list of nodes and their attributes. - - """ - nodenames = self.owned_locks(locking.LEVEL_NODE) - volumes = self.rpc.call_node_volumes(nodenames) - - ilist = self.cfg.GetAllInstancesInfo() - vol2inst = _MapInstanceDisksToNodes(ilist.values()) - - output = [] - for node in nodenames: - nresult = volumes[node] - if nresult.offline: - continue - msg = nresult.fail_msg - if msg: - self.LogWarning("Can't compute volume data on node %s: %s", node, msg) - continue - - node_vols = sorted(nresult.payload, - key=operator.itemgetter("dev")) - - for vol in node_vols: - node_output = [] - for field in self.op.output_fields: - if field == "node": - val = node - elif field == "phys": - val = vol["dev"] - elif field == "vg": - val = vol["vg"] - elif field == "name": - val = vol["name"] - elif field == "size": - val = int(float(vol["size"])) - elif field == "instance": - val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-") - else: - raise errors.ParameterError(field) - node_output.append(str(val)) - - output.append(node_output) - - return output - - -class LUNodeQueryStorage(NoHooksLU): - """Logical unit for getting information on storage units on node(s). - - """ - _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) - REQ_BGL = False - - def CheckArguments(self): - _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), - selected=self.op.output_fields) - - def ExpandNames(self): - self.share_locks = _ShareAll() - - if self.op.nodes: - self.needed_locks = { - locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), - } - else: - self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_NODE_ALLOC: locking.ALL_SET, - } - - def Exec(self, feedback_fn): - """Computes the list of nodes and their attributes. - - """ - self.nodes = self.owned_locks(locking.LEVEL_NODE) - - # Always get name to sort by - if constants.SF_NAME in self.op.output_fields: - fields = self.op.output_fields[:] - else: - fields = [constants.SF_NAME] + self.op.output_fields - - # Never ask for node or type as it's only known to the LU - for extra in [constants.SF_NODE, constants.SF_TYPE]: - while extra in fields: - fields.remove(extra) - - field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) - name_idx = field_idx[constants.SF_NAME] - - st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) - data = self.rpc.call_storage_list(self.nodes, - self.op.storage_type, st_args, - self.op.name, fields) - - result = [] - - for node in utils.NiceSort(self.nodes): - nresult = data[node] - if nresult.offline: - continue - - msg = nresult.fail_msg - if msg: - self.LogWarning("Can't get storage data from node %s: %s", node, msg) - continue - - rows = dict([(row[name_idx], row) for row in nresult.payload]) - - for name in utils.NiceSort(rows.keys()): - row = rows[name] - - out = [] - - for field in self.op.output_fields: - if field == constants.SF_NODE: - val = node - elif field == constants.SF_TYPE: - val = self.op.storage_type - elif field in field_idx: - val = row[field_idx[field]] - else: - raise errors.ParameterError(field) - - out.append(val) - - result.append(out) - - return result - - class _InstanceQuery(_QueryBase): FIELDS = query.INSTANCE_FIELDS @@ -1833,887 +1253,140 @@ class _InstanceQuery(_QueryBase): owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP)) # Check if node groups for locked instances are still correct - for instance_name in owned_instances: - _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups) - - def _GetQueryData(self, lu): - """Computes the list of instances and their attributes. - - """ - if self.do_grouplocks: - self._CheckGroupLocks(lu) - - cluster = lu.cfg.GetClusterInfo() - all_info = lu.cfg.GetAllInstancesInfo() - - instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE) - - instance_list = [all_info[name] for name in instance_names] - nodes = frozenset(itertools.chain(*(inst.all_nodes - for inst in instance_list))) - hv_list = list(set([inst.hypervisor for inst in instance_list])) - bad_nodes = [] - offline_nodes = [] - wrongnode_inst = set() - - # Gather data as requested - if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]): - live_data = {} - node_data = lu.rpc.call_all_instances_info(nodes, hv_list) - for name in nodes: - result = node_data[name] - if result.offline: - # offline nodes will be in both lists - assert result.fail_msg - offline_nodes.append(name) - if result.fail_msg: - bad_nodes.append(name) - elif result.payload: - for inst in result.payload: - if inst in all_info: - if all_info[inst].primary_node == name: - live_data.update(result.payload) - else: - wrongnode_inst.add(inst) - else: - # orphan instance; we don't list it here as we don't - # handle this case yet in the output of instance listing - logging.warning("Orphan instance '%s' found on node %s", - inst, name) - # else no instance is alive - else: - live_data = {} - - if query.IQ_DISKUSAGE in self.requested_data: - gmi = ganeti.masterd.instance - disk_usage = dict((inst.name, - gmi.ComputeDiskSize(inst.disk_template, - [{constants.IDISK_SIZE: disk.size} - for disk in inst.disks])) - for inst in instance_list) - else: - disk_usage = None - - if query.IQ_CONSOLE in self.requested_data: - consinfo = {} - for inst in instance_list: - if inst.name in live_data: - # Instance is running - consinfo[inst.name] = _GetInstanceConsole(cluster, inst) - else: - consinfo[inst.name] = None - assert set(consinfo.keys()) == set(instance_names) - else: - consinfo = None - - if query.IQ_NODES in self.requested_data: - node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"), - instance_list))) - nodes = dict(lu.cfg.GetMultiNodeInfo(node_names)) - groups = dict((uuid, lu.cfg.GetNodeGroup(uuid)) - for uuid in set(map(operator.attrgetter("group"), - nodes.values()))) - else: - nodes = None - groups = None - - if query.IQ_NETWORKS in self.requested_data: - net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name) - for i in instance_list)) - networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids) - else: - networks = None - - return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(), - disk_usage, offline_nodes, bad_nodes, - live_data, wrongnode_inst, consinfo, - nodes, groups, networks) - - -class LUQuery(NoHooksLU): - """Query for resources/items of a certain kind. - - """ - # pylint: disable=W0142 - REQ_BGL = False - - def CheckArguments(self): - qcls = _GetQueryImplementation(self.op.what) - - self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking) - - def ExpandNames(self): - self.impl.ExpandNames(self) - - def DeclareLocks(self, level): - self.impl.DeclareLocks(self, level) - - def Exec(self, feedback_fn): - return self.impl.NewStyleQuery(self) - - -class LUQueryFields(NoHooksLU): - """Query for resources/items of a certain kind. - - """ - # pylint: disable=W0142 - REQ_BGL = False - - def CheckArguments(self): - self.qcls = _GetQueryImplementation(self.op.what) - - def ExpandNames(self): - self.needed_locks = {} - - def Exec(self, feedback_fn): - return query.QueryFields(self.qcls.FIELDS, self.op.fields) - - -class LUNodeModifyStorage(NoHooksLU): - """Logical unit for modifying a storage volume on a node. - - """ - REQ_BGL = False - - def CheckArguments(self): - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - - storage_type = self.op.storage_type - - try: - modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] - except KeyError: - raise errors.OpPrereqError("Storage units of type '%s' can not be" - " modified" % storage_type, - errors.ECODE_INVAL) - - diff = set(self.op.changes.keys()) - modifiable - if diff: - raise errors.OpPrereqError("The following fields can not be modified for" - " storage units of type '%s': %r" % - (storage_type, list(diff)), - errors.ECODE_INVAL) - - def ExpandNames(self): - self.needed_locks = { - locking.LEVEL_NODE: self.op.node_name, - } - - def Exec(self, feedback_fn): - """Computes the list of nodes and their attributes. - - """ - st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) - result = self.rpc.call_storage_modify(self.op.node_name, - self.op.storage_type, st_args, - self.op.name, self.op.changes) - result.Raise("Failed to modify storage unit '%s' on %s" % - (self.op.name, self.op.node_name)) - - -class LUNodeAdd(LogicalUnit): - """Logical unit for adding node to the cluster. - - """ - HPATH = "node-add" - HTYPE = constants.HTYPE_NODE - _NFLAGS = ["master_capable", "vm_capable"] - - def CheckArguments(self): - self.primary_ip_family = self.cfg.GetPrimaryIPFamily() - # validate/normalize the node name - self.hostname = netutils.GetHostname(name=self.op.node_name, - family=self.primary_ip_family) - self.op.node_name = self.hostname.name - - if self.op.readd and self.op.node_name == self.cfg.GetMasterNode(): - raise errors.OpPrereqError("Cannot readd the master node", - errors.ECODE_STATE) - - if self.op.readd and self.op.group: - raise errors.OpPrereqError("Cannot pass a node group when a node is" - " being readded", errors.ECODE_INVAL) - - def BuildHooksEnv(self): - """Build hooks env. - - This will run on all nodes before, and on all nodes + the new node after. - - """ - return { - "OP_TARGET": self.op.node_name, - "NODE_NAME": self.op.node_name, - "NODE_PIP": self.op.primary_ip, - "NODE_SIP": self.op.secondary_ip, - "MASTER_CAPABLE": str(self.op.master_capable), - "VM_CAPABLE": str(self.op.vm_capable), - } - - def BuildHooksNodes(self): - """Build hooks nodes. - - """ - # Exclude added node - pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name])) - post_nodes = pre_nodes + [self.op.node_name, ] - - return (pre_nodes, post_nodes) - - def CheckPrereq(self): - """Check prerequisites. - - This checks: - - the new node is not already in the config - - it is resolvable - - its parameters (single/dual homed) matches the cluster - - Any errors are signaled by raising errors.OpPrereqError. - - """ - cfg = self.cfg - hostname = self.hostname - node = hostname.name - primary_ip = self.op.primary_ip = hostname.ip - if self.op.secondary_ip is None: - if self.primary_ip_family == netutils.IP6Address.family: - raise errors.OpPrereqError("When using a IPv6 primary address, a valid" - " IPv4 address must be given as secondary", - errors.ECODE_INVAL) - self.op.secondary_ip = primary_ip - - secondary_ip = self.op.secondary_ip - if not netutils.IP4Address.IsValid(secondary_ip): - raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" - " address" % secondary_ip, errors.ECODE_INVAL) - - node_list = cfg.GetNodeList() - if not self.op.readd and node in node_list: - raise errors.OpPrereqError("Node %s is already in the configuration" % - node, errors.ECODE_EXISTS) - elif self.op.readd and node not in node_list: - raise errors.OpPrereqError("Node %s is not in the configuration" % node, - errors.ECODE_NOENT) - - self.changed_primary_ip = False - - for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list): - if self.op.readd and node == existing_node_name: - if existing_node.secondary_ip != secondary_ip: - raise errors.OpPrereqError("Readded node doesn't have the same IP" - " address configuration as before", - errors.ECODE_INVAL) - if existing_node.primary_ip != primary_ip: - self.changed_primary_ip = True - - continue - - if (existing_node.primary_ip == primary_ip or - existing_node.secondary_ip == primary_ip or - existing_node.primary_ip == secondary_ip or - existing_node.secondary_ip == secondary_ip): - raise errors.OpPrereqError("New node ip address(es) conflict with" - " existing node %s" % existing_node.name, - errors.ECODE_NOTUNIQUE) - - # After this 'if' block, None is no longer a valid value for the - # _capable op attributes - if self.op.readd: - old_node = self.cfg.GetNodeInfo(node) - assert old_node is not None, "Can't retrieve locked node %s" % node - for attr in self._NFLAGS: - if getattr(self.op, attr) is None: - setattr(self.op, attr, getattr(old_node, attr)) - else: - for attr in self._NFLAGS: - if getattr(self.op, attr) is None: - setattr(self.op, attr, True) - - if self.op.readd and not self.op.vm_capable: - pri, sec = cfg.GetNodeInstances(node) - if pri or sec: - raise errors.OpPrereqError("Node %s being re-added with vm_capable" - " flag set to false, but it already holds" - " instances" % node, - errors.ECODE_STATE) - - # check that the type of the node (single versus dual homed) is the - # same as for the master - myself = cfg.GetNodeInfo(self.cfg.GetMasterNode()) - master_singlehomed = myself.secondary_ip == myself.primary_ip - newbie_singlehomed = secondary_ip == primary_ip - if master_singlehomed != newbie_singlehomed: - if master_singlehomed: - raise errors.OpPrereqError("The master has no secondary ip but the" - " new node has one", - errors.ECODE_INVAL) - else: - raise errors.OpPrereqError("The master has a secondary ip but the" - " new node doesn't have one", - errors.ECODE_INVAL) - - # checks reachability - if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("Node not reachable by ping", - errors.ECODE_ENVIRON) - - if not newbie_singlehomed: - # check reachability from my secondary ip to newbie's secondary ip - if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, - source=myself.secondary_ip): - raise errors.OpPrereqError("Node secondary ip not reachable by TCP" - " based ping to node daemon port", - errors.ECODE_ENVIRON) - - if self.op.readd: - exceptions = [node] - else: - exceptions = [] - - if self.op.master_capable: - self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) - else: - self.master_candidate = False - - if self.op.readd: - self.new_node = old_node - else: - node_group = cfg.LookupNodeGroup(self.op.group) - self.new_node = objects.Node(name=node, - primary_ip=primary_ip, - secondary_ip=secondary_ip, - master_candidate=self.master_candidate, - offline=False, drained=False, - group=node_group, ndparams={}) - - if self.op.ndparams: - utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) - _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", - "node", "cluster or group") - - if self.op.hv_state: - self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None) - - if self.op.disk_state: - self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None) - - # TODO: If we need to have multiple DnsOnlyRunner we probably should make - # it a property on the base class. - rpcrunner = rpc.DnsOnlyRunner() - result = rpcrunner.call_version([node])[node] - result.Raise("Can't get version information from node %s" % node) - if constants.PROTOCOL_VERSION == result.payload: - logging.info("Communication to node %s fine, sw version %s match", - node, result.payload) - else: - raise errors.OpPrereqError("Version mismatch master version %s," - " node version %s" % - (constants.PROTOCOL_VERSION, result.payload), - errors.ECODE_ENVIRON) - - vg_name = cfg.GetVGName() - if vg_name is not None: - vparams = {constants.NV_PVLIST: [vg_name]} - excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node) - cname = self.cfg.GetClusterName() - result = rpcrunner.call_node_verify_light([node], vparams, cname)[node] - (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor) - if errmsgs: - raise errors.OpPrereqError("Checks on node PVs failed: %s" % - "; ".join(errmsgs), errors.ECODE_ENVIRON) - - def Exec(self, feedback_fn): - """Adds the new node to the cluster. - - """ - new_node = self.new_node - node = new_node.name - - assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ - "Not owning BGL" - - # We adding a new node so we assume it's powered - new_node.powered = True - - # for re-adds, reset the offline/drained/master-candidate flags; - # we need to reset here, otherwise offline would prevent RPC calls - # later in the procedure; this also means that if the re-add - # fails, we are left with a non-offlined, broken node - if self.op.readd: - new_node.drained = new_node.offline = False # pylint: disable=W0201 - self.LogInfo("Readding a node, the offline/drained flags were reset") - # if we demote the node, we do cleanup later in the procedure - new_node.master_candidate = self.master_candidate - if self.changed_primary_ip: - new_node.primary_ip = self.op.primary_ip - - # copy the master/vm_capable flags - for attr in self._NFLAGS: - setattr(new_node, attr, getattr(self.op, attr)) - - # notify the user about any possible mc promotion - if new_node.master_candidate: - self.LogInfo("Node will be a master candidate") - - if self.op.ndparams: - new_node.ndparams = self.op.ndparams - else: - new_node.ndparams = {} - - if self.op.hv_state: - new_node.hv_state_static = self.new_hv_state - - if self.op.disk_state: - new_node.disk_state_static = self.new_disk_state - - # Add node to our /etc/hosts, and add key to known_hosts - if self.cfg.GetClusterInfo().modify_etc_hosts: - master_node = self.cfg.GetMasterNode() - result = self.rpc.call_etc_hosts_modify(master_node, - constants.ETC_HOSTS_ADD, - self.hostname.name, - self.hostname.ip) - result.Raise("Can't update hosts file with new host data") - - if new_node.secondary_ip != new_node.primary_ip: - _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip, - False) - - node_verify_list = [self.cfg.GetMasterNode()] - node_verify_param = { - constants.NV_NODELIST: ([node], {}), - # TODO: do a node-net-test as well? - } - - result = self.rpc.call_node_verify(node_verify_list, node_verify_param, - self.cfg.GetClusterName()) - for verifier in node_verify_list: - result[verifier].Raise("Cannot communicate with node %s" % verifier) - nl_payload = result[verifier].payload[constants.NV_NODELIST] - if nl_payload: - for failed in nl_payload: - feedback_fn("ssh/hostname verification failed" - " (checking from %s): %s" % - (verifier, nl_payload[failed])) - raise errors.OpExecError("ssh/hostname verification failed") - - if self.op.readd: - _RedistributeAncillaryFiles(self) - self.context.ReaddNode(new_node) - # make sure we redistribute the config - self.cfg.Update(new_node, feedback_fn) - # and make sure the new node will not have old files around - if not new_node.master_candidate: - result = self.rpc.call_node_demote_from_mc(new_node.name) - msg = result.fail_msg - if msg: - self.LogWarning("Node failed to demote itself from master" - " candidate status: %s" % msg) - else: - _RedistributeAncillaryFiles(self, additional_nodes=[node], - additional_vm=self.op.vm_capable) - self.context.AddNode(new_node, self.proc.GetECId()) - - -class LUNodeSetParams(LogicalUnit): - """Modifies the parameters of a node. - - @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) - to the node role (as _ROLE_*) - @cvar _R2F: a dictionary from node role to tuples of flags - @cvar _FLAGS: a list of attribute names corresponding to the flags - - """ - HPATH = "node-modify" - HTYPE = constants.HTYPE_NODE - REQ_BGL = False - (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) - _F2R = { - (True, False, False): _ROLE_CANDIDATE, - (False, True, False): _ROLE_DRAINED, - (False, False, True): _ROLE_OFFLINE, - (False, False, False): _ROLE_REGULAR, - } - _R2F = dict((v, k) for k, v in _F2R.items()) - _FLAGS = ["master_candidate", "drained", "offline"] - - def CheckArguments(self): - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, - self.op.master_capable, self.op.vm_capable, - self.op.secondary_ip, self.op.ndparams, self.op.hv_state, - self.op.disk_state] - if all_mods.count(None) == len(all_mods): - raise errors.OpPrereqError("Please pass at least one modification", - errors.ECODE_INVAL) - if all_mods.count(True) > 1: - raise errors.OpPrereqError("Can't set the node into more than one" - " state at the same time", - errors.ECODE_INVAL) - - # Boolean value that tells us whether we might be demoting from MC - self.might_demote = (self.op.master_candidate is False or - self.op.offline is True or - self.op.drained is True or - self.op.master_capable is False) - - if self.op.secondary_ip: - if not netutils.IP4Address.IsValid(self.op.secondary_ip): - raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" - " address" % self.op.secondary_ip, - errors.ECODE_INVAL) - - self.lock_all = self.op.auto_promote and self.might_demote - self.lock_instances = self.op.secondary_ip is not None - - def _InstanceFilter(self, instance): - """Filter for getting affected instances. - - """ - return (instance.disk_template in constants.DTS_INT_MIRROR and - self.op.node_name in instance.all_nodes) - - def ExpandNames(self): - if self.lock_all: - self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - - # Block allocations when all nodes are locked - locking.LEVEL_NODE_ALLOC: locking.ALL_SET, - } - else: - self.needed_locks = { - locking.LEVEL_NODE: self.op.node_name, - } - - # Since modifying a node can have severe effects on currently running - # operations the resource lock is at least acquired in shared mode - self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE] - - # Get all locks except nodes in shared mode; they are not used for anything - # but read-only access - self.share_locks = _ShareAll() - self.share_locks[locking.LEVEL_NODE] = 0 - self.share_locks[locking.LEVEL_NODE_RES] = 0 - self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 - - if self.lock_instances: - self.needed_locks[locking.LEVEL_INSTANCE] = \ - frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)) - - def BuildHooksEnv(self): - """Build hooks env. - - This runs on the master node. - - """ - return { - "OP_TARGET": self.op.node_name, - "MASTER_CANDIDATE": str(self.op.master_candidate), - "OFFLINE": str(self.op.offline), - "DRAINED": str(self.op.drained), - "MASTER_CAPABLE": str(self.op.master_capable), - "VM_CAPABLE": str(self.op.vm_capable), - } - - def BuildHooksNodes(self): - """Build hooks nodes. - - """ - nl = [self.cfg.GetMasterNode(), self.op.node_name] - return (nl, nl) - - def CheckPrereq(self): - """Check prerequisites. + for instance_name in owned_instances: + _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups) - This only checks the instance list against the existing names. + def _GetQueryData(self, lu): + """Computes the list of instances and their attributes. """ - node = self.node = self.cfg.GetNodeInfo(self.op.node_name) + if self.do_grouplocks: + self._CheckGroupLocks(lu) - if self.lock_instances: - affected_instances = \ - self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) + cluster = lu.cfg.GetClusterInfo() + all_info = lu.cfg.GetAllInstancesInfo() - # Verify instance locks - owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) - wanted_instances = frozenset(affected_instances.keys()) - if wanted_instances - owned_instances: - raise errors.OpPrereqError("Instances affected by changing node %s's" - " secondary IP address have changed since" - " locks were acquired, wanted '%s', have" - " '%s'; retry the operation" % - (self.op.node_name, - utils.CommaJoin(wanted_instances), - utils.CommaJoin(owned_instances)), - errors.ECODE_STATE) - else: - affected_instances = None - - if (self.op.master_candidate is not None or - self.op.drained is not None or - self.op.offline is not None): - # we can't change the master's node flags - if self.op.node_name == self.cfg.GetMasterNode(): - raise errors.OpPrereqError("The master role can be changed" - " only via master-failover", - errors.ECODE_INVAL) + instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE) - if self.op.master_candidate and not node.master_capable: - raise errors.OpPrereqError("Node %s is not master capable, cannot make" - " it a master candidate" % node.name, - errors.ECODE_STATE) + instance_list = [all_info[name] for name in instance_names] + nodes = frozenset(itertools.chain(*(inst.all_nodes + for inst in instance_list))) + hv_list = list(set([inst.hypervisor for inst in instance_list])) + bad_nodes = [] + offline_nodes = [] + wrongnode_inst = set() - if self.op.vm_capable is False: - (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name) - if ipri or isec: - raise errors.OpPrereqError("Node %s hosts instances, cannot unset" - " the vm_capable flag" % node.name, - errors.ECODE_STATE) + # Gather data as requested + if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]): + live_data = {} + node_data = lu.rpc.call_all_instances_info(nodes, hv_list) + for name in nodes: + result = node_data[name] + if result.offline: + # offline nodes will be in both lists + assert result.fail_msg + offline_nodes.append(name) + if result.fail_msg: + bad_nodes.append(name) + elif result.payload: + for inst in result.payload: + if inst in all_info: + if all_info[inst].primary_node == name: + live_data.update(result.payload) + else: + wrongnode_inst.add(inst) + else: + # orphan instance; we don't list it here as we don't + # handle this case yet in the output of instance listing + logging.warning("Orphan instance '%s' found on node %s", + inst, name) + # else no instance is alive + else: + live_data = {} - if node.master_candidate and self.might_demote and not self.lock_all: - assert not self.op.auto_promote, "auto_promote set but lock_all not" - # check if after removing the current node, we're missing master - # candidates - (mc_remaining, mc_should, _) = \ - self.cfg.GetMasterCandidateStats(exceptions=[node.name]) - if mc_remaining < mc_should: - raise errors.OpPrereqError("Not enough master candidates, please" - " pass auto promote option to allow" - " promotion (--auto-promote or RAPI" - " auto_promote=True)", errors.ECODE_STATE) - - self.old_flags = old_flags = (node.master_candidate, - node.drained, node.offline) - assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) - self.old_role = old_role = self._F2R[old_flags] - - # Check for ineffective changes - for attr in self._FLAGS: - if (getattr(self.op, attr) is False and getattr(node, attr) is False): - self.LogInfo("Ignoring request to unset flag %s, already unset", attr) - setattr(self.op, attr, None) - - # Past this point, any flag change to False means a transition - # away from the respective state, as only real changes are kept - - # TODO: We might query the real power state if it supports OOB - if _SupportsOob(self.cfg, node): - if self.op.offline is False and not (node.powered or - self.op.powered is True): - raise errors.OpPrereqError(("Node %s needs to be turned on before its" - " offline status can be reset") % - self.op.node_name, errors.ECODE_STATE) - elif self.op.powered is not None: - raise errors.OpPrereqError(("Unable to change powered state for node %s" - " as it does not support out-of-band" - " handling") % self.op.node_name, - errors.ECODE_STATE) + if query.IQ_DISKUSAGE in self.requested_data: + gmi = ganeti.masterd.instance + disk_usage = dict((inst.name, + gmi.ComputeDiskSize(inst.disk_template, + [{constants.IDISK_SIZE: disk.size} + for disk in inst.disks])) + for inst in instance_list) + else: + disk_usage = None - # If we're being deofflined/drained, we'll MC ourself if needed - if (self.op.drained is False or self.op.offline is False or - (self.op.master_capable and not node.master_capable)): - if _DecideSelfPromotion(self): - self.op.master_candidate = True - self.LogInfo("Auto-promoting node to master candidate") - - # If we're no longer master capable, we'll demote ourselves from MC - if self.op.master_capable is False and node.master_candidate: - self.LogInfo("Demoting from master candidate") - self.op.master_candidate = False - - # Compute new role - assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 - if self.op.master_candidate: - new_role = self._ROLE_CANDIDATE - elif self.op.drained: - new_role = self._ROLE_DRAINED - elif self.op.offline: - new_role = self._ROLE_OFFLINE - elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: - # False is still in new flags, which means we're un-setting (the - # only) True flag - new_role = self._ROLE_REGULAR - else: # no new flags, nothing, keep old role - new_role = old_role - - self.new_role = new_role - - if old_role == self._ROLE_OFFLINE and new_role != old_role: - # Trying to transition out of offline status - result = self.rpc.call_version([node.name])[node.name] - if result.fail_msg: - raise errors.OpPrereqError("Node %s is being de-offlined but fails" - " to report its version: %s" % - (node.name, result.fail_msg), - errors.ECODE_STATE) - else: - self.LogWarning("Transitioning node from offline to online state" - " without using re-add. Please make sure the node" - " is healthy!") - - # When changing the secondary ip, verify if this is a single-homed to - # multi-homed transition or vice versa, and apply the relevant - # restrictions. - if self.op.secondary_ip: - # Ok even without locking, because this can't be changed by any LU - master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) - master_singlehomed = master.secondary_ip == master.primary_ip - if master_singlehomed and self.op.secondary_ip != node.primary_ip: - if self.op.force and node.name == master.name: - self.LogWarning("Transitioning from single-homed to multi-homed" - " cluster; all nodes will require a secondary IP" - " address") - else: - raise errors.OpPrereqError("Changing the secondary ip on a" - " single-homed cluster requires the" - " --force option to be passed, and the" - " target node to be the master", - errors.ECODE_INVAL) - elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: - if self.op.force and node.name == master.name: - self.LogWarning("Transitioning from multi-homed to single-homed" - " cluster; secondary IP addresses will have to be" - " removed") + if query.IQ_CONSOLE in self.requested_data: + consinfo = {} + for inst in instance_list: + if inst.name in live_data: + # Instance is running + consinfo[inst.name] = _GetInstanceConsole(cluster, inst) else: - raise errors.OpPrereqError("Cannot set the secondary IP to be the" - " same as the primary IP on a multi-homed" - " cluster, unless the --force option is" - " passed, and the target node is the" - " master", errors.ECODE_INVAL) - - assert not (frozenset(affected_instances) - - self.owned_locks(locking.LEVEL_INSTANCE)) - - if node.offline: - if affected_instances: - msg = ("Cannot change secondary IP address: offline node has" - " instances (%s) configured to use it" % - utils.CommaJoin(affected_instances.keys())) - raise errors.OpPrereqError(msg, errors.ECODE_STATE) - else: - # On online nodes, check that no instances are running, and that - # the node has the new ip and we can reach it. - for instance in affected_instances.values(): - _CheckInstanceState(self, instance, INSTANCE_DOWN, - msg="cannot change secondary ip") - - _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True) - if master.name != node.name: - # check reachability from master secondary ip to new secondary ip - if not netutils.TcpPing(self.op.secondary_ip, - constants.DEFAULT_NODED_PORT, - source=master.secondary_ip): - raise errors.OpPrereqError("Node secondary ip not reachable by TCP" - " based ping to node daemon port", - errors.ECODE_ENVIRON) - - if self.op.ndparams: - new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams) - utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) - _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", - "node", "cluster or group") - self.new_ndparams = new_ndparams - - if self.op.hv_state: - self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, - self.node.hv_state_static) - - if self.op.disk_state: - self.new_disk_state = \ - _MergeAndVerifyDiskState(self.op.disk_state, - self.node.disk_state_static) - - def Exec(self, feedback_fn): - """Modifies a node. - - """ - node = self.node - old_role = self.old_role - new_role = self.new_role - - result = [] - - if self.op.ndparams: - node.ndparams = self.new_ndparams + consinfo[inst.name] = None + assert set(consinfo.keys()) == set(instance_names) + else: + consinfo = None - if self.op.powered is not None: - node.powered = self.op.powered + if query.IQ_NODES in self.requested_data: + node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"), + instance_list))) + nodes = dict(lu.cfg.GetMultiNodeInfo(node_names)) + groups = dict((uuid, lu.cfg.GetNodeGroup(uuid)) + for uuid in set(map(operator.attrgetter("group"), + nodes.values()))) + else: + nodes = None + groups = None - if self.op.hv_state: - node.hv_state_static = self.new_hv_state + if query.IQ_NETWORKS in self.requested_data: + net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name) + for i in instance_list)) + networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids) + else: + networks = None - if self.op.disk_state: - node.disk_state_static = self.new_disk_state + return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(), + disk_usage, offline_nodes, bad_nodes, + live_data, wrongnode_inst, consinfo, + nodes, groups, networks) - for attr in ["master_capable", "vm_capable"]: - val = getattr(self.op, attr) - if val is not None: - setattr(node, attr, val) - result.append((attr, str(val))) - if new_role != old_role: - # Tell the node to demote itself, if no longer MC and not offline - if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE: - msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg - if msg: - self.LogWarning("Node failed to demote itself: %s", msg) +class LUQuery(NoHooksLU): + """Query for resources/items of a certain kind. - new_flags = self._R2F[new_role] - for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): - if of != nf: - result.append((desc, str(nf))) - (node.master_candidate, node.drained, node.offline) = new_flags + """ + # pylint: disable=W0142 + REQ_BGL = False - # we locked all nodes, we adjust the CP before updating this node - if self.lock_all: - _AdjustCandidatePool(self, [node.name]) + def CheckArguments(self): + qcls = _GetQueryImplementation(self.op.what) - if self.op.secondary_ip: - node.secondary_ip = self.op.secondary_ip - result.append(("secondary_ip", self.op.secondary_ip)) + self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking) - # this will trigger configuration file update, if needed - self.cfg.Update(node, feedback_fn) + def ExpandNames(self): + self.impl.ExpandNames(self) - # this will trigger job queue propagation or cleanup if the mc - # flag changed - if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1: - self.context.ReaddNode(node) + def DeclareLocks(self, level): + self.impl.DeclareLocks(self, level) - return result + def Exec(self, feedback_fn): + return self.impl.NewStyleQuery(self) -class LUNodePowercycle(NoHooksLU): - """Powercycles a node. +class LUQueryFields(NoHooksLU): + """Query for resources/items of a certain kind. """ + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force: - raise errors.OpPrereqError("The node is the master and the force" - " parameter was not set", - errors.ECODE_INVAL) + self.qcls = _GetQueryImplementation(self.op.what) def ExpandNames(self): - """Locking for PowercycleNode. - - This is a last-resort option and shouldn't block on other - jobs. Therefore, we grab no locks. - - """ self.needed_locks = {} def Exec(self, feedback_fn): - """Reboots a node. - - """ - result = self.rpc.call_node_powercycle(self.op.node_name, - self.cfg.GetHypervisorType()) - result.Raise("Failed to schedule the reboot") - return result.payload + return query.QueryFields(self.qcls.FIELDS, self.op.fields) class LUInstanceActivateDisks(NoHooksLU): @@ -4414,70 +3087,6 @@ class LUInstanceMove(LogicalUnit): (instance.name, target_node, msg)) -class LUNodeMigrate(LogicalUnit): - """Migrate all instances from a node. - - """ - HPATH = "node-migrate" - HTYPE = constants.HTYPE_NODE - REQ_BGL = False - - def CheckArguments(self): - pass - - def ExpandNames(self): - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - - self.share_locks = _ShareAll() - self.needed_locks = { - locking.LEVEL_NODE: [self.op.node_name], - } - - def BuildHooksEnv(self): - """Build hooks env. - - This runs on the master, the primary and all the secondaries. - - """ - return { - "NODE_NAME": self.op.node_name, - "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, - } - - def BuildHooksNodes(self): - """Build hooks nodes. - - """ - nl = [self.cfg.GetMasterNode()] - return (nl, nl) - - def CheckPrereq(self): - pass - - def Exec(self, feedback_fn): - # Prepare jobs for migration instances - allow_runtime_changes = self.op.allow_runtime_changes - jobs = [ - [opcodes.OpInstanceMigrate(instance_name=inst.name, - mode=self.op.mode, - live=self.op.live, - iallocator=self.op.iallocator, - target_node=self.op.target_node, - allow_runtime_changes=allow_runtime_changes, - ignore_ipolicy=self.op.ignore_ipolicy)] - for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)] - - # TODO: Run iallocator in this opcode and pass correct placement options to - # OpInstanceMigrate. Since other jobs can modify the cluster between - # running the iallocator and the actual migration, a good consistency model - # will have to be found. - - assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == - frozenset([self.op.node_name])) - - return ResultWithJobs(jobs) - - class TLMigrateInstance(Tasklet): """Tasklet class for instance migration. @@ -8142,267 +6751,6 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) -class LURepairNodeStorage(NoHooksLU): - """Repairs the volume group on a node. - - """ - REQ_BGL = False - - def CheckArguments(self): - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - - storage_type = self.op.storage_type - - if (constants.SO_FIX_CONSISTENCY not in - constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): - raise errors.OpPrereqError("Storage units of type '%s' can not be" - " repaired" % storage_type, - errors.ECODE_INVAL) - - def ExpandNames(self): - self.needed_locks = { - locking.LEVEL_NODE: [self.op.node_name], - } - - def _CheckFaultyDisks(self, instance, node_name): - """Ensure faulty disks abort the opcode or at least warn.""" - try: - if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, - node_name, True): - raise errors.OpPrereqError("Instance '%s' has faulty disks on" - " node '%s'" % (instance.name, node_name), - errors.ECODE_STATE) - except errors.OpPrereqError, err: - if self.op.ignore_consistency: - self.LogWarning(str(err.args[0])) - else: - raise - - def CheckPrereq(self): - """Check prerequisites. - - """ - # Check whether any instance on this node has faulty disks - for inst in _GetNodeInstances(self.cfg, self.op.node_name): - if inst.admin_state != constants.ADMINST_UP: - continue - check_nodes = set(inst.all_nodes) - check_nodes.discard(self.op.node_name) - for inst_node_name in check_nodes: - self._CheckFaultyDisks(inst, inst_node_name) - - def Exec(self, feedback_fn): - feedback_fn("Repairing storage unit '%s' on %s ..." % - (self.op.name, self.op.node_name)) - - st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) - result = self.rpc.call_storage_execute(self.op.node_name, - self.op.storage_type, st_args, - self.op.name, - constants.SO_FIX_CONSISTENCY) - result.Raise("Failed to repair storage unit '%s' on %s" % - (self.op.name, self.op.node_name)) - - -class LUNodeEvacuate(NoHooksLU): - """Evacuates instances off a list of nodes. - - """ - REQ_BGL = False - - _MODE2IALLOCATOR = { - constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI, - constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC, - constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL, - } - assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES - assert (frozenset(_MODE2IALLOCATOR.values()) == - constants.IALLOCATOR_NEVAC_MODES) - - def CheckArguments(self): - _CheckIAllocatorOrNode(self, "iallocator", "remote_node") - - def ExpandNames(self): - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - - if self.op.remote_node is not None: - self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - assert self.op.remote_node - - if self.op.remote_node == self.op.node_name: - raise errors.OpPrereqError("Can not use evacuated node as a new" - " secondary node", errors.ECODE_INVAL) - - if self.op.mode != constants.NODE_EVAC_SEC: - raise errors.OpPrereqError("Without the use of an iallocator only" - " secondary instances can be evacuated", - errors.ECODE_INVAL) - - # Declare locks - self.share_locks = _ShareAll() - self.needed_locks = { - locking.LEVEL_INSTANCE: [], - locking.LEVEL_NODEGROUP: [], - locking.LEVEL_NODE: [], - } - - # Determine nodes (via group) optimistically, needs verification once locks - # have been acquired - self.lock_nodes = self._DetermineNodes() - - def _DetermineNodes(self): - """Gets the list of nodes to operate on. - - """ - if self.op.remote_node is None: - # Iallocator will choose any node(s) in the same group - group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name]) - else: - group_nodes = frozenset([self.op.remote_node]) - - # Determine nodes to be locked - return set([self.op.node_name]) | group_nodes - - def _DetermineInstances(self): - """Builds list of instances to operate on. - - """ - assert self.op.mode in constants.NODE_EVAC_MODES - - if self.op.mode == constants.NODE_EVAC_PRI: - # Primary instances only - inst_fn = _GetNodePrimaryInstances - assert self.op.remote_node is None, \ - "Evacuating primary instances requires iallocator" - elif self.op.mode == constants.NODE_EVAC_SEC: - # Secondary instances only - inst_fn = _GetNodeSecondaryInstances - else: - # All instances - assert self.op.mode == constants.NODE_EVAC_ALL - inst_fn = _GetNodeInstances - # TODO: In 2.6, change the iallocator interface to take an evacuation mode - # per instance - raise errors.OpPrereqError("Due to an issue with the iallocator" - " interface it is not possible to evacuate" - " all instances at once; specify explicitly" - " whether to evacuate primary or secondary" - " instances", - errors.ECODE_INVAL) - - return inst_fn(self.cfg, self.op.node_name) - - def DeclareLocks(self, level): - if level == locking.LEVEL_INSTANCE: - # Lock instances optimistically, needs verification once node and group - # locks have been acquired - self.needed_locks[locking.LEVEL_INSTANCE] = \ - set(i.name for i in self._DetermineInstances()) - - elif level == locking.LEVEL_NODEGROUP: - # Lock node groups for all potential target nodes optimistically, needs - # verification once nodes have been acquired - self.needed_locks[locking.LEVEL_NODEGROUP] = \ - self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) - - elif level == locking.LEVEL_NODE: - self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes - - def CheckPrereq(self): - # Verify locks - owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) - owned_nodes = self.owned_locks(locking.LEVEL_NODE) - owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) - - need_nodes = self._DetermineNodes() - - if not owned_nodes.issuperset(need_nodes): - raise errors.OpPrereqError("Nodes in same group as '%s' changed since" - " locks were acquired, current nodes are" - " are '%s', used to be '%s'; retry the" - " operation" % - (self.op.node_name, - utils.CommaJoin(need_nodes), - utils.CommaJoin(owned_nodes)), - errors.ECODE_STATE) - - wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) - if owned_groups != wanted_groups: - raise errors.OpExecError("Node groups changed since locks were acquired," - " current groups are '%s', used to be '%s';" - " retry the operation" % - (utils.CommaJoin(wanted_groups), - utils.CommaJoin(owned_groups))) - - # Determine affected instances - self.instances = self._DetermineInstances() - self.instance_names = [i.name for i in self.instances] - - if set(self.instance_names) != owned_instances: - raise errors.OpExecError("Instances on node '%s' changed since locks" - " were acquired, current instances are '%s'," - " used to be '%s'; retry the operation" % - (self.op.node_name, - utils.CommaJoin(self.instance_names), - utils.CommaJoin(owned_instances))) - - if self.instance_names: - self.LogInfo("Evacuating instances from node '%s': %s", - self.op.node_name, - utils.CommaJoin(utils.NiceSort(self.instance_names))) - else: - self.LogInfo("No instances to evacuate from node '%s'", - self.op.node_name) - - if self.op.remote_node is not None: - for i in self.instances: - if i.primary_node == self.op.remote_node: - raise errors.OpPrereqError("Node %s is the primary node of" - " instance %s, cannot use it as" - " secondary" % - (self.op.remote_node, i.name), - errors.ECODE_INVAL) - - def Exec(self, feedback_fn): - assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) - - if not self.instance_names: - # No instances to evacuate - jobs = [] - - elif self.op.iallocator is not None: - # TODO: Implement relocation to other group - evac_mode = self._MODE2IALLOCATOR[self.op.mode] - req = iallocator.IAReqNodeEvac(evac_mode=evac_mode, - instances=list(self.instance_names)) - ial = iallocator.IAllocator(self.cfg, self.rpc, req) - - ial.Run(self.op.iallocator) - - if not ial.success: - raise errors.OpPrereqError("Can't compute node evacuation using" - " iallocator '%s': %s" % - (self.op.iallocator, ial.info), - errors.ECODE_NORES) - - jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True) - - elif self.op.remote_node is not None: - assert self.op.mode == constants.NODE_EVAC_SEC - jobs = [ - [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, - remote_node=self.op.remote_node, - disks=[], - mode=constants.REPLACE_DISK_CHG, - early_release=self.op.early_release)] - for instance_name in self.instance_names] - - else: - raise errors.ProgrammerError("No iallocator or remote node") - - return ResultWithJobs(jobs) - - def _DiskSizeInBytesToMebibytes(lu, size): """Converts a disk size in bytes to mebibytes. diff --git a/lib/cmdlib/common.py b/lib/cmdlib/common.py index 5d630c6d3..2d675b835 100644 --- a/lib/cmdlib/common.py +++ b/lib/cmdlib/common.py @@ -37,6 +37,17 @@ from ganeti import ssconf from ganeti import utils +# States of instance +INSTANCE_DOWN = [constants.ADMINST_DOWN] +INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] +INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE] + +#: Instance status in which an instance can be marked as offline/online +CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([ + constants.ADMINST_OFFLINE, + ])) + + def _ExpandItemName(fn, name, kind): """Expand an item name. @@ -873,3 +884,131 @@ def _MapInstanceDisksToNodes(instances): for inst in instances for (node, vols) in inst.MapLVsByNode().items() for vol in vols) + + +def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels): + """Make sure that none of the given paramters is global. + + If a global parameter is found, an L{errors.OpPrereqError} exception is + raised. This is used to avoid setting global parameters for individual nodes. + + @type params: dictionary + @param params: Parameters to check + @type glob_pars: dictionary + @param glob_pars: Forbidden parameters + @type kind: string + @param kind: Kind of parameters (e.g. "node") + @type bad_levels: string + @param bad_levels: Level(s) at which the parameters are forbidden (e.g. + "instance") + @type good_levels: strings + @param good_levels: Level(s) at which the parameters are allowed (e.g. + "cluster or group") + + """ + used_globals = glob_pars.intersection(params) + if used_globals: + msg = ("The following %s parameters are global and cannot" + " be customized at %s level, please modify them at" + " %s level: %s" % + (kind, bad_levels, good_levels, utils.CommaJoin(used_globals))) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + + +def _IsExclusiveStorageEnabledNode(cfg, node): + """Whether exclusive_storage is in effect for the given node. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type node: L{objects.Node} + @param node: The node + @rtype: bool + @return: The effective value of exclusive_storage + + """ + return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE] + + +def _CheckInstanceState(lu, instance, req_states, msg=None): + """Ensure that an instance is in one of the required states. + + @param lu: the LU on behalf of which we make the check + @param instance: the instance to check + @param msg: if passed, should be a message to replace the default one + @raise errors.OpPrereqError: if the instance is not in the required state + + """ + if msg is None: + msg = ("can't use instance from outside %s states" % + utils.CommaJoin(req_states)) + if instance.admin_state not in req_states: + raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % + (instance.name, instance.admin_state, msg), + errors.ECODE_STATE) + + if constants.ADMINST_UP not in req_states: + pnode = instance.primary_node + if not lu.cfg.GetNodeInfo(pnode).offline: + ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] + ins_l.Raise("Can't contact node %s for instance information" % pnode, + prereq=True, ecode=errors.ECODE_ENVIRON) + if instance.name in ins_l.payload: + raise errors.OpPrereqError("Instance %s is running, %s" % + (instance.name, msg), errors.ECODE_STATE) + else: + lu.LogWarning("Primary node offline, ignoring check that instance" + " is down") + + +def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): + """Check the sanity of iallocator and node arguments and use the + cluster-wide iallocator if appropriate. + + Check that at most one of (iallocator, node) is specified. If none is + specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT}, + then the LU's opcode's iallocator slot is filled with the cluster-wide + default iallocator. + + @type iallocator_slot: string + @param iallocator_slot: the name of the opcode iallocator slot + @type node_slot: string + @param node_slot: the name of the opcode target node slot + + """ + node = getattr(lu.op, node_slot, None) + ialloc = getattr(lu.op, iallocator_slot, None) + if node == []: + node = None + + if node is not None and ialloc is not None: + raise errors.OpPrereqError("Do not specify both, iallocator and node", + errors.ECODE_INVAL) + elif ((node is None and ialloc is None) or + ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT): + default_iallocator = lu.cfg.GetDefaultIAllocator() + if default_iallocator: + setattr(lu.op, iallocator_slot, default_iallocator) + else: + raise errors.OpPrereqError("No iallocator or node given and no" + " cluster-wide default iallocator found;" + " please specify either an iallocator or a" + " node, or set a cluster-wide default" + " iallocator", errors.ECODE_INVAL) + + +def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq): + faulty = [] + + for dev in instance.disks: + cfg.SetDiskID(dev, node_name) + + result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks, + instance)) + result.Raise("Failed to get disk status from node %s" % node_name, + prereq=prereq, ecode=errors.ECODE_ENVIRON) + + for idx, bdev_status in enumerate(result.payload): + if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: + faulty.append(idx) + + return faulty diff --git a/lib/cmdlib/node.py b/lib/cmdlib/node.py new file mode 100644 index 000000000..b186059d0 --- /dev/null +++ b/lib/cmdlib/node.py @@ -0,0 +1,1569 @@ +# +# + +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Logical units dealing with nodes.""" + +import logging +import operator + +from ganeti import constants +from ganeti import errors +from ganeti import locking +from ganeti import netutils +from ganeti import objects +from ganeti import opcodes +from ganeti import qlang +from ganeti import query +from ganeti import rpc +from ganeti import utils +from ganeti.masterd import iallocator + +from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, _QueryBase, \ + ResultWithJobs +from ganeti.cmdlib.common import _CheckParamsNotGlobal, \ + _MergeAndVerifyHvState, _MergeAndVerifyDiskState, \ + _IsExclusiveStorageEnabledNode, _CheckNodePVs, \ + _RedistributeAncillaryFiles, _ExpandNodeName, _ShareAll, _SupportsOob, \ + _CheckInstanceState, INSTANCE_DOWN, _GetUpdatedParams, \ + _AdjustCandidatePool, _CheckIAllocatorOrNode, _LoadNodeEvacResult, \ + _GetWantedNodes, _MapInstanceDisksToNodes, _RunPostHook, \ + _FindFaultyInstanceDisks + + +def _DecideSelfPromotion(lu, exceptions=None): + """Decide whether I should promote myself as a master candidate. + + """ + cp_size = lu.cfg.GetClusterInfo().candidate_pool_size + mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) + # the new node will increase mc_max with one, so: + mc_should = min(mc_should + 1, cp_size) + return mc_now < mc_should + + +def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq): + """Ensure that a node has the given secondary ip. + + @type lu: L{LogicalUnit} + @param lu: the LU on behalf of which we make the check + @type node: string + @param node: the node to check + @type secondary_ip: string + @param secondary_ip: the ip to check + @type prereq: boolean + @param prereq: whether to throw a prerequisite or an execute error + @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True + @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False + + """ + result = lu.rpc.call_node_has_ip_address(node, secondary_ip) + result.Raise("Failure checking secondary ip on node %s" % node, + prereq=prereq, ecode=errors.ECODE_ENVIRON) + if not result.payload: + msg = ("Node claims it doesn't have the secondary ip you gave (%s)," + " please fix and re-run this command" % secondary_ip) + if prereq: + raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) + else: + raise errors.OpExecError(msg) + + +class LUNodeAdd(LogicalUnit): + """Logical unit for adding node to the cluster. + + """ + HPATH = "node-add" + HTYPE = constants.HTYPE_NODE + _NFLAGS = ["master_capable", "vm_capable"] + + def CheckArguments(self): + self.primary_ip_family = self.cfg.GetPrimaryIPFamily() + # validate/normalize the node name + self.hostname = netutils.GetHostname(name=self.op.node_name, + family=self.primary_ip_family) + self.op.node_name = self.hostname.name + + if self.op.readd and self.op.node_name == self.cfg.GetMasterNode(): + raise errors.OpPrereqError("Cannot readd the master node", + errors.ECODE_STATE) + + if self.op.readd and self.op.group: + raise errors.OpPrereqError("Cannot pass a node group when a node is" + " being readded", errors.ECODE_INVAL) + + def BuildHooksEnv(self): + """Build hooks env. + + This will run on all nodes before, and on all nodes + the new node after. + + """ + return { + "OP_TARGET": self.op.node_name, + "NODE_NAME": self.op.node_name, + "NODE_PIP": self.op.primary_ip, + "NODE_SIP": self.op.secondary_ip, + "MASTER_CAPABLE": str(self.op.master_capable), + "VM_CAPABLE": str(self.op.vm_capable), + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + # Exclude added node + pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name])) + post_nodes = pre_nodes + [self.op.node_name, ] + + return (pre_nodes, post_nodes) + + def CheckPrereq(self): + """Check prerequisites. + + This checks: + - the new node is not already in the config + - it is resolvable + - its parameters (single/dual homed) matches the cluster + + Any errors are signaled by raising errors.OpPrereqError. + + """ + cfg = self.cfg + hostname = self.hostname + node = hostname.name + primary_ip = self.op.primary_ip = hostname.ip + if self.op.secondary_ip is None: + if self.primary_ip_family == netutils.IP6Address.family: + raise errors.OpPrereqError("When using a IPv6 primary address, a valid" + " IPv4 address must be given as secondary", + errors.ECODE_INVAL) + self.op.secondary_ip = primary_ip + + secondary_ip = self.op.secondary_ip + if not netutils.IP4Address.IsValid(secondary_ip): + raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" + " address" % secondary_ip, errors.ECODE_INVAL) + + node_list = cfg.GetNodeList() + if not self.op.readd and node in node_list: + raise errors.OpPrereqError("Node %s is already in the configuration" % + node, errors.ECODE_EXISTS) + elif self.op.readd and node not in node_list: + raise errors.OpPrereqError("Node %s is not in the configuration" % node, + errors.ECODE_NOENT) + + self.changed_primary_ip = False + + for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list): + if self.op.readd and node == existing_node_name: + if existing_node.secondary_ip != secondary_ip: + raise errors.OpPrereqError("Readded node doesn't have the same IP" + " address configuration as before", + errors.ECODE_INVAL) + if existing_node.primary_ip != primary_ip: + self.changed_primary_ip = True + + continue + + if (existing_node.primary_ip == primary_ip or + existing_node.secondary_ip == primary_ip or + existing_node.primary_ip == secondary_ip or + existing_node.secondary_ip == secondary_ip): + raise errors.OpPrereqError("New node ip address(es) conflict with" + " existing node %s" % existing_node.name, + errors.ECODE_NOTUNIQUE) + + # After this 'if' block, None is no longer a valid value for the + # _capable op attributes + if self.op.readd: + old_node = self.cfg.GetNodeInfo(node) + assert old_node is not None, "Can't retrieve locked node %s" % node + for attr in self._NFLAGS: + if getattr(self.op, attr) is None: + setattr(self.op, attr, getattr(old_node, attr)) + else: + for attr in self._NFLAGS: + if getattr(self.op, attr) is None: + setattr(self.op, attr, True) + + if self.op.readd and not self.op.vm_capable: + pri, sec = cfg.GetNodeInstances(node) + if pri or sec: + raise errors.OpPrereqError("Node %s being re-added with vm_capable" + " flag set to false, but it already holds" + " instances" % node, + errors.ECODE_STATE) + + # check that the type of the node (single versus dual homed) is the + # same as for the master + myself = cfg.GetNodeInfo(self.cfg.GetMasterNode()) + master_singlehomed = myself.secondary_ip == myself.primary_ip + newbie_singlehomed = secondary_ip == primary_ip + if master_singlehomed != newbie_singlehomed: + if master_singlehomed: + raise errors.OpPrereqError("The master has no secondary ip but the" + " new node has one", + errors.ECODE_INVAL) + else: + raise errors.OpPrereqError("The master has a secondary ip but the" + " new node doesn't have one", + errors.ECODE_INVAL) + + # checks reachability + if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): + raise errors.OpPrereqError("Node not reachable by ping", + errors.ECODE_ENVIRON) + + if not newbie_singlehomed: + # check reachability from my secondary ip to newbie's secondary ip + if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, + source=myself.secondary_ip): + raise errors.OpPrereqError("Node secondary ip not reachable by TCP" + " based ping to node daemon port", + errors.ECODE_ENVIRON) + + if self.op.readd: + exceptions = [node] + else: + exceptions = [] + + if self.op.master_capable: + self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) + else: + self.master_candidate = False + + if self.op.readd: + self.new_node = old_node + else: + node_group = cfg.LookupNodeGroup(self.op.group) + self.new_node = objects.Node(name=node, + primary_ip=primary_ip, + secondary_ip=secondary_ip, + master_candidate=self.master_candidate, + offline=False, drained=False, + group=node_group, ndparams={}) + + if self.op.ndparams: + utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", + "node", "cluster or group") + + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None) + + if self.op.disk_state: + self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None) + + # TODO: If we need to have multiple DnsOnlyRunner we probably should make + # it a property on the base class. + rpcrunner = rpc.DnsOnlyRunner() + result = rpcrunner.call_version([node])[node] + result.Raise("Can't get version information from node %s" % node) + if constants.PROTOCOL_VERSION == result.payload: + logging.info("Communication to node %s fine, sw version %s match", + node, result.payload) + else: + raise errors.OpPrereqError("Version mismatch master version %s," + " node version %s" % + (constants.PROTOCOL_VERSION, result.payload), + errors.ECODE_ENVIRON) + + vg_name = cfg.GetVGName() + if vg_name is not None: + vparams = {constants.NV_PVLIST: [vg_name]} + excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node) + cname = self.cfg.GetClusterName() + result = rpcrunner.call_node_verify_light([node], vparams, cname)[node] + (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor) + if errmsgs: + raise errors.OpPrereqError("Checks on node PVs failed: %s" % + "; ".join(errmsgs), errors.ECODE_ENVIRON) + + def Exec(self, feedback_fn): + """Adds the new node to the cluster. + + """ + new_node = self.new_node + node = new_node.name + + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + + # We adding a new node so we assume it's powered + new_node.powered = True + + # for re-adds, reset the offline/drained/master-candidate flags; + # we need to reset here, otherwise offline would prevent RPC calls + # later in the procedure; this also means that if the re-add + # fails, we are left with a non-offlined, broken node + if self.op.readd: + new_node.drained = new_node.offline = False # pylint: disable=W0201 + self.LogInfo("Readding a node, the offline/drained flags were reset") + # if we demote the node, we do cleanup later in the procedure + new_node.master_candidate = self.master_candidate + if self.changed_primary_ip: + new_node.primary_ip = self.op.primary_ip + + # copy the master/vm_capable flags + for attr in self._NFLAGS: + setattr(new_node, attr, getattr(self.op, attr)) + + # notify the user about any possible mc promotion + if new_node.master_candidate: + self.LogInfo("Node will be a master candidate") + + if self.op.ndparams: + new_node.ndparams = self.op.ndparams + else: + new_node.ndparams = {} + + if self.op.hv_state: + new_node.hv_state_static = self.new_hv_state + + if self.op.disk_state: + new_node.disk_state_static = self.new_disk_state + + # Add node to our /etc/hosts, and add key to known_hosts + if self.cfg.GetClusterInfo().modify_etc_hosts: + master_node = self.cfg.GetMasterNode() + result = self.rpc.call_etc_hosts_modify(master_node, + constants.ETC_HOSTS_ADD, + self.hostname.name, + self.hostname.ip) + result.Raise("Can't update hosts file with new host data") + + if new_node.secondary_ip != new_node.primary_ip: + _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip, + False) + + node_verify_list = [self.cfg.GetMasterNode()] + node_verify_param = { + constants.NV_NODELIST: ([node], {}), + # TODO: do a node-net-test as well? + } + + result = self.rpc.call_node_verify(node_verify_list, node_verify_param, + self.cfg.GetClusterName()) + for verifier in node_verify_list: + result[verifier].Raise("Cannot communicate with node %s" % verifier) + nl_payload = result[verifier].payload[constants.NV_NODELIST] + if nl_payload: + for failed in nl_payload: + feedback_fn("ssh/hostname verification failed" + " (checking from %s): %s" % + (verifier, nl_payload[failed])) + raise errors.OpExecError("ssh/hostname verification failed") + + if self.op.readd: + _RedistributeAncillaryFiles(self) + self.context.ReaddNode(new_node) + # make sure we redistribute the config + self.cfg.Update(new_node, feedback_fn) + # and make sure the new node will not have old files around + if not new_node.master_candidate: + result = self.rpc.call_node_demote_from_mc(new_node.name) + msg = result.fail_msg + if msg: + self.LogWarning("Node failed to demote itself from master" + " candidate status: %s" % msg) + else: + _RedistributeAncillaryFiles(self, additional_nodes=[node], + additional_vm=self.op.vm_capable) + self.context.AddNode(new_node, self.proc.GetECId()) + + +class LUNodeSetParams(LogicalUnit): + """Modifies the parameters of a node. + + @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) + to the node role (as _ROLE_*) + @cvar _R2F: a dictionary from node role to tuples of flags + @cvar _FLAGS: a list of attribute names corresponding to the flags + + """ + HPATH = "node-modify" + HTYPE = constants.HTYPE_NODE + REQ_BGL = False + (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) + _F2R = { + (True, False, False): _ROLE_CANDIDATE, + (False, True, False): _ROLE_DRAINED, + (False, False, True): _ROLE_OFFLINE, + (False, False, False): _ROLE_REGULAR, + } + _R2F = dict((v, k) for k, v in _F2R.items()) + _FLAGS = ["master_candidate", "drained", "offline"] + + def CheckArguments(self): + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, + self.op.master_capable, self.op.vm_capable, + self.op.secondary_ip, self.op.ndparams, self.op.hv_state, + self.op.disk_state] + if all_mods.count(None) == len(all_mods): + raise errors.OpPrereqError("Please pass at least one modification", + errors.ECODE_INVAL) + if all_mods.count(True) > 1: + raise errors.OpPrereqError("Can't set the node into more than one" + " state at the same time", + errors.ECODE_INVAL) + + # Boolean value that tells us whether we might be demoting from MC + self.might_demote = (self.op.master_candidate is False or + self.op.offline is True or + self.op.drained is True or + self.op.master_capable is False) + + if self.op.secondary_ip: + if not netutils.IP4Address.IsValid(self.op.secondary_ip): + raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" + " address" % self.op.secondary_ip, + errors.ECODE_INVAL) + + self.lock_all = self.op.auto_promote and self.might_demote + self.lock_instances = self.op.secondary_ip is not None + + def _InstanceFilter(self, instance): + """Filter for getting affected instances. + + """ + return (instance.disk_template in constants.DTS_INT_MIRROR and + self.op.node_name in instance.all_nodes) + + def ExpandNames(self): + if self.lock_all: + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + + # Block allocations when all nodes are locked + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } + else: + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } + + # Since modifying a node can have severe effects on currently running + # operations the resource lock is at least acquired in shared mode + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE] + + # Get all locks except nodes in shared mode; they are not used for anything + # but read-only access + self.share_locks = _ShareAll() + self.share_locks[locking.LEVEL_NODE] = 0 + self.share_locks[locking.LEVEL_NODE_RES] = 0 + self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 + + if self.lock_instances: + self.needed_locks[locking.LEVEL_INSTANCE] = \ + frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)) + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master node. + + """ + return { + "OP_TARGET": self.op.node_name, + "MASTER_CANDIDATE": str(self.op.master_candidate), + "OFFLINE": str(self.op.offline), + "DRAINED": str(self.op.drained), + "MASTER_CAPABLE": str(self.op.master_capable), + "VM_CAPABLE": str(self.op.vm_capable), + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + nl = [self.cfg.GetMasterNode(), self.op.node_name] + return (nl, nl) + + def CheckPrereq(self): + """Check prerequisites. + + This only checks the instance list against the existing names. + + """ + node = self.node = self.cfg.GetNodeInfo(self.op.node_name) + + if self.lock_instances: + affected_instances = \ + self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) + + # Verify instance locks + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + wanted_instances = frozenset(affected_instances.keys()) + if wanted_instances - owned_instances: + raise errors.OpPrereqError("Instances affected by changing node %s's" + " secondary IP address have changed since" + " locks were acquired, wanted '%s', have" + " '%s'; retry the operation" % + (self.op.node_name, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + else: + affected_instances = None + + if (self.op.master_candidate is not None or + self.op.drained is not None or + self.op.offline is not None): + # we can't change the master's node flags + if self.op.node_name == self.cfg.GetMasterNode(): + raise errors.OpPrereqError("The master role can be changed" + " only via master-failover", + errors.ECODE_INVAL) + + if self.op.master_candidate and not node.master_capable: + raise errors.OpPrereqError("Node %s is not master capable, cannot make" + " it a master candidate" % node.name, + errors.ECODE_STATE) + + if self.op.vm_capable is False: + (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name) + if ipri or isec: + raise errors.OpPrereqError("Node %s hosts instances, cannot unset" + " the vm_capable flag" % node.name, + errors.ECODE_STATE) + + if node.master_candidate and self.might_demote and not self.lock_all: + assert not self.op.auto_promote, "auto_promote set but lock_all not" + # check if after removing the current node, we're missing master + # candidates + (mc_remaining, mc_should, _) = \ + self.cfg.GetMasterCandidateStats(exceptions=[node.name]) + if mc_remaining < mc_should: + raise errors.OpPrereqError("Not enough master candidates, please" + " pass auto promote option to allow" + " promotion (--auto-promote or RAPI" + " auto_promote=True)", errors.ECODE_STATE) + + self.old_flags = old_flags = (node.master_candidate, + node.drained, node.offline) + assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) + self.old_role = old_role = self._F2R[old_flags] + + # Check for ineffective changes + for attr in self._FLAGS: + if (getattr(self.op, attr) is False and getattr(node, attr) is False): + self.LogInfo("Ignoring request to unset flag %s, already unset", attr) + setattr(self.op, attr, None) + + # Past this point, any flag change to False means a transition + # away from the respective state, as only real changes are kept + + # TODO: We might query the real power state if it supports OOB + if _SupportsOob(self.cfg, node): + if self.op.offline is False and not (node.powered or + self.op.powered is True): + raise errors.OpPrereqError(("Node %s needs to be turned on before its" + " offline status can be reset") % + self.op.node_name, errors.ECODE_STATE) + elif self.op.powered is not None: + raise errors.OpPrereqError(("Unable to change powered state for node %s" + " as it does not support out-of-band" + " handling") % self.op.node_name, + errors.ECODE_STATE) + + # If we're being deofflined/drained, we'll MC ourself if needed + if (self.op.drained is False or self.op.offline is False or + (self.op.master_capable and not node.master_capable)): + if _DecideSelfPromotion(self): + self.op.master_candidate = True + self.LogInfo("Auto-promoting node to master candidate") + + # If we're no longer master capable, we'll demote ourselves from MC + if self.op.master_capable is False and node.master_candidate: + self.LogInfo("Demoting from master candidate") + self.op.master_candidate = False + + # Compute new role + assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 + if self.op.master_candidate: + new_role = self._ROLE_CANDIDATE + elif self.op.drained: + new_role = self._ROLE_DRAINED + elif self.op.offline: + new_role = self._ROLE_OFFLINE + elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: + # False is still in new flags, which means we're un-setting (the + # only) True flag + new_role = self._ROLE_REGULAR + else: # no new flags, nothing, keep old role + new_role = old_role + + self.new_role = new_role + + if old_role == self._ROLE_OFFLINE and new_role != old_role: + # Trying to transition out of offline status + result = self.rpc.call_version([node.name])[node.name] + if result.fail_msg: + raise errors.OpPrereqError("Node %s is being de-offlined but fails" + " to report its version: %s" % + (node.name, result.fail_msg), + errors.ECODE_STATE) + else: + self.LogWarning("Transitioning node from offline to online state" + " without using re-add. Please make sure the node" + " is healthy!") + + # When changing the secondary ip, verify if this is a single-homed to + # multi-homed transition or vice versa, and apply the relevant + # restrictions. + if self.op.secondary_ip: + # Ok even without locking, because this can't be changed by any LU + master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) + master_singlehomed = master.secondary_ip == master.primary_ip + if master_singlehomed and self.op.secondary_ip != node.primary_ip: + if self.op.force and node.name == master.name: + self.LogWarning("Transitioning from single-homed to multi-homed" + " cluster; all nodes will require a secondary IP" + " address") + else: + raise errors.OpPrereqError("Changing the secondary ip on a" + " single-homed cluster requires the" + " --force option to be passed, and the" + " target node to be the master", + errors.ECODE_INVAL) + elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: + if self.op.force and node.name == master.name: + self.LogWarning("Transitioning from multi-homed to single-homed" + " cluster; secondary IP addresses will have to be" + " removed") + else: + raise errors.OpPrereqError("Cannot set the secondary IP to be the" + " same as the primary IP on a multi-homed" + " cluster, unless the --force option is" + " passed, and the target node is the" + " master", errors.ECODE_INVAL) + + assert not (frozenset(affected_instances) - + self.owned_locks(locking.LEVEL_INSTANCE)) + + if node.offline: + if affected_instances: + msg = ("Cannot change secondary IP address: offline node has" + " instances (%s) configured to use it" % + utils.CommaJoin(affected_instances.keys())) + raise errors.OpPrereqError(msg, errors.ECODE_STATE) + else: + # On online nodes, check that no instances are running, and that + # the node has the new ip and we can reach it. + for instance in affected_instances.values(): + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot change secondary ip") + + _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True) + if master.name != node.name: + # check reachability from master secondary ip to new secondary ip + if not netutils.TcpPing(self.op.secondary_ip, + constants.DEFAULT_NODED_PORT, + source=master.secondary_ip): + raise errors.OpPrereqError("Node secondary ip not reachable by TCP" + " based ping to node daemon port", + errors.ECODE_ENVIRON) + + if self.op.ndparams: + new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams) + utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) + _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", + "node", "cluster or group") + self.new_ndparams = new_ndparams + + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, + self.node.hv_state_static) + + if self.op.disk_state: + self.new_disk_state = \ + _MergeAndVerifyDiskState(self.op.disk_state, + self.node.disk_state_static) + + def Exec(self, feedback_fn): + """Modifies a node. + + """ + node = self.node + old_role = self.old_role + new_role = self.new_role + + result = [] + + if self.op.ndparams: + node.ndparams = self.new_ndparams + + if self.op.powered is not None: + node.powered = self.op.powered + + if self.op.hv_state: + node.hv_state_static = self.new_hv_state + + if self.op.disk_state: + node.disk_state_static = self.new_disk_state + + for attr in ["master_capable", "vm_capable"]: + val = getattr(self.op, attr) + if val is not None: + setattr(node, attr, val) + result.append((attr, str(val))) + + if new_role != old_role: + # Tell the node to demote itself, if no longer MC and not offline + if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE: + msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg + if msg: + self.LogWarning("Node failed to demote itself: %s", msg) + + new_flags = self._R2F[new_role] + for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): + if of != nf: + result.append((desc, str(nf))) + (node.master_candidate, node.drained, node.offline) = new_flags + + # we locked all nodes, we adjust the CP before updating this node + if self.lock_all: + _AdjustCandidatePool(self, [node.name]) + + if self.op.secondary_ip: + node.secondary_ip = self.op.secondary_ip + result.append(("secondary_ip", self.op.secondary_ip)) + + # this will trigger configuration file update, if needed + self.cfg.Update(node, feedback_fn) + + # this will trigger job queue propagation or cleanup if the mc + # flag changed + if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1: + self.context.ReaddNode(node) + + return result + + +class LUNodePowercycle(NoHooksLU): + """Powercycles a node. + + """ + REQ_BGL = False + + def CheckArguments(self): + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force: + raise errors.OpPrereqError("The node is the master and the force" + " parameter was not set", + errors.ECODE_INVAL) + + def ExpandNames(self): + """Locking for PowercycleNode. + + This is a last-resort option and shouldn't block on other + jobs. Therefore, we grab no locks. + + """ + self.needed_locks = {} + + def Exec(self, feedback_fn): + """Reboots a node. + + """ + result = self.rpc.call_node_powercycle(self.op.node_name, + self.cfg.GetHypervisorType()) + result.Raise("Failed to schedule the reboot") + return result.payload + + +def _GetNodeInstancesInner(cfg, fn): + return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] + + +def _GetNodePrimaryInstances(cfg, node_name): + """Returns primary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name == inst.primary_node) + + +def _GetNodeSecondaryInstances(cfg, node_name): + """Returns secondary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name in inst.secondary_nodes) + + +def _GetNodeInstances(cfg, node_name): + """Returns a list of all primary and secondary instances on a node. + + """ + + return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes) + + +class LUNodeEvacuate(NoHooksLU): + """Evacuates instances off a list of nodes. + + """ + REQ_BGL = False + + _MODE2IALLOCATOR = { + constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI, + constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC, + constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL, + } + assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES + assert (frozenset(_MODE2IALLOCATOR.values()) == + constants.IALLOCATOR_NEVAC_MODES) + + def CheckArguments(self): + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + + def ExpandNames(self): + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + + if self.op.remote_node is not None: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) + assert self.op.remote_node + + if self.op.remote_node == self.op.node_name: + raise errors.OpPrereqError("Can not use evacuated node as a new" + " secondary node", errors.ECODE_INVAL) + + if self.op.mode != constants.NODE_EVAC_SEC: + raise errors.OpPrereqError("Without the use of an iallocator only" + " secondary instances can be evacuated", + errors.ECODE_INVAL) + + # Declare locks + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + # Determine nodes (via group) optimistically, needs verification once locks + # have been acquired + self.lock_nodes = self._DetermineNodes() + + def _DetermineNodes(self): + """Gets the list of nodes to operate on. + + """ + if self.op.remote_node is None: + # Iallocator will choose any node(s) in the same group + group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name]) + else: + group_nodes = frozenset([self.op.remote_node]) + + # Determine nodes to be locked + return set([self.op.node_name]) | group_nodes + + def _DetermineInstances(self): + """Builds list of instances to operate on. + + """ + assert self.op.mode in constants.NODE_EVAC_MODES + + if self.op.mode == constants.NODE_EVAC_PRI: + # Primary instances only + inst_fn = _GetNodePrimaryInstances + assert self.op.remote_node is None, \ + "Evacuating primary instances requires iallocator" + elif self.op.mode == constants.NODE_EVAC_SEC: + # Secondary instances only + inst_fn = _GetNodeSecondaryInstances + else: + # All instances + assert self.op.mode == constants.NODE_EVAC_ALL + inst_fn = _GetNodeInstances + # TODO: In 2.6, change the iallocator interface to take an evacuation mode + # per instance + raise errors.OpPrereqError("Due to an issue with the iallocator" + " interface it is not possible to evacuate" + " all instances at once; specify explicitly" + " whether to evacuate primary or secondary" + " instances", + errors.ECODE_INVAL) + + return inst_fn(self.cfg, self.op.node_name) + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + set(i.name for i in self._DetermineInstances()) + + elif level == locking.LEVEL_NODEGROUP: + # Lock node groups for all potential target nodes optimistically, needs + # verification once nodes have been acquired + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) + + elif level == locking.LEVEL_NODE: + self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes + + def CheckPrereq(self): + # Verify locks + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + owned_nodes = self.owned_locks(locking.LEVEL_NODE) + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) + + need_nodes = self._DetermineNodes() + + if not owned_nodes.issuperset(need_nodes): + raise errors.OpPrereqError("Nodes in same group as '%s' changed since" + " locks were acquired, current nodes are" + " are '%s', used to be '%s'; retry the" + " operation" % + (self.op.node_name, + utils.CommaJoin(need_nodes), + utils.CommaJoin(owned_nodes)), + errors.ECODE_STATE) + + wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) + if owned_groups != wanted_groups: + raise errors.OpExecError("Node groups changed since locks were acquired," + " current groups are '%s', used to be '%s';" + " retry the operation" % + (utils.CommaJoin(wanted_groups), + utils.CommaJoin(owned_groups))) + + # Determine affected instances + self.instances = self._DetermineInstances() + self.instance_names = [i.name for i in self.instances] + + if set(self.instance_names) != owned_instances: + raise errors.OpExecError("Instances on node '%s' changed since locks" + " were acquired, current instances are '%s'," + " used to be '%s'; retry the operation" % + (self.op.node_name, + utils.CommaJoin(self.instance_names), + utils.CommaJoin(owned_instances))) + + if self.instance_names: + self.LogInfo("Evacuating instances from node '%s': %s", + self.op.node_name, + utils.CommaJoin(utils.NiceSort(self.instance_names))) + else: + self.LogInfo("No instances to evacuate from node '%s'", + self.op.node_name) + + if self.op.remote_node is not None: + for i in self.instances: + if i.primary_node == self.op.remote_node: + raise errors.OpPrereqError("Node %s is the primary node of" + " instance %s, cannot use it as" + " secondary" % + (self.op.remote_node, i.name), + errors.ECODE_INVAL) + + def Exec(self, feedback_fn): + assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) + + if not self.instance_names: + # No instances to evacuate + jobs = [] + + elif self.op.iallocator is not None: + # TODO: Implement relocation to other group + evac_mode = self._MODE2IALLOCATOR[self.op.mode] + req = iallocator.IAReqNodeEvac(evac_mode=evac_mode, + instances=list(self.instance_names)) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute node evacuation using" + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True) + + elif self.op.remote_node is not None: + assert self.op.mode == constants.NODE_EVAC_SEC + jobs = [ + [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, + remote_node=self.op.remote_node, + disks=[], + mode=constants.REPLACE_DISK_CHG, + early_release=self.op.early_release)] + for instance_name in self.instance_names] + + else: + raise errors.ProgrammerError("No iallocator or remote node") + + return ResultWithJobs(jobs) + + +class LUNodeMigrate(LogicalUnit): + """Migrate all instances from a node. + + """ + HPATH = "node-migrate" + HTYPE = constants.HTYPE_NODE + REQ_BGL = False + + def CheckArguments(self): + pass + + def ExpandNames(self): + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + return { + "NODE_NAME": self.op.node_name, + "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + nl = [self.cfg.GetMasterNode()] + return (nl, nl) + + def CheckPrereq(self): + pass + + def Exec(self, feedback_fn): + # Prepare jobs for migration instances + allow_runtime_changes = self.op.allow_runtime_changes + jobs = [ + [opcodes.OpInstanceMigrate(instance_name=inst.name, + mode=self.op.mode, + live=self.op.live, + iallocator=self.op.iallocator, + target_node=self.op.target_node, + allow_runtime_changes=allow_runtime_changes, + ignore_ipolicy=self.op.ignore_ipolicy)] + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)] + + # TODO: Run iallocator in this opcode and pass correct placement options to + # OpInstanceMigrate. Since other jobs can modify the cluster between + # running the iallocator and the actual migration, a good consistency model + # will have to be found. + + assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == + frozenset([self.op.node_name])) + + return ResultWithJobs(jobs) + + +def _GetStorageTypeArgs(cfg, storage_type): + """Returns the arguments for a storage type. + + """ + # Special case for file storage + if storage_type == constants.ST_FILE: + # storage.FileStorage wants a list of storage directories + return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]] + + return [] + + +class LUNodeModifyStorage(NoHooksLU): + """Logical unit for modifying a storage volume on a node. + + """ + REQ_BGL = False + + def CheckArguments(self): + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + + storage_type = self.op.storage_type + + try: + modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] + except KeyError: + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " modified" % storage_type, + errors.ECODE_INVAL) + + diff = set(self.op.changes.keys()) - modifiable + if diff: + raise errors.OpPrereqError("The following fields can not be modified for" + " storage units of type '%s': %r" % + (storage_type, list(diff)), + errors.ECODE_INVAL) + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_modify(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, self.op.changes) + result.Raise("Failed to modify storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) + + +class _NodeQuery(_QueryBase): + FIELDS = query.NODE_FIELDS + + def ExpandNames(self, lu): + lu.needed_locks = {} + lu.share_locks = _ShareAll() + + if self.names: + self.wanted = _GetWantedNodes(lu, self.names) + else: + self.wanted = locking.ALL_SET + + self.do_locking = (self.use_locking and + query.NQ_LIVE in self.requested_data) + + if self.do_locking: + # If any non-static field is requested we need to lock the nodes + lu.needed_locks[locking.LEVEL_NODE] = self.wanted + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + + def DeclareLocks(self, lu, level): + pass + + def _GetQueryData(self, lu): + """Computes the list of nodes and their attributes. + + """ + all_info = lu.cfg.GetAllNodesInfo() + + nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE) + + # Gather data as requested + if query.NQ_LIVE in self.requested_data: + # filter out non-vm_capable nodes + toquery_nodes = [name for name in nodenames if all_info[name].vm_capable] + + es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes) + node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()], + [lu.cfg.GetHypervisorType()], es_flags) + live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload)) + for (name, nresult) in node_data.items() + if not nresult.fail_msg and nresult.payload) + else: + live_data = None + + if query.NQ_INST in self.requested_data: + node_to_primary = dict([(name, set()) for name in nodenames]) + node_to_secondary = dict([(name, set()) for name in nodenames]) + + inst_data = lu.cfg.GetAllInstancesInfo() + + for inst in inst_data.values(): + if inst.primary_node in node_to_primary: + node_to_primary[inst.primary_node].add(inst.name) + for secnode in inst.secondary_nodes: + if secnode in node_to_secondary: + node_to_secondary[secnode].add(inst.name) + else: + node_to_primary = None + node_to_secondary = None + + if query.NQ_OOB in self.requested_data: + oob_support = dict((name, bool(_SupportsOob(lu.cfg, node))) + for name, node in all_info.iteritems()) + else: + oob_support = None + + if query.NQ_GROUP in self.requested_data: + groups = lu.cfg.GetAllNodeGroupsInfo() + else: + groups = {} + + return query.NodeQueryData([all_info[name] for name in nodenames], + live_data, lu.cfg.GetMasterNode(), + node_to_primary, node_to_secondary, groups, + oob_support, lu.cfg.GetClusterInfo()) + + +class LUNodeQuery(NoHooksLU): + """Logical unit for querying nodes. + + """ + # pylint: disable=W0142 + REQ_BGL = False + + def CheckArguments(self): + self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names), + self.op.output_fields, self.op.use_locking) + + def ExpandNames(self): + self.nq.ExpandNames(self) + + def DeclareLocks(self, level): + self.nq.DeclareLocks(self, level) + + def Exec(self, feedback_fn): + return self.nq.OldStyleQuery(self) + + +def _CheckOutputFields(static, dynamic, selected): + """Checks whether all selected fields are valid. + + @type static: L{utils.FieldSet} + @param static: static fields set + @type dynamic: L{utils.FieldSet} + @param dynamic: dynamic fields set + + """ + f = utils.FieldSet() + f.Extend(static) + f.Extend(dynamic) + + delta = f.NonMatching(selected) + if delta: + raise errors.OpPrereqError("Unknown output fields selected: %s" + % ",".join(delta), errors.ECODE_INVAL) + + +class LUNodeQueryvols(NoHooksLU): + """Logical unit for getting volumes on node(s). + + """ + REQ_BGL = False + _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") + _FIELDS_STATIC = utils.FieldSet("node") + + def CheckArguments(self): + _CheckOutputFields(static=self._FIELDS_STATIC, + dynamic=self._FIELDS_DYNAMIC, + selected=self.op.output_fields) + + def ExpandNames(self): + self.share_locks = _ShareAll() + + if self.op.nodes: + self.needed_locks = { + locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), + } + else: + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + nodenames = self.owned_locks(locking.LEVEL_NODE) + volumes = self.rpc.call_node_volumes(nodenames) + + ilist = self.cfg.GetAllInstancesInfo() + vol2inst = _MapInstanceDisksToNodes(ilist.values()) + + output = [] + for node in nodenames: + nresult = volumes[node] + if nresult.offline: + continue + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't compute volume data on node %s: %s", node, msg) + continue + + node_vols = sorted(nresult.payload, + key=operator.itemgetter("dev")) + + for vol in node_vols: + node_output = [] + for field in self.op.output_fields: + if field == "node": + val = node + elif field == "phys": + val = vol["dev"] + elif field == "vg": + val = vol["vg"] + elif field == "name": + val = vol["name"] + elif field == "size": + val = int(float(vol["size"])) + elif field == "instance": + val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-") + else: + raise errors.ParameterError(field) + node_output.append(str(val)) + + output.append(node_output) + + return output + + +class LUNodeQueryStorage(NoHooksLU): + """Logical unit for getting information on storage units on node(s). + + """ + _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) + REQ_BGL = False + + def CheckArguments(self): + _CheckOutputFields(static=self._FIELDS_STATIC, + dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), + selected=self.op.output_fields) + + def ExpandNames(self): + self.share_locks = _ShareAll() + + if self.op.nodes: + self.needed_locks = { + locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), + } + else: + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + self.nodes = self.owned_locks(locking.LEVEL_NODE) + + # Always get name to sort by + if constants.SF_NAME in self.op.output_fields: + fields = self.op.output_fields[:] + else: + fields = [constants.SF_NAME] + self.op.output_fields + + # Never ask for node or type as it's only known to the LU + for extra in [constants.SF_NODE, constants.SF_TYPE]: + while extra in fields: + fields.remove(extra) + + field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) + name_idx = field_idx[constants.SF_NAME] + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + data = self.rpc.call_storage_list(self.nodes, + self.op.storage_type, st_args, + self.op.name, fields) + + result = [] + + for node in utils.NiceSort(self.nodes): + nresult = data[node] + if nresult.offline: + continue + + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't get storage data from node %s: %s", node, msg) + continue + + rows = dict([(row[name_idx], row) for row in nresult.payload]) + + for name in utils.NiceSort(rows.keys()): + row = rows[name] + + out = [] + + for field in self.op.output_fields: + if field == constants.SF_NODE: + val = node + elif field == constants.SF_TYPE: + val = self.op.storage_type + elif field in field_idx: + val = row[field_idx[field]] + else: + raise errors.ParameterError(field) + + out.append(val) + + result.append(out) + + return result + + +class LUNodeRemove(LogicalUnit): + """Logical unit for removing a node. + + """ + HPATH = "node-remove" + HTYPE = constants.HTYPE_NODE + + def BuildHooksEnv(self): + """Build hooks env. + + """ + return { + "OP_TARGET": self.op.node_name, + "NODE_NAME": self.op.node_name, + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + This doesn't run on the target node in the pre phase as a failed + node would then be impossible to remove. + + """ + all_nodes = self.cfg.GetNodeList() + try: + all_nodes.remove(self.op.node_name) + except ValueError: + pass + return (all_nodes, all_nodes) + + def CheckPrereq(self): + """Check prerequisites. + + This checks: + - the node exists in the configuration + - it does not have primary or secondary instances + - it's not the master + + Any errors are signaled by raising errors.OpPrereqError. + + """ + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + node = self.cfg.GetNodeInfo(self.op.node_name) + assert node is not None + + masternode = self.cfg.GetMasterNode() + if node.name == masternode: + raise errors.OpPrereqError("Node is the master node, failover to another" + " node is required", errors.ECODE_INVAL) + + for instance_name, instance in self.cfg.GetAllInstancesInfo().items(): + if node.name in instance.all_nodes: + raise errors.OpPrereqError("Instance %s is still running on the node," + " please remove first" % instance_name, + errors.ECODE_INVAL) + self.op.node_name = node.name + self.node = node + + def Exec(self, feedback_fn): + """Removes the node from the cluster. + + """ + node = self.node + logging.info("Stopping the node daemon and removing configs from node %s", + node.name) + + modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + + # Promote nodes to master candidate as needed + _AdjustCandidatePool(self, exceptions=[node.name]) + self.context.RemoveNode(node.name) + + # Run post hooks on the node before it's removed + _RunPostHook(self, node.name) + + result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) + msg = result.fail_msg + if msg: + self.LogWarning("Errors encountered on the remote node while leaving" + " the cluster: %s", msg) + + # Remove node from our /etc/hosts + if self.cfg.GetClusterInfo().modify_etc_hosts: + master_node = self.cfg.GetMasterNode() + result = self.rpc.call_etc_hosts_modify(master_node, + constants.ETC_HOSTS_REMOVE, + node.name, None) + result.Raise("Can't update hosts file with new host data") + _RedistributeAncillaryFiles(self) + + +class LURepairNodeStorage(NoHooksLU): + """Repairs the volume group on a node. + + """ + REQ_BGL = False + + def CheckArguments(self): + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + + storage_type = self.op.storage_type + + if (constants.SO_FIX_CONSISTENCY not in + constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " repaired" % storage_type, + errors.ECODE_INVAL) + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + def _CheckFaultyDisks(self, instance, node_name): + """Ensure faulty disks abort the opcode or at least warn.""" + try: + if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, + node_name, True): + raise errors.OpPrereqError("Instance '%s' has faulty disks on" + " node '%s'" % (instance.name, node_name), + errors.ECODE_STATE) + except errors.OpPrereqError, err: + if self.op.ignore_consistency: + self.LogWarning(str(err.args[0])) + else: + raise + + def CheckPrereq(self): + """Check prerequisites. + + """ + # Check whether any instance on this node has faulty disks + for inst in _GetNodeInstances(self.cfg, self.op.node_name): + if inst.admin_state != constants.ADMINST_UP: + continue + check_nodes = set(inst.all_nodes) + check_nodes.discard(self.op.node_name) + for inst_node_name in check_nodes: + self._CheckFaultyDisks(inst, inst_node_name) + + def Exec(self, feedback_fn): + feedback_fn("Repairing storage unit '%s' on %s ..." % + (self.op.name, self.op.node_name)) + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_execute(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, + constants.SO_FIX_CONSISTENCY) + result.Raise("Failed to repair storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) diff --git a/test/py/ganeti.cmdlib_unittest.py b/test/py/ganeti.cmdlib_unittest.py index e42bd19fd..2b24ef181 100755 --- a/test/py/ganeti.cmdlib_unittest.py +++ b/test/py/ganeti.cmdlib_unittest.py @@ -109,7 +109,7 @@ class TestIAllocatorChecks(testutils.GanetiTestCase): op = OpTest() lu = TestLU(op) - c_i = lambda: cmdlib._CheckIAllocatorOrNode(lu, "iallocator", "node") + c_i = lambda: common._CheckIAllocatorOrNode(lu, "iallocator", "node") # Neither node nor iallocator given for n in (None, []): -- GitLab