diff --git a/doc/design-2.3.rst b/doc/design-2.3.rst index afb8a731e7d9094cdeb90db480fa8d36f55a018f..46316e0f6361156e0cc60cc6d106de8eae5e4d5c 100644 --- a/doc/design-2.3.rst +++ b/doc/design-2.3.rst @@ -205,8 +205,8 @@ Opcode priorities are synchronized to disk in order to be restored after a restart or crash of the master daemon. Priorities also need to be considered inside the locking library to -ensure opcodes with higher priorities get locks first, but the design -changes for this will be discussed in a separate section. +ensure opcodes with higher priorities get locks first. See +:ref:`locking priorities <locking-priorities>` for more details. Worker pool +++++++++++ @@ -243,6 +243,59 @@ changing its own priority. This is useful for the following cases: With these changes, the job queue will be able to implement per-job priorities. +.. _locking-priorities: + +Locking ++++++++ + +In order to support priorities in Ganeti's own lock classes, +``locking.SharedLock`` and ``locking.LockSet``, the internal structure +of the former class needs to be changed. The last major change in this +area was done for Ganeti 2.1 and can be found in the respective +:doc:`design document <design-2.1>`. + +The plain list (``[]``) used as a queue is replaced by a heap queue, +similar to the `worker pool`_. The heap or priority queue does automatic +sorting, thereby automatically taking care of priorities. For each +priority there's a plain list with pending acquires, like the single +queue of pending acquires before this change. + +When the lock is released, the code locates the list of pending acquires +for the highest priority waiting. The first condition (index 0) is +notified. Once all waiting threads received the notification, the +condition is removed from the list. If the list of conditions is empty +it's removed from the heap queue. + +Like before, shared acquires are grouped and skip ahead of exclusive +acquires if there's already an existing shared acquire for a priority. +To accomplish this, a separate dictionary of shared acquires per +priority is maintained. + +To simplify the code and reduce memory consumption, the concept of the +"active" and "inactive" condition for shared acquires is abolished. The +lock can't predict what priorities the next acquires will use and even +keeping a cache can become computationally expensive for arguable +benefit (the underlying POSIX pipe, see ``pipe(2)``, needs to be +re-created for each notification anyway). + +The following diagram shows a possible state of the internal queue from +a high-level view. Conditions are shown as (waiting) threads. Assuming +no modifications are made to the queue (e.g. more acquires or timeouts), +the lock would be acquired by the threads in this order (concurrent +acquires in parentheses): ``threadE1``, ``threadE2``, (``threadS1``, +``threadS2``, ``threadS3``), (``threadS4``, ``threadS5``), ``threadE3``, +``threadS6``, ``threadE4``, ``threadE5``. + +:: + + [ + (0, [exc/threadE1, exc/threadE2, shr/threadS1/threadS2/threadS3]), + (2, [shr/threadS4/threadS5]), + (10, [exc/threadE3]), + (33, [shr/threadS6, exc/threadE4, exc/threadE5]), + ] + + IPv6 support ------------ diff --git a/lib/locking.py b/lib/locking.py index 057cc84f4f58b57dd8256a7d052bb0924f9ce290..ef64fb90950ddc050d9abe766a94a0b46457b170 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -32,6 +32,7 @@ import time import errno import weakref import logging +import heapq from ganeti import errors from ganeti import utils @@ -41,6 +42,8 @@ from ganeti import compat _EXCLUSIVE_TEXT = "exclusive" _SHARED_TEXT = "shared" +_DEFAULT_PRIORITY = 0 + def ssynchronized(mylock, shared=0): """Shared Synchronization decorator. @@ -406,6 +409,19 @@ class PipeCondition(_BaseCondition): return bool(self._waiters) +class _PipeConditionWithMode(PipeCondition): + __slots__ = [ + "shared", + ] + + def __init__(self, lock, shared): + """Initializes this class. + + """ + self.shared = shared + PipeCondition.__init__(self, lock) + + class SharedLock(object): """Implements a shared lock. @@ -413,9 +429,13 @@ class SharedLock(object): acquire_shared(). In order to acquire the lock in an exclusive way threads can call acquire_exclusive(). - The lock prevents starvation but does not guarantee that threads will acquire - the shared lock in the order they queued for it, just that they will - eventually do so. + Notes on data structures: C{__pending} contains a priority queue (heapq) of + all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), + ...]}. Each per-priority queue contains a normal in-order list of conditions + to be notified when the lock can be acquired. Shared locks are grouped + together by priority and the condition for them is stored in + C{__pending_shared} if it already exists. C{__pending_by_prio} keeps + references for the per-priority queues indexed by priority for faster access. @type name: string @ivar name: the name of the lock @@ -423,17 +443,17 @@ class SharedLock(object): """ __slots__ = [ "__weakref__", - "__active_shr_c", - "__inactive_shr_c", "__deleted", "__exc", "__lock", "__pending", + "__pending_by_prio", + "__pending_shared", "__shr", "name", ] - __condition_class = PipeCondition + __condition_class = _PipeConditionWithMode def __init__(self, name, monitor=None): """Construct a new SharedLock. @@ -452,10 +472,8 @@ class SharedLock(object): # Queue containing waiting acquires self.__pending = [] - - # Active and inactive conditions for shared locks - self.__active_shr_c = self.__condition_class(self.__lock) - self.__inactive_shr_c = self.__condition_class(self.__lock) + self.__pending_by_prio = {} + self.__pending_shared = {} # Current lock holders self.__shr = set() @@ -509,16 +527,18 @@ class SharedLock(object): elif fname == "pending": data = [] - for cond in self.__pending: - if cond in (self.__active_shr_c, self.__inactive_shr_c): - mode = _SHARED_TEXT - else: - mode = _EXCLUSIVE_TEXT + # Sorting instead of copying and using heaq functions for simplicity + for (_, prioqueue) in sorted(self.__pending): + for cond in prioqueue: + if cond.shared: + mode = _SHARED_TEXT + else: + mode = _EXCLUSIVE_TEXT - # This function should be fast as it runs with the lock held. Hence - # not using utils.NiceSort. - data.append((mode, sorted([i.getName() - for i in cond.get_waiting()]))) + # This function should be fast as it runs with the lock held. + # Hence not using utils.NiceSort. + data.append((mode, sorted(i.getName() + for i in cond.get_waiting()))) info.append(data) else: @@ -584,7 +604,23 @@ class SharedLock(object): """ self.__lock.acquire() try: - return len(self.__pending) + return sum(len(prioqueue) for (_, prioqueue) in self.__pending) + finally: + self.__lock.release() + + def _check_empty(self): + """Checks whether there are any pending acquires. + + @rtype: bool + + """ + self.__lock.acquire() + try: + # Order is important: __find_first_pending_queue modifies __pending + return not (self.__find_first_pending_queue() or + self.__pending or + self.__pending_by_prio or + self.__pending_shared) finally: self.__lock.release() @@ -606,20 +642,42 @@ class SharedLock(object): else: return len(self.__shr) == 0 and self.__exc is None + def __find_first_pending_queue(self): + """Tries to find the topmost queued entry with pending acquires. + + Removes empty entries while going through the list. + + """ + while self.__pending: + (priority, prioqueue) = self.__pending[0] + + if not prioqueue: + heapq.heappop(self.__pending) + del self.__pending_by_prio[priority] + assert priority not in self.__pending_shared + continue + + if prioqueue: + return prioqueue + + return None + def __is_on_top(self, cond): """Checks whether the passed condition is on top of the queue. The caller must make sure the queue isn't empty. """ - return self.__pending[0] == cond + return cond == self.__find_first_pending_queue()[0] - def __acquire_unlocked(self, shared, timeout): + def __acquire_unlocked(self, shared, timeout, priority): """Acquire a shared lock. @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ self.__check_deleted() @@ -628,26 +686,46 @@ class SharedLock(object): assert not self.__is_owned(), ("double acquire() on a non-recursive lock" " %s" % self.name) + # Remove empty entries from queue + self.__find_first_pending_queue() + # Check whether someone else holds the lock or there are pending acquires. if not self.__pending and self.__can_acquire(shared): # Apparently not, can acquire lock directly. self.__do_acquire(shared) return True - if shared: - wait_condition = self.__active_shr_c + prioqueue = self.__pending_by_prio.get(priority, None) - # Check if we're not yet in the queue - if wait_condition not in self.__pending: - self.__pending.append(wait_condition) + if shared: + # Try to re-use condition for shared acquire + wait_condition = self.__pending_shared.get(priority, None) + assert (wait_condition is None or + (wait_condition.shared and wait_condition in prioqueue)) else: - wait_condition = self.__condition_class(self.__lock) - # Always add to queue - self.__pending.append(wait_condition) + wait_condition = None + + if wait_condition is None: + if prioqueue is None: + assert priority not in self.__pending_by_prio + + prioqueue = [] + heapq.heappush(self.__pending, (priority, prioqueue)) + self.__pending_by_prio[priority] = prioqueue + + wait_condition = self.__condition_class(self.__lock, shared) + prioqueue.append(wait_condition) + + if shared: + # Keep reference for further shared acquires on same priority. This is + # better than trying to find it in the list of pending acquires. + assert priority not in self.__pending_shared + self.__pending_shared[priority] = wait_condition try: # Wait until we become the topmost acquire in the queue or the timeout # expires. + # TODO: Decrease timeout with spurious notifications while not (self.__is_on_top(wait_condition) and self.__can_acquire(shared)): # Wait for notification @@ -664,12 +742,15 @@ class SharedLock(object): return True finally: # Remove condition from queue if there are no more waiters - if not wait_condition.has_waiting() and not self.__deleted: - self.__pending.remove(wait_condition) + if not wait_condition.has_waiting(): + prioqueue.remove(wait_condition) + if wait_condition.shared: + del self.__pending_shared[priority] return False - def acquire(self, shared=0, timeout=None, test_notify=None): + def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY, + test_notify=None): """Acquire a shared lock. @type shared: integer (0/1) used as a boolean @@ -677,6 +758,8 @@ class SharedLock(object): exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock @type test_notify: callable or None @param test_notify: Special callback function for unittesting @@ -687,7 +770,7 @@ class SharedLock(object): if __debug__ and callable(test_notify): test_notify() - return self.__acquire_unlocked(shared, timeout) + return self.__acquire_unlocked(shared, timeout, priority) finally: self.__lock.release() @@ -710,18 +793,14 @@ class SharedLock(object): self.__shr.remove(threading.currentThread()) # Notify topmost condition in queue - if self.__pending: - first_condition = self.__pending[0] - first_condition.notifyAll() - - if first_condition == self.__active_shr_c: - self.__active_shr_c = self.__inactive_shr_c - self.__inactive_shr_c = first_condition + prioqueue = self.__find_first_pending_queue() + if prioqueue: + prioqueue[0].notifyAll() finally: self.__lock.release() - def delete(self, timeout=None): + def delete(self, timeout=None, priority=_DEFAULT_PRIORITY): """Delete a Shared Lock. This operation will declare the lock for removal. First the lock will be @@ -730,6 +809,8 @@ class SharedLock(object): @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ self.__lock.acquire() @@ -742,7 +823,7 @@ class SharedLock(object): acquired = self.__is_exclusive() if not acquired: - acquired = self.__acquire_unlocked(0, timeout) + acquired = self.__acquire_unlocked(0, timeout, priority) assert self.__is_exclusive() and not self.__is_sharer(), \ "Lock wasn't acquired in exclusive mode" @@ -754,8 +835,11 @@ class SharedLock(object): assert not (self.__exc or self.__shr), "Found owner during deletion" # Notify all acquires. They'll throw an error. - while self.__pending: - self.__pending.pop().notifyAll() + for (_, prioqueue) in self.__pending: + for cond in prioqueue: + cond.notifyAll() + + assert self.__deleted return acquired finally: @@ -908,7 +992,8 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, timeout=None, shared=0, test_notify=None): + def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY, + test_notify=None): """Acquire a set of resource locks. @type names: list of strings (or string) @@ -919,6 +1004,8 @@ class LockSet: exclusive lock will be acquired @type timeout: float or None @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring locks @type test_notify: callable or None @param test_notify: Special callback function for unittesting @@ -945,7 +1032,7 @@ class LockSet: if isinstance(names, basestring): names = [names] - return self.__acquire_inner(names, False, shared, + return self.__acquire_inner(names, False, shared, priority, running_timeout.Remaining, test_notify) else: @@ -954,18 +1041,18 @@ class LockSet: # Some of them may then be deleted later, but we'll cope with this. # # We'd like to acquire this lock in a shared way, as it's nice if - # everybody else can use the instances at the same time. If are + # everybody else can use the instances at the same time. If we are # acquiring them exclusively though they won't be able to do this # anyway, though, so we'll get the list lock exclusively as well in # order to be able to do add() on the set while owning it. - if not self.__lock.acquire(shared=shared, + if not self.__lock.acquire(shared=shared, priority=priority, timeout=running_timeout.Remaining()): raise _AcquireTimeout() try: # note we own the set-lock self._add_owned() - return self.__acquire_inner(self.__names(), True, shared, + return self.__acquire_inner(self.__names(), True, shared, priority, running_timeout.Remaining, test_notify) except: # We shouldn't have problems adding the lock to the owners list, but @@ -978,13 +1065,15 @@ class LockSet: except _AcquireTimeout: return None - def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify): + def __acquire_inner(self, names, want_all, shared, priority, + timeout_fn, test_notify): """Inner logic for acquiring a number of locks. @param names: Names of the locks to be acquired @param want_all: Whether all locks in the set should be acquired @param shared: Whether to acquire in shared mode @param timeout_fn: Function returning remaining timeout + @param priority: Priority for acquiring locks @param test_notify: Special callback function for unittesting """ @@ -1028,6 +1117,7 @@ class LockSet: try: # raises LockError if the lock was deleted acq_success = lock.acquire(shared=shared, timeout=timeout, + priority=priority, test_notify=test_notify_fn) except errors.LockError: if want_all: @@ -1146,6 +1236,8 @@ class LockSet: lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) if acquired: + # No need for priority or timeout here as this lock has just been + # created lock.acquire(shared=shared) # now the lock cannot be deleted, we have it! try: diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 273e817258edc7707ee051861b5d1e920c3b5df6..b3edda815f9535e92282544b8a266bde58566b0d 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -28,10 +28,12 @@ import time import Queue import threading import random +import itertools from ganeti import locking from ganeti import errors from ganeti import utils +from ganeti import compat import testutils @@ -701,6 +703,106 @@ class TestSharedLock(_ThreadedTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) + def testPriority(self): + # Acquire in exclusive mode + self.assert_(self.sl.acquire(shared=0)) + + # Queue acquires + def _Acquire(prev, next, shared, priority, result): + prev.wait() + self.sl.acquire(shared=shared, priority=priority, test_notify=next.set) + try: + self.done.put(result) + finally: + self.sl.release() + + counter = itertools.count(0) + priorities = range(-20, 30) + first = threading.Event() + prev = first + + # Data structure: + # { + # priority: + # [(shared/exclusive, set(acquire names), set(pending threads)), + # (shared/exclusive, ...), + # ..., + # ], + # } + perprio = {} + + # References shared acquire per priority in L{perprio}. Data structure: + # { + # priority: (shared=1, set(acquire names), set(pending threads)), + # } + prioshared = {} + + for seed in [4979, 9523, 14902, 32440]: + # Use a deterministic random generator + rnd = random.Random(seed) + for priority in [rnd.choice(priorities) for _ in range(30)]: + modes = [0, 1] + rnd.shuffle(modes) + for shared in modes: + # Unique name + acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority) + + ev = threading.Event() + thread = self._addThread(target=_Acquire, + args=(prev, ev, shared, priority, acqname)) + prev = ev + + # Record expected aqcuire, see above for structure + data = (shared, set([acqname]), set([thread])) + priolist = perprio.setdefault(priority, []) + if shared: + priosh = prioshared.get(priority, None) + if priosh: + # Shared acquires are merged + for i, j in zip(priosh[1:], data[1:]): + i.update(j) + assert data[0] == priosh[0] + else: + prioshared[priority] = data + priolist.append(data) + else: + priolist.append(data) + + # Start all acquires and wait for them + first.set() + prev.wait() + + # Check lock information + self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name]) + self.assertEqual(self.sl.GetInfo(["mode", "owner"]), + ["exclusive", [threading.currentThread().getName()]]) + self.assertEqual(self.sl.GetInfo(["name", "pending"]), + [self.sl.name, + [(["exclusive", "shared"][int(bool(shared))], + sorted([t.getName() for t in threads])) + for acquires in [perprio[i] + for i in sorted(perprio.keys())] + for (shared, _, threads) in acquires]]) + + # Let threads acquire the lock + self.sl.release() + + # Wait for everything to finish + self._waitThreads() + + self.assert_(self.sl._check_empty()) + + # Check acquires by priority + for acquires in [perprio[i] for i in sorted(perprio.keys())]: + for (_, names, _) in acquires: + # For shared acquires, the set will contain 1..n entries. For exclusive + # acquires only one. + while names: + names.remove(self.done.get_nowait()) + self.assertFalse(compat.any(names for (_, names, _) in acquires)) + + self.assertRaises(Queue.Empty, self.done.get_nowait) + class TestSharedLockInCondition(_ThreadedTestCase): """SharedLock as a condition lock tests""" @@ -1259,6 +1361,57 @@ class TestLockSet(_ThreadedTestCase): self.assertEqual(self.done.get_nowait(), 'DONE') self._setUpLS() + def testPriority(self): + def _Acquire(prev, next, name, priority, success_fn): + prev.wait() + self.assert_(self.ls.acquire(name, shared=0, + priority=priority, + test_notify=lambda _: next.set())) + try: + success_fn() + finally: + self.ls.release() + + # Get all in exclusive mode + self.assert_(self.ls.acquire(locking.ALL_SET, shared=0)) + + done_two = Queue.Queue(0) + + first = threading.Event() + prev = first + + acquires = [("one", prio, self.done) for prio in range(1, 33)] + acquires.extend([("two", prio, done_two) for prio in range(1, 33)]) + + # Use a deterministic random generator + random.Random(741).shuffle(acquires) + + for (name, prio, done) in acquires: + ev = threading.Event() + self._addThread(target=_Acquire, + args=(prev, ev, name, prio, + compat.partial(done.put, "Prio%s" % prio))) + prev = ev + + # Start acquires + first.set() + + # Wait for last acquire to start + prev.wait() + + # Let threads acquire locks + self.ls.release() + + # Wait for threads to finish + self._waitThreads() + + for i in range(1, 33): + self.assertEqual(self.done.get_nowait(), "Prio%s" % i) + self.assertEqual(done_two.get_nowait(), "Prio%s" % i) + + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.assertRaises(Queue.Empty, done_two.get_nowait) + class TestGanetiLockManager(_ThreadedTestCase):