From ebb80afa1cd807a2edcf5943790851d2f832eea6 Mon Sep 17 00:00:00 2001 From: Guido Trotter <ultrotter@google.com> Date: Fri, 25 Jun 2010 18:35:01 +0200 Subject: [PATCH] jqueue: remove the _big_jqueue_lock module global By using ssynchronized in the new way, we can remove the module-global _big_jqueue_lock and revert back to an internal _lock inside the jqueue. Signed-off-by: Guido Trotter <ultrotter@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/jqueue.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index 870559d72..ac5fc32ac 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -58,11 +58,9 @@ from ganeti import rpc JOBQUEUE_THREADS = 25 JOBS_PER_ARCHIVE_DIRECTORY = 10000 -# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in -# shared mode to ensure exclusion with legacy code, which acquires it -# exclusively. It can not be acquired at all only after concurrency with all -# new and legacy code is ensured. -_big_jqueue_lock = locking.SharedLock() +# member lock names to be passed to @ssynchronized decorator +_LOCK = "_lock" +_QUEUE = "_queue" class CancelJob(Exception): @@ -446,7 +444,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): finally: self._queue.release() - @locking.ssynchronized(_big_jqueue_lock, shared=1) + @locking.ssynchronized(_QUEUE, shared=1) def _AppendFeedback(self, timestamp, log_type, log_msg): """Internal feedback append function, with locks @@ -728,7 +726,7 @@ def _RequireOpenQueue(fn): @warning: Use this decorator only after locking.ssynchronized Example:: - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Example(self): pass @@ -766,8 +764,15 @@ class JobQueue(object): self._memcache = weakref.WeakValueDictionary() self._my_hostname = utils.HostInfo().name - self.acquire = _big_jqueue_lock.acquire - self.release = _big_jqueue_lock.release + # The Big JobQueue lock. If a code block or method acquires it in shared + # mode safe it must guarantee concurrency with all the code acquiring it in + # shared mode, including itself. In order not to acquire it at all + # concurrency must be guaranteed with all code acquiring it in shared mode + # and all code acquiring it exclusively. + self._lock = locking.SharedLock() + + self.acquire = self._lock.acquire + self.release = self._lock.release # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. @@ -837,7 +842,7 @@ class JobQueue(object): self._wpool.TerminateWorkers() raise - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): """Register a new node with the queue. @@ -882,7 +887,7 @@ class JobQueue(object): self._nodes[node_name] = node.primary_ip - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def RemoveNode(self, node_name): """Callback called when removing nodes from the cluster. @@ -1184,7 +1189,7 @@ class JobQueue(object): """ self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def SetDrainFlag(self, drain_flag): """Sets the drain flag for the queue. @@ -1239,7 +1244,7 @@ class JobQueue(object): return job - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def SubmitJob(self, ops): """Create and store a new job. @@ -1251,7 +1256,7 @@ class JobQueue(object): self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops)) return job_id - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @@ -1325,7 +1330,7 @@ class JobQueue(object): finally: helper.Close() - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def CancelJob(self, job_id): """Cancels a job. @@ -1398,7 +1403,7 @@ class JobQueue(object): self._UpdateQueueSizeUnlocked() return len(archive_jobs) - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def ArchiveJob(self, job_id): """Archives a job. @@ -1420,7 +1425,7 @@ class JobQueue(object): return self._ArchiveJobsUnlocked([job]) == 1 - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AutoArchiveJobs(self, age, timeout): """Archives all jobs based on age. @@ -1505,7 +1510,7 @@ class JobQueue(object): return jobs - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): """Stops the job queue. -- GitLab