diff --git a/lib/cmdlib.py b/lib/cmdlib.py index a14621d5d56d18c09aeaf008d89f5aa6414db9bb..37fcca58d430366b6d149c697a28adad2e477c09 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -36,6 +36,9 @@ import platform import logging import copy import OpenSSL +import socket +import tempfile +import shutil from ganeti import ssh from ganeti import utils @@ -9848,6 +9851,145 @@ class LUTestDelay(NoHooksLU): self._TestDelay() +class LUTestJobqueue(NoHooksLU): + """Utility LU to test some aspects of the job queue. + + """ + _OP_PARAMS = [ + ("notify_waitlock", False, _TBool), + ("notify_exec", False, _TBool), + ("log_messages", _EmptyList, _TListOf(_TString)), + ("fail", False, _TBool), + ] + REQ_BGL = False + + # Must be lower than default timeout for WaitForJobChange to see whether it + # notices changed jobs + _CLIENT_CONNECT_TIMEOUT = 20.0 + _CLIENT_CONFIRM_TIMEOUT = 60.0 + + @classmethod + def _NotifyUsingSocket(cls, cb, errcls): + """Opens a Unix socket and waits for another program to connect. + + @type cb: callable + @param cb: Callback to send socket name to client + @type errcls: class + @param errcls: Exception class to use for errors + + """ + # Using a temporary directory as there's no easy way to create temporary + # sockets without writing a custom loop around tempfile.mktemp and + # socket.bind + tmpdir = tempfile.mkdtemp() + try: + tmpsock = utils.PathJoin(tmpdir, "sock") + + logging.debug("Creating temporary socket at %s", tmpsock) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.bind(tmpsock) + sock.listen(1) + + # Send details to client + cb(tmpsock) + + # Wait for client to connect before continuing + sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) + try: + (conn, _) = sock.accept() + except socket.error, err: + raise errcls("Client didn't connect in time (%s)" % err) + finally: + sock.close() + finally: + # Remove as soon as client is connected + shutil.rmtree(tmpdir) + + # Wait for client to close + try: + try: + conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) + conn.recv(1) + except socket.error, err: + raise errcls("Client failed to confirm notification (%s)" % err) + finally: + conn.close() + + def _SendNotification(self, test, arg, sockname): + """Sends a notification to the client. + + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + @type sockname: string + @param sockname: Socket path + + """ + self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg)) + + def _Notify(self, prereq, test, arg): + """Notifies the client of a test. + + @type prereq: bool + @param prereq: Whether this is a prereq-phase test + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + + """ + if prereq: + errcls = errors.OpPrereqError + else: + errcls = errors.OpExecError + + return self._NotifyUsingSocket(compat.partial(self._SendNotification, + test, arg), + errcls) + + def CheckArguments(self): + self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 + self.expandnames_calls = 0 + + def ExpandNames(self): + checkargs_calls = getattr(self, "checkargs_calls", 0) + if checkargs_calls < 1: + raise errors.ProgrammerError("CheckArguments was not called") + + self.expandnames_calls += 1 + + if self.op.notify_waitlock: + self._Notify(True, constants.JQT_EXPANDNAMES, None) + + self.LogInfo("Expanding names") + + # Get lock on master node (just to get a lock, not for a particular reason) + self.needed_locks = { + locking.LEVEL_NODE: self.cfg.GetMasterNode(), + } + + def Exec(self, feedback_fn): + if self.expandnames_calls < 1: + raise errors.ProgrammerError("ExpandNames was not called") + + if self.op.notify_exec: + self._Notify(False, constants.JQT_EXEC, None) + + self.LogInfo("Executing") + + if self.op.log_messages: + for idx, msg in enumerate(self.op.log_messages): + self.LogInfo("Sending log message %s", idx + 1) + feedback_fn(constants.JQT_MSGPREFIX + msg) + # Report how many test messages have been sent + self._Notify(False, constants.JQT_LOGMSG, idx + 1) + + if self.op.fail: + raise errors.OpExecError("Opcode failure was requested") + + return True + + class IAllocator(object): """IAllocator framework. diff --git a/lib/constants.py b/lib/constants.py index 9dfa92ed185bc3b386b0bff4c7f55e22380e88cc..0a4687b8cee0dc3d8da2fd3e69c957917a13df0a 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -809,6 +809,18 @@ OPS_FINALIZED = frozenset([OP_STATUS_CANCELED, ELOG_MESSAGE = "message" ELOG_PROGRESS = "progress" ELOG_REMOTE_IMPORT = "remote-import" +ELOG_JQUEUE_TEST = "jqueue-test" + +# Job queue test +JQT_MSGPREFIX = "TESTMSG=" +JQT_EXPANDNAMES = "expandnames" +JQT_EXEC = "exec" +JQT_LOGMSG = "logmsg" +JQT_ALL = frozenset([ + JQT_EXPANDNAMES, + JQT_EXEC, + JQT_LOGMSG, + ]) # max dynamic devices MAX_NICS = 8 diff --git a/lib/mcpu.py b/lib/mcpu.py index e5c7922a4edd74f7a8bd5ca9edd33029bef587c1..4039d75da2034815f0ed3447c50fa62e72a16468 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -220,6 +220,7 @@ class Processor(object): # test lu opcodes.OpTestDelay: cmdlib.LUTestDelay, opcodes.OpTestAllocator: cmdlib.LUTestAllocator, + opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue, } def __init__(self, context, ec_id): diff --git a/lib/opcodes.py b/lib/opcodes.py index 0caf9fbf5bd0e9f03f14b9985ac3a5cbaea7f132..aacbbddfaffb3ac2c52482a00c12d075b1b1afb7 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -784,6 +784,19 @@ class OpTestAllocator(OpCode): ] +class OpTestJobqueue(OpCode): + """Utility opcode to test some aspects of the job queue. + + """ + OP_ID = "OP_TEST_JQUEUE" + __slots__ = [ + "notify_waitlock", + "notify_exec", + "log_messages", + "fail", + ] + + OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values() if (isinstance(v, type) and issubclass(v, OpCode) and hasattr(v, "OP_ID"))]) diff --git a/man/gnt-debug.sgml b/man/gnt-debug.sgml index 3f75b629ae40d20120b714c48b038ddfebd6a995..a4929398f22c5e220375ac2ae7384d56a0199ce8 100644 --- a/man/gnt-debug.sgml +++ b/man/gnt-debug.sgml @@ -179,6 +179,19 @@ </refsect2> + <refsect2> + <title>TEST-JOBQUEUE</title> + + <cmdsynopsis> + <command>test-jobqueue</command> + </cmdsynopsis> + + <para> + Executes a few tests on the job queue. This command might generate + failed jobs deliberately. + </para> + </refsect2> + </refsect1> &footer; diff --git a/scripts/gnt-debug b/scripts/gnt-debug index bf9d98d3e98fc424b75983595160d7a6853bcc17..addf349f94905617628229f50bc04da24741f75b 100755 --- a/scripts/gnt-debug +++ b/scripts/gnt-debug @@ -28,9 +28,12 @@ import sys import simplejson import time +import socket +import logging from ganeti.cli import * from ganeti import cli +from ganeti import constants from ganeti import opcodes from ganeti import utils from ganeti import errors @@ -155,6 +158,154 @@ def TestAllocator(opts, args): return 0 +class _JobQueueTestReporter(cli.StdioJobPollReportCb): + def __init__(self): + """Initializes this class. + + """ + cli.StdioJobPollReportCb.__init__(self) + self._testmsgs = [] + self._job_id = None + + def GetTestMessages(self): + """Returns all test log messages received so far. + + """ + return self._testmsgs + + def GetJobId(self): + """Returns the job ID. + + """ + return self._job_id + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + """Handles a log message. + + """ + if self._job_id is None: + self._job_id = job_id + elif self._job_id != job_id: + raise errors.ProgrammerError("The same reporter instance was used for" + " more than one job") + + if log_type == constants.ELOG_JQUEUE_TEST: + (sockname, test, arg) = log_msg + return self._ProcessTestMessage(job_id, sockname, test, arg) + + elif (log_type == constants.ELOG_MESSAGE and + log_msg.startswith(constants.JQT_MSGPREFIX)): + self._testmsgs.append(log_msg[len(constants.JQT_MSGPREFIX):]) + return + + return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial, + timestamp, log_type, + log_msg) + + def _ProcessTestMessage(self, job_id, sockname, test, arg): + """Handles a job queue test message. + + """ + if test not in constants.JQT_ALL: + raise errors.OpExecError("Received invalid test message %s" % test) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.settimeout(30.0) + + logging.debug("Connecting to %s", sockname) + sock.connect(sockname) + + logging.debug("Checking status") + jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0] + if not jobdetails: + raise errors.OpExecError("Can't find job %s" % job_id) + + status = jobdetails[0] + + logging.debug("Status of job %s is %s", job_id, status) + + if test == constants.JQT_EXPANDNAMES: + if status != constants.JOB_STATUS_WAITLOCK: + raise errors.OpExecError("Job status while expanding names is '%s'," + " not '%s' as expected" % + (status, constants.JOB_STATUS_WAITLOCK)) + elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG): + if status != constants.JOB_STATUS_RUNNING: + raise errors.OpExecError("Job status while executing opcode is '%s'," + " not '%s' as expected" % + (status, constants.JOB_STATUS_RUNNING)) + + if test == constants.JQT_LOGMSG: + if len(self._testmsgs) != arg: + raise errors.OpExecError("Received %s test messages when %s are" + " expected" % (len(self._testmsgs), arg)) + finally: + logging.debug("Closing socket") + sock.close() + + +def TestJobqueue(opts, _): + """Runs a few tests on the job queue. + + """ + test_messages = [ + "Hello World", + "A", + "", + "B" + "Foo|bar|baz", + utils.TimestampForFilename(), + ] + + for fail in [False, True]: + if fail: + ToStdout("Testing job failure") + else: + ToStdout("Testing job success") + + op = opcodes.OpTestJobqueue(notify_waitlock=True, + notify_exec=True, + log_messages=test_messages, + fail=fail) + + reporter = _JobQueueTestReporter() + try: + SubmitOpCode(op, reporter=reporter, opts=opts) + except errors.OpExecError, err: + if not fail: + raise + # Ignore error + else: + if fail: + raise errors.OpExecError("Job didn't fail when it should") + + # Check received log messages + if reporter.GetTestMessages() != test_messages: + raise errors.OpExecError("Received test messages don't match input" + " (input %r, received %r)" % + (test_messages, reporter.GetTestMessages())) + + # Check final status + job_id = reporter.GetJobId() + + jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0] + if not jobdetails: + raise errors.OpExecError("Can't find job %s" % job_id) + + if fail: + exp_status = constants.JOB_STATUS_ERROR + else: + exp_status = constants.JOB_STATUS_SUCCESS + + final_status = jobdetails[0] + if final_status != exp_status: + raise errors.OpExecError("Final job status is %s, not %s as expected" % + (final_status, exp_status)) + + return 0 + + commands = { 'delay': ( Delay, [ArgUnknown(min=1, max=1)], @@ -206,6 +357,9 @@ commands = { help="Comma separated list of tags"), ], "{opts...} <instance>", "Executes a TestAllocator OpCode"), + "test-jobqueue": ( + TestJobqueue, ARGS_NONE, [], + "", "Test a few aspects of the job queue") } diff --git a/test/ganeti.cmdlib_unittest.py b/test/ganeti.cmdlib_unittest.py index 067eba31b78a9ef0023507338cd95951cf280605..2dc5b5fa19ab1b8bc409897b4e45babd4bd71120 100755 --- a/test/ganeti.cmdlib_unittest.py +++ b/test/ganeti.cmdlib_unittest.py @@ -33,6 +33,7 @@ from ganeti import cmdlib from ganeti import opcodes from ganeti import errors from ganeti import utils +from ganeti import luxi import testutils import mocks @@ -147,5 +148,13 @@ class TestIAllocatorChecks(testutils.GanetiTestCase): self.assertRaises(errors.OpPrereqError, c_i) +class TestLUTestJobqueue(unittest.TestCase): + def test(self): + self.assert_(cmdlib.LUTestJobqueue._CLIENT_CONNECT_TIMEOUT < + (luxi.WFJC_TIMEOUT * 0.75), + msg=("Client timeout too high, might not notice bugs" + " in WaitForJobChange")) + + if __name__ == "__main__": testutils.GanetiTestProgram()