From 989a8bee53165073b89f42b4c31cbc87d4fcb531 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Wed, 14 Jul 2010 17:29:56 +0200
Subject: [PATCH] jqueue: Factorize code waiting for job changes

By splitting the _WaitForJobChangesHelper class into multiple smaller
classes, we gain in several places:

- Simpler code, less interaction between functions and variables
- Easy to unittest (close to 100% coverage)
- Waiting for job changes has no direct knowledge of queue anymore (it
  doesn't references queue functions anymore, especially not private ones)
- Activate inotify only if there was no change at the beginning (and
  checking again right away to avoid race conditions)

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>
---
 Makefile.am                    |   1 +
 lib/constants.py               |   5 +
 lib/jqueue.py                  | 264 +++++++++++++++++++++------------
 test/ganeti.jqueue_unittest.py | 224 ++++++++++++++++++++++++++++
 4 files changed, 403 insertions(+), 91 deletions(-)
 create mode 100755 test/ganeti.jqueue_unittest.py

diff --git a/Makefile.am b/Makefile.am
index 0bb93d2a5..5cc72d8c9 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -377,6 +377,7 @@ python_tests = \
 	test/ganeti.hooks_unittest.py \
 	test/ganeti.http_unittest.py \
 	test/ganeti.impexpd_unittest.py \
+	test/ganeti.jqueue_unittest.py \
 	test/ganeti.locking_unittest.py \
 	test/ganeti.luxi_unittest.py \
 	test/ganeti.masterd.instance_unittest.py \
diff --git a/lib/constants.py b/lib/constants.py
index 5ff50fc46..9dfa92ed1 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -785,6 +785,11 @@ JOB_STATUS_RUNNING = "running"
 JOB_STATUS_CANCELED = "canceled"
 JOB_STATUS_SUCCESS = "success"
 JOB_STATUS_ERROR = "error"
+JOBS_FINALIZED = frozenset([
+  JOB_STATUS_CANCELED,
+  JOB_STATUS_SUCCESS,
+  JOB_STATUS_ERROR,
+  ])
 
 # OpCode status
 # not yet finalized
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 6f19ba427..1a9d781a5 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -54,6 +54,7 @@ from ganeti import utils
 from ganeti import jstore
 from ganeti import rpc
 from ganeti import netutils
+from ganeti import compat
 
 
 JOBQUEUE_THREADS = 25
@@ -481,65 +482,33 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     self._job.lock_status = msg
 
 
-class _WaitForJobChangesHelper(object):
-  """Helper class using initofy to wait for changes in a job file.
-
-  This class takes a previous job status and serial, and alerts the client when
-  the current job status has changed.
+class _JobChangesChecker(object):
+  def __init__(self, fields, prev_job_info, prev_log_serial):
+    """Initializes this class.
 
-  @type job_id: string
-  @ivar job_id: id of the job we're watching
-  @type prev_job_info: string
-  @ivar prev_job_info: previous job info, as passed by the luxi client
-  @type prev_log_serial: string
-  @ivar prev_log_serial: previous job serial, as passed by the luxi client
-  @type queue: L{JobQueue}
-  @ivar queue: job queue (used for a few utility functions)
-  @type job_path: string
-  @ivar job_path: absolute path of the job file
-  @type wm: pyinotify.WatchManager (or None)
-  @ivar wm: inotify watch manager to watch for changes
-  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
-  @ivar inotify_handler: single file event handler, used for watching
-  @type notifier: pyinotify.Notifier
-  @ivar notifier: inotify single-threaded notifier, used for watching
+    @type fields: list of strings
+    @param fields: Fields requested by LUXI client
+    @type prev_job_info: string
+    @param prev_job_info: previous job info, as passed by the LUXI client
+    @type prev_log_serial: string
+    @param prev_log_serial: previous job serial, as passed by the LUXI client
 
-  """
-  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
-    self.job_id = job_id
-    self.fields = fields
-    self.prev_job_info = prev_job_info
-    self.prev_log_serial = prev_log_serial
-    self.queue = queue
-    # pylint: disable-msg=W0212
-    self.job_path = self.queue._GetJobPath(self.job_id)
-    self.wm = None
-    self.inotify_handler = None
-    self.notifier = None
+    """
+    self._fields = fields
+    self._prev_job_info = prev_job_info
+    self._prev_log_serial = prev_log_serial
 
-  def _SetupInotify(self):
-    """Create the inotify
+  def __call__(self, job):
+    """Checks whether job has changed.
 
-    @raises errors.InotifyError: if the notifier cannot be setup
+    @type job: L{_QueuedJob}
+    @param job: Job object
 
     """
-    if self.wm:
-      return
-    self.wm = pyinotify.WatchManager()
-    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
-                                                                self.OnInotify,
-                                                                self.job_path)
-    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
-    self.inotify_handler.enable()
-
-  def _LoadDiskStatus(self):
-    job = self.queue.SafeLoadJobFromDisk(self.job_id)
-    if not job:
-      raise errors.JobLost()
-    self.job_status = job.CalcStatus()
+    status = job.CalcStatus()
+    job_info = job.GetInfo(self._fields)
+    log_entries = job.GetLogEntries(self._prev_log_serial)
 
-    job_info = job.GetInfo(self.fields)
-    log_entries = job.GetLogEntries(self.prev_log_serial)
     # Serializing and deserializing data can cause type changes (e.g. from
     # tuple to list) or precision loss. We're doing it here so that we get
     # the same modifications as the data received from the client. Without
@@ -547,49 +516,164 @@ class _WaitForJobChangesHelper(object):
     # significantly different.
     # TODO: we just deserialized from disk, investigate how to make sure that
     # the job info and log entries are compatible to avoid this further step.
-    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
-    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
+    # TODO: Doing something like in testutils.py:UnifyValueType might be more
+    # efficient, though floats will be tricky
+    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
+    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
 
-  def _CheckForChanges(self):
-    self._LoadDiskStatus()
     # Don't even try to wait if the job is no longer running, there will be
     # no changes.
-    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
-                                constants.JOB_STATUS_RUNNING,
-                                constants.JOB_STATUS_WAITLOCK) or
-        self.prev_job_info != self.job_info or
-        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
-      logging.debug("Job %s changed", self.job_id)
-      return (self.job_info, self.log_entries)
+    if (status not in (constants.JOB_STATUS_QUEUED,
+                       constants.JOB_STATUS_RUNNING,
+                       constants.JOB_STATUS_WAITLOCK) or
+        job_info != self._prev_job_info or
+        (log_entries and self._prev_log_serial != log_entries[0][0])):
+      logging.debug("Job %s changed", job.id)
+      return (job_info, log_entries)
 
-    raise utils.RetryAgain()
+    return None
+
+
+class _JobFileChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+    @raises errors.InotifyError: if the notifier cannot be setup
 
-  def OnInotify(self, notifier_enabled):
+    """
+    self._wm = pyinotify.WatchManager()
+    self._inotify_handler = \
+      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
+    self._notifier = \
+      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
+    try:
+      self._inotify_handler.enable()
+    except Exception:
+      # pyinotify doesn't close file descriptors automatically
+      self._notifier.stop()
+      raise
+
+  def _OnInotify(self, notifier_enabled):
+    """Callback for inotify.
+
+    """
     if not notifier_enabled:
-      self.inotify_handler.enable()
+      self._inotify_handler.enable()
+
+  def Wait(self, timeout):
+    """Waits for the job file to change.
+
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    assert timeout >= 0
+    have_events = self._notifier.check_events(timeout * 1000)
+    if have_events:
+      self._notifier.read_events()
+    self._notifier.process_events()
+    return have_events
+
+  def Close(self):
+    """Closes underlying notifier and its file descriptor.
+
+    """
+    self._notifier.stop()
+
+
+class _JobChangesWaiter(object):
+  def __init__(self, filename):
+    """Initializes this class.
+
+    @type filename: string
+    @param filename: Path to job file
+
+    """
+    self._filewaiter = None
+    self._filename = filename
 
-  def WaitFn(self, timeout):
-    self._SetupInotify()
-    if self.notifier.check_events(timeout*1000):
-      self.notifier.read_events()
-    self.notifier.process_events()
+  def Wait(self, timeout):
+    """Waits for a job to change.
 
-  def WaitForChanges(self, timeout):
-    self._SetupInotify()
+    @type timeout: float
+    @param timeout: Timeout in seconds
+    @return: Whether there have been events
+
+    """
+    if self._filewaiter:
+      return self._filewaiter.Wait(timeout)
+
+    # Lazy setup: Avoid inotify setup cost when job file has already changed.
+    # If this point is reached, return immediately and let caller check the job
+    # file again in case there were changes since the last check. This avoids a
+    # race condition.
+    self._filewaiter = _JobFileChangesWaiter(self._filename)
+
+    return True
+
+  def Close(self):
+    """Closes underlying waiter.
+
+    """
+    if self._filewaiter:
+      self._filewaiter.Close()
+
+
+class _WaitForJobChangesHelper(object):
+  """Helper class using inotify to wait for changes in a job file.
+
+  This class takes a previous job status and serial, and alerts the client when
+  the current job status has changed.
+
+  """
+  @staticmethod
+  def _CheckForChanges(job_load_fn, check_fn):
+    job = job_load_fn()
+    if not job:
+      raise errors.JobLost()
+
+    result = check_fn(job)
+    if result is None:
+      raise utils.RetryAgain()
+
+    return result
+
+  def __call__(self, filename, job_load_fn,
+               fields, prev_job_info, prev_log_serial, timeout):
+    """Waits for changes on a job.
+
+    @type filename: string
+    @param filename: File on which to wait for changes
+    @type job_load_fn: callable
+    @param job_load_fn: Function to load job
+    @type fields: list of strings
+    @param fields: Which fields to check for changes
+    @type prev_job_info: list or None
+    @param prev_job_info: Last job information returned
+    @type prev_log_serial: int
+    @param prev_log_serial: Last job message serial number
+    @type timeout: float
+    @param timeout: maximum time to wait in seconds
+
+    """
     try:
-      return utils.Retry(self._CheckForChanges,
-                         utils.RETRY_REMAINING_TIME,
-                         timeout,
-                         wait_fn=self.WaitFn)
+      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
+      waiter = _JobChangesWaiter(filename)
+      try:
+        return utils.Retry(compat.partial(self._CheckForChanges,
+                                          job_load_fn, check_fn),
+                           utils.RETRY_REMAINING_TIME, timeout,
+                           wait_fn=waiter.Wait)
+      finally:
+        waiter.Close()
     except (errors.InotifyError, errors.JobLost):
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
 
-  def Close(self):
-    if self.wm:
-      self.notifier.stop()
-
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -1314,7 +1398,7 @@ class JobQueue(object):
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
     @type timeout: float
-    @param timeout: maximum time to wait
+    @param timeout: maximum time to wait in seconds
     @rtype: tuple (job info, log entries)
     @return: a tuple of the job information as required via
         the fields parameter, and the log entries as a list
@@ -1325,12 +1409,12 @@ class JobQueue(object):
         as such by the clients
 
     """
-    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
-                                      prev_log_serial, self)
-    try:
-      return helper.WaitForChanges(timeout)
-    finally:
-      helper.Close()
+    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
+
+    helper = _WaitForJobChangesHelper()
+
+    return helper(self._GetJobPath(job_id), load_fn,
+                  fields, prev_job_info, prev_log_serial, timeout)
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
@@ -1380,9 +1464,7 @@ class JobQueue(object):
     archive_jobs = []
     rename_files = []
     for job in jobs:
