Commit 3b86a503 authored by Thomas Thrainer's avatar Thomas Thrainer
Browse files

Merge branch 'stable-2.10' into stable-2.11



* stable-2.10
  Fix passing of ispecs in cluster init during QA
  Move QAThreadGroup to qa_job_utils.py
  Extract GetJobStatuses and use an unified version
  Run disk template specific tests only if possible
Signed-off-by: default avatarThomas Thrainer <thomasth@google.com>
Reviewed-by: default avatarHrvoje Ribicic <riba@google.com>
parents 866e1f76 6383059d
...@@ -863,8 +863,10 @@ def RunPerformanceTests(): ...@@ -863,8 +863,10 @@ def RunPerformanceTests():
RunTest(qa_performance.TestJobQueueSubmissionPerformance) RunTest(qa_performance.TestJobQueueSubmissionPerformance)
if qa_config.TestEnabled("parallel-performance"): if qa_config.TestEnabled("parallel-performance"):
RunTest(qa_performance.TestParallelDRBDInstanceCreationPerformance) if qa_config.IsTemplateSupported(constants.DT_DRBD8):
RunTest(qa_performance.TestParallelPlainInstanceCreationPerformance) RunTest(qa_performance.TestParallelDRBDInstanceCreationPerformance)
if qa_config.IsTemplateSupported(constants.DT_PLAIN):
RunTest(qa_performance.TestParallelPlainInstanceCreationPerformance)
if qa_config.IsTemplateSupported(constants.DT_DRBD8): if qa_config.IsTemplateSupported(constants.DT_DRBD8):
inodes = qa_config.AcquireManyNodes(2) inodes = qa_config.AcquireManyNodes(2)
......
...@@ -207,11 +207,14 @@ def TestClusterInit(rapi_user, rapi_secret): ...@@ -207,11 +207,14 @@ def TestClusterInit(rapi_user, rapi_secret):
for spec_type in ("mem-size", "disk-size", "disk-count", "cpu-count", for spec_type in ("mem-size", "disk-size", "disk-count", "cpu-count",
"nic-count"): "nic-count"):
spec_values = []
for spec_val in ("min", "max", "std"): for spec_val in ("min", "max", "std"):
spec = qa_config.get("ispec_%s_%s" % spec = qa_config.get("ispec_%s_%s" %
(spec_type.replace("-", "_"), spec_val), None) (spec_type.replace("-", "_"), spec_val), None)
if spec is not None: if spec is not None:
cmd.append("--specs-%s=%s=%d" % (spec_type, spec_val, spec)) spec_values.append("%s=%d" % (spec_val, spec))
if spec_values:
cmd.append("--specs-%s=%s" % (spec_type, ",".join(spec_values)))
if master.secondary: if master.secondary:
cmd.append("--secondary-ip=%s" % master.secondary) cmd.append("--secondary-ip=%s" % master.secondary)
......
...@@ -23,17 +23,16 @@ ...@@ -23,17 +23,16 @@
""" """
from ganeti.utils import retry
from ganeti import constants
from ganeti import query
import functools import functools
import re import re
from ganeti.utils import retry
from ganeti import constants
from ganeti import query
import qa_config import qa_config
import qa_error import qa_error
import qa_job_utils
import qa_utils import qa_utils
from qa_utils import AssertCommand, GetCommandOutput from qa_utils import AssertCommand, GetCommandOutput
...@@ -48,20 +47,6 @@ def TestJobListFields(): ...@@ -48,20 +47,6 @@ def TestJobListFields():
qa_utils.GenericQueryFieldsTest("gnt-job", query.JOB_FIELDS.keys()) qa_utils.GenericQueryFieldsTest("gnt-job", query.JOB_FIELDS.keys())
def _GetJobStatuses():
""" Invokes gnt-job list and extracts an id to status dictionary.
@rtype: dict of string to string
@return: A dictionary mapping job ids to matching statuses
"""
master = qa_config.GetMasterNode()
list_output = GetCommandOutput(
master.primary, "gnt-job list --no-headers --output=id,status"
)
return dict(map(lambda s: s.split(), list_output.splitlines()))
def _GetJobStatus(job_id): def _GetJobStatus(job_id):
""" Retrieves the status of a job. """ Retrieves the status of a job.
...@@ -72,7 +57,7 @@ def _GetJobStatus(job_id): ...@@ -72,7 +57,7 @@ def _GetJobStatus(job_id):
@return: The job status, or None if not present. @return: The job status, or None if not present.
""" """
return _GetJobStatuses().get(job_id, None) return qa_job_utils.GetJobStatuses([job_id]).get(job_id, None)
def _RetryingFetchJobStatus(retry_status, job_id): def _RetryingFetchJobStatus(retry_status, job_id):
......
...@@ -82,6 +82,24 @@ def ExecuteJobProducingCommand(cmd): ...@@ -82,6 +82,24 @@ def ExecuteJobProducingCommand(cmd):
return int(possible_job_ids[0]) return int(possible_job_ids[0])
def GetJobStatuses(job_ids=None):
""" Invokes gnt-job list and extracts an id to status dictionary.
@type job_ids: list
@param job_ids: list of job ids to query the status for; if C{None}, the
status of all current jobs is returned
@rtype: dict of string to string
@return: A dictionary mapping job ids to matching statuses
"""
cmd = ["gnt-job", "list", "--no-headers", "--output=id,status"]
if job_ids is not None:
cmd.extend(map(str, job_ids))
list_output = GetOutputFromMaster(cmd)
return dict(map(lambda s: s.split(), list_output.splitlines()))
def _RetrieveTerminationInfo(job_id): def _RetrieveTerminationInfo(job_id):
""" Retrieves the termination info from a job caused by gnt-debug delay. """ Retrieves the termination info from a job caused by gnt-debug delay.
...@@ -262,6 +280,32 @@ class QAThread(threading.Thread): ...@@ -262,6 +280,32 @@ class QAThread(threading.Thread):
raise self._exc_info[0], self._exc_info[1], self._exc_info[2] raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
class QAThreadGroup(object):
"""This class manages a list of QAThreads.
"""
def __init__(self):
self._threads = []
def Start(self, thread):
"""Starts the given thread and adds it to this group.
@type thread: qa_job_utils.QAThread
@param thread: the thread to start and to add to this group.
"""
thread.start()
self._threads.append(thread)
def JoinAndReraise(self):
"""Joins all threads in this group and calls their C{reraise} method.
"""
for thread in self._threads:
thread.join()
thread.reraise()
# TODO: Can this be done as a decorator? Implement as needed. # TODO: Can this be done as a decorator? Implement as needed.
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs): def RunWithLocks(fn, locks, timeout, block, *args, **kwargs):
""" Runs the given function, acquiring a set of locks beforehand. """ Runs the given function, acquiring a set of locks beforehand.
......
...@@ -96,15 +96,11 @@ class _JobQueueDriver(object): ...@@ -96,15 +96,11 @@ class _JobQueueDriver(object):
def _FetchJobStatuses(self): def _FetchJobStatuses(self):
"""Retrieves status information of the given jobs. """Retrieves status information of the given jobs.
@rtype: dict of string to list of L{_JobEntry)
""" """
cmd = (["gnt-job", "list", "--no-headers", "-o", "id,status"]) job_statuses = qa_job_utils.GetJobStatuses(self._GetJobIds())
cmd.extend(map(str, self._GetJobIds()))
job_statuses = [line.split() for line in
qa_job_utils.GetOutputFromMaster(cmd).splitlines()]
new_statuses = {} new_statuses = {}
for job_id, status in job_statuses: for job_id, status in job_statuses.items():
new_statuses.setdefault(status, []).append(self._jobs[int(job_id)]) new_statuses.setdefault(status, []).append(self._jobs[int(job_id)])
self._jobs_per_status = new_statuses self._jobs_per_status = new_statuses
...@@ -433,33 +429,6 @@ def TestParallelInstanceOSOperations(instances): ...@@ -433,33 +429,6 @@ def TestParallelInstanceOSOperations(instances):
job_driver.WaitForCompletion() job_driver.WaitForCompletion()
# TODO(thomasth): move to qa_job_utils.py once stable-2.10 is merged to master
class _QAThreadGroup(object):
"""This class manages a list of QAThreads.
"""
def __init__(self):
self._threads = []
def Start(self, thread):
"""Starts the given thread and adds it to this group.
@type thread: qa_job_utils.QAThread
@param thread: the thread to start and to add to this group.
"""
thread.start()
self._threads.append(thread)
def JoinAndReraise(self):
"""Joins all threads in this group and calls their C{reraise} method.
"""
for thread in self._threads:
thread.join()
thread.reraise()
def TestParallelInstanceQueries(instances): def TestParallelInstanceQueries(instances):
"""PERFORMANCE: Parallel instance queries. """PERFORMANCE: Parallel instance queries.
...@@ -467,7 +436,7 @@ def TestParallelInstanceQueries(instances): ...@@ -467,7 +436,7 @@ def TestParallelInstanceQueries(instances):
@param instances: list of instances to issue queries against @param instances: list of instances to issue queries against
""" """
threads = _QAThreadGroup() threads = qa_job_utils.QAThreadGroup()
for instance in instances: for instance in instances:
cmd = ["gnt-instance", "info", instance.name] cmd = ["gnt-instance", "info", instance.name]
info_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {}) info_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {})
...@@ -516,7 +485,7 @@ def TestJobQueueSubmissionPerformance(): ...@@ -516,7 +485,7 @@ def TestJobQueueSubmissionPerformance():
job_driver.AddJob(job_id) job_driver.AddJob(job_id)
threads = _QAThreadGroup() threads = qa_job_utils.QAThreadGroup()
for i in range(10): for i in range(10):
thread = qa_job_utils.QAThread(_SubmitDelayJob, [20], {}) thread = qa_job_utils.QAThread(_SubmitDelayJob, [20], {})
threads.Start(thread) threads.Start(thread)
...@@ -533,8 +502,7 @@ def TestParallelDRBDInstanceCreationPerformance(): ...@@ -533,8 +502,7 @@ def TestParallelDRBDInstanceCreationPerformance():
"""PERFORMANCE: Parallel DRBD backed instance creation. """PERFORMANCE: Parallel DRBD backed instance creation.
""" """
if not qa_config.IsTemplateSupported(constants.DT_DRBD8): assert qa_config.IsTemplateSupported(constants.DT_DRBD8)
print(qa_logging.FormatInfo("DRBD disk template not supported, skipping"))
nodes = list(_AcquireAllNodes()) nodes = list(_AcquireAllNodes())
_TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2, _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2,
...@@ -546,8 +514,7 @@ def TestParallelPlainInstanceCreationPerformance(): ...@@ -546,8 +514,7 @@ def TestParallelPlainInstanceCreationPerformance():
"""PERFORMANCE: Parallel plain backed instance creation. """PERFORMANCE: Parallel plain backed instance creation.
""" """
if not qa_config.IsTemplateSupported(constants.DT_PLAIN): assert qa_config.IsTemplateSupported(constants.DT_PLAIN)
print(qa_logging.FormatInfo("Plain disk template not supported, skipping"))
nodes = list(_AcquireAllNodes()) nodes = list(_AcquireAllNodes())
_TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2, _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2,
...@@ -571,9 +538,7 @@ def _TestInstanceOperationInParallelToInstanceCreation(*cmds): ...@@ -571,9 +538,7 @@ def _TestInstanceOperationInParallelToInstanceCreation(*cmds):
job_driver.AddJob( job_driver.AddJob(
job_id, running_fn=functools.partial(_SubmitNextCommand, cmd_idx + 1)) job_id, running_fn=functools.partial(_SubmitNextCommand, cmd_idx + 1))
if not qa_config.IsTemplateSupported(constants.DT_DRBD8): assert qa_config.IsTemplateSupported(constants.DT_DRBD8)
print(qa_logging.FormatInfo("DRBD disk template not supported, skipping"))
assert len(cmds) > 0 assert len(cmds) > 0
job_driver = _JobQueueDriver() job_driver = _JobQueueDriver()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment