Commit d61df03e authored by Iustin Pop's avatar Iustin Pop
Browse files

Allocator framework, 1st part: allocator input generation

In preparation for the introduction of automatic instance allocator,
this patch adds an allocator simulation opcode, that based on the input
parameters, will return either the input message to the allocator
(implemented) or the result of the allocator run (not yet implemented).

This allows algorithm tests against simulated allocations and the
current cluster state.

The patch adds the following:
  - a function that generates the generic cluster information for the
    allocator
  - a function that generates the 'new instance' information
  - a function that generates the 'replace_secondary' information

These three functions will be used by the allocator framework later to
generate the actual information for the external algorithms. Currently
we just return the json-serialized text.

Reviewed-by: imsnah
parent b62ddbe5
...@@ -30,6 +30,7 @@ import time ...@@ -30,6 +30,7 @@ import time
import tempfile import tempfile
import re import re
import platform import platform
import simplejson
from ganeti import rpc from ganeti import rpc
from ganeti import ssh from ganeti import ssh
...@@ -44,6 +45,14 @@ from ganeti import opcodes ...@@ -44,6 +45,14 @@ from ganeti import opcodes
from ganeti import ssconf from ganeti import ssconf
# Check whether the simplejson module supports indentation
_JSON_INDENT = 2
try:
simplejson.dumps(1, indent=_JSON_INDENT)
except TypeError:
_JSON_INDENT = None
class LogicalUnit(object): class LogicalUnit(object):
"""Logical Unit base class. """Logical Unit base class.
...@@ -4639,3 +4648,194 @@ class LUTestDelay(NoHooksLU): ...@@ -4639,3 +4648,194 @@ class LUTestDelay(NoHooksLU):
if not node_result: if not node_result:
raise errors.OpExecError("Failure during rpc call to node %s," raise errors.OpExecError("Failure during rpc call to node %s,"
" result: %s" % (node, node_result)) " result: %s" % (node, node_result))
def _AllocatorGetClusterData(cfg, sstore):
"""Compute the generic allocator input data.
This is the data that is independent of the actual operation.
"""
# cluster data
data = {
"version": 1,
"cluster_name": sstore.GetClusterName(),
"cluster_tags": list(cfg.GetClusterInfo().GetTags()),
# we don't have job IDs
}
# node data
node_results = {}
node_list = cfg.GetNodeList()
node_data = rpc.call_node_info(node_list, cfg.GetVGName())
for nname in node_list:
ninfo = cfg.GetNodeInfo(nname)
if nname not in node_data or not isinstance(node_data[nname], dict):
raise errors.OpExecError("Can't get data for node %s" % nname)
remote_info = node_data[nname]
for attr in ['memory_total', 'memory_free',
'vg_size', 'vg_free']:
if attr not in remote_info:
raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
(nname, attr))
try:
int(remote_info[attr])
except ValueError, err:
raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
" %s" % (nname, attr, str(err)))
pnr = {
"tags": list(ninfo.GetTags()),
"total_memory": utils.TryConvert(int, remote_info['memory_total']),
"free_memory": utils.TryConvert(int, remote_info['memory_free']),
"total_disk": utils.TryConvert(int, remote_info['vg_size']),
"free_disk": utils.TryConvert(int, remote_info['vg_free']),
"primary_ip": ninfo.primary_ip,
"secondary_ip": ninfo.secondary_ip,
}
node_results[nname] = pnr
data["nodes"] = node_results
# instance data
instance_data = {}
i_list = cfg.GetInstanceList()
for iname in i_list:
iinfo = cfg.GetInstanceInfo(iname)
nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
for n in iinfo.nics]
pir = {
"tags": list(iinfo.GetTags()),
"should_run": iinfo.status == "up",
"vcpus": iinfo.vcpus,
"memory": iinfo.memory,
"os": iinfo.os,
"nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
"nics": nic_data,
"disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
"disk_template": iinfo.disk_template,
}
instance_data[iname] = pir
data["instances"] = instance_data
return data
def _AllocatorAddNewInstance(data, op):
"""Add new instance data to allocator structure.
This in combination with _AllocatorGetClusterData will create the
correct structure needed as input for the allocator.
The checks for the completeness of the opcode must have already been
done.
"""
request = {
"type": "allocate",
"name": op.name,
"disk_template": op.disk_template,
"tags": op.tags,
"os": op.os,
"vcpus": op.vcpus,
"memory": op.mem_size,
"disks": op.disks,
"nics": op.nics,
}
data["request"] = request
def _AllocatorAddRelocateInstance(data, op):
"""Add relocate instance data to allocator structure.
This in combination with _AllocatorGetClusterData will create the
correct structure needed as input for the allocator.
The checks for the completeness of the opcode must have already been
done.
"""
request = {
"type": "replace_secondary",
"name": op.name,
}
data["request"] = request
class LUTestAllocator(NoHooksLU):
"""Run allocator tests.
This LU runs the allocator tests
"""
_OP_REQP = ["direction", "mode", "name"]
def CheckPrereq(self):
"""Check prerequisites.
This checks the opcode parameters depending on the director and mode test.
"""
if self.op.mode == constants.ALF_MODE_ALLOC:
for attr in ["name", "mem_size", "disks", "disk_template",
"os", "tags", "nics", "vcpus"]:
if not hasattr(self.op, attr):
raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
attr)
iname = self.cfg.ExpandInstanceName(self.op.name)
if iname is not None:
raise errors.OpPrereqError("Instance '%s' already in the cluster" %
iname)
if not isinstance(self.op.nics, list):
raise errors.OpPrereqError("Invalid parameter 'nics'")
for row in self.op.nics:
if (not isinstance(row, dict) or
"mac" not in row or
"ip" not in row or
"bridge" not in row):
raise errors.OpPrereqError("Invalid contents of the"
" 'nics' parameter")
if not isinstance(self.op.disks, list):
raise errors.OpPrereqError("Invalid parameter 'disks'")
for row in self.op.disks:
if (not isinstance(row, dict) or
"size" not in row or
not isinstance(row["size"], int) or
"mode" not in row or
row["mode"] not in ['r', 'w']):
raise errors.OpPrereqError("Invalid contents of the"
" 'disks' parameter")
elif self.op.mode == constants.ALF_MODE_RELOC:
if not hasattr(self.op, "name"):
raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
fname = self.cfg.ExpandInstanceName(self.op.name)
if fname is None:
raise errors.OpPrereqError("Instance '%s' not found for relocation" %
self.op.name)
self.op.name = fname
else:
raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
self.op.mode)
if self.op.direction == constants.ALF_DIR_OUT:
if not hasattr(self.op, "allocator"):
raise errors.OpPrereqError("Missing allocator name")
raise errors.OpPrereqError("Allocator out mode not supported yet")
elif self.op.direction != constants.ALF_DIR_IN:
raise errors.OpPrereqError("Wrong allocator test '%s'" %
self.op.direction)
def Exec(self, feedback_fn):
"""Run the allocator test.
"""
data = _AllocatorGetClusterData(self.cfg, self.sstore)
if self.op.mode == constants.ALF_MODE_ALLOC:
_AllocatorAddNewInstance(data, self.op)
else:
_AllocatorAddRelocateInstance(data, self.op)
if _JSON_INDENT is None:
text = simplejson.dumps(data)
else:
text = simplejson.dumps(data, indent=_JSON_INDENT)
return text
...@@ -184,3 +184,8 @@ VNC_PASSWORD_FILE = _autoconf.SYSCONFDIR + "/ganeti/vnc-cluster-password" ...@@ -184,3 +184,8 @@ VNC_PASSWORD_FILE = _autoconf.SYSCONFDIR + "/ganeti/vnc-cluster-password"
VERIFY_NPLUSONE_MEM = 'nplusone_mem' VERIFY_NPLUSONE_MEM = 'nplusone_mem'
VERIFY_OPTIONAL_CHECKS = frozenset([VERIFY_NPLUSONE_MEM]) VERIFY_OPTIONAL_CHECKS = frozenset([VERIFY_NPLUSONE_MEM])
# Allocator framework constants
ALF_DIR_IN = "in"
ALF_DIR_OUT = "out"
ALF_MODE_ALLOC = "allocate"
ALF_MODE_RELOC = "relocate"
...@@ -87,6 +87,7 @@ class Processor(object): ...@@ -87,6 +87,7 @@ class Processor(object):
opcodes.OpDelTags: cmdlib.LUDelTags, opcodes.OpDelTags: cmdlib.LUDelTags,
# test lu # test lu
opcodes.OpTestDelay: cmdlib.LUTestDelay, opcodes.OpTestDelay: cmdlib.LUTestDelay,
opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
} }
def __init__(self, feedback=None): def __init__(self, feedback=None):
......
...@@ -444,3 +444,22 @@ class OpTestDelay(OpCode): ...@@ -444,3 +444,22 @@ class OpTestDelay(OpCode):
""" """
OP_ID = "OP_TEST_DELAY" OP_ID = "OP_TEST_DELAY"
__slots__ = ["duration", "on_master", "on_nodes"] __slots__ = ["duration", "on_master", "on_nodes"]
class OpTestAllocator(OpCode):
"""Allocator framework testing.
This opcode has two modes:
- gather and return allocator input for a given mode (allocate new
or replace secondary) and a given instance definition (direction
'in')
- run a selected allocator for a given operation (as above) and
return the allocator output (direction 'out')
"""
OP_ID = "OP_TEST_ALLOCATOR"
__slots__ = [
"direction", "mode", "allocator", "name",
"mem_size", "disks", "disk_template",
"os", "tags", "nics", "vcpus",
]
...@@ -97,6 +97,47 @@ def GenericOpCodes(opts, args): ...@@ -97,6 +97,47 @@ def GenericOpCodes(opts, args):
return 0 return 0
def TestAllocator(opts, args):
"""Runs the test allocator opcode"""
try:
disks = [{"size": utils.ParseUnit(val), "mode": 'w'}
for val in opts.disks.split(",")]
except errors.UnitParseError, err:
print >> sys.stderr, "Invalid disks parameter '%s': %s" % (opts.disks, err)
return 1
nics = [val.split("/") for val in opts.nics.split(",")]
for row in nics:
while len(row) < 3:
row.append(None)
for i in range(3):
if row[i] == '':
row[i] = None
nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics]
if opts.tags is None:
opts.tags = []
else:
opts.tags = opts.tags.split(",")
op = opcodes.OpTestAllocator(mode=opts.mode,
name=args[0],
mem_size=opts.mem,
disks=disks,
disk_template=opts.disk_template,
nics=nic_dict,
os=opts.os_type,
vcpus=opts.vcpus,
tags=opts.tags,
direction=opts.direction,
allocator=opts.allocator,
)
result = SubmitOpCode(op)
print result
return 0
commands = { commands = {
'delay': (Delay, ARGS_ONE, 'delay': (Delay, ARGS_ONE,
[DEBUG_OPT, [DEBUG_OPT,
...@@ -113,6 +154,37 @@ commands = { ...@@ -113,6 +154,37 @@ commands = {
], ],
"<op_list_file>", "Submits a job built from a json-file" "<op_list_file>", "Submits a job built from a json-file"
" with a list of serialized opcodes"), " with a list of serialized opcodes"),
'allocator': (TestAllocator, ARGS_ONE,
[DEBUG_OPT,
make_option("--dir", dest="direction",
default="in", choices=["in", "out"],
help="Show allocator input (in) or allocator"
" results (out)"),
make_option("--algorithm", dest="allocator",
default=None,
help="Allocator algorithm name"),
make_option("-m", "--mode", default="relocate",
choices=["relocate", "allocate"],
help="Request mode, either allocate or"
"relocate"),
cli_option("--mem", default=128, type="unit",
help="Memory size for the instance (MiB)"),
make_option("--disks", default="4096,4096",
help="Comma separated list of disk sizes (MiB)"),
make_option("-t", "--disk-template", default="drbd",
help="Select the disk template"),
make_option("--nics", default="00:11:22:33:44:55",
help="Comma separated list of nics, each nic"
" definition is of form mac/ip/bridge, if"
" missing values are replace by None"),
make_option("-o", "--os-type", default=None,
help="Select os for the instance"),
make_option("-p", "--vcpus", default=1, type="int",
help="Select number of VCPUs for the instance"),
make_option("--tags", default=None,
help="Comma separated list of tags"),
],
"{opts...} <instance>", "Executes a TestAllocator OpCode"),
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment