Commit 1d870e0d authored by Thomas Thrainer's avatar Thomas Thrainer

Extract test logical units from cmdlib

LUTest* are moved to test.py.
Signed-off-by: default avatarThomas Thrainer <thomasth@google.com>
Reviewed-by: default avatarBernardo Dal Seno <bdalseno@google.com>
parent 37dc17e3
......@@ -312,7 +312,8 @@ cmdlib_PYTHON = \
lib/cmdlib/common.py \
lib/cmdlib/base.py \
lib/cmdlib/tags.py \
lib/cmdlib/network.py
lib/cmdlib/network.py \
lib/cmdlib/test.py
hypervisor_PYTHON = \
lib/hypervisor/__init__.py \
......
......@@ -34,9 +34,6 @@ import re
import logging
import copy
import OpenSSL
import socket
import tempfile
import shutil
import itertools
import operator
......@@ -66,11 +63,13 @@ from ganeti.masterd import iallocator
from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
Tasklet, _QueryBase
from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \
_ExpandNodeName, _ShareAll, _CheckNodeGroupInstances
_ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
_GetWantedInstances
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
LUNetworkDisconnect
from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
import ganeti.masterd.instance # pylint: disable=W0611
......@@ -214,44 +213,6 @@ def _CopyLockList(names):
return names[:]
def _GetWantedNodes(lu, nodes):
"""Returns list of checked and expanded node names.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type nodes: list
@param nodes: list of node names or None for all nodes
@rtype: list
@return: the list of nodes, sorted
@raise errors.ProgrammerError: if the nodes parameter is wrong type
"""
if nodes:
return [_ExpandNodeName(lu.cfg, name) for name in nodes]
return utils.NiceSort(lu.cfg.GetNodeList())
def _GetWantedInstances(lu, instances):
"""Returns list of checked and expanded instance names.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type instances: list
@param instances: list of instance names or None for all instances
@rtype: list
@return: the list of instances, sorted
@raise errors.OpPrereqError: if the instances parameter is wrong type
@raise errors.OpPrereqError: if any of the passed instances is not found
"""
if instances:
wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
else:
wanted = utils.NiceSort(lu.cfg.GetInstanceList())
return wanted
def _GetUpdatedParams(old_params, update_dict,
use_default=True, use_none=False):
"""Return the new version of a parameter dictionary.
......@@ -15309,54 +15270,6 @@ class LUGroupEvacuate(LogicalUnit):
return ResultWithJobs(jobs)
class LUTestDelay(NoHooksLU):
"""Sleep for a specified amount of time.
This LU sleeps on the master and/or nodes for a specified amount of
time.
"""
REQ_BGL = False
def ExpandNames(self):
"""Expand names and set required locks.
This expands the node list, if any.
"""
self.needed_locks = {}
if self.op.on_nodes:
# _GetWantedNodes can be used here, but is not always appropriate to use
# this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
# more information.
self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
def _TestDelay(self):
"""Do the actual sleep.
"""
if self.op.on_master:
if not utils.TestDelay(self.op.duration):
raise errors.OpExecError("Error during master delay test")
if self.op.on_nodes:
result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
for node, node_result in result.items():
node_result.Raise("Failure during rpc call to node %s" % node)
def Exec(self, feedback_fn):
"""Execute the test delay opcode, with the wanted repetitions.
"""
if self.op.repeat == 0:
self._TestDelay()
else:
top_value = self.op.repeat - 1
for i in range(self.op.repeat):
self.LogInfo("Test delay iteration %d/%d", i, top_value)
self._TestDelay()
class LURestrictedCommand(NoHooksLU):
"""Logical unit for executing restricted commands.
......@@ -15404,255 +15317,6 @@ class LURestrictedCommand(NoHooksLU):
return result
class LUTestJqueue(NoHooksLU):
"""Utility LU to test some aspects of the job queue.
"""
REQ_BGL = False
# Must be lower than default timeout for WaitForJobChange to see whether it
# notices changed jobs
_CLIENT_CONNECT_TIMEOUT = 20.0
_CLIENT_CONFIRM_TIMEOUT = 60.0
@classmethod
def _NotifyUsingSocket(cls, cb, errcls):
"""Opens a Unix socket and waits for another program to connect.
@type cb: callable
@param cb: Callback to send socket name to client
@type errcls: class
@param errcls: Exception class to use for errors
"""
# Using a temporary directory as there's no easy way to create temporary
# sockets without writing a custom loop around tempfile.mktemp and
# socket.bind
tmpdir = tempfile.mkdtemp()
try:
tmpsock = utils.PathJoin(tmpdir, "sock")
logging.debug("Creating temporary socket at %s", tmpsock)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.bind(tmpsock)
sock.listen(1)
# Send details to client
cb(tmpsock)
# Wait for client to connect before continuing
sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
try:
(conn, _) = sock.accept()
except socket.error, err:
raise errcls("Client didn't connect in time (%s)" % err)
finally:
sock.close()
finally:
# Remove as soon as client is connected
shutil.rmtree(tmpdir)
# Wait for client to close
try:
try:
# pylint: disable=E1101
# Instance of '_socketobject' has no ... member
conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
conn.recv(1)
except socket.error, err:
raise errcls("Client failed to confirm notification (%s)" % err)
finally:
conn.close()
def _SendNotification(self, test, arg, sockname):
"""Sends a notification to the client.
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
@type sockname: string
@param sockname: Socket path
"""
self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
def _Notify(self, prereq, test, arg):
"""Notifies the client of a test.
@type prereq: bool
@param prereq: Whether this is a prereq-phase test
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
"""
if prereq:
errcls = errors.OpPrereqError
else:
errcls = errors.OpExecError
return self._NotifyUsingSocket(compat.partial(self._SendNotification,
test, arg),
errcls)
def CheckArguments(self):
self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
self.expandnames_calls = 0
def ExpandNames(self):
checkargs_calls = getattr(self, "checkargs_calls", 0)
if checkargs_calls < 1:
raise errors.ProgrammerError("CheckArguments was not called")
self.expandnames_calls += 1
if self.op.notify_waitlock:
self._Notify(True, constants.JQT_EXPANDNAMES, None)
self.LogInfo("Expanding names")
# Get lock on master node (just to get a lock, not for a particular reason)
self.needed_locks = {
locking.LEVEL_NODE: self.cfg.GetMasterNode(),
}
def Exec(self, feedback_fn):
if self.expandnames_calls < 1:
raise errors.ProgrammerError("ExpandNames was not called")
if self.op.notify_exec:
self._Notify(False, constants.JQT_EXEC, None)
self.LogInfo("Executing")
if self.op.log_messages:
self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
for idx, msg in enumerate(self.op.log_messages):
self.LogInfo("Sending log message %s", idx + 1)
feedback_fn(constants.JQT_MSGPREFIX + msg)
# Report how many test messages have been sent
self._Notify(False, constants.JQT_LOGMSG, idx + 1)
if self.op.fail:
raise errors.OpExecError("Opcode failure was requested")
return True
class LUTestAllocator(NoHooksLU):
"""Run allocator tests.
This LU runs the allocator tests
"""
def CheckPrereq(self):
"""Check prerequisites.
This checks the opcode parameters depending on the director and mode test.
"""
if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
constants.IALLOCATOR_MODE_MULTI_ALLOC):
for attr in ["memory", "disks", "disk_template",
"os", "tags", "nics", "vcpus"]:
if not hasattr(self.op, attr):
raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
attr, errors.ECODE_INVAL)
iname = self.cfg.ExpandInstanceName(self.op.name)
if iname is not None:
raise errors.OpPrereqError("Instance '%s' already in the cluster" %
iname, errors.ECODE_EXISTS)
if not isinstance(self.op.nics, list):
raise errors.OpPrereqError("Invalid parameter 'nics'",
errors.ECODE_INVAL)
if not isinstance(self.op.disks, list):
raise errors.OpPrereqError("Invalid parameter 'disks'",
errors.ECODE_INVAL)
for row in self.op.disks:
if (not isinstance(row, dict) or
constants.IDISK_SIZE not in row or
not isinstance(row[constants.IDISK_SIZE], int) or
constants.IDISK_MODE not in row or
row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
raise errors.OpPrereqError("Invalid contents of the 'disks'"
" parameter", errors.ECODE_INVAL)
if self.op.hypervisor is None:
self.op.hypervisor = self.cfg.GetHypervisorType()
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
fname = _ExpandInstanceName(self.cfg, self.op.name)
self.op.name = fname
self.relocate_from = \
list(self.cfg.GetInstanceInfo(fname).secondary_nodes)
elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
constants.IALLOCATOR_MODE_NODE_EVAC):
if not self.op.instances:
raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
self.op.instances = _GetWantedInstances(self, self.op.instances)
else:
raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
if self.op.direction == constants.IALLOCATOR_DIR_OUT:
if self.op.iallocator is None:
raise errors.OpPrereqError("Missing allocator name",
errors.ECODE_INVAL)
elif self.op.direction != constants.IALLOCATOR_DIR_IN:
raise errors.OpPrereqError("Wrong allocator test '%s'" %
self.op.direction, errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Run the allocator test.
"""
if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
req = iallocator.IAReqInstanceAlloc(name=self.op.name,
memory=self.op.memory,
disks=self.op.disks,
disk_template=self.op.disk_template,
os=self.op.os,
tags=self.op.tags,
nics=self.op.nics,
vcpus=self.op.vcpus,
spindle_use=self.op.spindle_use,
hypervisor=self.op.hypervisor,
node_whitelist=None)
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
req = iallocator.IAReqRelocate(name=self.op.name,
relocate_from=list(self.relocate_from))
elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
req = iallocator.IAReqGroupChange(instances=self.op.instances,
target_groups=self.op.target_groups)
elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
req = iallocator.IAReqNodeEvac(instances=self.op.instances,
evac_mode=self.op.evac_mode)
elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
disk_template = self.op.disk_template
insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
memory=self.op.memory,
disks=self.op.disks,
disk_template=disk_template,
os=self.op.os,
tags=self.op.tags,
nics=self.op.nics,
vcpus=self.op.vcpus,
spindle_use=self.op.spindle_use,
hypervisor=self.op.hypervisor)
for idx in range(self.op.count)]
req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
else:
raise errors.ProgrammerError("Uncatched mode %s in"
" LUTestAllocator.Exec", self.op.mode)
ial = iallocator.IAllocator(self.cfg, self.rpc, req)
if self.op.direction == constants.IALLOCATOR_DIR_IN:
result = ial.in_text
else:
ial.Run(self.op.iallocator, validate=False)
result = ial.out_text
return result
#: Query type implementations
_QUERY_IMPL = {
constants.QR_CLUSTER: _ClusterQuery,
......
......@@ -82,3 +82,41 @@ def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
errors.ECODE_STATE)
return wanted_instances
def _GetWantedNodes(lu, nodes):
"""Returns list of checked and expanded node names.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type nodes: list
@param nodes: list of node names or None for all nodes
@rtype: list
@return: the list of nodes, sorted
@raise errors.ProgrammerError: if the nodes parameter is wrong type
"""
if nodes:
return [_ExpandNodeName(lu.cfg, name) for name in nodes]
return utils.NiceSort(lu.cfg.GetNodeList())
def _GetWantedInstances(lu, instances):
"""Returns list of checked and expanded instance names.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type instances: list
@param instances: list of instance names or None for all instances
@rtype: list
@return: the list of instances, sorted
@raise errors.OpPrereqError: if the instances parameter is wrong type
@raise errors.OpPrereqError: if any of the passed instances is not found
"""
if instances:
wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
else:
wanted = utils.NiceSort(lu.cfg.GetInstanceList())
return wanted
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Test logical units."""
import logging
import shutil
import socket
import tempfile
from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import locking
from ganeti import utils
from ganeti.masterd import iallocator
from ganeti.cmdlib.base import NoHooksLU
from ganeti.cmdlib.common import _ExpandInstanceName, _GetWantedNodes, \
_GetWantedInstances
class LUTestDelay(NoHooksLU):
"""Sleep for a specified amount of time.
This LU sleeps on the master and/or nodes for a specified amount of
time.
"""
REQ_BGL = False
def ExpandNames(self):
"""Expand names and set required locks.
This expands the node list, if any.
"""
self.needed_locks = {}
if self.op.on_nodes:
# _GetWantedNodes can be used here, but is not always appropriate to use
# this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
# more information.
self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
def _TestDelay(self):
"""Do the actual sleep.
"""
if self.op.on_master:
if not utils.TestDelay(self.op.duration):
raise errors.OpExecError("Error during master delay test")
if self.op.on_nodes:
result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
for node, node_result in result.items():
node_result.Raise("Failure during rpc call to node %s" % node)
def Exec(self, feedback_fn):
"""Execute the test delay opcode, with the wanted repetitions.
"""
if self.op.repeat == 0:
self._TestDelay()
else:
top_value = self.op.repeat - 1
for i in range(self.op.repeat):
self.LogInfo("Test delay iteration %d/%d", i, top_value)
self._TestDelay()
class LUTestJqueue(NoHooksLU):
"""Utility LU to test some aspects of the job queue.
"""
REQ_BGL = False
# Must be lower than default timeout for WaitForJobChange to see whether it
# notices changed jobs
_CLIENT_CONNECT_TIMEOUT = 20.0
_CLIENT_CONFIRM_TIMEOUT = 60.0
@classmethod
def _NotifyUsingSocket(cls, cb, errcls):
"""Opens a Unix socket and waits for another program to connect.
@type cb: callable
@param cb: Callback to send socket name to client
@type errcls: class
@param errcls: Exception class to use for errors
"""
# Using a temporary directory as there's no easy way to create temporary
# sockets without writing a custom loop around tempfile.mktemp and
# socket.bind
tmpdir = tempfile.mkdtemp()
try:
tmpsock = utils.PathJoin(tmpdir, "sock")
logging.debug("Creating temporary socket at %s", tmpsock)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.bind(tmpsock)
sock.listen(1)
# Send details to client
cb(tmpsock)
# Wait for client to connect before continuing
sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
try:
(conn, _) = sock.accept()
except socket.error, err:
raise errcls("Client didn't connect in time (%s)" % err)
finally:
sock.close()
finally:
# Remove as soon as client is connected
shutil.rmtree(tmpdir)
# Wait for client to close
try:
try:
# pylint: disable=E1101
# Instance of '_socketobject' has no ... member
conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
conn.recv(1)
except socket.error, err:
raise errcls("Client failed to confirm notification (%s)" % err)
finally:
conn.close()
def _SendNotification(self, test, arg, sockname):
"""Sends a notification to the client.
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
@type sockname: string
@param sockname: Socket path
"""
self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
def _Notify(self, prereq, test, arg):
"""Notifies the client of a test.
@type prereq: bool
@param prereq: Whether this is a prereq-phase test
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
"""
if prereq:
errcls = errors.OpPrereqError
else:
errcls = errors.OpExecError
return self._NotifyUsingSocket(compat.partial(self._SendNotification,
test, arg),
errcls)
def CheckArguments(self):
self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
self.expandnames_calls = 0
def ExpandNames(self):
checkargs_calls = getattr(self, "checkargs_calls", 0)
if checkargs_calls < 1:
raise errors.ProgrammerError("CheckArguments was not called")
self.expandnames_calls += 1
if self.op.notify_waitlock:
self._Notify(True, constants.JQT_EXPANDNAMES, None)
self.LogInfo("Expanding names")
# Get lock on master node (just to get a lock, not for a particular reason)
self.needed_locks = {
locking.LEVEL_NODE: self.cfg.GetMasterNode(),
}
def Exec(self, feedback_fn):
if self.expandnames_calls < 1:
raise errors.ProgrammerError("ExpandNames was not called")
if self.op.notify_exec:
self._Notify(False, constants.JQT_EXEC, None)
self.LogInfo("Executing")
if self.op.log_messages:
self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))