Commit ebb80afa authored by Guido Trotter's avatar Guido Trotter
Browse files

jqueue: remove the _big_jqueue_lock module global

By using ssynchronized in the new way, we can remove the module-global
_big_jqueue_lock and revert back to an internal _lock inside the jqueue.
Signed-off-by: default avatarGuido Trotter <>
Reviewed-by: default avatarIustin Pop <>
parent dbb11e8b
......@@ -58,11 +58,9 @@ from ganeti import rpc
# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in
# shared mode to ensure exclusion with legacy code, which acquires it
# exclusively. It can not be acquired at all only after concurrency with all
# new and legacy code is ensured.
_big_jqueue_lock = locking.SharedLock()
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
class CancelJob(Exception):
......@@ -446,7 +444,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
@locking.ssynchronized(_big_jqueue_lock, shared=1)
@locking.ssynchronized(_QUEUE, shared=1)
def _AppendFeedback(self, timestamp, log_type, log_msg):
"""Internal feedback append function, with locks
......@@ -728,7 +726,7 @@ def _RequireOpenQueue(fn):
@warning: Use this decorator only after locking.ssynchronized
def Example(self):
......@@ -766,8 +764,15 @@ class JobQueue(object):
self._memcache = weakref.WeakValueDictionary()
self._my_hostname = utils.HostInfo().name
self.acquire = _big_jqueue_lock.acquire
self.release = _big_jqueue_lock.release
# The Big JobQueue lock. If a code block or method acquires it in shared
# mode safe it must guarantee concurrency with all the code acquiring it in
# shared mode, including itself. In order not to acquire it at all
# concurrency must be guaranteed with all code acquiring it in shared mode
# and all code acquiring it exclusively.
self._lock = locking.SharedLock()
self.acquire = self._lock.acquire
self.release = self._lock.release
# Initialize the queue, and acquire the filelock.
# This ensures no other process is working on the job queue.
......@@ -837,7 +842,7 @@ class JobQueue(object):
def AddNode(self, node):
"""Register a new node with the queue.
......@@ -882,7 +887,7 @@ class JobQueue(object):
self._nodes[node_name] = node.primary_ip
def RemoveNode(self, node_name):
"""Callback called when removing nodes from the cluster.
......@@ -1184,7 +1189,7 @@ class JobQueue(object):
self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
def SetDrainFlag(self, drain_flag):
"""Sets the drain flag for the queue.
......@@ -1239,7 +1244,7 @@ class JobQueue(object):
return job
def SubmitJob(self, ops):
"""Create and store a new job.
......@@ -1251,7 +1256,7 @@ class JobQueue(object):
self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
return job_id
def SubmitManyJobs(self, jobs):
"""Create and store multiple jobs.
......@@ -1325,7 +1330,7 @@ class JobQueue(object):
def CancelJob(self, job_id):
"""Cancels a job.
......@@ -1398,7 +1403,7 @@ class JobQueue(object):
return len(archive_jobs)
def ArchiveJob(self, job_id):
"""Archives a job.
......@@ -1420,7 +1425,7 @@ class JobQueue(object):
return self._ArchiveJobsUnlocked([job]) == 1
def AutoArchiveJobs(self, age, timeout):
"""Archives all jobs based on age.
......@@ -1505,7 +1510,7 @@ class JobQueue(object):
return jobs
def Shutdown(self):
"""Stops the job queue.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment