Commit f9d0783b authored by Iustin Pop's avatar Iustin Pop
Browse files

Merge branch 'stable-2.1' into devel-2.1

* stable-2.1: (22 commits)
  Fix long-standing race condition bug in locking unittest
  confd client: copy the peers in UpdatePeerList
  testutils: Print name of test program before running it
  Don't use hardcoded name for pylint
  Partially revert "Makefile.am: Run pylint on all Python code"
  build-bash-completion: Take care of pylint warnings
  Makefile.am: Run pylint on all Python code
  Small improvements for release script
  check-python-code: Use “set -e” to abort on errors
  build-bash-completion: Fix a few pylint warnings
  Generate hmac file with a newline at the end
  jqueue: Don't return negative number for unchecked jobs when archiving
  cli.GenerateTable: Don't write EOL spaces
  Improve logging for workerpool tasks by providing __repr__
  workerpool: Simplify log messages
  workerpool: Use worker name as thread name
  workerpool: Make worker ID alphanumeric
  locking: Fix race condition in LockSet
  mcpu: Log lock status with sorted names
  locking: Append to list outside error handling block
  ...
parents 1b3a7656 51e3bb92
......@@ -253,6 +253,8 @@ EXTRA_DIST = \
doc/examples/ganeti.cron.in \
doc/examples/gnt-config-backup.in \
doc/examples/dumb-allocator \
doc/examples/ganeti.default \
doc/examples/ganeti.default-debug \
doc/examples/hooks/ethers \
doc/examples/hooks/ipsec.in \
test/testutils.py \
......@@ -351,9 +353,15 @@ srclink_files = \
$(all_python_code)
check_python_code = \
autotools/build-bash-completion \
$(BUILD_BASH_COMPLETION) \
$(all_python_code)
lint_python_code = \
ganeti \
$(dist_sbin_SCRIPTS) \
$(dist_tools_SCRIPTS) \
$(BUILD_BASH_COMPLETION)
devel/upload: devel/upload.in $(REPLACE_VARS_SED)
sed -f $(REPLACE_VARS_SED) < $< > $@
chmod u+x $@
......@@ -475,7 +483,8 @@ check-local:
.PHONY: lint
lint: ganeti
pylint $(LINT_OPTS) ganeti $(dist_sbin_SCRIPTS) $(dist_tools_SCRIPTS)
@test -n "$(PYLINT)" || { echo 'pylint' not found during configure; exit 1; }
$(PYLINT) $(LINT_OPTS) $(lint_python_code)
# a dist hook rule for catching revision control directories
distcheck-hook:
......
......@@ -19,9 +19,14 @@
# 02110-1301, USA.
import optparse
"""Script to generate bash_completion script for Ganeti.
"""
# pylint: disable-msg=C0103
# [C0103] Invalid name build-bash-completion
import os
import sys
import re
from cStringIO import StringIO
......@@ -263,6 +268,8 @@ class CompletionWriter:
self.args = args
for opt in opts:
# While documented, these variables aren't seen as public attributes by
# pylint. pylint: disable-msg=W0212
opt.all_names = sorted(opt._short_opts + opt._long_opts)
def _FindFirstArgument(self, sw):
......@@ -591,7 +598,7 @@ def GetCommands(filename, module):
"""
try:
commands = getattr(module, "commands")
except AttributeError, err:
except AttributeError:
raise Exception("Script %s doesn't have 'commands' attribute" %
filename)
......
......@@ -18,7 +18,11 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
let problems=0
set -e
# "[...] If the last ARG evaluates to 0, let returns 1; 0 is returned
# otherwise.", hence ignoring the return value.
let problems=0 || :
for script; do
if grep -n -H -F $'\t' "$script"; then
......
......@@ -153,6 +153,14 @@ then
AC_MSG_WARN([dot (from the graphviz suite) not found, documentation rebuild not possible])
fi
# Check for pylint
AC_ARG_VAR(PYLINT, [pylint path])
AC_PATH_PROG(PYLINT, [pylint], [])
if test -z "$PYLINT"
then
AC_MSG_WARN([pylint not found, checking code will not be possible])
fi
# Check for socat
AC_ARG_VAR(SOCAT, [socat path])
AC_PATH_PROG(SOCAT, [socat], [])
......
......@@ -101,7 +101,8 @@ class IOServer(SocketServer.UnixStreamServer):
def setup_queue(self):
self.context = GanetiContext()
self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
self.request_workers = workerpool.WorkerPool("ClientReq",
CLIENT_REQUEST_WORKERS,
ClientRequestWorker)
def process_request(self, request, client_address):
......
......@@ -32,18 +32,35 @@ set -e
: ${URL:=git://git.ganeti.org/ganeti.git}
TAG="$1"
TMPDIR=`mktemp -d`
if [[ -z "$TAG" ]]; then
echo "Usage: $0 <tree-ish>" >&2
exit 1
fi
echo "Using Git repository $URL"
TMPDIR=$(mktemp -d -t gntrelease.XXXXXXXXXX)
cd $TMPDIR
echo "Cloning the repository under $TMPDIR ..."
git clone -q "$URL" dist
cd dist
git checkout $TAG
./autogen.sh
./configure
VERSION=$(sed -n -e '/^PACKAGE_VERSION =/ s/^PACKAGE_VERSION = // p' Makefile)
make distcheck
fakeroot make dist
tar tzvf ganeti-$VERSION.tar.gz
echo
echo 'MD5:'
md5sum ganeti-$VERSION.tar.gz
echo
echo 'SHA1:'
sha1sum ganeti-$VERSION.tar.gz
echo
echo "The archive is at $PWD/ganeti-$VERSION.tar.gz"
echo "Please copy it and remove the temporary directory when done."
......@@ -78,6 +78,10 @@ make $make_args install DESTDIR="$TXD"
install -D --mode=0755 doc/examples/ganeti.initd \
"$TXD/$SYSCONFDIR/init.d/ganeti"
[ -f doc/examples/ganeti.default-debug ] && \
install -D --mode=0644 doc/examples/ganeti.default-debug \
"$TXD/$SYSCONFDIR/default/ganeti"
[ -f doc/examples/bash_completion ] && \
install -D --mode=0644 doc/examples/bash_completion \
"$TXD/$SYSCONFDIR/bash_completion.d/ganeti"
......
# Default arguments for Ganeti daemons
NODED_ARGS=""
MASTERD_ARGS=""
RAPI_ARGS=""
CONFD_ARGS=""
# Default arguments for Ganeti daemons (debug mode)
NODED_ARGS="-d"
MASTERD_ARGS="-d"
RAPI_ARGS="-d"
CONFD_ARGS="-d"
......@@ -107,7 +107,7 @@ def GenerateHmacKey(file_name):
@param file_name: Path to output file
"""
utils.WriteFile(file_name, data=utils.GenerateSecret(), mode=0400)
utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400)
def _InitGanetiServerSetup(master_name):
......
......@@ -1575,7 +1575,10 @@ def GenerateTable(headers, fields, separator, data,
for idx, name in enumerate(fields):
hdr = headers[name]
if separator is None:
mlens[idx] = max(mlens[idx], len(hdr))
if idx == len(fields) - 1 and not numfields.Matches(name):
mlens[idx] = 0
else:
mlens[idx] = max(mlens[idx], len(hdr))
args.append(mlens[idx])
args.append(hdr)
result.append(format % tuple(args))
......
......@@ -133,7 +133,8 @@ class ConfdClient:
# pylint: disable-msg=W0201
if not isinstance(peers, list):
raise errors.ProgrammerError("peers must be a list")
self._peers = peers
# make a copy of peers, since we're going to shuffle the list, later
self._peers = list(peers)
def _PackRequest(self, request, now=None):
"""Prepare a request to be sent on the wire.
......
......@@ -95,6 +95,14 @@ class HttpClientRequest(object):
self.resp_headers = None
self.resp_body = None
def __repr__(self):
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
"%s:%s" % (self.host, self.port),
self.method,
self.path]
return "<%s at %#x>" % (" ".join(status), id(self))
class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
pass
......@@ -328,6 +336,12 @@ class _HttpClientPendingRequest(object):
# Thread synchronization
self.done = threading.Event()
def __repr__(self):
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
"req=%r" % self.request]
return "<%s at %#x>" % (" ".join(status), id(self))
class HttpClientWorker(workerpool.BaseWorker):
"""HTTP client worker class.
......@@ -342,7 +356,8 @@ class HttpClientWorker(workerpool.BaseWorker):
class HttpClientWorkerPool(workerpool.WorkerPool):
def __init__(self, manager):
workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
workerpool.WorkerPool.__init__(self, "HttpClient",
HTTP_CLIENT_THREADS,
HttpClientWorker)
self.manager = manager
......
......@@ -190,6 +190,13 @@ class _QueuedJob(object):
# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
def __repr__(self):
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
"id=%s" % self.id,
"ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
return "<%s at %#x>" % (" ".join(status), id(self))
@classmethod
def Restore(cls, queue, state):
"""Restore a _QueuedJob from serialized state:
......@@ -430,8 +437,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
@param job: the job to be processed
"""
logging.info("Worker %s processing job %s",
self.worker_id, job.id)
logging.info("Processing job %s", job.id)
proc = mcpu.Processor(self.pool.queue.context, job.id)
queue = job.queue
try:
......@@ -527,8 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
finally:
queue.release()
logging.info("Worker %s finished job %s, status = %s",
self.worker_id, job_id, status)
logging.info("Finished job %s, status = %s", job_id, status)
class _JobQueueWorkerPool(workerpool.WorkerPool):
......@@ -536,7 +541,8 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
"""
def __init__(self, queue):
super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
super(_JobQueueWorkerPool, self).__init__("JobQueue",
JOBQUEUE_THREADS,
_JobQueueWorker)
self.queue = queue
......@@ -1315,7 +1321,7 @@ class JobQueue(object):
all_job_ids = self._GetJobIDsUnlocked(archived=False)
pending = []
for idx, job_id in enumerate(all_job_ids):
last_touched = idx
last_touched = idx + 1
# Not optimal because jobs could be pending
# TODO: Measure average duration for job archival and take number of
......@@ -1345,7 +1351,7 @@ class JobQueue(object):
if pending:
archived_count += self._ArchiveJobsUnlocked(pending)
return (archived_count, len(all_job_ids) - last_touched - 1)
return (archived_count, len(all_job_ids) - last_touched)
@staticmethod
def _GetJobInfoUnlocked(job, fields):
......
......@@ -775,7 +775,9 @@ class LockSet:
def _release_and_delete_owned(self):
"""Release and delete all resources owned by the current thread"""
for lname in self._list_owned():
self.__lockdict[lname].release()
lock = self.__lockdict[lname]
if lock._is_owned():
lock.release()
self._del_owned(name=lname)
def __names(self):
......@@ -839,8 +841,6 @@ class LockSet:
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
else:
names = sorted(names)
return self.__acquire_inner(names, False, shared,
running_timeout.Remaining, test_notify)
......@@ -889,11 +889,11 @@ class LockSet:
# First we look the locks up on __lockdict. We have no way of being sure
# they will still be there after, but this makes it a lot faster should
# just one of them be the already wrong
for lname in utils.UniqueSequence(names):
# just one of them be the already wrong. Using a sorted sequence to prevent
# deadlocks.
for lname in sorted(utils.UniqueSequence(names)):
try:
lock = self.__lockdict[lname] # raises KeyError if lock is not there
acquire_list.append((lname, lock))
except KeyError:
if want_all:
# We are acquiring all the set, it doesn't matter if this particular
......@@ -902,6 +902,8 @@ class LockSet:
raise errors.LockError("Non-existing lock in set (%s)" % lname)
acquire_list.append((lname, lock))
# This will hold the locknames we effectively acquired.
acquired = set()
......
......@@ -275,7 +275,7 @@ class Processor(object):
elif isinstance(names, basestring):
parts.append(names)
else:
parts.append(",".join(names))
parts.append(",".join(sorted(names)))
if shared:
parts.append("shared")
......
......@@ -42,9 +42,8 @@ class BaseWorker(threading.Thread, object):
@param worker_id: identifier for this worker
"""
super(BaseWorker, self).__init__()
super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
self.worker_id = worker_id
self._current_task = None
def ShouldTerminate(self):
......@@ -89,12 +88,12 @@ class BaseWorker(threading.Thread, object):
# We only wait if there's no task for us.
if not pool._tasks:
logging.debug("Worker %s: waiting for tasks", self.worker_id)
logging.debug("Waiting for tasks")
# wait() releases the lock and sleeps until notified
pool._pool_to_worker.wait()
logging.debug("Worker %s: notified while waiting", self.worker_id)
logging.debug("Notified while waiting")
# Were we woken up in order to terminate?
if pool._ShouldWorkerTerminateUnlocked(self):
......@@ -114,14 +113,11 @@ class BaseWorker(threading.Thread, object):
# Run the actual task
try:
logging.debug("Worker %s: starting task %r",
self.worker_id, self._current_task)
logging.debug("Starting task %r", self._current_task)
self.RunTask(*self._current_task)
logging.debug("Worker %s: done with task %r",
self.worker_id, self._current_task)
logging.debug("Done with task %r", self._current_task)
except: # pylint: disable-msg=W0702
logging.error("Worker %s: Caught unhandled exception",
self.worker_id, exc_info=True)
logging.exception("Caught unhandled exception")
finally:
# Notify pool
pool._lock.acquire()
......@@ -132,7 +128,7 @@ class BaseWorker(threading.Thread, object):
finally:
pool._lock.release()
logging.debug("Worker %s: terminates", self.worker_id)
logging.debug("Terminates")
def RunTask(self, *args):
"""Function called to start a task.
......@@ -153,7 +149,7 @@ class WorkerPool(object):
guaranteed to finish in the same order.
"""
def __init__(self, num_workers, worker_class):
def __init__(self, name, num_workers, worker_class):
"""Constructor for worker pool.
@param num_workers: number of workers to be started
......@@ -168,6 +164,7 @@ class WorkerPool(object):
self._pool_to_worker = threading.Condition(self._lock)
self._worker_to_pool = threading.Condition(self._lock)
self._worker_class = worker_class
self._name = name
self._last_worker_id = 0
self._workers = []
self._quiescing = False
......@@ -253,7 +250,8 @@ class WorkerPool(object):
"""
self._last_worker_id += 1
return self._last_worker_id
return "%s%d" % (self._name, self._last_worker_id)
def _ResizeUnlocked(self, num_workers):
"""Changes the number of workers.
......
......@@ -604,7 +604,7 @@ class TestSharedLock(_ThreadedTestCase):
@_Repeat
def testMixedAcquireTimeout(self):
sync = threading.Condition()
sync = threading.Event()
def _AcquireShared(ev):
if not self.sl.acquire(shared=1, timeout=None):
......@@ -615,12 +615,8 @@ class TestSharedLock(_ThreadedTestCase):
# Notify main thread
ev.set()
# Wait for notification
sync.acquire()
try:
sync.wait()
finally:
sync.release()
# Wait for notification from main thread
sync.wait()
# Release lock
self.sl.release()
......@@ -641,7 +637,7 @@ class TestSharedLock(_ThreadedTestCase):
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Acquire exclusive without timeout
exclsync = threading.Condition()
exclsync = threading.Event()
exclev = threading.Event()
def _AcquireExclusive():
......@@ -653,11 +649,8 @@ class TestSharedLock(_ThreadedTestCase):
# Notify main thread
exclev.set()
exclsync.acquire()
try:
exclsync.wait()
finally:
exclsync.release()
# Wait for notification from main thread
exclsync.wait()
self.sl.release()
......@@ -667,11 +660,7 @@ class TestSharedLock(_ThreadedTestCase):
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Make all shared holders release their locks
sync.acquire()
try:
sync.notifyAll()
finally:
sync.release()
sync.set()
# Wait for exclusive acquire to succeed
exclev.wait()
......@@ -690,11 +679,7 @@ class TestSharedLock(_ThreadedTestCase):
self._addThread(target=_AcquireSharedSimple)
# Tell exclusive lock to release
exclsync.acquire()
try:
exclsync.notifyAll()
finally:
exclsync.release()
exclsync.set()
# Wait for everything to finish
self._waitThreads()
......
......@@ -62,7 +62,7 @@ class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
def testDummy(self):
wp = workerpool.WorkerPool(3, DummyBaseWorker)
wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
......@@ -75,7 +75,7 @@ class TestWorkerpool(unittest.TestCase):
self._CheckWorkerCount(wp, 0)
def testNoTasks(self):
wp = workerpool.WorkerPool(3, DummyBaseWorker)
wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
......@@ -84,7 +84,7 @@ class TestWorkerpool(unittest.TestCase):
self._CheckWorkerCount(wp, 0)
def testNoTasksQuiesce(self):
wp = workerpool.WorkerPool(3, DummyBaseWorker)
wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
......@@ -97,7 +97,7 @@ class TestWorkerpool(unittest.TestCase):
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.
wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
......
......@@ -22,6 +22,7 @@
"""Utilities for unit testing"""
import os
import sys
import stat
import tempfile
import unittest
......@@ -40,6 +41,10 @@ class GanetiTestProgram(unittest.TestProgram):
"""
logging.basicConfig(filename=os.devnull)
sys.stderr.write("Running %s\n" % self.progName)
sys.stderr.flush()
return unittest.TestProgram.runTests(self)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment