Commit c30421e0 authored by René Nussbaumer's avatar René Nussbaumer

Merge branch 'devel-2.2'

hansmi helped me with merging the conflict. Thanks

Signed-off-by: default avatarRené Nussbaumer <>
Reviewed-by: default avatarIustin Pop <>
parents 14bde528 310a8944
Version 2.2.0 rc1
*(Released Mon, 23 Aug 2010)*
- Support DRBD versions of the format "a.b.c.d"
- Updated manpages
- Re-introduce support for usage from multiple threads in RAPI client
- Instance renames and modify via RAPI
- Work around race condition between processing and archival in job
- Mark opcodes following failed one as failed, too
- Job field ``lock_status`` was removed due to difficulties making it
work with the changed job queue in Ganeti 2.2; a better way to monitor
locks is expected for a later 2.2.x release
- Fixed dry-run behaviour with many commands
- Support ``ssh-agent`` again when adding nodes
- Many additional bugfixes
Version 2.2.0 rc0
......@@ -105,6 +125,22 @@ Version 2.2.0 beta 0
see the ``ganeti-os-interface(7)`` manpage and look for
Version 2.1.7
*(Released Tue, 24 Aug 2010)*
Bugfixes only:
- Don't ignore secondary node silently on non-mirrored disk templates
(issue 113)
- Fix --master-netdev arg name in gnt-cluster(8) (issue 114)
- Fix usb_mouse parameter breaking with vnc_console (issue 109)
- Properly document the usb_mouse parameter
- Fix path in ganeti-rapi(8) (issue 116)
- Adjust error message when the ganeti user's .ssh directory is
- Add same-node-check when changing the disk template to drbd
Version 2.1.6
......@@ -2,7 +2,7 @@
m4_define([gnt_version_major], [2])
m4_define([gnt_version_minor], [2])
m4_define([gnt_version_revision], [0])
m4_define([gnt_version_suffix], [~rc0])
m4_define([gnt_version_suffix], [~rc1])
gnt_version_major, gnt_version_minor,
......@@ -277,6 +277,11 @@ class ClientOps:
op = opcodes.OpGetTags(kind=kind, name=name)
return self._Query(op)
elif method == luxi.REQ_QUERY_LOCKS:
(fields, sync) = args"Received locks query request")
return self.server.context.glm.QueryLocks(fields, sync)
elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
drain_flag = args"Received queue drain flag change request to %s",
......@@ -84,6 +84,7 @@ __all__ = [
......@@ -196,6 +197,7 @@ __all__ = [
NO_PREFIX = "no_"
......@@ -929,6 +931,11 @@ SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
help="Maximum time to wait for instance shutdown")
INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
help=("Number of seconds between repetions of the"
" command"))
EARLY_RELEASE_OPT = cli_option("--early-release",
dest="early_release", default=False,
......@@ -1208,6 +1215,24 @@ def CalculateOSNames(os_name, os_variants):
return [os_name]
def ParseFields(selected, default):
"""Parses the values of "--field"-like options.
@type selected: string or None
@param selected: User-selected options
@type default: list
@param default: Default fields
if selected is None:
return default
if selected.startswith("+"):
return default + selected[1:].split(",")
return selected.split(",")
UsesRPC = rpc.RunWithRPC
......@@ -690,6 +690,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
@param job: the job to be processed
self.SetTaskName("Job%s" %"Processing job %s",
proc = mcpu.Processor(self.pool.queue.context,
queue = job.queue
......@@ -30,6 +30,8 @@ import select
import threading
import time
import errno
import weakref
import logging
from ganeti import errors
from ganeti import utils
......@@ -409,6 +411,7 @@ class SharedLock(object):
__slots__ = [
......@@ -421,10 +424,12 @@ class SharedLock(object):
__condition_class = PipeCondition
def __init__(self, name):
def __init__(self, name, monitor=None):
"""Construct a new SharedLock.
@param name: the name of the lock
@type monitor: L{LockMonitor}
@param monitor: Lock monitor with which to register
......@@ -448,6 +453,55 @@ class SharedLock(object):
# is this lock in the deleted state?
self.__deleted = False
# Register with lock monitor
if monitor:
def GetInfo(self, fields):
"""Retrieves information for querying locks.
@type fields: list of strings
@param fields: List of fields to return
info = []
# Note: to avoid unintentional race conditions, no references to
# modifiable objects should be returned unless they were created in this
# function.
for fname in fields:
if fname == "name":
elif fname == "mode":
if self.__deleted:
assert not (self.__exc or self.__shr)
elif self.__exc:
elif self.__shr:
elif fname == "owner":
if self.__exc:
owner = [self.__exc]
owner = self.__shr
if owner:
assert not self.__deleted
info.append([i.getName() for i in owner])
raise errors.OpExecError("Invalid query field '%s'" % fname)
return info
def __check_deleted(self):
"""Raises an exception if the lock has been deleted.
......@@ -671,6 +725,8 @@ class SharedLock(object):
self.__deleted = True
self.__exc = None
assert not (self.__exc or self.__shr), "Found owner during deletion"
# Notify all acquires. They'll throw an error.
while self.__pending:
......@@ -713,16 +769,21 @@ class LockSet:
@ivar name: the name of the lockset
def __init__(self, members, name):
def __init__(self, members, name, monitor=None):
"""Constructs a new LockSet.
@type members: list of strings
@param members: initial members of the set
@type monitor: L{LockMonitor}
@param monitor: Lock monitor with which to register member locks
assert members is not None, "members parameter is not a list" = name
# Lock monitor
self.__monitor = monitor
# Used internally to guarantee coherency.
self.__lock = SharedLock(name)
......@@ -731,7 +792,8 @@ class LockSet:
self.__lockdict = {}
for mname in members:
self.__lockdict[mname] = SharedLock("%s/%s" % (name, mname))
self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
# The owner dict contains the set of locks each thread owns. For
# performance each thread can access its own key without a global lock on
......@@ -742,6 +804,12 @@ class LockSet:
# will be trouble.
self.__owners = {}
def _GetLockName(self, mname):
"""Returns the name for a member lock.
return "%s/%s" % (, mname)
def _is_owned(self):
"""Is the current thread a current level owner?"""
return threading.currentThread() in self.__owners
......@@ -1049,7 +1117,7 @@ class LockSet:
for lockname in names:
lock = SharedLock("%s/%s" % (, lockname))
lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
if acquired:
......@@ -1187,13 +1255,24 @@ class GanetiLockManager:
self.__class__._instance = self
self._monitor = LockMonitor()
# The keyring contains all the locks, at their level and in the correct
# locking order.
self.__keyring = {
LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"),
LEVEL_NODE: LockSet(nodes, "nodes lockset"),
LEVEL_INSTANCE: LockSet(instances, "instances lockset"),
LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
LEVEL_INSTANCE: LockSet(instances, "instances",
def QueryLocks(self, fields, sync):
"""Queries information from all locks.
See L{LockMonitor.QueryLocks}.
return self._monitor.QueryLocks(fields, sync)
def _names(self, level):
"""List the lock names at the given level.
......@@ -1346,3 +1425,59 @@ class GanetiLockManager:
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
class LockMonitor(object):
_LOCK_ATTR = "_lock"
def __init__(self):
"""Initializes this class.
self._lock = SharedLock("LockMonitor")
# Tracked locks. Weak references are used to avoid issues with circular
# references and deletion.
self._locks = weakref.WeakKeyDictionary()
def RegisterLock(self, lock):
"""Registers a new lock.
logging.debug("Registering lock %s",
assert lock not in self._locks, "Duplicate lock registration"
assert not compat.any( == for i in self._locks.keys()), \
"Found duplicate lock name"
self._locks[lock] = None
def _GetLockInfo(self, fields):
"""Get information from all locks while the monitor lock is held.
result = {}
for lock in self._locks.keys():
assert not in result, "Found duplicate lock name"
result[] = lock.GetInfo(fields)
return result
def QueryLocks(self, fields, sync):
"""Queries information from all locks.
@type fields: list of strings
@param fields: List of fields to return
@type sync: boolean
@param sync: Whether to operate in synchronous mode
if sync:
raise NotImplementedError("Synchronous queries are not implemented")
# Get all data without sorting
result = self._GetLockInfo(fields)
# Sort by name
return [result[name] for name in utils.NiceSort(result.keys())]
......@@ -59,6 +59,7 @@ REQ_QUERY_EXPORTS = "QueryExports"
REQ_QUERY_TAGS = "QueryTags"
REQ_QUERY_LOCKS = "QueryLocks"
......@@ -490,3 +491,6 @@ class Client(object):
def QueryTags(self, kind, name):
return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
def QueryLocks(self, fields, sync):
return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))
......@@ -26,12 +26,25 @@
import os
import logging
import re
from ganeti import utils
from ganeti import errors
from ganeti import constants
def FormatParamikoFingerprint(fingerprint):
"""Formats the fingerprint of L{paramiko.PKey.get_fingerprint()}
@type fingerprint: str
@param fingerprint: PKey fingerprint
@return The string hex representation of the fingerprint
assert len(fingerprint) % 2 == 0
return ":".join(re.findall(r"..", fingerprint.lower()))
def GetUserFiles(user, mkdir=False):
"""Return the paths of a user's ssh files.
......@@ -69,8 +69,11 @@ class BaseWorker(threading.Thread, object):
super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
self._worker_id = worker_id
self._current_task = None
assert self.getName() == worker_id
def ShouldTerminate(self):
"""Returns whether this worker should terminate.
......@@ -100,6 +103,23 @@ class BaseWorker(threading.Thread, object):
def SetTaskName(self, taskname):
"""Sets the name of the current task.
Should only be called from within L{RunTask}.
@type taskname: string
@param taskname: Task's name
if taskname:
name = "%s/%s" % (self._worker_id, taskname)
name = self._worker_id
# Set thread name
def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task.
......@@ -147,7 +167,11 @@ class BaseWorker(threading.Thread, object):
# Run the actual task
assert defer is None
logging.debug("Starting task %r, priority %s", args, priority)
self.RunTask(*args) # pylint: disable-msg=W0142
assert self.getName() == self._worker_id
self.RunTask(*args) # pylint: disable-msg=W0142
logging.debug("Done with task %r, priority %s", args, priority)
except DeferTask, err:
defer = err
......@@ -159,7 +183,6 @@ class BaseWorker(threading.Thread, object):
logging.debug("Deferring task %r, new priority %s", defer.priority)
assert self._HasRunningTaskUnlocked()
except: # pylint: disable-msg=W0702
logging.exception("Caught unhandled exception")
......@@ -192,6 +192,70 @@
<arg>-o <replaceable>[+]FIELD,...</replaceable></arg>
Shows a list of locks in the master daemon.
The <option>--no-headers</option> option will skip the initial
header line. The <option>--separator</option> option takes an
argument which denotes what will be used between the output
fields. Both these options are to help scripting.
The <option>-o</option> option takes a comma-separated list of
output fields. The available fields and their meaning are:
<simpara>Lock name</simpara>
Mode in which the lock is currently acquired (exclusive or
<simpara>Current lock owner(s)</simpara>
If the value of the option starts with the character
<constant>+</constant>, the new fields will be added to the default
list. This allows to quickly see the default list plus a few other
fields, instead of retyping the entire list of fields.
Use <option>--interval</option> to repeat the listing. A delay
specified by the option value in seconds is inserted.
......@@ -227,12 +227,6 @@
<simpara>the list of opcode end times</simpara>
<simpara>the lock status (useful for debugging)</simpara>
......@@ -39,6 +39,14 @@ from ganeti import utils
from ganeti import errors
#: Default fields for L{ListLocks}
def Delay(opts, args):
"""Sleeps for a while
......@@ -398,6 +406,57 @@ def TestJobqueue(opts, _):
return 0
def ListLocks(opts, args): # pylint: disable-msg=W0613
"""List all locks.
@param opts: the command line options selected by the user
@type args: list
@param args: should be an empty list
@rtype: int
@return: the desired exit code
selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
if not opts.no_headers:
headers = {
"name": "Name",
"mode": "Mode",
"owner": "Owner",
headers = None
while True:
# Not reusing client as interval might be too long
output = GetClient().QueryLocks(selected_fields, False)
# change raw values to nicer strings
for row in output:
for idx, field in enumerate(selected_fields):
val = row[idx]
if field in ("mode", "owner") and val is None:
val = "-"
elif field == "owner":
val = utils.CommaJoin(val)
row[idx] = str(val)
data = GenerateTable(separator=opts.separator, headers=headers,
fields=selected_fields, data=output)
for line in data:
if not opts.interval:
return 0
commands = {
'delay': (
Delay, [ArgUnknown(min=1, max=1)],
......@@ -454,7 +513,10 @@ commands = {
"{opts...} <instance>", "Executes a TestAllocator OpCode"),
"test-jobqueue": (
TestJobqueue, ARGS_NONE, [],
"", "Test a few aspects of the job queue")
"", "Test a few aspects of the job queue"),
"locks": (
"[--interval N]", "Show a list of locks in the master daemon"),