-      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
-                                  constants.JOB_STATUS_SUCCESS,
-                                  constants.JOB_STATUS_ERROR):
+      if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
         continue
 
diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py
new file mode 100755
index 000000000..c1bdca1c3
--- /dev/null
+++ b/test/ganeti.jqueue_unittest.py
@@ -0,0 +1,224 @@
+#!/usr/bin/python
+#
+
+# Copyright (C) 2010 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Script for testing ganeti.jqueue"""
+
+import os
+import sys
+import unittest
+import tempfile
+import shutil
+import errno
+
+from ganeti import constants
+from ganeti import utils
+from ganeti import errors
+from ganeti import jqueue
+
+import testutils
+
+
+class _FakeJob:
+  def __init__(self, job_id, status):
+    self.id = job_id
+    self._status = status
+    self._log = []
+
+  def SetStatus(self, status):
+    self._status = status
+
+  def AddLogEntry(self, msg):
+    self._log.append((len(self._log), msg))
+
+  def CalcStatus(self):
+    return self._status
+
+  def GetInfo(self, fields):
+    result = []
+
+    for name in fields:
+      if name == "status":
+        result.append(self._status)
+      else:
+        raise Exception("Unknown field")
+
+    return result
+
+  def GetLogEntries(self, newer_than):
+    assert newer_than is None or newer_than >= 0
+
+    if newer_than is None:
+      return self._log
+
+    return self._log[newer_than:]
+
+
+class TestJobChangesChecker(unittest.TestCase):
+  def testStatus(self):
+    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
+    checker = jqueue._JobChangesChecker(["status"], None, None)
+    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
+
+    job.SetStatus(constants.JOB_STATUS_RUNNING)
+    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
+
+    job.SetStatus(constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
+
+    # job.id is used by checker
+    self.assertEqual(job.id, 9094)
+
+  def testStatusWithPrev(self):
+    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
+    checker = jqueue._JobChangesChecker(["status"],
+                                        [constants.JOB_STATUS_QUEUED], None)
+    self.assert_(checker(job) is None)
+
+    job.SetStatus(constants.JOB_STATUS_RUNNING)
+    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
+
+  def testFinalStatus(self):
+    for status in constants.JOBS_FINALIZED:
+      job = _FakeJob(2178711, status)
+      checker = jqueue._JobChangesChecker(["status"], [status], None)
+      # There won't be any changes in this status, hence it should signal
+      # a change immediately
+      self.assertEqual(checker(job), ([status], []))
+
+  def testLog(self):
+    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
+    checker = jqueue._JobChangesChecker(["status"], None, None)
+    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
+
+    job.AddLogEntry("Hello World")
+    (job_info, log_entries) = checker(job)
+    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
+    self.assertEqual(log_entries, [[0, "Hello World"]])
+
+    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
+    self.assert_(checker2(job) is None)
+
+    job.AddLogEntry("Foo Bar")
+    job.SetStatus(constants.JOB_STATUS_ERROR)
+
+    (job_info, log_entries) = checker2(job)
+    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
+    self.assertEqual(log_entries, [[1, "Foo Bar"]])
+
+    checker3 = jqueue._JobChangesChecker(["status"], None, None)
+    (job_info, log_entries) = checker3(job)
+    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
+    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
+
+
+class TestJobChangesWaiter(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.filename = utils.PathJoin(self.tmpdir, "job-1")
+    utils.WriteFile(self.filename, data="")
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def _EnsureNotifierClosed(self, notifier):
+    try:
+      os.fstat(notifier._fd)
+    except EnvironmentError, err:
+      self.assertEqual(err.errno, errno.EBADF)
+    else:
+      self.fail("File descriptor wasn't closed")
+
+  def testClose(self):
+    for wait in [False, True]:
+      waiter = jqueue._JobFileChangesWaiter(self.filename)
+      try:
+        if wait:
+          waiter.Wait(0.001)
+      finally:
+        waiter.Close()
+
+      # Ensure file descriptor was closed
+      self._EnsureNotifierClosed(waiter._notifier)
+
+  def testChangingFile(self):
+    waiter = jqueue._JobFileChangesWaiter(self.filename)
+    try:
+      self.assertFalse(waiter.Wait(0.1))
+      utils.WriteFile(self.filename, data="changed")
+      self.assert_(waiter.Wait(60))
+    finally:
+      waiter.Close()
+
+    self._EnsureNotifierClosed(waiter._notifier)
+
+  def testChangingFile2(self):
+    waiter = jqueue._JobChangesWaiter(self.filename)
+    try:
+      self.assertFalse(waiter._filewaiter)
+      self.assert_(waiter.Wait(0.1))
+      self.assert_(waiter._filewaiter)
+
+      # File waiter is now used, but there have been no changes
+      self.assertFalse(waiter.Wait(0.1))
+      utils.WriteFile(self.filename, data="changed")
+      self.assert_(waiter.Wait(60))
+    finally:
+      waiter.Close()
+
+    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
+
+
+class TestWaitForJobChangesHelper(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
+    utils.WriteFile(self.filename, data="")
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def _LoadWaitingJob(self):
+    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
+
+  def _LoadLostJob(self):
+    return None
+
+  def testNoChanges(self):
+    wfjc = jqueue._WaitForJobChangesHelper()
+
+    # No change
+    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
+                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
+                     constants.JOB_NOTCHANGED)
+
+    # No previous information
+    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
+                          ["status"], None, None, 1.0),
+                     ([constants.JOB_STATUS_WAITLOCK], []))
+
+  def testLostJob(self):
+    wfjc = jqueue._WaitForJobChangesHelper()
+    self.assert_(wfjc(self.filename, self._LoadLostJob,
+                      ["status"], None, None, 1.0) is None)
+
+
+if __name__ == "__main__":
+  testutils.GanetiTestProgram()
-- 
GitLab