From e58f87a958c6f56c4e4ff185606ef2beeaff0649 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <>
Date: Thu, 15 Jul 2010 18:23:17 +0200
Subject: [PATCH] Add test for some aspects of job queue

This new opcode and gnt-debug sub-command test some aspects of the
job queue, including the status of a job. The bug fixed in commit
2034c70d507 was identified using this test. A future patch will
run this test automatically from the QA scripts.

Signed-off-by: Michael Hanselmann <>
Reviewed-by: Iustin Pop <>
 lib/                  | 142 ++++++++++++++++++++++++++++++
 lib/               |  12 +++
 lib/                    |   1 +
 lib/                 |  13 +++
 man/gnt-debug.sgml             |  13 +++
 scripts/gnt-debug              | 154 +++++++++++++++++++++++++++++++++
 test/ |   9 ++
 7 files changed, 344 insertions(+)

diff --git a/lib/ b/lib/
index a14621d5d..37fcca58d 100644
--- a/lib/
+++ b/lib/
@@ -36,6 +36,9 @@ import platform
 import logging
 import copy
 import OpenSSL
+import socket
+import tempfile
+import shutil
 from ganeti import ssh
 from ganeti import utils
@@ -9848,6 +9851,145 @@ class LUTestDelay(NoHooksLU):
+class LUTestJobqueue(NoHooksLU):
+  """Utility LU to test some aspects of the job queue.
+  """
+  _OP_PARAMS = [
+    ("notify_waitlock", False, _TBool),
+    ("notify_exec", False, _TBool),
+    ("log_messages", _EmptyList, _TListOf(_TString)),
+    ("fail", False, _TBool),
+    ]
+  REQ_BGL = False
+  # Must be lower than default timeout for WaitForJobChange to see whether it
+  # notices changed jobs
+  @classmethod
+  def _NotifyUsingSocket(cls, cb, errcls):
+    """Opens a Unix socket and waits for another program to connect.
+    @type cb: callable
+    @param cb: Callback to send socket name to client
+    @type errcls: class
+    @param errcls: Exception class to use for errors
+    """
+    # Using a temporary directory as there's no easy way to create temporary
+    # sockets without writing a custom loop around tempfile.mktemp and
+    # socket.bind
+    tmpdir = tempfile.mkdtemp()
+    try:
+      tmpsock = utils.PathJoin(tmpdir, "sock")
+      logging.debug("Creating temporary socket at %s", tmpsock)
+      sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+      try:
+        sock.bind(tmpsock)
+        sock.listen(1)
+        # Send details to client
+        cb(tmpsock)
+        # Wait for client to connect before continuing
+        sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
+        try:
+          (conn, _) = sock.accept()
+        except socket.error, err:
+          raise errcls("Client didn't connect in time (%s)" % err)
+      finally:
+        sock.close()
+    finally:
+      # Remove as soon as client is connected
+      shutil.rmtree(tmpdir)
+    # Wait for client to close
+    try:
+      try:
+        conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
+        conn.recv(1)
+      except socket.error, err:
+        raise errcls("Client failed to confirm notification (%s)" % err)
+    finally:
+      conn.close()
+  def _SendNotification(self, test, arg, sockname):
+    """Sends a notification to the client.
+    @type test: string
+    @param test: Test name
+    @param arg: Test argument (depends on test)
+    @type sockname: string
+    @param sockname: Socket path
+    """
+    self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
+  def _Notify(self, prereq, test, arg):
+    """Notifies the client of a test.
+    @type prereq: bool
+    @param prereq: Whether this is a prereq-phase test
+    @type test: string
+    @param test: Test name
+    @param arg: Test argument (depends on test)
+    """
+    if prereq:
+      errcls = errors.OpPrereqError
+    else:
+      errcls = errors.OpExecError
+    return self._NotifyUsingSocket(compat.partial(self._SendNotification,
+                                                  test, arg),
+                                   errcls)
+  def CheckArguments(self):
+    self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
+    self.expandnames_calls = 0
+  def ExpandNames(self):
+    checkargs_calls = getattr(self, "checkargs_calls", 0)
+    if checkargs_calls < 1:
+      raise errors.ProgrammerError("CheckArguments was not called")
+    self.expandnames_calls += 1
+    if self.op.notify_waitlock:
+      self._Notify(True, constants.JQT_EXPANDNAMES, None)
+    self.LogInfo("Expanding names")
+    # Get lock on master node (just to get a lock, not for a particular reason)
+    self.needed_locks = {
+      locking.LEVEL_NODE: self.cfg.GetMasterNode(),
+      }
+  def Exec(self, feedback_fn):
+    if self.expandnames_calls < 1:
+      raise errors.ProgrammerError("ExpandNames was not called")
+    if self.op.notify_exec:
+      self._Notify(False, constants.JQT_EXEC, None)
+    self.LogInfo("Executing")
+    if self.op.log_messages:
+      for idx, msg in enumerate(self.op.log_messages):
+        self.LogInfo("Sending log message %s", idx + 1)
+        feedback_fn(constants.JQT_MSGPREFIX + msg)
+        # Report how many test messages have been sent
+        self._Notify(False, constants.JQT_LOGMSG, idx + 1)
+    if
+      raise errors.OpExecError("Opcode failure was requested")
+    return True
 class IAllocator(object):
   """IAllocator framework.
diff --git a/lib/ b/lib/
index 9dfa92ed1..0a4687b8c 100644
--- a/lib/
+++ b/lib/
@@ -809,6 +809,18 @@ OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
 ELOG_MESSAGE = "message"
 ELOG_PROGRESS = "progress"
 ELOG_REMOTE_IMPORT = "remote-import"
+ELOG_JQUEUE_TEST = "jqueue-test"
+# Job queue test
+JQT_EXPANDNAMES = "expandnames"
+JQT_EXEC = "exec"
+JQT_LOGMSG = "logmsg"
+JQT_ALL = frozenset([
+  ])
 # max dynamic devices
diff --git a/lib/ b/lib/
index e5c7922a4..4039d75da 100644
--- a/lib/
+++ b/lib/
@@ -220,6 +220,7 @@ class Processor(object):
     # test lu
     opcodes.OpTestDelay: cmdlib.LUTestDelay,
     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
+    opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
   def __init__(self, context, ec_id):
diff --git a/lib/ b/lib/
index 0caf9fbf5..aacbbddfa 100644
--- a/lib/
+++ b/lib/
@@ -784,6 +784,19 @@ class OpTestAllocator(OpCode):
+class OpTestJobqueue(OpCode):
+  """Utility opcode to test some aspects of the job queue.
+  """
+  __slots__ = [
+    "notify_waitlock",
+    "notify_exec",
+    "log_messages",
+    "fail",
+    ]
 OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values()
                    if (isinstance(v, type) and issubclass(v, OpCode) and
                        hasattr(v, "OP_ID"))])
diff --git a/man/gnt-debug.sgml b/man/gnt-debug.sgml
index 3f75b629a..a4929398f 100644
--- a/man/gnt-debug.sgml
+++ b/man/gnt-debug.sgml
@@ -179,6 +179,19 @@
+    <refsect2>
+      <title>TEST-JOBQUEUE</title>
+      <cmdsynopsis>
+        <command>test-jobqueue</command>
+      </cmdsynopsis>
+      <para>
+        Executes a few tests on the job queue. This command might generate
+        failed jobs deliberately.
+      </para>
+    </refsect2>
diff --git a/scripts/gnt-debug b/scripts/gnt-debug
index bf9d98d3e..addf349f9 100755
--- a/scripts/gnt-debug
+++ b/scripts/gnt-debug
@@ -28,9 +28,12 @@
 import sys
 import simplejson
 import time
+import socket
+import logging
 from ganeti.cli import *
 from ganeti import cli
+from ganeti import constants
 from ganeti import opcodes
 from ganeti import utils
 from ganeti import errors
@@ -155,6 +158,154 @@ def TestAllocator(opts, args):
   return 0
+class _JobQueueTestReporter(cli.StdioJobPollReportCb):
+  def __init__(self):
+    """Initializes this class.
+    """
+    cli.StdioJobPollReportCb.__init__(self)
+    self._testmsgs = []
+    self._job_id = None
+  def GetTestMessages(self):
+    """Returns all test log messages received so far.
+    """
+    return self._testmsgs
+  def GetJobId(self):
+    """Returns the job ID.
+    """
+    return self._job_id
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+    """
+    if self._job_id is None:
+      self._job_id = job_id
+    elif self._job_id != job_id:
+      raise errors.ProgrammerError("The same reporter instance was used for"
+                                   " more than one job")
+    if log_type == constants.ELOG_JQUEUE_TEST:
+      (sockname, test, arg) = log_msg
+      return self._ProcessTestMessage(job_id, sockname, test, arg)
+    elif (log_type == constants.ELOG_MESSAGE and
+          log_msg.startswith(constants.JQT_MSGPREFIX)):
+      self._testmsgs.append(log_msg[len(constants.JQT_MSGPREFIX):])
+      return
+    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
+                                                     timestamp, log_type,
+                                                     log_msg)
+  def _ProcessTestMessage(self, job_id, sockname, test, arg):
+    """Handles a job queue test message.
+    """
+    if test not in constants.JQT_ALL:
+      raise errors.OpExecError("Received invalid test message %s" % test)
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    try:
+      sock.settimeout(30.0)
+      logging.debug("Connecting to %s", sockname)
+      sock.connect(sockname)
+      logging.debug("Checking status")
+      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
+      if not jobdetails:
+        raise errors.OpExecError("Can't find job %s" % job_id)
+      status = jobdetails[0]
+      logging.debug("Status of job %s is %s", job_id, status)
+      if test == constants.JQT_EXPANDNAMES:
+        if status != constants.JOB_STATUS_WAITLOCK:
+          raise errors.OpExecError("Job status while expanding names is '%s',"
+                                   " not '%s' as expected" %
+                                   (status, constants.JOB_STATUS_WAITLOCK))
+      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
+        if status != constants.JOB_STATUS_RUNNING:
+          raise errors.OpExecError("Job status while executing opcode is '%s',"
+                                   " not '%s' as expected" %
+                                   (status, constants.JOB_STATUS_RUNNING))
+      if test == constants.JQT_LOGMSG:
+        if len(self._testmsgs) != arg:
+          raise errors.OpExecError("Received %s test messages when %s are"
+                                   " expected" % (len(self._testmsgs), arg))
+    finally:
+      logging.debug("Closing socket")
+      sock.close()
+def TestJobqueue(opts, _):
+  """Runs a few tests on the job queue.
+  """
+  test_messages = [
+    "Hello World",
+    "A",
+    "",
+    "B"
+    "Foo|bar|baz",
+    utils.TimestampForFilename(),
+    ]
+  for fail in [False, True]:
+    if fail:
+      ToStdout("Testing job failure")
+    else:
+      ToStdout("Testing job success")
+    op = opcodes.OpTestJobqueue(notify_waitlock=True,
+                                notify_exec=True,
+                                log_messages=test_messages,
+                                fail=fail)
+    reporter = _JobQueueTestReporter()
+    try:
+      SubmitOpCode(op, reporter=reporter, opts=opts)
+    except errors.OpExecError, err:
+      if not fail:
+        raise
+      # Ignore error
+    else:
+      if fail:
+        raise errors.OpExecError("Job didn't fail when it should")
+    # Check received log messages
+    if reporter.GetTestMessages() != test_messages:
+      raise errors.OpExecError("Received test messages don't match input"
+                               " (input %r, received %r)" %
+                               (test_messages, reporter.GetTestMessages()))
+    # Check final status
+    job_id = reporter.GetJobId()
+    jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
+    if not jobdetails:
+      raise errors.OpExecError("Can't find job %s" % job_id)
+    if fail:
+      exp_status = constants.JOB_STATUS_ERROR
+    else:
+      exp_status = constants.JOB_STATUS_SUCCESS
+    final_status = jobdetails[0]
+    if final_status != exp_status:
+      raise errors.OpExecError("Final job status is %s, not %s as expected" %
+                               (final_status, exp_status))
+  return 0
 commands = {
   'delay': (
     Delay, [ArgUnknown(min=1, max=1)],
@@ -206,6 +357,9 @@ commands = {
                 help="Comma separated list of tags"),
     "{opts...} <instance>", "Executes a TestAllocator OpCode"),
+  "test-jobqueue": (
+    TestJobqueue, ARGS_NONE, [],
+    "", "Test a few aspects of the job queue")
diff --git a/test/ b/test/
index 067eba31b..2dc5b5fa1 100755
--- a/test/
+++ b/test/
@@ -33,6 +33,7 @@ from ganeti import cmdlib
 from ganeti import opcodes
 from ganeti import errors
 from ganeti import utils
+from ganeti import luxi
 import testutils
 import mocks
@@ -147,5 +148,13 @@ class TestIAllocatorChecks(testutils.GanetiTestCase):
     self.assertRaises(errors.OpPrereqError, c_i)
+class TestLUTestJobqueue(unittest.TestCase):
+  def test(self):
+    self.assert_(cmdlib.LUTestJobqueue._CLIENT_CONNECT_TIMEOUT <
+                 (luxi.WFJC_TIMEOUT * 0.75),
+                 msg=("Client timeout too high, might not notice bugs"
+                      " in WaitForJobChange"))
 if __name__ == "__main__":