Commit 06c876fe authored by Thomas Thrainer's avatar Thomas Thrainer
Browse files

Test parallel job submission performance



Submit 200 delay jobs and verify that the submission rate does not drop
as more jobs are added to the queue. Also verify that a `gnt-cluster
info` is not slowed down by a large number of jobs in the queue.
Signed-off-by: default avatarThomas Thrainer <thomasth@google.com>
Reviewed-by: default avatarHrvoje Ribicic <riba@google.com>
parent 0e594722
......@@ -67,7 +67,7 @@ The following tests are added to the QA:
return within a reasonable low timeout.
* For the maximum amount of instances in the cluster, submit add-,
remove- and list-tags jobs.
* Submit 200 `gnt-debug delay` jobs with a delay of 1 seconds. To
* Submit 200 `gnt-debug delay` jobs with a delay of 0.1 seconds. To
speed up submission, perform multiple job submissions in parallel.
Verify that submitting jobs doesn't significantly slow down during
the process. Verify that querying cluster information over CLI and
......
......@@ -798,6 +798,8 @@ def RunPerformanceTests():
qa_performance.RemoveAllInstances(instances)
RunTest(qa_performance.TestJobQueueSubmissionPerformance)
def RunQa():
"""Main QA body.
......
......@@ -23,8 +23,10 @@
"""
import datetime
import functools
import itertools
import threading
import time
from ganeti import constants
......@@ -33,9 +35,13 @@ import qa_config
import qa_error
from qa_instance_utils import GetGenericAddParameters
import qa_job_utils
import qa_logging
import qa_utils
MAX_JOB_SUBMISSION_DURATION = 15.0
class _JobQueueDriver(object):
"""This class handles polling of jobs and reacting on status changes.
......@@ -66,6 +72,7 @@ class _JobQueueDriver(object):
self._jobs = {}
self._running_notified = set()
self._jobs_per_status = {}
self._lock = threading.RLock()
def AddJob(self, job_id, running_fn=None, success_fn=None):
"""Add a job to the driver.
......@@ -79,11 +86,12 @@ class _JobQueueDriver(object):
@param success_fn: function called for each successful job id
"""
self._jobs[job_id] = _JobQueueDriver._JobEntry(job_id,
running_fn,
success_fn)
# the status will be updated on the next call to _FetchJobStatuses
self._jobs_per_status.setdefault(self._UNKNOWN_STATUS, []).append(job_id)
with self._lock:
self._jobs[job_id] = _JobQueueDriver._JobEntry(job_id,
running_fn,
success_fn)
# the status will be updated on the next call to _FetchJobStatuses
self._jobs_per_status.setdefault(self._UNKNOWN_STATUS, []).append(job_id)
def _FetchJobStatuses(self):
"""Retrieves status information of the given jobs.
......@@ -148,11 +156,12 @@ class _JobQueueDriver(object):
@return: C{True} if there are still jobs which have not succeeded
"""
self._UpdateJobStatuses()
uncompleted_jobs = self._GetJobsInStatuses(
constants.JOB_STATUS_ALL - constants.JOBS_FINALIZED)
unknown_jobs = self._GetJobsInStatuses([self._UNKNOWN_STATUS])
return len(uncompleted_jobs) > 0 or len(unknown_jobs) > 0
with self._lock:
self._UpdateJobStatuses()
uncompleted_jobs = self._GetJobsInStatuses(
constants.JOB_STATUS_ALL - constants.JOBS_FINALIZED)
unknown_jobs = self._GetJobsInStatuses([self._UNKNOWN_STATUS])
return len(uncompleted_jobs) > 0 or len(unknown_jobs) > 0
def WaitForCompletion(self):
"""Wait for the completion of all registered jobs.
......@@ -161,9 +170,10 @@ class _JobQueueDriver(object):
while self._HasPendingJobs():
time.sleep(2)
if self._jobs:
raise qa_error.Error(
"Jobs %s didn't finish in success state!" % self._GetJobIds())
with self._lock:
if self._jobs:
raise qa_error.Error(
"Jobs %s didn't finish in success state!" % self._GetJobIds())
def _AcquireAllInstances():
......@@ -192,6 +202,28 @@ def _AcquireAllNodes():
pass
def _ExecuteJobSubmittingCmd(cmd):
"""Executes a job submitting command and returns the resulting job ID.
This will fail if submitting the job takes longer than
L{MAX_JOB_SUBMISSION_DURATION}.
@type cmd: list of string or string
@param cmd: the job producing command to execute on the cluster
@rtype: int
@return: job-id
"""
start = datetime.datetime.now()
result = qa_job_utils.ExecuteJobProducingCommand(cmd)
duration = qa_utils.TimedeltaToTotalSeconds(datetime.datetime.now() - start)
if duration > MAX_JOB_SUBMISSION_DURATION:
raise qa_error.Error(
"Executing '%s' took %f seconds, only %f are allowed" %
(cmd, duration, MAX_JOB_SUBMISSION_DURATION))
return result
def _SubmitInstanceCreationJob(instance):
"""Submit an instance creation job.
......@@ -211,7 +243,7 @@ def _SubmitInstanceCreationJob(instance):
instance.SetDiskTemplate(disk_template)
return qa_job_utils.ExecuteJobProducingCommand(cmd)
return _ExecuteJobSubmittingCmd(cmd)
except:
instance.Release()
raise
......@@ -230,7 +262,7 @@ def _SubmitInstanceRemoveJob(instance):
cmd = (["gnt-instance", "remove", "--submit", "-f"])
cmd.append(instance.name)
return qa_job_utils.ExecuteJobProducingCommand(cmd)
return _ExecuteJobSubmittingCmd(cmd)
finally:
instance.Release()
......@@ -320,7 +352,7 @@ def TestParallelModify(instances):
cmd = (["gnt-instance", "modify", "--submit",
"-B", "%s=%s" % (constants.BE_MINMEM, new_min_mem)])
cmd.append(instance.name)
job_driver.AddJob(qa_job_utils.ExecuteJobProducingCommand(cmd))
job_driver.AddJob(_ExecuteJobSubmittingCmd(cmd))
cmd = (["gnt-instance", "modify", "--submit",
"-O", "fake_os_param=fake_value"])
......@@ -385,6 +417,33 @@ def TestParallelInstanceOperations(instances):
job_driver.WaitForCompletion()
# TODO(thomasth): move to qa_job_utils.py once stable-2.10 is merged to master
class _QAThreadGroup(object):
"""This class manages a list of QAThreads.
"""
def __init__(self):
self._threads = []
def Start(self, thread):
"""Starts the given thread and adds it to this group.
@type thread: qa_job_utils.QAThread
@param thread: the thread to start and to add to this group.
"""
thread.start()
self._threads.append(thread)
def JoinAndReraise(self):
"""Joins all threads in this group and calls their C{reraise} method.
"""
for thread in self._threads:
thread.join()
thread.reraise()
def TestParallelInstanceQueries(instances):
"""PERFORMANCE: Parallel instance queries.
......@@ -392,18 +451,63 @@ def TestParallelInstanceQueries(instances):
@param instances: list of instances to issue queries against
"""
threads = []
threads = _QAThreadGroup()
for instance in instances:
cmd = ["gnt-instance", "info", instance.name]
info_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {})
info_thread.start()
threads.append(info_thread)
threads.Start(info_thread)
cmd = ["gnt-instance", "list"]
list_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {})
list_thread.start()
threads.append(list_thread)
threads.Start(list_thread)
threads.JoinAndReraise()
for thread in threads:
thread.join()
thread.reraise()
def TestJobQueueSubmissionPerformance():
"""PERFORMANCE: Job queue submission performance.
This test exercises the job queue and verifies that the job submission time
does not increase as more jobs are added.
"""
MAX_CLUSTER_INFO_SECONDS = 15.0
job_driver = _JobQueueDriver()
submission_durations = []
def _VerifySubmissionDuration(duration_seconds):
# only start to verify the submission duration once we got data from the
# first 10 job submissions
if len(submission_durations) >= 10:
avg_duration = sum(submission_durations) / len(submission_durations)
max_duration = avg_duration * 1.5
if duration_seconds > max_duration:
print(qa_logging.FormatWarning(
"Submitting a delay job took %f seconds, max %f expected" %
(duration_seconds, max_duration)))
else:
submission_durations.append(duration_seconds)
def _SubmitDelayJob(count):
for _ in range(count):
cmd = ["gnt-debug", "delay", "--submit", "0.1"]
start = datetime.datetime.now()
job_id = _ExecuteJobSubmittingCmd(cmd)
duration_seconds = \
qa_utils.TimedeltaToTotalSeconds(datetime.datetime.now() - start)
_VerifySubmissionDuration(duration_seconds)
job_driver.AddJob(job_id)
threads = _QAThreadGroup()
for i in range(10):
thread = qa_job_utils.QAThread(_SubmitDelayJob, [20], {})
threads.Start(thread)
threads.JoinAndReraise()
qa_utils.AssertCommand(["gnt-cluster", "info"],
max_seconds=MAX_CLUSTER_INFO_SECONDS)
job_driver.WaitForCompletion()
......@@ -24,6 +24,7 @@
"""
import copy
import datetime
import operator
import os
import random
......@@ -140,7 +141,7 @@ def _AssertRetCode(rcode, fail, cmdstr, nodename):
(cmdstr, nodename, rcode))
def AssertCommand(cmd, fail=False, node=None, log_cmd=True):
def AssertCommand(cmd, fail=False, node=None, log_cmd=True, max_seconds=None):
"""Checks that a remote command succeeds.
@param cmd: either a string (the command to execute) or a list (to
......@@ -152,6 +153,9 @@ def AssertCommand(cmd, fail=False, node=None, log_cmd=True):
dict or a string)
@param log_cmd: if False, the command won't be logged (simply passed to
StartSSH)
@type max_seconds: double
@param max_seconds: fail if the command takes more than C{max_seconds}
seconds
@return: the return code of the command
@raise qa_error.Error: if the command fails when it shouldn't or vice versa
......@@ -166,9 +170,17 @@ def AssertCommand(cmd, fail=False, node=None, log_cmd=True):
else:
cmdstr = utils.ShellQuoteArgs(cmd)
start = datetime.datetime.now()
rcode = StartSSH(nodename, cmdstr, log_cmd=log_cmd).wait()
duration_seconds = TimedeltaToTotalSeconds(datetime.datetime.now() - start)
_AssertRetCode(rcode, fail, cmdstr, nodename)
if max_seconds is not None:
if duration_seconds > max_seconds:
raise qa_error.Error(
"Cmd '%s' took %f seconds, maximum of %f was exceeded" %
(cmdstr, duration_seconds, max_seconds))
return rcode
......@@ -870,3 +882,19 @@ def ParseIPolicy(policy):
else:
ret_policy[key] = val
return (ret_policy, ret_specs)
def TimedeltaToTotalSeconds(td):
"""Returns the total seconds in a C{datetime.timedelta} object.
This performs the same task as the C{datetime.timedelta.total_seconds()}
method which is present in Python 2.7 onwards.
@type td: datetime.timedelta
@param td: timedelta object to convert
@rtype float
@return: total seconds in the timedelta object
"""
return ((td.microseconds + (td.seconds + td.days * 24.0 * 3600.0) * 10 ** 6) /
10 ** 6)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment