Commit d1c2dd75 authored by Iustin Pop's avatar Iustin Pop
Browse files

Move all iallocator functions into a class

This patch moves all the iallocator function into a separate class that
is then somewhat easier to use. It doesn't bring any new functionality.

The patch also changes the way the iallocator is called - the
OpTestAllocator opcode is no longer needed, and all its parameters
should be passed directly to the IAllocator constructor.

Reviewed-by: ultrotter
parent a424ce50
...@@ -3138,47 +3138,42 @@ class LUCreateInstance(LogicalUnit): ...@@ -3138,47 +3138,42 @@ class LUCreateInstance(LogicalUnit):
"""Run the allocator based on input opcode. """Run the allocator based on input opcode.
""" """
al_data = _IAllocatorGetClusterData(self.cfg, self.sstore)
disks = [{"size": self.op.disk_size, "mode": "w"}, disks = [{"size": self.op.disk_size, "mode": "w"},
{"size": self.op.swap_size, "mode": "w"}] {"size": self.op.swap_size, "mode": "w"}]
nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None), nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
"bridge": self.op.bridge}] "bridge": self.op.bridge}]
op = opcodes.OpTestAllocator(name=self.op.instance_name, ial = IAllocator(self.cfg, self.sstore,
disk_template=self.op.disk_template, name=self.op.instance_name,
tags=[], disk_template=self.op.disk_template,
os=self.op.os_type, tags=[],
vcpus=self.op.vcpus, os=self.op.os_type,
mem_size=self.op.mem_size, vcpus=self.op.vcpus,
disks=disks, mem_size=self.op.mem_size,
nics=nics) disks=disks,
nics=nics,
_IAllocatorAddNewInstance(al_data, op) mode=constants.IALLOCATOR_MODE_ALLOC)
text = serializer.Dump(al_data) ial.Run(self.op.iallocator)
result = _IAllocatorRun(self.op.iallocator, text) if not ial.success:
result = _IAllocatorValidateResult(result)
if not result["success"]:
raise errors.OpPrereqError("Can't compute nodes using" raise errors.OpPrereqError("Can't compute nodes using"
" iallocator '%s': %s" % (self.op.iallocator, " iallocator '%s': %s" % (self.op.iallocator,
result["info"])) ial.info))
req_nodes = 1 req_nodes = 1
if self.op.disk_template in constants.DTS_NET_MIRROR: if self.op.disk_template in constants.DTS_NET_MIRROR:
req_nodes += 1 req_nodes += 1
if len(result["nodes"]) != req_nodes: if len(ial.nodes) != req_nodes:
raise errors.OpPrereqError("iallocator '%s' returned invalid number" raise errors.OpPrereqError("iallocator '%s' returned invalid number"
" of nodes (%s), required %s" % " of nodes (%s), required %s" %
(len(result["nodes"]), req_nodes)) (len(ial.nodes), req_nodes))
self.op.pnode = result["nodes"][0] self.op.pnode = ial.nodes[0]
logger.ToStdout("Selected nodes for the instance: %s" % logger.ToStdout("Selected nodes for the instance: %s" %
(", ".join(result["nodes"]),)) (", ".join(ial.nodes),))
logger.Info("Selected nodes for instance %s via iallocator %s: %s" % logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
(self.op.instance_name, self.op.iallocator, result["nodes"])) (self.op.instance_name, self.op.iallocator, ial.nodes))
if req_nodes == 2: if req_nodes == 2:
self.op.snode = result["nodes"][1] self.op.snode = ial.nodes[1]
def BuildHooksEnv(self): def BuildHooksEnv(self):
"""Build hooks env. """Build hooks env.
...@@ -4719,168 +4714,229 @@ class LUTestDelay(NoHooksLU): ...@@ -4719,168 +4714,229 @@ class LUTestDelay(NoHooksLU):
" result: %s" % (node, node_result)) " result: %s" % (node, node_result))
def _IAllocatorGetClusterData(cfg, sstore): class IAllocator(object):
"""Compute the generic allocator input data. """IAllocator framework.
This is the data that is independent of the actual operation. An IAllocator instance has three sets of attributes:
- cfg/sstore that are needed to query the cluster
- input data (all members of the _KEYS class attribute are required)
- four buffer attributes (in|out_data|text), that represent the
input (to the external script) in text and data structure format,
and the output from it, again in two formats
- the result variables from the script (success, info, nodes) for
easy usage
""" """
# cluster data _KEYS = [
data = { "mode", "name",
"version": 1, "mem_size", "disks", "disk_template",
"cluster_name": sstore.GetClusterName(), "os", "tags", "nics", "vcpus",
"cluster_tags": list(cfg.GetClusterInfo().GetTags()), ]
# we don't have job IDs
} def __init__(self, cfg, sstore, **kwargs):
self.cfg = cfg
# node data self.sstore = sstore
node_results = {} # init buffer variables
node_list = cfg.GetNodeList() self.in_text = self.out_text = self.in_data = self.out_data = None
node_data = rpc.call_node_info(node_list, cfg.GetVGName()) # init all input fields so that pylint is happy
for nname in node_list: self.mode = self.name = None
ninfo = cfg.GetNodeInfo(nname) self.mem_size = self.disks = self.disk_template = None
if nname not in node_data or not isinstance(node_data[nname], dict): self.os = self.tags = self.nics = self.vcpus = None
raise errors.OpExecError("Can't get data for node %s" % nname) # init result fields
remote_info = node_data[nname] self.success = self.info = self.nodes = None
for attr in ['memory_total', 'memory_free', for key in kwargs:
'vg_size', 'vg_free']: if key not in self._KEYS:
if attr not in remote_info: raise errors.ProgrammerError("Invalid input parameter '%s' to"
raise errors.OpExecError("Node '%s' didn't return attribute '%s'" % " IAllocator" % key)
(nname, attr)) setattr(self, key, kwargs[key])
try: for key in self._KEYS:
int(remote_info[attr]) if key not in kwargs:
except ValueError, err: raise errors.ProgrammerError("Missing input parameter '%s' to"
raise errors.OpExecError("Node '%s' returned invalid value for '%s':" " IAllocator" % key)
" %s" % (nname, attr, str(err))) self._BuildInputData()
pnr = {
"tags": list(ninfo.GetTags()), def _ComputeClusterData(self):
"total_memory": utils.TryConvert(int, remote_info['memory_total']), """Compute the generic allocator input data.
"free_memory": utils.TryConvert(int, remote_info['memory_free']),
"total_disk": utils.TryConvert(int, remote_info['vg_size']), This is the data that is independent of the actual operation.
"free_disk": utils.TryConvert(int, remote_info['vg_free']),
"primary_ip": ninfo.primary_ip, """
"secondary_ip": ninfo.secondary_ip, cfg = self.cfg
} # cluster data
node_results[nname] = pnr data = {
data["nodes"] = node_results "version": 1,
"cluster_name": self.sstore.GetClusterName(),
# instance data "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
instance_data = {} # we don't have job IDs
i_list = cfg.GetInstanceList()
for iname in i_list:
iinfo = cfg.GetInstanceInfo(iname)
nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
for n in iinfo.nics]
pir = {
"tags": list(iinfo.GetTags()),
"should_run": iinfo.status == "up",
"vcpus": iinfo.vcpus,
"memory": iinfo.memory,
"os": iinfo.os,
"nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
"nics": nic_data,
"disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
"disk_template": iinfo.disk_template,
} }
instance_data[iname] = pir
data["instances"] = instance_data # node data
node_results = {}
node_list = cfg.GetNodeList()
node_data = rpc.call_node_info(node_list, cfg.GetVGName())
for nname in node_list:
ninfo = cfg.GetNodeInfo(nname)
if nname not in node_data or not isinstance(node_data[nname], dict):
raise errors.OpExecError("Can't get data for node %s" % nname)
remote_info = node_data[nname]
for attr in ['memory_total', 'memory_free',
'vg_size', 'vg_free']:
if attr not in remote_info:
raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
(nname, attr))
try:
int(remote_info[attr])
except ValueError, err:
raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
" %s" % (nname, attr, str(err)))
pnr = {
"tags": list(ninfo.GetTags()),
"total_memory": utils.TryConvert(int, remote_info['memory_total']),
"free_memory": utils.TryConvert(int, remote_info['memory_free']),
"total_disk": utils.TryConvert(int, remote_info['vg_size']),
"free_disk": utils.TryConvert(int, remote_info['vg_free']),
"primary_ip": ninfo.primary_ip,
"secondary_ip": ninfo.secondary_ip,
}
node_results[nname] = pnr
data["nodes"] = node_results
# instance data
instance_data = {}
i_list = cfg.GetInstanceList()
for iname in i_list:
iinfo = cfg.GetInstanceInfo(iname)
nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
for n in iinfo.nics]
pir = {
"tags": list(iinfo.GetTags()),
"should_run": iinfo.status == "up",
"vcpus": iinfo.vcpus,
"memory": iinfo.memory,
"os": iinfo.os,
"nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
"nics": nic_data,
"disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
"disk_template": iinfo.disk_template,
}
instance_data[iname] = pir
return data data["instances"] = instance_data
self.in_data = data
def _IAllocatorAddNewInstance(data, op): def _AddNewInstance(self):
"""Add new instance data to allocator structure. """Add new instance data to allocator structure.
This in combination with _AllocatorGetClusterData will create the This in combination with _AllocatorGetClusterData will create the
correct structure needed as input for the allocator. correct structure needed as input for the allocator.
The checks for the completeness of the opcode must have already been The checks for the completeness of the opcode must have already been
done. done.
""" """
if len(op.disks) != 2: data = self.in_data
raise errors.OpExecError("Only two-disk configurations supported") if len(self.disks) != 2:
raise errors.OpExecError("Only two-disk configurations supported")
disk_space = _ComputeDiskSize(self.disk_template,
self.disks[0]["size"], self.disks[1]["size"])
request = {
"type": "allocate",
"name": self.name,
"disk_template": self.disk_template,
"tags": self.tags,
"os": self.os,
"vcpus": self.vcpus,
"memory": self.mem_size,
"disks": self.disks,
"disk_space_total": disk_space,
"nics": self.nics,
}
data["request"] = request
disk_space = _ComputeDiskSize(op.disk_template, def _AddRelocateInstance(self):
op.disks[0]["size"], op.disks[1]["size"]) """Add relocate instance data to allocator structure.
request = { This in combination with _IAllocatorGetClusterData will create the
"type": "allocate", correct structure needed as input for the allocator.
"name": op.name,
"disk_template": op.disk_template,
"tags": op.tags,
"os": op.os,
"vcpus": op.vcpus,
"memory": op.mem_size,
"disks": op.disks,
"disk_space_total": disk_space,
"nics": op.nics,
}
data["request"] = request
The checks for the completeness of the opcode must have already been
done.
def _IAllocatorAddRelocateInstance(data, op): """
"""Add relocate instance data to allocator structure. data = self.in_data
request = {
"type": "replace_secondary",
"name": self.name,
}
data["request"] = request
This in combination with _IAllocatorGetClusterData will create the def _BuildInputData(self):
correct structure needed as input for the allocator. """Build input data structures.
The checks for the completeness of the opcode must have already been """
done. self._ComputeClusterData()
""" if self.mode == constants.IALLOCATOR_MODE_ALLOC:
request = { self._AddNewInstance()
"type": "replace_secondary", else:
"name": op.name, self._AddRelocateInstance()
}
data["request"] = request
self.in_text = serializer.Dump(self.in_data)
def _IAllocatorRun(name, data): def Run(self, name, validate=True):
"""Run an instance allocator and return the results. """Run an instance allocator and return the results.
""" """
alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH, data = self.in_text
os.path.isfile)
if alloc_script is None:
raise errors.OpExecError("Can't find allocator '%s'" % name)
fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.") alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
try: os.path.isfile)
os.write(fd, data) if alloc_script is None:
os.close(fd) raise errors.OpExecError("Can't find allocator '%s'" % name)
result = utils.RunCmd([alloc_script, fin_name])
if result.failed:
raise errors.OpExecError("Instance allocator call failed: %s,"
" output: %s" %
(result.fail_reason, result.stdout))
finally:
os.unlink(fin_name)
return result.stdout
fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
try:
os.write(fd, data)
os.close(fd)
result = utils.RunCmd([alloc_script, fin_name])
if result.failed:
raise errors.OpExecError("Instance allocator call failed: %s,"
" output: %s" %
(result.fail_reason, result.stdout))
finally:
os.unlink(fin_name)
self.out_text = result.stdout
if validate:
self._ValidateResult()
def _IAllocatorValidateResult(data): def _ValidateResult(self):
"""Process the allocator results. """Process the allocator results.
""" This will process and if successful save the result in
try: self.out_data and the other parameters.
rdict = serializer.Load(data)
except Exception, err:
raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
if not isinstance(rdict, dict): """
raise errors.OpExecError("Can't parse iallocator results: not a dict") try:
rdict = serializer.Load(self.out_text)
except Exception, err:
raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
if not isinstance(rdict, dict):
raise errors.OpExecError("Can't parse iallocator results: not a dict")
for key in "success", "info", "nodes": for key in "success", "info", "nodes":
if key not in rdict: if key not in rdict:
raise errors.OpExecError("Can't parse iallocator results:" raise errors.OpExecError("Can't parse iallocator results:"
" missing key '%s'" % key) " missing key '%s'" % key)
setattr(self, key, rdict[key])
if not isinstance(rdict["nodes"], list): if not isinstance(rdict["nodes"], list):
raise errors.OpExecError("Can't parse iallocator results: 'nodes' key" raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
" is not a list") " is not a list")
return rdict self.out_data = rdict
class LUTestAllocator(NoHooksLU): class LUTestAllocator(NoHooksLU):
...@@ -4951,15 +5007,21 @@ class LUTestAllocator(NoHooksLU): ...@@ -4951,15 +5007,21 @@ class LUTestAllocator(NoHooksLU):
"""Run the allocator test. """Run the allocator test.
""" """
data = _IAllocatorGetClusterData(self.cfg, self.sstore) ial = IAllocator(self.cfg, self.sstore,
if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: mode=self.op.mode,
_IAllocatorAddNewInstance(data, self.op) name=self.op.name,
else: mem_size=self.op.mem_size,
_IAllocatorAddRelocateInstance(data, self.op) disks=self.op.disks,
disk_template=self.op.disk_template,
os=self.op.os,
tags=self.op.tags,
nics=self.op.nics,
vcpus=self.op.vcpus,
)
text = serializer.Dump(data)
if self.op.direction == constants.IALLOCATOR_DIR_IN: if self.op.direction == constants.IALLOCATOR_DIR_IN:
result = text result = ial.in_text
else: else:
result = _IAllocatorRun(self.op.allocator, text) ial.Run(self.op.allocator, validate=False)
result = ial.out_text
return result return result
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment