Commit 3648dad3 authored by Klaus Aehlig's avatar Klaus Aehlig
Browse files

Merge branch 'stable-2.11' into stable-2.12



* stable-2.11
  Add hsqueeze to NEWS
  kvm: use a dedicated QMP socket for kvmd
  Add design-node-security.rst to docinput

* stable-2.10
  Test parallel instance ops and plain instances
  Test parallel creation of DRBD instances
  Test parallel job submission performance
  Test parallel instance query operations
  Test parallel instance operations
  Test parallel instance modification
  Test parallel node-count instance creation
  Test parallel instance creation and removal
  Fail in replace-disks if attaching disks fails
  Add a basic test for --restricted-migration
  Describe the --restricted-migration option
  Support restricted migration
  Add an option for restricted migration
  Add an example for node evacuation
  Add a test for parsing version strings
  Set correct Ganeti version on setup commands
  Add a utility to combine shell commands
  Add design doc for performance tests

* stable-2.9
  KVM: set IFF_ONE_QUEUE on created tap interfaces
  Add configure option to pass GHC flags

Conflicts:
	doc/design-draft.rst
	lib/hypervisor/hv_kvm/__init__.py
	qa/qa_job_utils.py
Resolution:
	trivial, following code moves
Signed-off-by: default avatarKlaus Aehlig <aehlig@google.com>
Reviewed-by: default avatarPetr Pudlak <pudlak@google.com>
parents 00828204 866e1f76
......@@ -598,6 +598,7 @@ docinput = \
doc/design-multi-version-tests.rst \
doc/design-network.rst \
doc/design-node-add.rst \
doc/design-node-security.rst \
doc/design-oob.rst \
doc/design-openvswitch.rst \
doc/design-opportunistic-locking.rst \
......@@ -605,6 +606,8 @@ docinput = \
doc/design-os.rst \
doc/design-ovf-support.rst \
doc/design-partitioned.rst \
doc/design-performance-tests.rst \
doc/design-query-splitting.rst \
doc/design-query2.rst \
doc/design-query-splitting.rst \
doc/design-reason-trail.rst \
......@@ -713,6 +716,8 @@ HTEST_FLAGS = $(HFLAGS) -fhpc -itest/hs \
HEXTRA =
# internal extra flags (used for test/hs/htest mainly)
HEXTRA_INT =
# combination of HEXTRA and HEXTRA_CONFIGURE
HEXTRA_COMBINED = $(HEXTRA) $(HEXTRA_CONFIGURE)
# exclude options for coverage reports
HPCEXCL = --exclude Main \
--exclude Ganeti.Constants \
......@@ -1126,26 +1131,26 @@ endif
Makefile.ghc: $(HS_MAKEFILE_GHC_SRCS) Makefile \
| $(built_base_sources) $(HS_BUILT_SRCS)
$(GHC) -M -dep-makefile $@ -dep-suffix $(HTEST_SUFFIX) $(HFLAGS) -itest/hs \
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA) $(HS_MAKEFILE_GHC_SRCS)
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA_COMBINED) $(HS_MAKEFILE_GHC_SRCS)
@include_makefile_ghc@
%.o:
@echo '[GHC]: $@ <- $^'
@$(GHC) -c $(HFLAGS) \
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA) $(@:%.o=%.hs)
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA_COMBINED) $(@:%.o=%.hs)
%.$(HTEST_SUFFIX)_o:
@echo '[GHC]: $@ <- $^'
@$(GHC) -c $(HTEST_FLAGS) \
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA) $(@:%.$(HTEST_SUFFIX)_o=%.hs)
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA_COMBINED) $(@:%.$(HTEST_SUFFIX)_o=%.hs)
%.hi: %.o ;
%.$(HTEST_SUFFIX)_hi: %.$(HTEST_SUFFIX)_o ;
$(HS_SRC_PROGS): %: %.o | stamp-directories
$(GHC) $(HFLAGS) \
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA) --make $(@:%=%.hs)
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA_COMBINED) --make $(@:%=%.hs)
@rm -f $(notdir $@).tix
@touch "$@"
......@@ -1157,7 +1162,7 @@ $(HS_TEST_PROGS): %: %.$(HTEST_SUFFIX)_o \
exit 1; \
fi
$(GHC) $(HTEST_FLAGS) \
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA) --make $(@:%=%.hs)
$(HS_PARALLEL3) $(HS_REGEX_PCRE) $(HEXTRA_COMBINED) --make $(@:%=%.hs)
@rm -f $(notdir $@).tix
@touch "$@"
......@@ -1379,6 +1384,7 @@ TEST_FILES = \
test/data/htools/hail-reloc-drbd.json \
test/data/htools/hbal-cpu-speed.data \
test/data/htools/hbal-dyn.data \
test/data/htools/hbal-evac.data \
test/data/htools/hbal-excl-tags.data \
test/data/htools/hbal-split-insts.data \
test/data/htools/hspace-tiered-dualspec-exclusive.data \
......@@ -1411,6 +1417,7 @@ TEST_FILES = \
test/hs/shelltests/htools-dynutil.test \
test/hs/shelltests/htools-excl.test \
test/hs/shelltests/htools-hail.test \
test/hs/shelltests/htools-hbal-evac.test \
test/hs/shelltests/htools-hroller.test \
test/hs/shelltests/htools-hspace.test \
test/hs/shelltests/htools-hsqueeze.test \
......
......@@ -83,6 +83,9 @@ New features
``--gluster-storage-dir`` switch.
- Job scheduling is now handled by luxid, and the maximal number of jobs running
in parallel is a run-time parameter of the cluster.
- A new tool for planning dynamic power management, called ``hsqueeze``, has
been added. It suggests nodes to power up or down and corresponding instance
moves.
New dependencies
~~~~~~~~~~~~~~~~
......
......@@ -108,6 +108,15 @@ AC_ARG_ENABLE([haskell-tests],
AC_SUBST(HTEST, $HTEST)
AM_CONDITIONAL([HTEST], [test "$HTEST" = yes])
# --with-haskell-flags=
AC_ARG_WITH([haskell-flags],
[AS_HELP_STRING([--with-haskell-flags=FLAGS],
[Extra flags to pass to GHC]
)],
[hextra_configure="$withval"],
[hextra_configure=""])
AC_SUBST(HEXTRA_CONFIGURE, $hextra_configure)
# --with-ssh-initscript=...
AC_ARG_WITH([ssh-initscript],
[AS_HELP_STRING([--with-ssh-initscript=SCRIPT],
......
......@@ -23,6 +23,7 @@ Design document drafts
design-node-security.rst
design-systemd.rst
design-cpu-speed.rst
design-performance-tests.rst
.. vim: set textwidth=72 :
.. Local Variables:
......
========================
Performance tests for QA
========================
.. contents:: :depth: 4
This design document describes performance tests to be added to QA in
order to measure performance changes over time.
Current state and shortcomings
==============================
Currently, only functional QA tests are performed. Those tests verify
the correct behaviour of Ganeti in various configurations, but are not
designed to continuously monitor the performance of Ganeti.
The current QA tests don't execute multiple tasks/jobs in parallel.
Therefore, the locking part of Ganeti does not really receive any
testing, neither functional nor performance wise.
On the plus side, Ganeti's QA code does already measure the runtime of
individual tests, which is leveraged in this design.
Proposed changes
================
The tests to be added in the context of this design document focus on
two areas:
* Job queue performance. How does Ganeti handle a lot of submitted
jobs?
* Parallel job execution performance. How well does Ganeti
parallelize jobs?
Jobs are submitted to the job queue in sequential order, but the
execution of the jobs runs in parallel. All job submissions must
complete within a reasonable timeout.
In order to make it easier to recognize performance related tests, all
tests added in the context of this design get a description with a
"PERFORMANCE: " prefix.
Job queue performance
---------------------
Tests targeting the job queue should eliminate external factors (like
network/disk performance or hypervisor delays) as much as possible, so
they are designed to run in a vcluster QA environment.
The following tests are added to the QA:
* Submit the maximum amount of instance create jobs in parallel. As
soon as a creation job succeeds, submit a removal job for this
instance.
* Submit as many instance create jobs as there are nodes in the
cluster in parallel (for non-redundant instances). Removal jobs
as above.
* For the maximum amount of instances in the cluster, submit modify
jobs (modify hypervisor and backend parameters) in parallel.
* For the maximum amount of instances in the cluster, submit stop,
start, reboot and reinstall jobs in parallel.
* For the maximum amount of instances in the cluster, submit multiple
list and info jobs in parallel.
* For the maximum amount of instances in the cluster, submit move
jobs in parallel. While the move operations are running, get
instance information using info jobs. Those jobs are required to
return within a reasonable low timeout.
* For the maximum amount of instances in the cluster, submit add-,
remove- and list-tags jobs.
* Submit 200 `gnt-debug delay` jobs with a delay of 0.1 seconds. To
speed up submission, perform multiple job submissions in parallel.
Verify that submitting jobs doesn't significantly slow down during
the process. Verify that querying cluster information over CLI and
RAPI succeeds in a timely fashion with the delay jobs
running/queued.
Parallel job execution performance
----------------------------------
Tests targeting the performance of parallel execution of "real" jobs
in close-to-production clusters should actually perform all operations,
such as creating disks and starting instances. This way, real world
locking or waiting issues can be reproduced. Performing all those
operations does requires quite some time though, so only a smaller
number of instances and parallel jobs can be tested realistically.
The following tests are added to the QA:
* Submitting twice as many instance creation request as there are
nodes in the cluster, using DRBD as disk template. As soon as a
creation job succeeds, submit a removal job for this instance.
* Submitting twice as many instance creation request as there are
nodes in the cluster, using Plain as disk template. As soon as a
creation job succeeds, submit a removal job for this instance.
This test can make better use of parallelism because only one
node must be locked for an instance creation.
* Create an instance using DRBD. Fail it over, migrate it, change
its secondary node, reboot it and reinstall it while creating an
additional instance in parallel to each of those operations.
Future work
===========
Based on test results of the tests listed above, additional tests can
be added to cover more real-world use-cases. Also, based on user
requests, specially crafted performance tests modeling those workloads
can be added too.
Additionally, the correlations between job submission time and job
queue size could be detected. Therefore, a snapshot of the job queue
before job submission could be taken to measure job submission time
based on the jobs in the queue.
.. vim: set textwidth=72 :
.. Local Variables:
.. mode: rst
.. fill-column: 72
.. End:
......@@ -308,6 +308,30 @@ def RunNodeSetupCmd(cluster_name, node, basecmd, debug, verbose,
if verbose:
cmd.append("--verbose")
logging.debug("Node setup command: %s", cmd)
version = constants.DIR_VERSION
all_cmds = [["test", "-d", os.path.join(pathutils.PKGLIBDIR, version)]]
if constants.HAS_GNU_LN:
all_cmds.extend([["ln", "-s", "-f", "-T",
os.path.join(pathutils.PKGLIBDIR, version),
os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
["ln", "-s", "-f", "-T",
os.path.join(pathutils.SHAREDIR, version),
os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
else:
all_cmds.extend([["rm", "-f",
os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
["ln", "-s", "-f",
os.path.join(pathutils.PKGLIBDIR, version),
os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
["rm", "-f",
os.path.join(pathutils.SYSCONFDIR, "ganeti/share")],
["ln", "-s", "-f",
os.path.join(pathutils.SHAREDIR, version),
os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
all_cmds.append(cmd)
if port is None:
port = netutils.GetDaemonPort(constants.SSH)
......@@ -315,7 +339,8 @@ def RunNodeSetupCmd(cluster_name, node, basecmd, debug, verbose,
srun = ssh.SshRunner(cluster_name,
ipv6=(family == netutils.IP6Address.family))
scmd = srun.BuildCmd(node, constants.SSH_LOGIN_USER,
utils.ShellQuoteArgs(cmd),
utils.ShellQuoteArgs(
utils.ShellCombineCommands(all_cmds)),
batch=False, ask_key=ask_key, quiet=False,
strict_host_check=strict_host_check,
use_cluster_key=use_cluster_key,
......
......@@ -2686,10 +2686,10 @@ class TLReplaceDisks(Tasklet):
for to_node, to_result in result.items():
msg = to_result.fail_msg
if msg:
self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
self.cfg.GetNodeName(to_node), msg,
hint=("please do a gnt-instance info to see the"
" status of disks"))
raise errors.OpExecError(
"Can't attach drbd disks on node %s: %s (please do a gnt-instance "
"info to see the status of disks)" %
(self.cfg.GetNodeName(to_node), msg))
cstep = itertools.count(5)
......
......@@ -552,6 +552,13 @@ class KVMHypervisor(hv_base.BaseHypervisor):
"""
return utils.PathJoin(cls._CTRL_DIR, "%s.qmp" % instance_name)
@classmethod
def _InstanceKvmdMonitor(cls, instance_name):
"""Returns the instance kvm daemon socket name
"""
return utils.PathJoin(cls._CTRL_DIR, "%s.kvmd" % instance_name)
@classmethod
def _InstanceShutdownMonitor(cls, instance_name):
"""Returns the instance QMP output filename
......@@ -1593,6 +1600,9 @@ class KVMHypervisor(hv_base.BaseHypervisor):
logging.debug("Enabling QMP")
kvm_cmd.extend(["-qmp", "unix:%s,server,nowait" %
self._InstanceQmpMonitor(instance.name)])
# Add a second monitor for kvmd
kvm_cmd.extend(["-qmp", "unix:%s,server,nowait" %
self._InstanceKvmdMonitor(instance.name)])
# Configure the network now for starting instances and bridged interfaces,
# during FinalizeMigration for incoming instances' routed interfaces
......
......@@ -39,6 +39,7 @@ TUNGETIFF = 0x800454d2
TUNGETFEATURES = 0x800454cf
IFF_TAP = 0x0002
IFF_NO_PI = 0x1000
IFF_ONE_QUEUE = 0x2000
IFF_VNET_HDR = 0x4000
......@@ -112,7 +113,7 @@ def OpenTap(vnet_hdr=True, name=""):
except EnvironmentError:
raise errors.HypervisorError("Failed to open /dev/net/tun")
flags = IFF_TAP | IFF_NO_PI
flags = IFF_TAP | IFF_NO_PI | IFF_ONE_QUEUE
if vnet_hdr and _ProbeTapVnetHdr(tapfd):
flags |= IFF_VNET_HDR
......
......@@ -242,6 +242,13 @@ def ShellQuoteArgs(args):
return " ".join([ShellQuote(i) for i in args])
def ShellCombineCommands(cmdlist):
"""Out of a list of shell comands construct a single one.
"""
return ["/bin/sh", "-c", " && ".join(ShellQuoteArgs(c) for c in cmdlist)]
class ShellWriter:
"""Helper class to write scripts with indentation.
......
......@@ -33,6 +33,7 @@ Algorithm options:
**[ \--ignore-dynu ]**
**[ \--mond *yes|no* ]**
**[ \--evac-mode ]**
**[ \--restricted-migration ]**
**[ \--select-instances *inst...* ]**
**[ \--exclude-instances *inst...* ]**
......@@ -307,6 +308,15 @@ The options that can be passed to the program are as follows:
(bulk) replacement for Ganeti's own *gnt-node evacuate*, with the
note that it doesn't guarantee full evacuation.
\--restricted-migration
This parameter disallows any replace-primary moves (frf), as well as
those replace-and-failover moves (rf) where the primary node of the
instance is not drained. If used together with the ``--evac-mode``
option, the only migrations that hbal will do are migrations of
instances off a drained node. This can be useful if during a reinstall
of the base operating system migration is only possible from the old
OS to the new OS.
\--select-instances=*instances*
This parameter marks the given instances (as a comma-separated list)
as the only ones being moved during the rebalance.
......
......@@ -44,6 +44,7 @@ import qa_monitoring
import qa_network
import qa_node
import qa_os
import qa_performance
import qa_job
import qa_rapi
import qa_tags
......@@ -851,6 +852,43 @@ def RunMonitoringTests():
RunTest(qa_monitoring.TestInstStatusCollector)
def RunPerformanceTests():
if qa_config.TestEnabled("jobqueue-performance"):
RunTest(qa_performance.TestParallelMaxInstanceCreationPerformance)
RunTest(qa_performance.TestParallelNodeCountInstanceCreationPerformance)
instances = qa_performance.CreateAllInstances()
RunTest(qa_performance.TestParallelModify, instances)
RunTest(qa_performance.TestParallelInstanceOSOperations, instances)
RunTest(qa_performance.TestParallelInstanceQueries, instances)
qa_performance.RemoveAllInstances(instances)
RunTest(qa_performance.TestJobQueueSubmissionPerformance)
if qa_config.TestEnabled("parallel-performance"):
RunTest(qa_performance.TestParallelDRBDInstanceCreationPerformance)
RunTest(qa_performance.TestParallelPlainInstanceCreationPerformance)
if qa_config.IsTemplateSupported(constants.DT_DRBD8):
inodes = qa_config.AcquireManyNodes(2)
try:
instance = qa_instance.TestInstanceAddWithDrbdDisk(inodes)
try:
RunTest(qa_performance.TestParallelInstanceFailover, instance)
RunTest(qa_performance.TestParallelInstanceMigration, instance)
RunTest(qa_performance.TestParallelInstanceReplaceDisks, instance)
RunTest(qa_performance.TestParallelInstanceReboot, instance)
RunTest(qa_performance.TestParallelInstanceReinstall, instance)
RunTest(qa_performance.TestParallelInstanceRename, instance)
finally:
qa_instance.TestInstanceRemove(instance)
instance.Release()
finally:
qa_config.ReleaseManyNodes(inodes)
def RunQa():
"""Main QA body.
......@@ -990,6 +1028,8 @@ def RunQa():
RunTestBlock(RunMonitoringTests)
RunPerformanceTests()
RunTestIf("create-cluster", qa_node.TestNodeRemoveAll)
RunTestIf("cluster-destroy", qa_cluster.TestClusterDestroy)
......
......@@ -66,9 +66,12 @@ def ExecuteJobProducingCommand(cmd):
@param cmd: The command to execute, broken into constituent components.
"""
job_id_output = _GetOutputFromMaster(cmd)
job_id_output = GetOutputFromMaster(cmd)
possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output)
# Usually, the output contains "JobID: <job_id>", but for instance related
# commands, the output is of the form "<job_id>: <instance_name>"
possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output) or \
re.findall("([0-9]+): .+", job_id_output)
if len(possible_job_ids) != 1:
raise qa_error.Error("Cannot parse command output to find job id: output "
"is %s" % job_id_output)
......@@ -150,7 +153,7 @@ def _GetNodeUUIDMap(nodes):
"""
cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
cmd.extend(nodes)
output = _GetOutputFromMaster(cmd)
output = GetOutputFromMaster(cmd)
return dict(map(lambda x: x.split(), output.splitlines()))
......@@ -200,7 +203,7 @@ def _GetBlockingLocks():
# Due to mysterious issues when a SSH multiplexer is being used by two
# threads, we turn it off, and block most of the logging to improve the
# visibility of the other thread's output
locks_output = _GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
locks_output = GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
log_cmd=False)
# The first non-empty line is the header, which we do not need
......
#
#
# Copyright (C) 2014 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Performance testing QA tests.
"""
import datetime
import functools
import itertools
import threading
import time
from ganeti import constants
import qa_config
import qa_error
from qa_instance_utils import GetGenericAddParameters
import qa_job_utils
import qa_logging
import qa_utils
MAX_JOB_SUBMISSION_DURATION = 15.0
class _JobQueueDriver(object):
"""This class handles polling of jobs and reacting on status changes.
Jobs are added via the L{AddJob} method, and can have callback functions
assigned to them. Those are called as soon as the job enters the appropriate
state. Callback functions can add new jobs to the driver as needed.
A call to L{WaitForCompletion} finally polls Ganeti until all jobs have
succeeded.
"""
_UNKNOWN_STATUS = "unknown"
class _JobEntry(object):
"""Internal class representing a job entry.
"""
def __init__(self, job_id, running_fn, success_fn):
self.job_id = job_id
self.running_fn = running_fn
self.success_fn = success_fn
def __str__(self):
return str(self.job_id)
def __init__(self):
self._jobs = {}
self._running_notified = set()
self._jobs_per_status = {}
self._lock = threading.RLock()
def AddJob(self, job_id, running_fn=None, success_fn=None):
"""Add a job to the driver.
@type job_id: of ints
@param job_id: job id to add to the driver
@type running_fn: function taking a L{_JobQueueDriver} and an int
@param running_fn: function called once when a job changes to running state
(or success state, if the running state was too short)
@type success_fn: function taking a L{_JobQueueDriver} and an int
@param success_fn: function called for each successful job id
"""
with self._lock:
self._jobs[job_id] = _JobQueueDriver._JobEntry(job_id,
running_fn,
success_fn)
# the status will be updated on the next call to _FetchJobStatuses
self._jobs_per_status.setdefault(self._UNKNOWN_STATUS, []).append(job_id)
def _FetchJobStatuses(self):
"""Retrieves status information of the given jobs.
@rtype: dict of string to list of L{_JobEntry)
"""
cmd = (["gnt-job", "list", "--no-headers", "-o", "id,status"])
cmd.extend(map(str, self._GetJobIds()))
job_statuses = [line.split() for line in
qa_job_utils.GetOutputFromMaster(cmd).splitlines()]
new_statuses = {}
for job_id, status in job_statuses:
new_statuses.setdefault(status, []).append(self._jobs[int(job_id)])
self._jobs_per_status = new_statuses
def _GetJobIds(self):
return list(self._jobs.keys())
def _GetJobsInStatuses(self, statuses):
"""Returns a list of L{_JobEntry} of all jobs in the given statuses.
@type statuses: iterable of strings
@param statuses: jobs in those statuses are returned
@rtype: list of L{_JobEntry}
@return: list of job entries in the requested statuses
"""
ret = []
for state in statuses:
ret.extend(self._jobs_per_status.get(state, []))
return ret
def _UpdateJobStatuses(self):
"""Retrieves job statuses from the cluster and updates internal state.
"""
self._FetchJobStatuses()
error_jobs = self._GetJobsInStatuses([constants.JOB_STATUS_ERROR])
if error_jobs:
raise qa_error.Error(
"Jobs %s are in error state!" % [job.job_id for job in error_jobs])
for job in self._GetJobsInStatuses([constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_SUCCESS]):
if job.job_id not in self._running_notified:
if job.running_fn is not None:
job.running_fn(self, job.job_id)
self._running_notified.add(job.job_id)
for job in self._GetJobsInStatuses([constants.JOB_STATUS_SUCCESS]):
if job.success_fn is not None:
job.success_fn(self, job.job_id)
# we're done with this job
del self._jobs[job.job_id]
def _HasPendingJobs(self):
"""Checks if there are still jobs pending.
@rtype: bool
@return: C{True} if there are still jobs which have not succeeded
"""
with self._lock: