Commit 34c5ec6c authored by Hrvoje Ribicic's avatar Hrvoje Ribicic

Track if a QA test was blocked by locks

This patch adds threading to the RunWithTests function, allowing one
thread to execute the QA test, and the other to monitor if it is being
blocked by locks set up during the test. If it is, terminate the
blocking job, and let the QA continue, reporting the test failure at
the very end.
Signed-off-by: default avatarHrvoje Ribicic <riba@google.com>
Reviewed-by: default avatarPetr Pudlak <pudlak@google.com>
parent ffafdcf6
......@@ -24,6 +24,8 @@
"""
import re
import threading
import time
from ganeti import constants
from ganeti import locking
......@@ -38,7 +40,7 @@ from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo
AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
def _GetOutputFromMaster(cmd):
def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
""" Gets the output of a command executed on master.
"""
......@@ -51,7 +53,8 @@ def _GetOutputFromMaster(cmd):
# buildbot
cmdstr += " 2>&1"
return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr)
return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
use_multiplexer=use_multiplexer, log_cmd=log_cmd)
def ExecuteJobProducingCommand(cmd):
......@@ -107,6 +110,87 @@ def _TerminateDelayFunction(termination_socket):
AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)
def _GetNodeUUIDMap(nodes):
""" Given a list of nodes, retrieves a mapping of their names to UUIDs.
@type nodes: list of string
@param nodes: The nodes to retrieve a map for. If empty, returns information
for all the nodes.
"""
cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
cmd.extend(nodes)
output = _GetOutputFromMaster(cmd)
return dict(map(lambda x: x.split(), output.splitlines()))
def _FindLockNames(locks):
""" Finds the ids and descriptions of locks that given locks can block.
@type locks: dict of locking level to list
@param locks: The locks that gnt-debug delay is holding.
@rtype: dict of string to string
@return: The lock name to entity name map.
For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
blocked even though they were not listed explicitly. This function has to take
care and list all locks that can be blocked by the locks given as parameters.
"""
lock_map = {}
if locking.LEVEL_NODE in locks:
node_locks = locks[locking.LEVEL_NODE]
if node_locks == locking.ALL_SET:
# Empty list retrieves all info
name_uuid_map = _GetNodeUUIDMap([])
else:
name_uuid_map = _GetNodeUUIDMap(node_locks)
for name in name_uuid_map:
lock_map["node/%s" % name_uuid_map[name]] = name
# If ALL_SET was requested explicitly, or there is at least one lock
# Note that locking.ALL_SET is None and hence the strange form of the if
if node_locks == locking.ALL_SET or node_locks:
lock_map["node/[lockset]"] = "joint node lock"
#TODO add other lock types here when support for these is added
return lock_map
def _GetBlockingLocks():
""" Finds out which locks are blocking jobs by invoking "gnt-debug locks".
@rtype: list of string
@return: The names of the locks currently blocking any job.
"""
# Due to mysterious issues when a SSH multiplexer is being used by two
# threads, we turn it off, and block most of the logging to improve the
# visibility of the other thread's output
locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
log_cmd=False)
# The first non-empty line is the header, which we do not need
lock_lines = locks_output.splitlines()[1:]
blocking_locks = []
for lock_line in lock_lines:
components = lock_line.split()
if len(components) != 4:
raise qa_error.Error("Error while parsing gnt-debug locks output, "
"line at fault is: %s" % lock_line)
lock_name, _, _, pending_jobs = components
if pending_jobs != '-':
blocking_locks.append(lock_name)
return blocking_locks
# TODO: Can this be done as a decorator? Implement as needed.
def RunWithLocks(fn, locks, timeout, *args, **kwargs):
""" Runs the given function, acquiring a set of locks beforehand.
......@@ -123,10 +207,11 @@ def RunWithLocks(fn, locks, timeout, *args, **kwargs):
test, to try and see if the function can run in parallel with other
operations.
The current version simply creates the locks, which expire after a given
timeout, and attempts to invoke the provided function.
This will probably block the QA, and future versions will address this.
Locks are acquired by invoking a gnt-debug delay operation which can be
interrupted as needed. The QA test is then run in a separate thread, with the
current thread observing jobs waiting for locks. When a job is spotted waiting
for a lock held by the started delay operation, this is noted, and the delay
is interrupted, allowing the QA test to continue.
A default timeout is not provided by design - the test creator must make a
good conservative estimate.
......@@ -139,11 +224,44 @@ def RunWithLocks(fn, locks, timeout, *args, **kwargs):
# The watcher may interfere by issuing its own jobs - therefore pause it
AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
termination_socket = _StartDelayFunction(locks, timeout)
# Find out the lock names prior to starting the delay function
lock_name_map = _FindLockNames(locks)
fn(*args, **kwargs)
termination_socket = _StartDelayFunction(locks, timeout)
_TerminateDelayFunction(termination_socket)
qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
qa_thread.start()
blocking_owned_locks = []
test_blocked = False
try:
while qa_thread.isAlive():
blocking_locks = _GetBlockingLocks()
blocking_owned_locks = \
set(blocking_locks).intersection(set(lock_name_map))
if blocking_owned_locks:
test_blocked = True
_TerminateDelayFunction(termination_socket)
break
# The sleeping time has been set arbitrarily
time.sleep(5)
except:
# If anything goes wrong here, we should be responsible and terminate the
# delay job
_TerminateDelayFunction(termination_socket)
raise
qa_thread.join()
if test_blocked:
blocking_lock_names = map(lock_name_map.get, blocking_owned_locks)
raise qa_error.Error("QA test succeded, but was blocked by the locks: %s" %
", ".join(blocking_lock_names))
else:
_TerminateDelayFunction(termination_socket)
# Revive the watcher
AssertCommand(["gnt-cluster", "watcher", "continue"])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment