Commit 887c7aa6 authored by Michael Hanselmann's avatar Michael Hanselmann

locking: Implement priorities in SharedLock and LockSet

For proper support of job priorities, jobs' locks need to respect
priorities.  Otherwise it could happen that a job with a lower priority
could get a lock before a job with a higher priority (depending on
timeouts and when they start acquiring).

This patch adds support for priorities in SharedLock and LockSet and
provides (unfortunately non-trivial) unittests. Outdated comments are also
adjusted and improved.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent cbccd9ca
......@@ -205,8 +205,8 @@ Opcode priorities are synchronized to disk in order to be restored after
a restart or crash of the master daemon.
Priorities also need to be considered inside the locking library to
ensure opcodes with higher priorities get locks first, but the design
changes for this will be discussed in a separate section.
ensure opcodes with higher priorities get locks first. See
:ref:`locking priorities <locking-priorities>` for more details.
Worker pool
+++++++++++
......@@ -243,6 +243,59 @@ changing its own priority. This is useful for the following cases:
With these changes, the job queue will be able to implement per-job
priorities.
.. _locking-priorities:
Locking
+++++++
In order to support priorities in Ganeti's own lock classes,
``locking.SharedLock`` and ``locking.LockSet``, the internal structure
of the former class needs to be changed. The last major change in this
area was done for Ganeti 2.1 and can be found in the respective
:doc:`design document <design-2.1>`.
The plain list (``[]``) used as a queue is replaced by a heap queue,
similar to the `worker pool`_. The heap or priority queue does automatic
sorting, thereby automatically taking care of priorities. For each
priority there's a plain list with pending acquires, like the single
queue of pending acquires before this change.
When the lock is released, the code locates the list of pending acquires
for the highest priority waiting. The first condition (index 0) is
notified. Once all waiting threads received the notification, the
condition is removed from the list. If the list of conditions is empty
it's removed from the heap queue.
Like before, shared acquires are grouped and skip ahead of exclusive
acquires if there's already an existing shared acquire for a priority.
To accomplish this, a separate dictionary of shared acquires per
priority is maintained.
To simplify the code and reduce memory consumption, the concept of the
"active" and "inactive" condition for shared acquires is abolished. The
lock can't predict what priorities the next acquires will use and even
keeping a cache can become computationally expensive for arguable
benefit (the underlying POSIX pipe, see ``pipe(2)``, needs to be
re-created for each notification anyway).
The following diagram shows a possible state of the internal queue from
a high-level view. Conditions are shown as (waiting) threads. Assuming
no modifications are made to the queue (e.g. more acquires or timeouts),
the lock would be acquired by the threads in this order (concurrent
acquires in parentheses): ``threadE1``, ``threadE2``, (``threadS1``,
``threadS2``, ``threadS3``), (``threadS4``, ``threadS5``), ``threadE3``,
``threadS6``, ``threadE4``, ``threadE5``.
::
[
(0, [exc/threadE1, exc/threadE2, shr/threadS1/threadS2/threadS3]),
(2, [shr/threadS4/threadS5]),
(10, [exc/threadE3]),
(33, [shr/threadS6, exc/threadE4, exc/threadE5]),
]
IPv6 support
------------
......
......@@ -32,6 +32,7 @@ import time
import errno
import weakref
import logging
import heapq
from ganeti import errors
from ganeti import utils
......@@ -41,6 +42,8 @@ from ganeti import compat
_EXCLUSIVE_TEXT = "exclusive"
_SHARED_TEXT = "shared"
_DEFAULT_PRIORITY = 0
def ssynchronized(mylock, shared=0):
"""Shared Synchronization decorator.
......@@ -406,6 +409,19 @@ class PipeCondition(_BaseCondition):
return bool(self._waiters)
class _PipeConditionWithMode(PipeCondition):
__slots__ = [
"shared",
]
def __init__(self, lock, shared):
"""Initializes this class.
"""
self.shared = shared
PipeCondition.__init__(self, lock)
class SharedLock(object):
"""Implements a shared lock.
......@@ -413,9 +429,13 @@ class SharedLock(object):
acquire_shared(). In order to acquire the lock in an exclusive way threads
can call acquire_exclusive().
The lock prevents starvation but does not guarantee that threads will acquire
the shared lock in the order they queued for it, just that they will
eventually do so.
Notes on data structures: C{__pending} contains a priority queue (heapq) of
all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
...]}. Each per-priority queue contains a normal in-order list of conditions
to be notified when the lock can be acquired. Shared locks are grouped
together by priority and the condition for them is stored in
C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
references for the per-priority queues indexed by priority for faster access.
@type name: string
@ivar name: the name of the lock
......@@ -423,17 +443,17 @@ class SharedLock(object):
"""
__slots__ = [
"__weakref__",
"__active_shr_c",
"__inactive_shr_c",
"__deleted",
"__exc",
"__lock",
"__pending",
"__pending_by_prio",
"__pending_shared",
"__shr",
"name",
]
__condition_class = PipeCondition
__condition_class = _PipeConditionWithMode
def __init__(self, name, monitor=None):
"""Construct a new SharedLock.
......@@ -452,10 +472,8 @@ class SharedLock(object):
# Queue containing waiting acquires
self.__pending = []
# Active and inactive conditions for shared locks
self.__active_shr_c = self.__condition_class(self.__lock)
self.__inactive_shr_c = self.__condition_class(self.__lock)
self.__pending_by_prio = {}
self.__pending_shared = {}
# Current lock holders
self.__shr = set()
......@@ -509,16 +527,18 @@ class SharedLock(object):
elif fname == "pending":
data = []
for cond in self.__pending:
if cond in (self.__active_shr_c, self.__inactive_shr_c):
mode = _SHARED_TEXT
else:
mode = _EXCLUSIVE_TEXT
# Sorting instead of copying and using heaq functions for simplicity
for (_, prioqueue) in sorted(self.__pending):
for cond in prioqueue:
if cond.shared:
mode = _SHARED_TEXT
else:
mode = _EXCLUSIVE_TEXT
# This function should be fast as it runs with the lock held. Hence
# not using utils.NiceSort.
data.append((mode, sorted([i.getName()
for i in cond.get_waiting()])))
# This function should be fast as it runs with the lock held.
# Hence not using utils.NiceSort.
data.append((mode, sorted(i.getName()
for i in cond.get_waiting())))
info.append(data)
else:
......@@ -584,7 +604,23 @@ class SharedLock(object):
"""
self.__lock.acquire()
try:
return len(self.__pending)
return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
finally:
self.__lock.release()
def _check_empty(self):
"""Checks whether there are any pending acquires.
@rtype: bool
"""
self.__lock.acquire()
try:
# Order is important: __find_first_pending_queue modifies __pending
return not (self.__find_first_pending_queue() or
self.__pending or
self.__pending_by_prio or
self.__pending_shared)
finally:
self.__lock.release()
......@@ -606,20 +642,42 @@ class SharedLock(object):
else:
return len(self.__shr) == 0 and self.__exc is None
def __find_first_pending_queue(self):
"""Tries to find the topmost queued entry with pending acquires.
Removes empty entries while going through the list.
"""
while self.__pending:
(priority, prioqueue) = self.__pending[0]
if not prioqueue:
heapq.heappop(self.__pending)
del self.__pending_by_prio[priority]
assert priority not in self.__pending_shared
continue
if prioqueue:
return prioqueue
return None
def __is_on_top(self, cond):
"""Checks whether the passed condition is on top of the queue.
The caller must make sure the queue isn't empty.
"""
return self.__pending[0] == cond
return cond == self.__find_first_pending_queue()[0]
def __acquire_unlocked(self, shared, timeout):
def __acquire_unlocked(self, shared, timeout, priority):
"""Acquire a shared lock.
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@param timeout: maximum waiting time before giving up
@type priority: integer
@param priority: Priority for acquiring lock
"""
self.__check_deleted()
......@@ -628,26 +686,46 @@ class SharedLock(object):
assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
" %s" % self.name)
# Remove empty entries from queue
self.__find_first_pending_queue()
# Check whether someone else holds the lock or there are pending acquires.
if not self.__pending and self.__can_acquire(shared):
# Apparently not, can acquire lock directly.
self.__do_acquire(shared)
return True
if shared:
wait_condition = self.__active_shr_c
prioqueue = self.__pending_by_prio.get(priority, None)
# Check if we're not yet in the queue
if wait_condition not in self.__pending:
self.__pending.append(wait_condition)
if shared:
# Try to re-use condition for shared acquire
wait_condition = self.__pending_shared.get(priority, None)
assert (wait_condition is None or
(wait_condition.shared and wait_condition in prioqueue))
else:
wait_condition = self.__condition_class(self.__lock)
# Always add to queue
self.__pending.append(wait_condition)
wait_condition = None
if wait_condition is None:
if prioqueue is None:
assert priority not in self.__pending_by_prio
prioqueue = []
heapq.heappush(self.__pending, (priority, prioqueue))
self.__pending_by_prio[priority] = prioqueue
wait_condition = self.__condition_class(self.__lock, shared)
prioqueue.append(wait_condition)
if shared:
# Keep reference for further shared acquires on same priority. This is
# better than trying to find it in the list of pending acquires.
assert priority not in self.__pending_shared
self.__pending_shared[priority] = wait_condition
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
# TODO: Decrease timeout with spurious notifications
while not (self.__is_on_top(wait_condition) and
self.__can_acquire(shared)):
# Wait for notification
......@@ -664,12 +742,15 @@ class SharedLock(object):
return True
finally:
# Remove condition from queue if there are no more waiters
if not wait_condition.has_waiting() and not self.__deleted:
self.__pending.remove(wait_condition)
if not wait_condition.has_waiting():
prioqueue.remove(wait_condition)
if wait_condition.shared:
del self.__pending_shared[priority]
return False
def acquire(self, shared=0, timeout=None, test_notify=None):
def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY,
test_notify=None):
"""Acquire a shared lock.
@type shared: integer (0/1) used as a boolean
......@@ -677,6 +758,8 @@ class SharedLock(object):
exclusive lock will be acquired
@type timeout: float
@param timeout: maximum waiting time before giving up
@type priority: integer
@param priority: Priority for acquiring lock
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
......@@ -687,7 +770,7 @@ class SharedLock(object):
if __debug__ and callable(test_notify):
test_notify()
return self.__acquire_unlocked(shared, timeout)
return self.__acquire_unlocked(shared, timeout, priority)
finally:
self.__lock.release()
......@@ -710,18 +793,14 @@ class SharedLock(object):
self.__shr.remove(threading.currentThread())
# Notify topmost condition in queue
if self.__pending:
first_condition = self.__pending[0]
first_condition.notifyAll()
if first_condition == self.__active_shr_c:
self.__active_shr_c = self.__inactive_shr_c
self.__inactive_shr_c = first_condition
prioqueue = self.__find_first_pending_queue()
if prioqueue:
prioqueue[0].notifyAll()
finally:
self.__lock.release()
def delete(self, timeout=None):
def delete(self, timeout=None, priority=_DEFAULT_PRIORITY):
"""Delete a Shared Lock.
This operation will declare the lock for removal. First the lock will be
......@@ -730,6 +809,8 @@ class SharedLock(object):
@type timeout: float
@param timeout: maximum waiting time before giving up
@type priority: integer
@param priority: Priority for acquiring lock
"""
self.__lock.acquire()
......@@ -742,7 +823,7 @@ class SharedLock(object):
acquired = self.__is_exclusive()
if not acquired:
acquired = self.__acquire_unlocked(0, timeout)
acquired = self.__acquire_unlocked(0, timeout, priority)
assert self.__is_exclusive() and not self.__is_sharer(), \
"Lock wasn't acquired in exclusive mode"
......@@ -754,8 +835,11 @@ class SharedLock(object):
assert not (self.__exc or self.__shr), "Found owner during deletion"
# Notify all acquires. They'll throw an error.
while self.__pending:
self.__pending.pop().notifyAll()
for (_, prioqueue) in self.__pending:
for cond in prioqueue:
cond.notifyAll()
assert self.__deleted
return acquired
finally:
......@@ -908,7 +992,8 @@ class LockSet:
self.__lock.release()
return set(result)
def acquire(self, names, timeout=None, shared=0, test_notify=None):
def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY,
test_notify=None):
"""Acquire a set of resource locks.
@type names: list of strings (or string)
......@@ -919,6 +1004,8 @@ class LockSet:
exclusive lock will be acquired
@type timeout: float or None
@param timeout: Maximum time to acquire all locks
@type priority: integer
@param priority: Priority for acquiring locks
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
......@@ -945,7 +1032,7 @@ class LockSet:
if isinstance(names, basestring):
names = [names]
return self.__acquire_inner(names, False, shared,
return self.__acquire_inner(names, False, shared, priority,
running_timeout.Remaining, test_notify)
else:
......@@ -954,18 +1041,18 @@ class LockSet:
# Some of them may then be deleted later, but we'll cope with this.
#
# We'd like to acquire this lock in a shared way, as it's nice if
# everybody else can use the instances at the same time. If are
# everybody else can use the instances at the same time. If we are
# acquiring them exclusively though they won't be able to do this
# anyway, though, so we'll get the list lock exclusively as well in
# order to be able to do add() on the set while owning it.
if not self.__lock.acquire(shared=shared,
if not self.__lock.acquire(shared=shared, priority=priority,
timeout=running_timeout.Remaining()):
raise _AcquireTimeout()
try:
# note we own the set-lock
self._add_owned()
return self.__acquire_inner(self.__names(), True, shared,
return self.__acquire_inner(self.__names(), True, shared, priority,
running_timeout.Remaining, test_notify)
except:
# We shouldn't have problems adding the lock to the owners list, but
......@@ -978,13 +1065,15 @@ class LockSet:
except _AcquireTimeout:
return None
def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
def __acquire_inner(self, names, want_all, shared, priority,
timeout_fn, test_notify):
"""Inner logic for acquiring a number of locks.
@param names: Names of the locks to be acquired
@param want_all: Whether all locks in the set should be acquired
@param shared: Whether to acquire in shared mode
@param timeout_fn: Function returning remaining timeout
@param priority: Priority for acquiring locks
@param test_notify: Special callback function for unittesting
"""
......@@ -1028,6 +1117,7 @@ class LockSet:
try:
# raises LockError if the lock was deleted
acq_success = lock.acquire(shared=shared, timeout=timeout,
priority=priority,
test_notify=test_notify_fn)
except errors.LockError:
if want_all:
......@@ -1146,6 +1236,8 @@ class LockSet:
lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
if acquired:
# No need for priority or timeout here as this lock has just been
# created
lock.acquire(shared=shared)
# now the lock cannot be deleted, we have it!
try:
......
......@@ -28,10 +28,12 @@ import time
import Queue
import threading
import random
import itertools
from ganeti import locking
from ganeti import errors
from ganeti import utils
from ganeti import compat
import testutils
......@@ -701,6 +703,106 @@ class TestSharedLock(_ThreadedTestCase):
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testPriority(self):
# Acquire in exclusive mode
self.assert_(self.sl.acquire(shared=0))
# Queue acquires
def _Acquire(prev, next, shared, priority, result):
prev.wait()
self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
try:
self.done.put(result)
finally:
self.sl.release()
counter = itertools.count(0)
priorities = range(-20, 30)
first = threading.Event()
prev = first
# Data structure:
# {
# priority:
# [(shared/exclusive, set(acquire names), set(pending threads)),
# (shared/exclusive, ...),
# ...,
# ],
# }
perprio = {}
# References shared acquire per priority in L{perprio}. Data structure:
# {
# priority: (shared=1, set(acquire names), set(pending threads)),
# }
prioshared = {}
for seed in [4979, 9523, 14902, 32440]:
# Use a deterministic random generator
rnd = random.Random(seed)
for priority in [rnd.choice(priorities) for _ in range(30)]:
modes = [0, 1]
rnd.shuffle(modes)
for shared in modes:
# Unique name
acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
ev = threading.Event()
thread = self._addThread(target=_Acquire,
args=(prev, ev, shared, priority, acqname))
prev = ev
# Record expected aqcuire, see above for structure
data = (shared, set([acqname]), set([thread]))
priolist = perprio.setdefault(priority, [])
if shared:
priosh = prioshared.get(priority, None)
if priosh:
# Shared acquires are merged
for i, j in zip(priosh[1:], data[1:]):
i.update(j)
assert data[0] == priosh[0]
else:
prioshared[priority] = data
priolist.append(data)
else:
priolist.append(data)
# Start all acquires and wait for them
first.set()
prev.wait()
# Check lock information
self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
["exclusive", [threading.currentThread().getName()]])
self.assertEqual(self.sl.GetInfo(["name", "pending"]),
[self.sl.name,
[(["exclusive", "shared"][int(bool(shared))],
sorted([t.getName() for t in threads]))
for acquires in [perprio[i]
for i in sorted(perprio.keys())]
for (shared, _, threads) in acquires]])
# Let threads acquire the lock
self.sl.release()
# Wait for everything to finish
self._waitThreads()
self.assert_(self.sl._check_empty())
# Check acquires by priority
for acquires in [perprio[i] for i in sorted(perprio.keys())]:
for (_, names, _) in acquires:
# For shared acquires, the set will contain 1..n entries. For exclusive
# acquires only one.
while names:
names.remove(self.done.get_nowait())
self.assertFalse(compat.any(names for (_, names, _) in acquires))
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""
......@@ -1259,6 +1361,57 @@ class TestLockSet(_ThreadedTestCase):
self.assertEqual(self.done.get_nowait(), 'DONE')
self._setUpLS()
def testPriority(self):
def _Acquire(prev, next, name, priority, success_fn):
prev.wait()
self.assert_(self.ls.acquire(name, shared=0,
priority=priority,
test_notify=lambda _: next.set()))
try:
success_fn()
finally:
self.ls.release()
# Get all in exclusive mode
self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
done_two = Queue.Queue(0)
first = threading.Event()
prev = first
acquires = [("one", prio, self.done) for prio in range(1, 33)]
acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
# Use a deterministic random generator
random.Random(741).shuffle(acquires)
for (name, prio, done) in acquires:
ev = threading.Event()
self._addThread(target=_Acquire,
args=(prev, ev, name, prio,
compat.partial(done.put, "Prio%s" % prio)))
prev = ev
# Start acquires
first.set()
# Wait for last acquire to start
prev.wait()
# Let threads acquire locks
self.ls.release()
# Wait for threads to finish
self._waitThreads()
for i in range(1, 33):
self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertRaises(Queue.Empty, done_two.get_nowait)
class TestGanetiLockManager(_ThreadedTestCase):
......
Markdown is supported
0% or