From 99bd4f0ae43d874ea965c6cec5532cd3276813a6 Mon Sep 17 00:00:00 2001
From: Guido Trotter <ultrotter@google.com>
Date: Fri, 4 Jun 2010 17:33:42 +0100
Subject: [PATCH] jqueue: convert to a SharedLock()

Remove the jqueue _lock member and convert to a _big_jqueue_lock
sharedlock. This allows smooth transition from the old single lock to a
more granular approach.

Signed-off-by: Guido Trotter <ultrotter@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/jqueue.py | 36 ++++++++++++++++++++----------------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index a0402a05f..283827280 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -31,7 +31,6 @@ used by all other classes in this module.
 
 import os
 import logging
-import threading
 import errno
 import re
 import time
@@ -47,6 +46,7 @@ from ganeti import asyncnotifier
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
+from ganeti import locking
 from ganeti import opcodes
 from ganeti import errors
 from ganeti import mcpu
@@ -58,6 +58,12 @@ from ganeti import rpc
 JOBQUEUE_THREADS = 25
 JOBS_PER_ARCHIVE_DIRECTORY = 10000
 
+# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in
+# shared mode to ensure exclusion with legacy code, which acquires it
+# exclusively. It can not be acquired at all only after concurrency with all
+# new and legacy code is ensured.
+_big_jqueue_lock = locking.SharedLock()
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -717,10 +723,10 @@ def _RequireOpenQueue(fn):
   that the decorated function is called with a first argument that has
   a '_queue_filelock' argument.
 
-  @warning: Use this decorator only after utils.LockedMethod!
+  @warning: Use this decorator only after locking.ssynchronized
 
   Example::
-    @utils.LockedMethod
+    @locking.ssynchronized(_big_jqueue_lock)
     @_RequireOpenQueue
     def Example(self):
       pass
@@ -758,10 +764,8 @@ class JobQueue(object):
     self._memcache = weakref.WeakValueDictionary()
     self._my_hostname = utils.HostInfo().name
 
-    # Locking
-    self._lock = threading.Lock()
-    self.acquire = self._lock.acquire
-    self.release = self._lock.release
+    self.acquire = _big_jqueue_lock.acquire
+    self.release = _big_jqueue_lock.release
 
     # Initialize the queue, and acquire the filelock.
     # This ensures no other process is working on the job queue.
@@ -831,7 +835,7 @@ class JobQueue(object):
       self._wpool.TerminateWorkers()
       raise
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def AddNode(self, node):
     """Register a new node with the queue.
@@ -876,7 +880,7 @@ class JobQueue(object):
 
     self._nodes[node_name] = node.primary_ip
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
     """Callback called when removing nodes from the cluster.
@@ -1178,7 +1182,7 @@ class JobQueue(object):
     """
     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def SetDrainFlag(self, drain_flag):
     """Sets the drain flag for the queue.
@@ -1233,7 +1237,7 @@ class JobQueue(object):
 
     return job
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def SubmitJob(self, ops):
     """Create and store a new job.
@@ -1245,7 +1249,7 @@ class JobQueue(object):
     self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
     return job_id
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def SubmitManyJobs(self, jobs):
     """Create and store multiple jobs.
@@ -1319,7 +1323,7 @@ class JobQueue(object):
     finally:
       helper.Close()
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
@@ -1392,7 +1396,7 @@ class JobQueue(object):
     self._UpdateQueueSizeUnlocked()
     return len(archive_jobs)
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def ArchiveJob(self, job_id):
     """Archives a job.
@@ -1414,7 +1418,7 @@ class JobQueue(object):
 
     return self._ArchiveJobsUnlocked([job]) == 1
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def AutoArchiveJobs(self, age, timeout):
     """Archives all jobs based on age.
@@ -1499,7 +1503,7 @@ class JobQueue(object):
 
     return jobs
 
-  @utils.LockedMethod
+  @locking.ssynchronized(_big_jqueue_lock)
   @_RequireOpenQueue
   def Shutdown(self):
     """Stops the job queue.
-- 
GitLab