Commit e58f87a9 authored by Michael Hanselmann's avatar Michael Hanselmann

Add test for some aspects of job queue

This new opcode and gnt-debug sub-command test some aspects of the
job queue, including the status of a job. The bug fixed in commit
2034c70d was identified using this test. A future patch will
run this test automatically from the QA scripts.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 9dd6889b
......@@ -36,6 +36,9 @@ import platform
import logging
import copy
import OpenSSL
import socket
import tempfile
import shutil
from ganeti import ssh
from ganeti import utils
......@@ -9848,6 +9851,145 @@ class LUTestDelay(NoHooksLU):
self._TestDelay()
class LUTestJobqueue(NoHooksLU):
"""Utility LU to test some aspects of the job queue.
"""
_OP_PARAMS = [
("notify_waitlock", False, _TBool),
("notify_exec", False, _TBool),
("log_messages", _EmptyList, _TListOf(_TString)),
("fail", False, _TBool),
]
REQ_BGL = False
# Must be lower than default timeout for WaitForJobChange to see whether it
# notices changed jobs
_CLIENT_CONNECT_TIMEOUT = 20.0
_CLIENT_CONFIRM_TIMEOUT = 60.0
@classmethod
def _NotifyUsingSocket(cls, cb, errcls):
"""Opens a Unix socket and waits for another program to connect.
@type cb: callable
@param cb: Callback to send socket name to client
@type errcls: class
@param errcls: Exception class to use for errors
"""
# Using a temporary directory as there's no easy way to create temporary
# sockets without writing a custom loop around tempfile.mktemp and
# socket.bind
tmpdir = tempfile.mkdtemp()
try:
tmpsock = utils.PathJoin(tmpdir, "sock")
logging.debug("Creating temporary socket at %s", tmpsock)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.bind(tmpsock)
sock.listen(1)
# Send details to client
cb(tmpsock)
# Wait for client to connect before continuing
sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
try:
(conn, _) = sock.accept()
except socket.error, err:
raise errcls("Client didn't connect in time (%s)" % err)
finally:
sock.close()
finally:
# Remove as soon as client is connected
shutil.rmtree(tmpdir)
# Wait for client to close
try:
try:
conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
conn.recv(1)
except socket.error, err:
raise errcls("Client failed to confirm notification (%s)" % err)
finally:
conn.close()
def _SendNotification(self, test, arg, sockname):
"""Sends a notification to the client.
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
@type sockname: string
@param sockname: Socket path
"""
self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
def _Notify(self, prereq, test, arg):
"""Notifies the client of a test.
@type prereq: bool
@param prereq: Whether this is a prereq-phase test
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
"""
if prereq:
errcls = errors.OpPrereqError
else:
errcls = errors.OpExecError
return self._NotifyUsingSocket(compat.partial(self._SendNotification,
test, arg),
errcls)
def CheckArguments(self):
self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
self.expandnames_calls = 0
def ExpandNames(self):
checkargs_calls = getattr(self, "checkargs_calls", 0)
if checkargs_calls < 1:
raise errors.ProgrammerError("CheckArguments was not called")
self.expandnames_calls += 1
if self.op.notify_waitlock:
self._Notify(True, constants.JQT_EXPANDNAMES, None)
self.LogInfo("Expanding names")
# Get lock on master node (just to get a lock, not for a particular reason)
self.needed_locks = {
locking.LEVEL_NODE: self.cfg.GetMasterNode(),
}
def Exec(self, feedback_fn):
if self.expandnames_calls < 1:
raise errors.ProgrammerError("ExpandNames was not called")
if self.op.notify_exec:
self._Notify(False, constants.JQT_EXEC, None)
self.LogInfo("Executing")
if self.op.log_messages:
for idx, msg in enumerate(self.op.log_messages):
self.LogInfo("Sending log message %s", idx + 1)
feedback_fn(constants.JQT_MSGPREFIX + msg)
# Report how many test messages have been sent
self._Notify(False, constants.JQT_LOGMSG, idx + 1)
if self.op.fail:
raise errors.OpExecError("Opcode failure was requested")
return True
class IAllocator(object):
"""IAllocator framework.
......
......@@ -809,6 +809,18 @@ OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
ELOG_MESSAGE = "message"
ELOG_PROGRESS = "progress"
ELOG_REMOTE_IMPORT = "remote-import"
ELOG_JQUEUE_TEST = "jqueue-test"
# Job queue test
JQT_MSGPREFIX = "TESTMSG="
JQT_EXPANDNAMES = "expandnames"
JQT_EXEC = "exec"
JQT_LOGMSG = "logmsg"
JQT_ALL = frozenset([
JQT_EXPANDNAMES,
JQT_EXEC,
JQT_LOGMSG,
])
# max dynamic devices
MAX_NICS = 8
......
......@@ -220,6 +220,7 @@ class Processor(object):
# test lu
opcodes.OpTestDelay: cmdlib.LUTestDelay,
opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
}
def __init__(self, context, ec_id):
......
......@@ -784,6 +784,19 @@ class OpTestAllocator(OpCode):
]
class OpTestJobqueue(OpCode):
"""Utility opcode to test some aspects of the job queue.
"""
OP_ID = "OP_TEST_JQUEUE"
__slots__ = [
"notify_waitlock",
"notify_exec",
"log_messages",
"fail",
]
OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values()
if (isinstance(v, type) and issubclass(v, OpCode) and
hasattr(v, "OP_ID"))])
......@@ -179,6 +179,19 @@
</refsect2>
<refsect2>
<title>TEST-JOBQUEUE</title>
<cmdsynopsis>
<command>test-jobqueue</command>
</cmdsynopsis>
<para>
Executes a few tests on the job queue. This command might generate
failed jobs deliberately.
</para>
</refsect2>
</refsect1>
&footer;
......
......@@ -28,9 +28,12 @@
import sys
import simplejson
import time
import socket
import logging
from ganeti.cli import *
from ganeti import cli
from ganeti import constants
from ganeti import opcodes
from ganeti import utils
from ganeti import errors
......@@ -155,6 +158,154 @@ def TestAllocator(opts, args):
return 0
class _JobQueueTestReporter(cli.StdioJobPollReportCb):
def __init__(self):
"""Initializes this class.
"""
cli.StdioJobPollReportCb.__init__(self)
self._testmsgs = []
self._job_id = None
def GetTestMessages(self):
"""Returns all test log messages received so far.
"""
return self._testmsgs
def GetJobId(self):
"""Returns the job ID.
"""
return self._job_id
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
"""Handles a log message.
"""
if self._job_id is None:
self._job_id = job_id
elif self._job_id != job_id:
raise errors.ProgrammerError("The same reporter instance was used for"
" more than one job")
if log_type == constants.ELOG_JQUEUE_TEST:
(sockname, test, arg) = log_msg
return self._ProcessTestMessage(job_id, sockname, test, arg)
elif (log_type == constants.ELOG_MESSAGE and
log_msg.startswith(constants.JQT_MSGPREFIX)):
self._testmsgs.append(log_msg[len(constants.JQT_MSGPREFIX):])
return
return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
timestamp, log_type,
log_msg)
def _ProcessTestMessage(self, job_id, sockname, test, arg):
"""Handles a job queue test message.
"""
if test not in constants.JQT_ALL:
raise errors.OpExecError("Received invalid test message %s" % test)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.settimeout(30.0)
logging.debug("Connecting to %s", sockname)
sock.connect(sockname)
logging.debug("Checking status")
jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
if not jobdetails:
raise errors.OpExecError("Can't find job %s" % job_id)
status = jobdetails[0]
logging.debug("Status of job %s is %s", job_id, status)
if test == constants.JQT_EXPANDNAMES:
if status != constants.JOB_STATUS_WAITLOCK:
raise errors.OpExecError("Job status while expanding names is '%s',"
" not '%s' as expected" %
(status, constants.JOB_STATUS_WAITLOCK))
elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
if status != constants.JOB_STATUS_RUNNING:
raise errors.OpExecError("Job status while executing opcode is '%s',"
" not '%s' as expected" %
(status, constants.JOB_STATUS_RUNNING))
if test == constants.JQT_LOGMSG:
if len(self._testmsgs) != arg:
raise errors.OpExecError("Received %s test messages when %s are"
" expected" % (len(self._testmsgs), arg))
finally:
logging.debug("Closing socket")
sock.close()
def TestJobqueue(opts, _):
"""Runs a few tests on the job queue.
"""
test_messages = [
"Hello World",
"A",
"",
"B"
"Foo|bar|baz",
utils.TimestampForFilename(),
]
for fail in [False, True]:
if fail:
ToStdout("Testing job failure")
else:
ToStdout("Testing job success")
op = opcodes.OpTestJobqueue(notify_waitlock=True,
notify_exec=True,
log_messages=test_messages,
fail=fail)
reporter = _JobQueueTestReporter()
try:
SubmitOpCode(op, reporter=reporter, opts=opts)
except errors.OpExecError, err:
if not fail:
raise
# Ignore error
else:
if fail:
raise errors.OpExecError("Job didn't fail when it should")
# Check received log messages
if reporter.GetTestMessages() != test_messages:
raise errors.OpExecError("Received test messages don't match input"
" (input %r, received %r)" %
(test_messages, reporter.GetTestMessages()))
# Check final status
job_id = reporter.GetJobId()
jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
if not jobdetails:
raise errors.OpExecError("Can't find job %s" % job_id)
if fail:
exp_status = constants.JOB_STATUS_ERROR
else:
exp_status = constants.JOB_STATUS_SUCCESS
final_status = jobdetails[0]
if final_status != exp_status:
raise errors.OpExecError("Final job status is %s, not %s as expected" %
(final_status, exp_status))
return 0
commands = {
'delay': (
Delay, [ArgUnknown(min=1, max=1)],
......@@ -206,6 +357,9 @@ commands = {
help="Comma separated list of tags"),
],
"{opts...} <instance>", "Executes a TestAllocator OpCode"),
"test-jobqueue": (
TestJobqueue, ARGS_NONE, [],
"", "Test a few aspects of the job queue")
}
......
......@@ -33,6 +33,7 @@ from ganeti import cmdlib
from ganeti import opcodes
from ganeti import errors
from ganeti import utils
from ganeti import luxi
import testutils
import mocks
......@@ -147,5 +148,13 @@ class TestIAllocatorChecks(testutils.GanetiTestCase):
self.assertRaises(errors.OpPrereqError, c_i)
class TestLUTestJobqueue(unittest.TestCase):
def test(self):
self.assert_(cmdlib.LUTestJobqueue._CLIENT_CONNECT_TIMEOUT <
(luxi.WFJC_TIMEOUT * 0.75),
msg=("Client timeout too high, might not notice bugs"
" in WaitForJobChange"))
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment