From 7b5c4a693b48f52db43f835f6740201c5e23a251 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Fri, 10 Sep 2010 17:28:09 +0200
Subject: [PATCH] jqueue: Use priority for worker pool

A small helper function is added to make this easier. Priorities are not
yet used in all necessary places.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/jqueue.py | 27 ++++++++++++++++++++++-----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index 5a352914a..c2d1a2b34 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -975,6 +975,8 @@ class JobQueue(object):
     """
     logging.info("Inspecting job queue")
 
+    restartjobs = []
+
     all_job_ids = self._GetJobIDsUnlocked()
     jobs_count = len(all_job_ids)
     lastinfo = time.time()
@@ -995,7 +997,7 @@ class JobQueue(object):
       status = job.CalcStatus()
 
       if status in (constants.JOB_STATUS_QUEUED, ):
-        self._wpool.AddTask((job, ))
+        restartjobs.append(job)
 
       elif status in (constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK,
@@ -1005,6 +1007,10 @@ class JobQueue(object):
                               "Unclean master daemon shutdown")
         self.UpdateJobUnlocked(job)
 
+    if restartjobs:
+      logging.info("Restarting %s jobs", len(restartjobs))
+      self._EnqueueJobs(restartjobs)
+
     logging.info("Job queue inspection finished")
 
   @locking.ssynchronized(_LOCK)
@@ -1434,7 +1440,7 @@ class JobQueue(object):
 
     """
     job_id = self._NewSerialsUnlocked(1)[0]
-    self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
+    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
     return job_id
 
   @locking.ssynchronized(_LOCK)
@@ -1446,21 +1452,32 @@ class JobQueue(object):
 
     """
     results = []
-    tasks = []
+    added_jobs = []
     all_job_ids = self._NewSerialsUnlocked(len(jobs))
     for job_id, ops in zip(all_job_ids, jobs):
       try:
-        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
+        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
         status = True
         data = job_id
       except errors.GenericError, err:
         data = str(err)
         status = False
       results.append((status, data))
-    self._wpool.AddManyTasks(tasks)
+
+    self._EnqueueJobs(added_jobs)
 
     return results
 
+  def _EnqueueJobs(self, jobs):
+    """Helper function to add jobs to worker pool's queue.
+
+    @type jobs: list
+    @param jobs: List of all jobs
+
+    """
+    self._wpool.AddManyTasks([(job, ) for job in jobs],
+                             priority=[job.CalcPriority() for job in jobs])
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job, replicate=True):
     """Update a job's on disk storage.
-- 
GitLab