From de9d02c7b032f53633ed8d8933bf38af4274742b Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Wed, 8 Sep 2010 19:25:07 +0200
Subject: [PATCH] jqueue: Move queue inspection into separate function
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This makes the __init__ function a lot smaller while not changing
functionality.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: RenΓ© Nussbaumer <rn@google.com>
---
 lib/jqueue.py | 77 +++++++++++++++++++++++++++------------------------
 1 file changed, 41 insertions(+), 36 deletions(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index fb269f595..bd4d98625 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -910,48 +910,53 @@ class JobQueue(object):
     # Setup worker pool
     self._wpool = _JobQueueWorkerPool(self)
     try:
-      # We need to lock here because WorkerPool.AddTask() may start a job while
-      # we're still doing our work.
-      self.acquire()
-      try:
-        logging.info("Inspecting job queue")
+      self._InspectQueue()
+    except:
+      self._wpool.TerminateWorkers()
+      raise
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def _InspectQueue(self):
+    """Loads the whole job queue and resumes unfinished jobs.
+
+    This function needs the lock here because WorkerPool.AddTask() may start a
+    job while we're still doing our work.
+
+    """
+    logging.info("Inspecting job queue")
 
-        all_job_ids = self._GetJobIDsUnlocked()
-        jobs_count = len(all_job_ids)
+    all_job_ids = self._GetJobIDsUnlocked()
+    jobs_count = len(all_job_ids)
+    lastinfo = time.time()
+    for idx, job_id in enumerate(all_job_ids):
+      # Give an update every 1000 jobs or 10 seconds
+      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
+          idx == (jobs_count - 1)):
+        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
+                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
         lastinfo = time.time()
-        for idx, job_id in enumerate(all_job_ids):
-          # Give an update every 1000 jobs or 10 seconds
-          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
-              idx == (jobs_count - 1)):
-            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
-                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
-            lastinfo = time.time()
-
-          job = self._LoadJobUnlocked(job_id)
-
-          # a failure in loading the job can cause 'None' to be returned
-          if job is None:
-            continue
 
-          status = job.CalcStatus()
+      job = self._LoadJobUnlocked(job_id)
 
-          if status in (constants.JOB_STATUS_QUEUED, ):
-            self._wpool.AddTask((job, ))
+      # a failure in loading the job can cause 'None' to be returned
+      if job is None:
+        continue
 
-          elif status in (constants.JOB_STATUS_RUNNING,
-                          constants.JOB_STATUS_WAITLOCK,
-                          constants.JOB_STATUS_CANCELING):
-            logging.warning("Unfinished job %s found: %s", job.id, job)
-            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                                  "Unclean master daemon shutdown")
-            self.UpdateJobUnlocked(job)
+      status = job.CalcStatus()
 
-        logging.info("Job queue inspection finished")
-      finally:
-        self.release()
-    except:
-      self._wpool.TerminateWorkers()
-      raise
+      if status in (constants.JOB_STATUS_QUEUED, ):
+        self._wpool.AddTask((job, ))
+
+      elif status in (constants.JOB_STATUS_RUNNING,
+                      constants.JOB_STATUS_WAITLOCK,
+                      constants.JOB_STATUS_CANCELING):
+        logging.warning("Unfinished job %s found: %s", job.id, job)
+        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                              "Unclean master daemon shutdown")
+        self.UpdateJobUnlocked(job)
+
+    logging.info("Job queue inspection finished")
 
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
-- 
GitLab