diff --git a/lib/jqueue.py b/lib/jqueue.py index a0402a05f8dc4d5ece41b94a8c60716bb4c22b03..2838272807f7334ea51e1189636d8f4d58d624d6 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -31,7 +31,6 @@ used by all other classes in this module. import os import logging -import threading import errno import re import time @@ -47,6 +46,7 @@ from ganeti import asyncnotifier from ganeti import constants from ganeti import serializer from ganeti import workerpool +from ganeti import locking from ganeti import opcodes from ganeti import errors from ganeti import mcpu @@ -58,6 +58,12 @@ from ganeti import rpc JOBQUEUE_THREADS = 25 JOBS_PER_ARCHIVE_DIRECTORY = 10000 +# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in +# shared mode to ensure exclusion with legacy code, which acquires it +# exclusively. It can not be acquired at all only after concurrency with all +# new and legacy code is ensured. +_big_jqueue_lock = locking.SharedLock() + class CancelJob(Exception): """Special exception to cancel a job. @@ -717,10 +723,10 @@ def _RequireOpenQueue(fn): that the decorated function is called with a first argument that has a '_queue_filelock' argument. - @warning: Use this decorator only after utils.LockedMethod! + @warning: Use this decorator only after locking.ssynchronized Example:: - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def Example(self): pass @@ -758,10 +764,8 @@ class JobQueue(object): self._memcache = weakref.WeakValueDictionary() self._my_hostname = utils.HostInfo().name - # Locking - self._lock = threading.Lock() - self.acquire = self._lock.acquire - self.release = self._lock.release + self.acquire = _big_jqueue_lock.acquire + self.release = _big_jqueue_lock.release # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. @@ -831,7 +835,7 @@ class JobQueue(object): self._wpool.TerminateWorkers() raise - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def AddNode(self, node): """Register a new node with the queue. @@ -876,7 +880,7 @@ class JobQueue(object): self._nodes[node_name] = node.primary_ip - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def RemoveNode(self, node_name): """Callback called when removing nodes from the cluster. @@ -1178,7 +1182,7 @@ class JobQueue(object): """ self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def SetDrainFlag(self, drain_flag): """Sets the drain flag for the queue. @@ -1233,7 +1237,7 @@ class JobQueue(object): return job - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def SubmitJob(self, ops): """Create and store a new job. @@ -1245,7 +1249,7 @@ class JobQueue(object): self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops)) return job_id - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def SubmitManyJobs(self, jobs): """Create and store multiple jobs. @@ -1319,7 +1323,7 @@ class JobQueue(object): finally: helper.Close() - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def CancelJob(self, job_id): """Cancels a job. @@ -1392,7 +1396,7 @@ class JobQueue(object): self._UpdateQueueSizeUnlocked() return len(archive_jobs) - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def ArchiveJob(self, job_id): """Archives a job. @@ -1414,7 +1418,7 @@ class JobQueue(object): return self._ArchiveJobsUnlocked([job]) == 1 - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def AutoArchiveJobs(self, age, timeout): """Archives all jobs based on age. @@ -1499,7 +1503,7 @@ class JobQueue(object): return jobs - @utils.LockedMethod + @locking.ssynchronized(_big_jqueue_lock) @_RequireOpenQueue def Shutdown(self): """Stops the job queue.