Commit 942817f2 authored by Petr Pudlak's avatar Petr Pudlak
Browse files

Remove the use of queue lock in Python job queue



Since now each process only creates a 1-job queue, trying to use file
locks only causes job deadlock.

Also reduce the number of threads running in a job queue to 1.

Later the job queue will be removed completely.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent 4cfa01f4
...@@ -67,7 +67,7 @@ from ganeti import pathutils ...@@ -67,7 +67,7 @@ from ganeti import pathutils
from ganeti import vcluster from ganeti import vcluster
JOBQUEUE_THREADS = 25 JOBQUEUE_THREADS = 1
# member lock names to be passed to @ssynchronized decorator # member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock" _LOCK = "_lock"
...@@ -1631,31 +1631,6 @@ class _JobDependencyManager: ...@@ -1631,31 +1631,6 @@ class _JobDependencyManager:
self._enqueue_fn(jobs) self._enqueue_fn(jobs)
def _RequireOpenQueue(fn):
"""Decorator for "public" functions.
This function should be used for all 'public' functions. That is,
functions usually called from other classes. Note that this should
be applied only to methods (not plain functions), since it expects
that the decorated function is called with a first argument that has
a '_queue_filelock' argument.
@warning: Use this decorator only after locking.ssynchronized
Example::
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def Example(self):
pass
"""
def wrapper(self, *args, **kwargs):
# pylint: disable=W0212
assert self._queue_filelock is not None, "Queue should be open"
return fn(self, *args, **kwargs)
return wrapper
def _RequireNonDrainedQueue(fn): def _RequireNonDrainedQueue(fn):
"""Decorator checking for a non-drained queue. """Decorator checking for a non-drained queue.
...@@ -1715,10 +1690,6 @@ class JobQueue(object): ...@@ -1715,10 +1690,6 @@ class JobQueue(object):
# Accept jobs by default # Accept jobs by default
self._accepting_jobs = True self._accepting_jobs = True
# Initialize the queue, and acquire the filelock.
# This ensures no other process is working on the job queue.
self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
# Read serial file # Read serial file
self._last_serial = jstore.ReadSerial() self._last_serial = jstore.ReadSerial()
assert self._last_serial is not None, ("Serial file was modified between" assert self._last_serial is not None, ("Serial file was modified between"
...@@ -1796,7 +1767,6 @@ class JobQueue(object): ...@@ -1796,7 +1767,6 @@ class JobQueue(object):
return rpc.JobQueueRunner(self.context, address_list) return rpc.JobQueueRunner(self.context, address_list)
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def AddNode(self, node): def AddNode(self, node):
"""Register a new node with the queue. """Register a new node with the queue.
...@@ -1852,7 +1822,6 @@ class JobQueue(object): ...@@ -1852,7 +1822,6 @@ class JobQueue(object):
self._nodes[node_name] = node.primary_ip self._nodes[node_name] = node.primary_ip
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def RemoveNode(self, node_name): def RemoveNode(self, node_name):
"""Callback called when removing nodes from the cluster. """Callback called when removing nodes from the cluster.
...@@ -2137,7 +2106,6 @@ class JobQueue(object): ...@@ -2137,7 +2106,6 @@ class JobQueue(object):
self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def SetDrainFlag(self, drain_flag): def SetDrainFlag(self, drain_flag):
"""Sets the drain flag for the queue. """Sets the drain flag for the queue.
...@@ -2264,7 +2232,6 @@ class JobQueue(object): ...@@ -2264,7 +2232,6 @@ class JobQueue(object):
raise errors.JobLost("Job %s not found" % job_id) raise errors.JobLost("Job %s not found" % job_id)
@_RequireOpenQueue
def UpdateJobUnlocked(self, job, replicate=True): def UpdateJobUnlocked(self, job, replicate=True):
"""Update a job's on disk storage. """Update a job's on disk storage.
...@@ -2339,7 +2306,6 @@ class JobQueue(object): ...@@ -2339,7 +2306,6 @@ class JobQueue(object):
return None return None
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def CancelJob(self, job_id): def CancelJob(self, job_id):
"""Cancels a job. """Cancels a job.
...@@ -2354,7 +2320,6 @@ class JobQueue(object): ...@@ -2354,7 +2320,6 @@ class JobQueue(object):
return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel()) return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def ChangeJobPriority(self, job_id, priority): def ChangeJobPriority(self, job_id, priority):
"""Changes a job's priority. """Changes a job's priority.
...@@ -2411,7 +2376,6 @@ class JobQueue(object): ...@@ -2411,7 +2376,6 @@ class JobQueue(object):
return (success, msg) return (success, msg)
@_RequireOpenQueue
def _ArchiveJobsUnlocked(self, jobs): def _ArchiveJobsUnlocked(self, jobs):
"""Archives jobs. """Archives jobs.
...@@ -2451,7 +2415,6 @@ class JobQueue(object): ...@@ -2451,7 +2415,6 @@ class JobQueue(object):
return len(archive_jobs) return len(archive_jobs)
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def ArchiveJob(self, job_id): def ArchiveJob(self, job_id):
"""Archives a job. """Archives a job.
...@@ -2473,7 +2436,6 @@ class JobQueue(object): ...@@ -2473,7 +2436,6 @@ class JobQueue(object):
return self._ArchiveJobsUnlocked([job]) == 1 return self._ArchiveJobsUnlocked([job]) == 1
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def AutoArchiveJobs(self, age, timeout): def AutoArchiveJobs(self, age, timeout):
"""Archives all jobs based on age. """Archives all jobs based on age.
...@@ -2625,7 +2587,6 @@ class JobQueue(object): ...@@ -2625,7 +2587,6 @@ class JobQueue(object):
return self._accepting_jobs return self._accepting_jobs
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def Shutdown(self): def Shutdown(self):
"""Stops the job queue. """Stops the job queue.
...@@ -2633,6 +2594,3 @@ class JobQueue(object): ...@@ -2633,6 +2594,3 @@ class JobQueue(object):
""" """
self._wpool.TerminateWorkers() self._wpool.TerminateWorkers()
self._queue_filelock.Close()
self._queue_filelock = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment