Commit 99bd4f0a authored by Guido Trotter's avatar Guido Trotter
Browse files

jqueue: convert to a SharedLock()



Remove the jqueue _lock member and convert to a _big_jqueue_lock
sharedlock. This allows smooth transition from the old single lock to a
more granular approach.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 39ed3a98
......@@ -31,7 +31,6 @@ used by all other classes in this module.
import os
import logging
import threading
import errno
import re
import time
......@@ -47,6 +46,7 @@ from ganeti import asyncnotifier
from ganeti import constants
from ganeti import serializer
from ganeti import workerpool
from ganeti import locking
from ganeti import opcodes
from ganeti import errors
from ganeti import mcpu
......@@ -58,6 +58,12 @@ from ganeti import rpc
JOBQUEUE_THREADS = 25
JOBS_PER_ARCHIVE_DIRECTORY = 10000
# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in
# shared mode to ensure exclusion with legacy code, which acquires it
# exclusively. It can not be acquired at all only after concurrency with all
# new and legacy code is ensured.
_big_jqueue_lock = locking.SharedLock()
class CancelJob(Exception):
"""Special exception to cancel a job.
......@@ -717,10 +723,10 @@ def _RequireOpenQueue(fn):
that the decorated function is called with a first argument that has
a '_queue_filelock' argument.
@warning: Use this decorator only after utils.LockedMethod!
@warning: Use this decorator only after locking.ssynchronized
Example::
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def Example(self):
pass
......@@ -758,10 +764,8 @@ class JobQueue(object):
self._memcache = weakref.WeakValueDictionary()
self._my_hostname = utils.HostInfo().name
# Locking
self._lock = threading.Lock()
self.acquire = self._lock.acquire
self.release = self._lock.release
self.acquire = _big_jqueue_lock.acquire
self.release = _big_jqueue_lock.release
# Initialize the queue, and acquire the filelock.
# This ensures no other process is working on the job queue.
......@@ -831,7 +835,7 @@ class JobQueue(object):
self._wpool.TerminateWorkers()
raise
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def AddNode(self, node):
"""Register a new node with the queue.
......@@ -876,7 +880,7 @@ class JobQueue(object):
self._nodes[node_name] = node.primary_ip
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def RemoveNode(self, node_name):
"""Callback called when removing nodes from the cluster.
......@@ -1178,7 +1182,7 @@ class JobQueue(object):
"""
self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def SetDrainFlag(self, drain_flag):
"""Sets the drain flag for the queue.
......@@ -1233,7 +1237,7 @@ class JobQueue(object):
return job
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def SubmitJob(self, ops):
"""Create and store a new job.
......@@ -1245,7 +1249,7 @@ class JobQueue(object):
self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
return job_id
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def SubmitManyJobs(self, jobs):
"""Create and store multiple jobs.
......@@ -1319,7 +1323,7 @@ class JobQueue(object):
finally:
helper.Close()
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def CancelJob(self, job_id):
"""Cancels a job.
......@@ -1392,7 +1396,7 @@ class JobQueue(object):
self._UpdateQueueSizeUnlocked()
return len(archive_jobs)
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def ArchiveJob(self, job_id):
"""Archives a job.
......@@ -1414,7 +1418,7 @@ class JobQueue(object):
return self._ArchiveJobsUnlocked([job]) == 1
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def AutoArchiveJobs(self, age, timeout):
"""Archives all jobs based on age.
......@@ -1499,7 +1503,7 @@ class JobQueue(object):
return jobs
@utils.LockedMethod
@locking.ssynchronized(_big_jqueue_lock)
@_RequireOpenQueue
def Shutdown(self):
"""Stops the job queue.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment