Commit 0fcd0cad authored by René Nussbaumer's avatar René Nussbaumer

Refactor IAllocator code

The IAllocator class was handling all the requests on its own, passing
in parameters on top level which works, but is hard to maintain and not
flexible.

With the upcoming change to the IAllocator for MultiAllocate we can't
use the toplevel parameters anymore. Therefore, we refactor the code
into separate classes, which just do their own stuff. It works similiar
to Opcode containers.
Signed-off-by: default avatarRené Nussbaumer <rn@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 32683096
......@@ -283,6 +283,7 @@ confd_PYTHON = \
masterd_PYTHON = \
lib/masterd/__init__.py \
lib/masterd/iallocator.py \
lib/masterd/instance.py
impexpd_PYTHON = \
......
......@@ -48,7 +48,6 @@ from ganeti import hypervisor
from ganeti import locking
from ganeti import constants
from ganeti import objects
from ganeti import serializer
from ganeti import ssconf
from ganeti import uidpool
from ganeti import compat
......@@ -14811,519 +14810,6 @@ class LUTestJqueue(NoHooksLU):
return True
class IAllocator(object):
"""IAllocator framework.
An IAllocator instance has three sets of attributes:
- cfg that is needed to query the cluster
- input data (all members of the _KEYS class attribute are required)
- four buffer attributes (in|out_data|text), that represent the
input (to the external script) in text and data structure format,
and the output from it, again in two formats
- the result variables from the script (success, info, nodes) for
easy usage
"""
# pylint: disable=R0902
# lots of instance attributes
def __init__(self, cfg, rpc_runner, mode, **kwargs):
self.cfg = cfg
self.rpc = rpc_runner
# init buffer variables
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy
self.mode = mode
self.memory = self.disks = self.disk_template = self.spindle_use = None
self.os = self.tags = self.nics = self.vcpus = None
self.hypervisor = None
self.relocate_from = None
self.name = None
self.instances = None
self.evac_mode = None
self.target_groups = []
# computed fields
self.required_nodes = None
# init result fields
self.success = self.info = self.result = None
try:
(fn, keydata, self._result_check) = self._MODE_DATA[self.mode]
except KeyError:
raise errors.ProgrammerError("Unknown mode '%s' passed to the"
" IAllocator" % self.mode)
keyset = [n for (n, _) in keydata]
for key in kwargs:
if key not in keyset:
raise errors.ProgrammerError("Invalid input parameter '%s' to"
" IAllocator" % key)
setattr(self, key, kwargs[key])
for key in keyset:
if key not in kwargs:
raise errors.ProgrammerError("Missing input parameter '%s' to"
" IAllocator" % key)
self._BuildInputData(compat.partial(fn, self), keydata)
def _ComputeClusterData(self):
"""Compute the generic allocator input data.
This is the data that is independent of the actual operation.
"""
cfg = self.cfg
cluster_info = cfg.GetClusterInfo()
# cluster data
data = {
"version": constants.IALLOCATOR_VERSION,
"cluster_name": cfg.GetClusterName(),
"cluster_tags": list(cluster_info.GetTags()),
"enabled_hypervisors": list(cluster_info.enabled_hypervisors),
"ipolicy": cluster_info.ipolicy,
}
ninfo = cfg.GetAllNodesInfo()
iinfo = cfg.GetAllInstancesInfo().values()
i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
# node data
node_list = [n.name for n in ninfo.values() if n.vm_capable]
if self.mode == constants.IALLOCATOR_MODE_ALLOC:
hypervisor_name = self.hypervisor
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
else:
hypervisor_name = cluster_info.primary_hypervisor
node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
[hypervisor_name])
node_iinfo = \
self.rpc.call_all_instances_info(node_list,
cluster_info.enabled_hypervisors)
data["nodegroups"] = self._ComputeNodeGroupData(cfg)
config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
i_list, config_ndata)
assert len(data["nodes"]) == len(ninfo), \
"Incomplete node data computed"
data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
self.in_data = data
@staticmethod
def _ComputeNodeGroupData(cfg):
"""Compute node groups data.
"""
cluster = cfg.GetClusterInfo()
ng = dict((guuid, {
"name": gdata.name,
"alloc_policy": gdata.alloc_policy,
"ipolicy": _CalculateGroupIPolicy(cluster, gdata),
})
for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
return ng
@staticmethod
def _ComputeBasicNodeData(cfg, node_cfg):
"""Compute global node data.
@rtype: dict
@returns: a dict of name: (node dict, node config)
"""
# fill in static (config-based) values
node_results = dict((ninfo.name, {
"tags": list(ninfo.GetTags()),
"primary_ip": ninfo.primary_ip,
"secondary_ip": ninfo.secondary_ip,
"offline": ninfo.offline,
"drained": ninfo.drained,
"master_candidate": ninfo.master_candidate,
"group": ninfo.group,
"master_capable": ninfo.master_capable,
"vm_capable": ninfo.vm_capable,
"ndparams": cfg.GetNdParams(ninfo),
})
for ninfo in node_cfg.values())
return node_results
@staticmethod
def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
node_results):
"""Compute global node data.
@param node_results: the basic node structures as filled from the config
"""
#TODO(dynmem): compute the right data on MAX and MIN memory
# make a copy of the current dict
node_results = dict(node_results)
for nname, nresult in node_data.items():
assert nname in node_results, "Missing basic data for node %s" % nname
ninfo = node_cfg[nname]
if not (ninfo.offline or ninfo.drained):
nresult.Raise("Can't get data for node %s" % nname)
node_iinfo[nname].Raise("Can't get node instance info from node %s" %
nname)
remote_info = _MakeLegacyNodeInfo(nresult.payload)
for attr in ["memory_total", "memory_free", "memory_dom0",
"vg_size", "vg_free", "cpu_total"]:
if attr not in remote_info:
raise errors.OpExecError("Node '%s' didn't return attribute"
" '%s'" % (nname, attr))
if not isinstance(remote_info[attr], int):
raise errors.OpExecError("Node '%s' returned invalid value"
" for '%s': %s" %
(nname, attr, remote_info[attr]))
# compute memory used by primary instances
i_p_mem = i_p_up_mem = 0
for iinfo, beinfo in i_list:
if iinfo.primary_node == nname:
i_p_mem += beinfo[constants.BE_MAXMEM]
if iinfo.name not in node_iinfo[nname].payload:
i_used_mem = 0
else:
i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
remote_info["memory_free"] -= max(0, i_mem_diff)
if iinfo.admin_state == constants.ADMINST_UP:
i_p_up_mem += beinfo[constants.BE_MAXMEM]
# compute memory used by instances
pnr_dyn = {
"total_memory": remote_info["memory_total"],
"reserved_memory": remote_info["memory_dom0"],
"free_memory": remote_info["memory_free"],
"total_disk": remote_info["vg_size"],
"free_disk": remote_info["vg_free"],
"total_cpus": remote_info["cpu_total"],
"i_pri_memory": i_p_mem,
"i_pri_up_memory": i_p_up_mem,
}
pnr_dyn.update(node_results[nname])
node_results[nname] = pnr_dyn
return node_results
@staticmethod
def _ComputeInstanceData(cluster_info, i_list):
"""Compute global instance data.
"""
instance_data = {}
for iinfo, beinfo in i_list:
nic_data = []
for nic in iinfo.nics:
filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
nic_dict = {
"mac": nic.mac,
"ip": nic.ip,
"mode": filled_params[constants.NIC_MODE],
"link": filled_params[constants.NIC_LINK],
}
if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
nic_dict["bridge"] = filled_params[constants.NIC_LINK]
nic_data.append(nic_dict)
pir = {
"tags": list(iinfo.GetTags()),
"admin_state": iinfo.admin_state,
"vcpus": beinfo[constants.BE_VCPUS],
"memory": beinfo[constants.BE_MAXMEM],
"spindle_use": beinfo[constants.BE_SPINDLE_USE],
"os": iinfo.os,
"nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
"nics": nic_data,
"disks": [{constants.IDISK_SIZE: dsk.size,
constants.IDISK_MODE: dsk.mode}
for dsk in iinfo.disks],
"disk_template": iinfo.disk_template,
"hypervisor": iinfo.hypervisor,
}
pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
pir["disks"])
instance_data[iinfo.name] = pir
return instance_data
def _AddNewInstance(self):
"""Add new instance data to allocator structure.
This in combination with _AllocatorGetClusterData will create the
correct structure needed as input for the allocator.
The checks for the completeness of the opcode must have already been
done.
"""
disk_space = _ComputeDiskSize(self.disk_template, self.disks)
if self.disk_template in constants.DTS_INT_MIRROR:
self.required_nodes = 2
else:
self.required_nodes = 1
request = {
"name": self.name,
"disk_template": self.disk_template,
"tags": self.tags,
"os": self.os,
"vcpus": self.vcpus,
"memory": self.memory,
"spindle_use": self.spindle_use,
"disks": self.disks,
"disk_space_total": disk_space,
"nics": self.nics,
"required_nodes": self.required_nodes,
"hypervisor": self.hypervisor,
}
return request
def _AddRelocateInstance(self):
"""Add relocate instance data to allocator structure.
This in combination with _IAllocatorGetClusterData will create the
correct structure needed as input for the allocator.
The checks for the completeness of the opcode must have already been
done.
"""
instance = self.cfg.GetInstanceInfo(self.name)
if instance is None:
raise errors.ProgrammerError("Unknown instance '%s' passed to"
" IAllocator" % self.name)
if instance.disk_template not in constants.DTS_MIRRORED:
raise errors.OpPrereqError("Can't relocate non-mirrored instances",
errors.ECODE_INVAL)
if instance.disk_template in constants.DTS_INT_MIRROR and \
len(instance.secondary_nodes) != 1:
raise errors.OpPrereqError("Instance has not exactly one secondary node",
errors.ECODE_STATE)
self.required_nodes = 1
disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
request = {
"name": self.name,
"disk_space_total": disk_space,
"required_nodes": self.required_nodes,
"relocate_from": self.relocate_from,
}
return request
def _AddNodeEvacuate(self):
"""Get data for node-evacuate requests.
"""
return {
"instances": self.instances,
"evac_mode": self.evac_mode,
}
def _AddChangeGroup(self):
"""Get data for node-evacuate requests.
"""
return {
"instances": self.instances,
"target_groups": self.target_groups,
}
def _BuildInputData(self, fn, keydata):
"""Build input data structures.
"""
self._ComputeClusterData()
request = fn()
request["type"] = self.mode
for keyname, keytype in keydata:
if keyname not in request:
raise errors.ProgrammerError("Request parameter %s is missing" %
keyname)
val = request[keyname]
if not keytype(val):
raise errors.ProgrammerError("Request parameter %s doesn't pass"
" validation, value %s, expected"
" type %s" % (keyname, val, keytype))
self.in_data["request"] = request
self.in_text = serializer.Dump(self.in_data)
_STRING_LIST = ht.TListOf(ht.TString)
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
# pylint: disable=E1101
# Class '...' has no 'OP_ID' member
"OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
opcodes.OpInstanceMigrate.OP_ID,
opcodes.OpInstanceReplaceDisks.OP_ID])
})))
_NEVAC_MOVED = \
ht.TListOf(ht.TAnd(ht.TIsLength(3),
ht.TItems([ht.TNonEmptyString,
ht.TNonEmptyString,
ht.TListOf(ht.TNonEmptyString),
])))
_NEVAC_FAILED = \
ht.TListOf(ht.TAnd(ht.TIsLength(2),
ht.TItems([ht.TNonEmptyString,
ht.TMaybeString,
])))
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
_MODE_DATA = {
constants.IALLOCATOR_MODE_ALLOC:
(_AddNewInstance,
[
("name", ht.TString),
("memory", ht.TInt),
("spindle_use", ht.TInt),
("disks", ht.TListOf(ht.TDict)),
("disk_template", ht.TString),
("os", ht.TString),
("tags", _STRING_LIST),
("nics", ht.TListOf(ht.TDict)),
("vcpus", ht.TInt),
("hypervisor", ht.TString),
], ht.TList),
constants.IALLOCATOR_MODE_RELOC:
(_AddRelocateInstance,
[("name", ht.TString), ("relocate_from", _STRING_LIST)],
ht.TList),
constants.IALLOCATOR_MODE_NODE_EVAC:
(_AddNodeEvacuate, [
("instances", _STRING_LIST),
("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
], _NEVAC_RESULT),
constants.IALLOCATOR_MODE_CHG_GROUP:
(_AddChangeGroup, [
("instances", _STRING_LIST),
("target_groups", _STRING_LIST),
], _NEVAC_RESULT),
}
def Run(self, name, validate=True, call_fn=None):
"""Run an instance allocator and return the results.
"""
if call_fn is None:
call_fn = self.rpc.call_iallocator_runner
result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
result.Raise("Failure while running the iallocator script")
self.out_text = result.payload
if validate:
self._ValidateResult()
def _ValidateResult(self):
"""Process the allocator results.
This will process and if successful save the result in
self.out_data and the other parameters.
"""
try:
rdict = serializer.Load(self.out_text)
except Exception, err:
raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
if not isinstance(rdict, dict):
raise errors.OpExecError("Can't parse iallocator results: not a dict")
# TODO: remove backwards compatiblity in later versions
if "nodes" in rdict and "result" not in rdict:
rdict["result"] = rdict["nodes"]
del rdict["nodes"]
for key in "success", "info", "result":
if key not in rdict:
raise errors.OpExecError("Can't parse iallocator results:"
" missing key '%s'" % key)
setattr(self, key, rdict[key])
if not self._result_check(self.result):
raise errors.OpExecError("Iallocator returned invalid result,"
" expected %s, got %s" %
(self._result_check, self.result),
errors.ECODE_INVAL)
if self.mode == constants.IALLOCATOR_MODE_RELOC:
assert self.relocate_from is not None
assert self.required_nodes == 1
node2group = dict((name, ndata["group"])
for (name, ndata) in self.in_data["nodes"].items())
fn = compat.partial(self._NodesToGroups, node2group,
self.in_data["nodegroups"])
instance = self.cfg.GetInstanceInfo(self.name)
request_groups = fn(self.relocate_from + [instance.primary_node])
result_groups = fn(rdict["result"] + [instance.primary_node])
if self.success and not set(result_groups).issubset(request_groups):
raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
" differ from original groups (%s)" %
(utils.CommaJoin(result_groups),
utils.CommaJoin(request_groups)))
elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
self.out_data = rdict
@staticmethod
def _NodesToGroups(node2group, groups, nodes):
"""Returns a list of unique group names for a list of nodes.
@type node2group: dict
@param node2group: Map from node name to group UUID
@type groups: dict
@param groups: Group information
@type nodes: list
@param nodes: Node names
"""
result = set()
for node in nodes:
try:
group_uuid = node2group[node]
except KeyError:
# Ignore unknown node
pass
else:
try:
group = groups[group_uuid]
except KeyError:
# Can't find group, let's use UUID
group_name = group_uuid
else:
group_name = group["name"]
result.add(group_name)
return sorted(result)
class LUTestAllocator(NoHooksLU):
"""Run allocator tests.
......
#
#
# Copyright (C) 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Module implementing the iallocator code."""
from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import ht
from ganeti import objectutils
from ganeti import opcodes
from ganeti import rpc
from ganeti import serializer
from ganeti import utils
import ganeti.masterd.instance as gmi
_STRING_LIST = ht.TListOf(ht.TString)
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
# pylint: disable=E1101
# Class '...' has no 'OP_ID' member
"OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
opcodes.OpInstanceMigrate.OP_ID,
opcodes.OpInstanceReplaceDisks.OP_ID])
})))
_NEVAC_MOVED = \
ht.TListOf(ht.TAnd(ht.TIsLength(3),
ht.TItems([ht.TNonEmptyString,
ht.TNonEmptyString,
ht.TListOf(ht.TNonEmptyString),
])))
_NEVAC_FAILED = \
ht.TListOf(ht.TAnd(ht.TIsLength(2),
ht.TItems([ht.TNonEmptyString,
ht.TMaybeString,
])))
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
class _AutoReqParam(objectutils.AutoSlots):
"""Meta class for request definitions.
"""
@classmethod
def _GetSlots(mcs, attrs):
"""Extract the slots out of REQ_PARAMS.
"""
params = attrs.setdefault("REQ_PARAMS", [])
return [slot for (slot, _) in params]
class IARequestBase(objectutils.ValidatedSlots):
"""A generic IAllocator request object.
"""
__metaclass__ = _AutoReqParam
MODE = NotImplemented
REQ_PARAMS = [
("required_nodes", ht.TPositiveInt)
]
REQ_RESULT = NotImplemented
def __init__(self, **kwargs):
"""Constructor for IARequestBase.
The constructor takes only keyword arguments and will set
attributes on this object based on the passed arguments. As such,
it means that you should not pass arguments which are not in the
REQ_PARAMS attribute for this class.
"""
self.required_nodes = 0
objectutils.ValidatedSlots.__init__(self, **kwargs)
self.Validate()
def Validate(self):
"""Validates all parameters of the request.
"""
assert self.MODE in constants.VALID_IALLOCATOR_MODES
for (param, validator) in self.REQ_PARAMS:
if not hasattr(self, param):
raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
errors.ECODE_INVAL)
value = getattr(self, param)
if not validator(value):
raise errors.OpPrereqError(("Request parameter '%s' has invalid"