diff --git a/NEWS b/NEWS index e1d5446863290b0dab39ee4eb6e121031a62a1d1..333a89109f657cbf208aee6f3a6999670e0b00b6 100644 --- a/NEWS +++ b/NEWS @@ -1,6 +1,26 @@ News ==== +Version 2.2.0 rc1 +----------------- + +*(Released Mon, 23 Aug 2010)* + +- Support DRBD versions of the format "a.b.c.d" +- Updated manpages +- Re-introduce support for usage from multiple threads in RAPI client +- Instance renames and modify via RAPI +- Work around race condition between processing and archival in job + queue +- Mark opcodes following failed one as failed, too +- Job field ``lock_status`` was removed due to difficulties making it + work with the changed job queue in Ganeti 2.2; a better way to monitor + locks is expected for a later 2.2.x release +- Fixed dry-run behaviour with many commands +- Support ``ssh-agent`` again when adding nodes +- Many additional bugfixes + + Version 2.2.0 rc0 ----------------- @@ -105,6 +125,22 @@ Version 2.2.0 beta 0 see the ``ganeti-os-interface(7)`` manpage and look for ``EXP_SIZE_FD`` +Version 2.1.7 +------------- + +*(Released Tue, 24 Aug 2010)* + +Bugfixes only: + - Don't ignore secondary node silently on non-mirrored disk templates + (issue 113) + - Fix --master-netdev arg name in gnt-cluster(8) (issue 114) + - Fix usb_mouse parameter breaking with vnc_console (issue 109) + - Properly document the usb_mouse parameter + - Fix path in ganeti-rapi(8) (issue 116) + - Adjust error message when the ganeti user's .ssh directory is + missing + - Add same-node-check when changing the disk template to drbd + Version 2.1.6 ------------- diff --git a/configure.ac b/configure.ac index 541f794aa0e9d7dc929a8e957250f6308f5fcab8..d0f3a82c748b4a7b9837ff8c49099cf2d42e95db 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ m4_define([gnt_version_major], [2]) m4_define([gnt_version_minor], [2]) m4_define([gnt_version_revision], [0]) -m4_define([gnt_version_suffix], [~rc0]) +m4_define([gnt_version_suffix], [~rc1]) m4_define([gnt_version_full], m4_format([%d.%d.%d%s], gnt_version_major, gnt_version_minor, diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 53288623ae89ed49b57ea8f84b6704f2dbc85b02..0791c4c1ffe5580d35b09648c121f9033a7d597c 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -277,6 +277,11 @@ class ClientOps: op = opcodes.OpGetTags(kind=kind, name=name) return self._Query(op) + elif method == luxi.REQ_QUERY_LOCKS: + (fields, sync) = args + logging.info("Received locks query request") + return self.server.context.glm.QueryLocks(fields, sync) + elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: drain_flag = args logging.info("Received queue drain flag change request to %s", diff --git a/lib/cli.py b/lib/cli.py index 435b4002a65dd903033a1e46f16509646b1b00fa..979db4b7583014482b0bcdb83a2dc7b55c96ae28 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -84,6 +84,7 @@ __all__ = [ "IGNORE_REMOVE_FAILURES_OPT", "IGNORE_SECONDARIES_OPT", "IGNORE_SIZE_OPT", + "INTERVAL_OPT", "MAC_PREFIX_OPT", "MAINTAIN_NODE_HEALTH_OPT", "MASTER_NETDEV_OPT", @@ -196,6 +197,7 @@ __all__ = [ "cli_option", "SplitNodeOption", "CalculateOSNames", + "ParseFields", ] NO_PREFIX = "no_" @@ -929,6 +931,11 @@ SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout", default=constants.DEFAULT_SHUTDOWN_TIMEOUT, help="Maximum time to wait for instance shutdown") +INTERVAL_OPT = cli_option("--interval", dest="interval", type="int", + default=None, + help=("Number of seconds between repetions of the" + " command")) + EARLY_RELEASE_OPT = cli_option("--early-release", dest="early_release", default=False, action="store_true", @@ -1208,6 +1215,24 @@ def CalculateOSNames(os_name, os_variants): return [os_name] +def ParseFields(selected, default): + """Parses the values of "--field"-like options. + + @type selected: string or None + @param selected: User-selected options + @type default: list + @param default: Default fields + + """ + if selected is None: + return default + + if selected.startswith("+"): + return default + selected[1:].split(",") + + return selected.split(",") + + UsesRPC = rpc.RunWithRPC diff --git a/lib/jqueue.py b/lib/jqueue.py index 5c90ef8c61cdcfcaaff0af55b8d2a24298b26057..c1ba07c1edefbdc7fd1badbbf68e8999447e9bcf 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -690,6 +690,8 @@ class _JobQueueWorker(workerpool.BaseWorker): @param job: the job to be processed """ + self.SetTaskName("Job%s" % job.id) + logging.info("Processing job %s", job.id) proc = mcpu.Processor(self.pool.queue.context, job.id) queue = job.queue diff --git a/lib/locking.py b/lib/locking.py index 59ab7db47d19eb015ca506b6f72fa8aee01f3817..2aeb7d980bdd8081d8c911ff7c7ea93437181e1f 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -30,6 +30,8 @@ import select import threading import time import errno +import weakref +import logging from ganeti import errors from ganeti import utils @@ -409,6 +411,7 @@ class SharedLock(object): """ __slots__ = [ + "__weakref__", "__active_shr_c", "__inactive_shr_c", "__deleted", @@ -421,10 +424,12 @@ class SharedLock(object): __condition_class = PipeCondition - def __init__(self, name): + def __init__(self, name, monitor=None): """Construct a new SharedLock. @param name: the name of the lock + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register """ object.__init__(self) @@ -448,6 +453,55 @@ class SharedLock(object): # is this lock in the deleted state? self.__deleted = False + # Register with lock monitor + if monitor: + monitor.RegisterLock(self) + + def GetInfo(self, fields): + """Retrieves information for querying locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + self.__lock.acquire() + try: + info = [] + + # Note: to avoid unintentional race conditions, no references to + # modifiable objects should be returned unless they were created in this + # function. + for fname in fields: + if fname == "name": + info.append(self.name) + elif fname == "mode": + if self.__deleted: + info.append("deleted") + assert not (self.__exc or self.__shr) + elif self.__exc: + info.append("exclusive") + elif self.__shr: + info.append("shared") + else: + info.append(None) + elif fname == "owner": + if self.__exc: + owner = [self.__exc] + else: + owner = self.__shr + + if owner: + assert not self.__deleted + info.append([i.getName() for i in owner]) + else: + info.append(None) + else: + raise errors.OpExecError("Invalid query field '%s'" % fname) + + return info + finally: + self.__lock.release() + def __check_deleted(self): """Raises an exception if the lock has been deleted. @@ -671,6 +725,8 @@ class SharedLock(object): self.__deleted = True self.__exc = None + assert not (self.__exc or self.__shr), "Found owner during deletion" + # Notify all acquires. They'll throw an error. while self.__pending: self.__pending.pop().notifyAll() @@ -713,16 +769,21 @@ class LockSet: @ivar name: the name of the lockset """ - def __init__(self, members, name): + def __init__(self, members, name, monitor=None): """Constructs a new LockSet. @type members: list of strings @param members: initial members of the set + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register member locks """ assert members is not None, "members parameter is not a list" self.name = name + # Lock monitor + self.__monitor = monitor + # Used internally to guarantee coherency. self.__lock = SharedLock(name) @@ -731,7 +792,8 @@ class LockSet: self.__lockdict = {} for mname in members: - self.__lockdict[mname] = SharedLock("%s/%s" % (name, mname)) + self.__lockdict[mname] = SharedLock(self._GetLockName(mname), + monitor=monitor) # The owner dict contains the set of locks each thread owns. For # performance each thread can access its own key without a global lock on @@ -742,6 +804,12 @@ class LockSet: # will be trouble. self.__owners = {} + def _GetLockName(self, mname): + """Returns the name for a member lock. + + """ + return "%s/%s" % (self.name, mname) + def _is_owned(self): """Is the current thread a current level owner?""" return threading.currentThread() in self.__owners @@ -1049,7 +1117,7 @@ class LockSet: (invalid_names, self.name)) for lockname in names: - lock = SharedLock("%s/%s" % (self.name, lockname)) + lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) if acquired: lock.acquire(shared=shared) @@ -1187,13 +1255,24 @@ class GanetiLockManager: self.__class__._instance = self + self._monitor = LockMonitor() + # The keyring contains all the locks, at their level and in the correct # locking order. self.__keyring = { - LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"), - LEVEL_NODE: LockSet(nodes, "nodes lockset"), - LEVEL_INSTANCE: LockSet(instances, "instances lockset"), - } + LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), + LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), + LEVEL_INSTANCE: LockSet(instances, "instances", + monitor=self._monitor), + } + + def QueryLocks(self, fields, sync): + """Queries information from all locks. + + See L{LockMonitor.QueryLocks}. + + """ + return self._monitor.QueryLocks(fields, sync) def _names(self, level): """List the lock names at the given level. @@ -1346,3 +1425,59 @@ class GanetiLockManager: "Cannot remove locks at a level while not owning it or" " owning some at a greater one") return self.__keyring[level].remove(names) + + +class LockMonitor(object): + _LOCK_ATTR = "_lock" + + def __init__(self): + """Initializes this class. + + """ + self._lock = SharedLock("LockMonitor") + + # Tracked locks. Weak references are used to avoid issues with circular + # references and deletion. + self._locks = weakref.WeakKeyDictionary() + + @ssynchronized(_LOCK_ATTR) + def RegisterLock(self, lock): + """Registers a new lock. + + """ + logging.debug("Registering lock %s", lock.name) + assert lock not in self._locks, "Duplicate lock registration" + assert not compat.any(lock.name == i.name for i in self._locks.keys()), \ + "Found duplicate lock name" + self._locks[lock] = None + + @ssynchronized(_LOCK_ATTR) + def _GetLockInfo(self, fields): + """Get information from all locks while the monitor lock is held. + + """ + result = {} + + for lock in self._locks.keys(): + assert lock.name not in result, "Found duplicate lock name" + result[lock.name] = lock.GetInfo(fields) + + return result + + def QueryLocks(self, fields, sync): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + @type sync: boolean + @param sync: Whether to operate in synchronous mode + + """ + if sync: + raise NotImplementedError("Synchronous queries are not implemented") + + # Get all data without sorting + result = self._GetLockInfo(fields) + + # Sort by name + return [result[name] for name in utils.NiceSort(result.keys())] diff --git a/lib/luxi.py b/lib/luxi.py index cdd5518ff290d675cd3d1e8c9acce93fb744186a..669c3dd58de805b46f32d5b3d14141e09047e9d7 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -59,6 +59,7 @@ REQ_QUERY_EXPORTS = "QueryExports" REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" REQ_QUERY_TAGS = "QueryTags" +REQ_QUERY_LOCKS = "QueryLocks" REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag" REQ_SET_WATCHER_PAUSE = "SetWatcherPause" @@ -490,3 +491,6 @@ class Client(object): def QueryTags(self, kind, name): return self.CallMethod(REQ_QUERY_TAGS, (kind, name)) + + def QueryLocks(self, fields, sync): + return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync)) diff --git a/lib/ssh.py b/lib/ssh.py index 44a1e6ae368c4a922c98585c96e2e6ae453c702b..fdecab40a3203ab8cc582be0c3c45a88dcf0cd72 100644 --- a/lib/ssh.py +++ b/lib/ssh.py @@ -26,12 +26,25 @@ import os import logging +import re from ganeti import utils from ganeti import errors from ganeti import constants +def FormatParamikoFingerprint(fingerprint): + """Formats the fingerprint of L{paramiko.PKey.get_fingerprint()} + + @type fingerprint: str + @param fingerprint: PKey fingerprint + @return The string hex representation of the fingerprint + + """ + assert len(fingerprint) % 2 == 0 + return ":".join(re.findall(r"..", fingerprint.lower())) + + def GetUserFiles(user, mkdir=False): """Return the paths of a user's ssh files. diff --git a/lib/workerpool.py b/lib/workerpool.py index 1838d97b7c3a275f89067c7898f307df494072c2..1f3bcd88bb27d16428ac5576921b7d88066a936f 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -69,8 +69,11 @@ class BaseWorker(threading.Thread, object): """ super(BaseWorker, self).__init__(name=worker_id) self.pool = pool + self._worker_id = worker_id self._current_task = None + assert self.getName() == worker_id + def ShouldTerminate(self): """Returns whether this worker should terminate. @@ -100,6 +103,23 @@ class BaseWorker(threading.Thread, object): finally: self.pool._lock.release() + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + def _HasRunningTaskUnlocked(self): """Returns whether this worker is currently running a task. @@ -147,7 +167,11 @@ class BaseWorker(threading.Thread, object): # Run the actual task assert defer is None logging.debug("Starting task %r, priority %s", args, priority) - self.RunTask(*args) # pylint: disable-msg=W0142 + assert self.getName() == self._worker_id + try: + self.RunTask(*args) # pylint: disable-msg=W0142 + finally: + self.SetTaskName(None) logging.debug("Done with task %r, priority %s", args, priority) except DeferTask, err: defer = err @@ -159,7 +183,6 @@ class BaseWorker(threading.Thread, object): logging.debug("Deferring task %r, new priority %s", defer.priority) assert self._HasRunningTaskUnlocked() - except: # pylint: disable-msg=W0702 logging.exception("Caught unhandled exception") diff --git a/man/gnt-debug.sgml b/man/gnt-debug.sgml index a4929398f22c5e220375ac2ae7384d56a0199ce8..2d2e99349f01b5ac2ea68b4e7daefff6a6d32412 100644 --- a/man/gnt-debug.sgml +++ b/man/gnt-debug.sgml @@ -192,6 +192,70 @@ </para> </refsect2> + <refsect2> + <title>LOCKS</title> + <cmdsynopsis> + <command>locks</command> + <arg>--no-headers</arg> + <arg>--separator=<replaceable>SEPARATOR</replaceable></arg> + <sbr> + <arg>-o <replaceable>[+]FIELD,...</replaceable></arg> + <arg>--interval=<replaceable>SECONDS</replaceable></arg> + <sbr> + </cmdsynopsis> + + <para> + Shows a list of locks in the master daemon. + </para> + + <para> + The <option>--no-headers</option> option will skip the initial + header line. The <option>--separator</option> option takes an + argument which denotes what will be used between the output + fields. Both these options are to help scripting. + </para> + + <para> + The <option>-o</option> option takes a comma-separated list of + output fields. The available fields and their meaning are: + <variablelist> + <varlistentry> + <term>name</term> + <listitem> + <simpara>Lock name</simpara> + </listitem> + </varlistentry> + <varlistentry> + <term>mode</term> + <listitem> + <simpara> + Mode in which the lock is currently acquired (exclusive or + shared) + </simpara> + </listitem> + </varlistentry> + <varlistentry> + <term>owner</term> + <listitem> + <simpara>Current lock owner(s)</simpara> + </listitem> + </varlistentry> + </variablelist> + </para> + + <para> + If the value of the option starts with the character + <constant>+</constant>, the new fields will be added to the default + list. This allows to quickly see the default list plus a few other + fields, instead of retyping the entire list of fields. + </para> + + <para> + Use <option>--interval</option> to repeat the listing. A delay + specified by the option value in seconds is inserted. + </para> + + </refsect2> </refsect1> &footer; diff --git a/man/gnt-job.sgml b/man/gnt-job.sgml index d7cfa27e969b7e41cc57882551d946c0607376b0..c55ee889ecfc805c3ba4d3fa2a905e2300a16942 100644 --- a/man/gnt-job.sgml +++ b/man/gnt-job.sgml @@ -227,12 +227,6 @@ <simpara>the list of opcode end times</simpara> </listitem> </varlistentry> - <varlistentry> - <term>lock_status</term> - <listitem> - <simpara>the lock status (useful for debugging)</simpara> - </listitem> - </varlistentry> </variablelist> </para> diff --git a/scripts/gnt-debug b/scripts/gnt-debug index a860b3da019adb2cfe64163e7458f3a07adfd9d1..8fea3a9112005295e44764cd5e3f7bfabe005419 100755 --- a/scripts/gnt-debug +++ b/scripts/gnt-debug @@ -39,6 +39,14 @@ from ganeti import utils from ganeti import errors +#: Default fields for L{ListLocks} +_LIST_LOCKS_DEF_FIELDS = [ + "name", + "mode", + "owner", + ] + + def Delay(opts, args): """Sleeps for a while @@ -398,6 +406,57 @@ def TestJobqueue(opts, _): return 0 +def ListLocks(opts, args): # pylint: disable-msg=W0613 + """List all locks. + + @param opts: the command line options selected by the user + @type args: list + @param args: should be an empty list + @rtype: int + @return: the desired exit code + + """ + selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS) + + if not opts.no_headers: + headers = { + "name": "Name", + "mode": "Mode", + "owner": "Owner", + } + else: + headers = None + + while True: + # Not reusing client as interval might be too long + output = GetClient().QueryLocks(selected_fields, False) + + # change raw values to nicer strings + for row in output: + for idx, field in enumerate(selected_fields): + val = row[idx] + + if field in ("mode", "owner") and val is None: + val = "-" + elif field == "owner": + val = utils.CommaJoin(val) + + row[idx] = str(val) + + data = GenerateTable(separator=opts.separator, headers=headers, + fields=selected_fields, data=output) + for line in data: + ToStdout(line) + + if not opts.interval: + break + + ToStdout("") + time.sleep(opts.interval) + + return 0 + + commands = { 'delay': ( Delay, [ArgUnknown(min=1, max=1)], @@ -454,7 +513,10 @@ commands = { "{opts...} <instance>", "Executes a TestAllocator OpCode"), "test-jobqueue": ( TestJobqueue, ARGS_NONE, [], - "", "Test a few aspects of the job queue") + "", "Test a few aspects of the job queue"), + "locks": ( + ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT], + "[--interval N]", "Show a list of locks in the master daemon"), } diff --git a/scripts/gnt-instance b/scripts/gnt-instance index 4fa2670be372cbadcd8aa21f96c9d7b08ac14b54..8bab26e074f1de0feb6488080883a205ef8e2262 100755 --- a/scripts/gnt-instance +++ b/scripts/gnt-instance @@ -244,12 +244,7 @@ def ListInstances(opts, args): @return: the desired exit code """ - if opts.output is None: - selected_fields = _LIST_DEF_FIELDS - elif opts.output.startswith("+"): - selected_fields = _LIST_DEF_FIELDS + opts.output[1:].split(",") - else: - selected_fields = opts.output.split(",") + selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) output = GetClient().QueryInstances(args, selected_fields, opts.do_locking) diff --git a/scripts/gnt-job b/scripts/gnt-job index 2e71baea1b59f700df65f72fb2068a6c74e4c399..df2ced97dbe554698a616e77b1ab723238a36a5d 100755 --- a/scripts/gnt-job +++ b/scripts/gnt-job @@ -61,12 +61,7 @@ def ListJobs(opts, args): @return: the desired exit code """ - if opts.output is None: - selected_fields = _LIST_DEF_FIELDS - elif opts.output.startswith("+"): - selected_fields = _LIST_DEF_FIELDS + opts.output[1:].split(",") - else: - selected_fields = opts.output.split(",") + selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) output = GetClient().QueryJobs(args, selected_fields) if not opts.no_headers: @@ -85,7 +80,6 @@ def ListJobs(opts, args): "start_ts": "Start", "end_ts": "End", "received_ts": "Received", - "lock_status": "LockStatus", } else: headers = None @@ -109,8 +103,6 @@ def ListJobs(opts, args): val = FormatTimestamp(val) elif field in ("opstart", "opexec", "opend"): val = [FormatTimestamp(entry) for entry in val] - elif field == "lock_status" and not val: - val = "-" row[idx] = str(val) diff --git a/scripts/gnt-node b/scripts/gnt-node index a7daa017206ed3837f52243f9dfd616764fefc76..dace8eb6f4ff56dfb788627c54a130ded20c4963 100755 --- a/scripts/gnt-node +++ b/scripts/gnt-node @@ -45,6 +45,10 @@ _LIST_DEF_FIELDS = [ ] +#: Default field list for L{ListVolumes} +_LIST_VOL_DEF_FIELDS = ["node", "phys", "vg", "name", "size", "instance"] + + #: default list of field for L{ListStorage} _LIST_STOR_DEF_FIELDS = [ constants.SF_NODE, @@ -218,12 +222,7 @@ def ListNodes(opts, args): @return: the desired exit code """ - if opts.output is None: - selected_fields = _LIST_DEF_FIELDS - elif opts.output.startswith("+"): - selected_fields = _LIST_DEF_FIELDS + opts.output[1:].split(",") - else: - selected_fields = opts.output.split(",") + selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) output = GetClient().QueryNodes(args, selected_fields, opts.do_locking) @@ -496,11 +495,7 @@ def ListVolumes(opts, args): @return: the desired exit code """ - if opts.output is None: - selected_fields = ["node", "phys", "vg", - "name", "size", "instance"] - else: - selected_fields = opts.output.split(",") + selected_fields = ParseFields(opts.output, _LIST_VOL_DEF_FIELDS) op = opcodes.OpQueryNodeVolumes(nodes=args, output_fields=selected_fields) output = SubmitOpCode(op, opts=opts) @@ -544,12 +539,7 @@ def ListStorage(opts, args): storage_type = ConvertStorageType(opts.user_storage_type) - if opts.output is None: - selected_fields = _LIST_STOR_DEF_FIELDS - elif opts.output.startswith("+"): - selected_fields = _LIST_STOR_DEF_FIELDS + opts.output[1:].split(",") - else: - selected_fields = opts.output.split(",") + selected_fields = ParseFields(opts.output, _LIST_STOR_DEF_FIELDS) op = opcodes.OpQueryNodeStorage(nodes=args, storage_type=storage_type, diff --git a/test/ganeti.cli_unittest.py b/test/ganeti.cli_unittest.py index a1ae1471a69e4f9467894aac60cbae0e433a8547..77ad4c16e5322d03d7f2ee031004d63aaa19b730 100755 --- a/test/ganeti.cli_unittest.py +++ b/test/ganeti.cli_unittest.py @@ -429,5 +429,18 @@ class TestFormatLogMessage(unittest.TestCase): self.assert_(cli.FormatLogMessage("some other type", (1, 2, 3))) +class TestParseFields(unittest.TestCase): + def test(self): + self.assertEqual(cli.ParseFields(None, []), []) + self.assertEqual(cli.ParseFields("name,foo,hello", []), + ["name", "foo", "hello"]) + self.assertEqual(cli.ParseFields(None, ["def", "ault", "fields", "here"]), + ["def", "ault", "fields", "here"]) + self.assertEqual(cli.ParseFields("name,foo", ["def", "ault"]), + ["name", "foo"]) + self.assertEqual(cli.ParseFields("+name,foo", ["def", "ault"]), + ["def", "ault", "name", "foo"]) + + if __name__ == '__main__': testutils.GanetiTestProgram() diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 3eb23754a82c4dcc2ebb728e8de5ff2bec7e563c..c1ea3130b77ce351f9b324ef91a45b8a93846027 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -27,9 +27,11 @@ import unittest import time import Queue import threading +import random from ganeti import locking from ganeti import errors +from ganeti import utils import testutils @@ -1422,5 +1424,173 @@ class TestGanetiLockManager(_ThreadedTestCase): self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) +class TestLockMonitor(_ThreadedTestCase): + def setUp(self): + _ThreadedTestCase.setUp(self) + self.lm = locking.LockMonitor() + + def testSingleThread(self): + locks = [] + + for i in range(100): + name = "TestLock%s" % i + locks.append(locking.SharedLock(name, monitor=self.lm)) + + self.assertEqual(len(self.lm._locks), len(locks)) + + # Delete all locks + del locks[:] + + # The garbage collector might needs some time + def _CheckLocks(): + if self.lm._locks: + raise utils.RetryAgain() + + utils.Retry(_CheckLocks, 0.1, 30.0) + + self.assertFalse(self.lm._locks) + + def testMultiThread(self): + locks = [] + + def _CreateLock(prev, next, name): + prev.wait() + locks.append(locking.SharedLock(name, monitor=self.lm)) + if next: + next.set() + + expnames = [] + + first = threading.Event() + prev = first + + # Use a deterministic random generator + for i in random.Random(4263).sample(range(100), 33): + name = "MtTestLock%s" % i + expnames.append(name) + + ev = threading.Event() + self._addThread(target=_CreateLock, args=(prev, ev, name)) + prev = ev + + # Add locks + first.set() + self._waitThreads() + + # Check order in which locks were added + self.assertEqual([i.name for i in locks], expnames) + + # Sync queries are not supported + self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True) + + # Check query result + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[name, None, None] for name in utils.NiceSort(expnames)]) + + # Test exclusive acquire + for tlock in locks[::4]: + tlock.acquire(shared=0) + try: + def _GetExpResult(name): + if tlock.name == name: + return [name, "exclusive", [threading.currentThread().getName()]] + return [name, None, None] + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [_GetExpResult(name) + for name in utils.NiceSort(expnames)]) + finally: + tlock.release() + + # Test shared acquire + def _Acquire(lock, shared, ev, notify): + lock.acquire(shared=shared) + try: + notify.set() + ev.wait() + finally: + lock.release() + + for tlock1 in locks[::11]: + for tlock2 in locks[::-15]: + if tlock2 == tlock1: + # Avoid deadlocks + continue + + for tlock3 in locks[::10]: + if tlock3 in (tlock2, tlock1): + # Avoid deadlocks + continue + + releaseev = threading.Event() + + # Acquire locks + acquireev = [] + tthreads1 = [] + for i in range(3): + ev = threading.Event() + tthreads1.append(self._addThread(target=_Acquire, + args=(tlock1, 1, releaseev, ev))) + acquireev.append(ev) + + ev = threading.Event() + tthread2 = self._addThread(target=_Acquire, + args=(tlock2, 1, releaseev, ev)) + acquireev.append(ev) + + ev = threading.Event() + tthread3 = self._addThread(target=_Acquire, + args=(tlock3, 0, releaseev, ev)) + acquireev.append(ev) + + # Wait for all locks to be acquired + for i in acquireev: + i.wait() + + # Check query result + for (name, mode, owner) in self.lm.QueryLocks(["name", "mode", + "owner"], False): + if name == tlock1.name: + self.assertEqual(mode, "shared") + self.assertEqual(set(owner), set(i.getName() for i in tthreads1)) + continue + + if name == tlock2.name: + self.assertEqual(mode, "shared") + self.assertEqual(owner, [tthread2.getName()]) + continue + + if name == tlock3.name: + self.assertEqual(mode, "exclusive") + self.assertEqual(owner, [tthread3.getName()]) + continue + + self.assert_(name in expnames) + self.assert_(mode is None) + self.assert_(owner is None) + + # Release locks again + releaseev.set() + + self._waitThreads() + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[name, None, None] + for name in utils.NiceSort(expnames)]) + + def testDelete(self): + lock = locking.SharedLock("TestLock", monitor=self.lm) + + self.assertEqual(len(self.lm._locks), 1) + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, None, None]]) + + lock.delete() + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, "deleted", None]]) + self.assertEqual(len(self.lm._locks), 1) + + if __name__ == '__main__': testutils.GanetiTestProgram() diff --git a/test/ganeti.ssh_unittest.py b/test/ganeti.ssh_unittest.py index 5afd59052c74278feef480dab55b38a9f678990e..bd6c951efd742518112e37c3b5740f8c33d07b32 100755 --- a/test/ganeti.ssh_unittest.py +++ b/test/ganeti.ssh_unittest.py @@ -47,6 +47,12 @@ class TestKnownHosts(testutils.GanetiTestCase): "%s ssh-rsa %s\n" % (cfg.GetClusterName(), mocks.FAKE_CLUSTER_KEY)) + def testFormatParamikoFingerprintCorrect(self): + self.assertEqual(ssh.FormatParamikoFingerprint("C0Ffee"), "c0:ff:ee") + + def testFormatParamikoFingerprintNotDividableByTwo(self): + self.assertRaises(AssertionError, ssh.FormatParamikoFingerprint, "C0Ffe") + if __name__ == '__main__': testutils.GanetiTestProgram() diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 63868622ef52621eee2fb9a878b55888e845dee7..89b3b1adfd9a5208450cf1e6b0b1b9a863c38e47 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -77,6 +77,13 @@ class ChecksumContext: class ChecksumBaseWorker(workerpool.BaseWorker): def RunTask(self, ctx, number): + name = "number%s" % number + self.SetTaskName(name) + + # This assertion needs to be checked before updating the checksum. A + # failing assertion will then cause the result to be wrong. + assert self.getName() == ("%s/%s" % (self._worker_id, name)) + ctx.lock.acquire() try: ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) diff --git a/tools/setup-ssh b/tools/setup-ssh index e5a512bf1286c28aabc6efcb33f4c20250324d89..2dd2c9ab601617ccb7611be34ae4fe98dd23fb18 100755 --- a/tools/setup-ssh +++ b/tools/setup-ssh @@ -177,6 +177,7 @@ def ParseOptions(): " <node...>"), prog=program) parser.add_option(cli.DEBUG_OPT) parser.add_option(cli.VERBOSE_OPT) + parser.add_option(cli.NOSSH_KEYCHECK_OPT) default_key = ssh.GetUserFiles(constants.GANETI_RUNAS)[0] parser.add_option(optparse.Option("-f", dest="private_key", default=default_key, @@ -296,6 +297,22 @@ def LoginViaKeys(transport, username, keys): return False +def LoadKnownHosts(): + """Loads the known hosts + + @return L{paramiko.util.load_host_keys} dict + + """ + homedir = utils.GetHomeDir(constants.GANETI_RUNAS) + known_hosts = os.path.join(homedir, ".ssh", "known_hosts") + + try: + return paramiko.util.load_host_keys(known_hosts) + except EnvironmentError: + # We didn't found the path, silently ignore and return an empty dict + return {} + + def main(): """Main routine. @@ -309,6 +326,7 @@ def main(): passwd = None username = constants.GANETI_RUNAS ssh_port = netutils.GetDaemonPort("ssh") + host_keys = LoadKnownHosts() # Below, we need to join() the transport objects, as otherwise the # following happens: @@ -322,6 +340,28 @@ def main(): for host in args: transport = paramiko.Transport((host, ssh_port)) transport.start_client() + server_key = transport.get_remote_server_key() + keytype = server_key.get_name() + + our_server_key = host_keys.get(host, {}).get(keytype, None) + if options.ssh_key_check: + if not our_server_key: + hexified_key = ssh.FormatParamikoFingerprint( + server_key.get_fingerprint()) + msg = ("Unable to verify hostkey of host %s: %s. Do you want to accept" + " it?" % (host, hexified_key)) + + if cli.AskUser(msg): + our_server_key = server_key + + if our_server_key != server_key: + logging.error("Unable to verify identity of host. Aborting") + transport.close() + transport.join() + # TODO: Run over all hosts, fetch the keys and let them verify from the + # user beforehand then proceed with actual work later on + raise paramiko.SSHException("Unable to verify identity of host") + try: if LoginViaKeys(transport, username, all_keys): logging.info("Authenticated to %s via public key", host)