Commit e7a25b08 authored by Guido Trotter's avatar Guido Trotter
Browse files

Merge branch 'devel-2.1'

* devel-2.1:
  burnin: only remove instances we actually added
  burnin.ExecOrQueue: add post-process function
  burnin.ExecOrQueue: remove variable argument list
  Fix new pylint errors
  Rename the confd_client unittest (to confd.client)
  Make watcher request the max coverage
  ConfdClient.SendRequest: allow max coverage
  Document the watcher node maintenance feature
  Watcher: automatic shutdown of orphan resources
  Export the maintain_node_health option in ssconf
  Add a new cluster parameter maintain_node_health
  Add a new confd callback (StoreResultCallback)
  ConfdClient: add synchronous wait for replies mode
  ConfdClient: unify some internal variables

	  change moved to
Signed-off-by: default avatarGuido Trotter <>
Reviewed-by: default avatarIustin Pop <>
parents cf6fee17 1e82a86b
......@@ -327,7 +327,7 @@ python_tests = \
test/ \
test/ \
test/ \
test/ \
test/ \
test/ \
test/ \
test/ \
......@@ -44,6 +44,10 @@ from ganeti import errors
from ganeti import opcodes
from ganeti import cli
from ganeti import luxi
from ganeti import ssconf
from ganeti import bdev
from ganeti import hypervisor
from ganeti.confd import client as confd_client
......@@ -109,6 +113,117 @@ def RunWatcherHooks():
class NodeMaintenance(object):
"""Talks to confd daemons and possible shutdown instances/drbd devices.
def __init__(self):
self.store_cb = confd_client.StoreResultCallback()
self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
self.confd_client = confd_client.GetConfdClient(self.filter_cb)
def ShouldRun():
"""Checks whether node maintenance should run.
return ssconf.SimpleStore().GetMaintainNodeHealth()
except errors.ConfigurationError, err:
logging.error("Configuration error, not activating node maintenance: %s",
return False
def GetRunningInstances():
"""Compute list of hypervisor/running instances.
hyp_list = ssconf.SimpleStore().GetHypervisorList()
results = []
for hv_name in hyp_list:
hv = hypervisor.GetHypervisor(hv_name)
ilist = hv.ListInstances()
results.extend([(iname, hv_name) for iname in ilist])
except: # pylint: disable-msg=W0702
logging.error("Error while listing instances for hypervisor %s",
hv_name, exc_info=True)
return results
def GetUsedDRBDs():
"""Get list of used DRBD minors.
return bdev.DRBD8.GetUsedDevs().keys()
def DoMaintenance(cls, role):
"""Maintain the instance list.
if role == constants.CONFD_NODE_ROLE_OFFLINE:
inst_running = cls.GetRunningInstances()
drbd_running = cls.GetUsedDRBDs()
logging.debug("Not doing anything for role %s", role)
def ShutdownInstances(inst_running):
"""Shutdown running instances.
names_running = set([i[0] for i in inst_running])
if names_running:"Following instances should not be running,"
" shutting them down: %s", utils.CommaJoin(names_running))
# this dictionary will collapse duplicate instance names (only
# xen pvm/vhm) into a single key, which is fine
i2h = dict(inst_running)
for name in names_running:
hv_name = i2h[name]
hv = hypervisor.GetHypervisor(hv_name)
hv.StopInstance(None, force=True, name=name)
def ShutdownDRBD(drbd_running):
"""Shutdown active DRBD devices.
if drbd_running:"Following DRBD minors should not be active,"
" shutting them down: %s", utils.CommaJoin(drbd_running))
for minor in drbd_running:
# pylint: disable-msg=W0212
# using the private method as is, pending enhancements to the DRBD
# interface
def Exec(self):
"""Check node status versus cluster desired state.
my_name = utils.HostInfo().name
req = confd_client.ConfdClientRequest(type=
self.confd_client.SendRequest(req, async=False, coverage=-1)
timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
if not timed_out:
# should have a valid response
status, result = self.store_cb.GetResponse(req.rsalt)
assert status, "Missing result but received replies"
if not self.filter_cb.consistent[req.rsalt]:
logging.warning("Inconsistent replies, not doing anything")
logging.warning("Confd query timed out, cannot do maintenance actions")
class WatcherState(object):
"""Interface to a state file recording restart attempts.
......@@ -527,6 +642,10 @@ def main():
# run node maintenance in all cases, even if master, so that old
# masters can be properly cleaned up too
if NodeMaintenance.ShouldRun():
notepad = WatcherState(statefile)
......@@ -1058,6 +1058,20 @@ in the manpage.
.. note:: this command only stores a local flag file, and if you
failover the master, it will not have effect on the new master.
Node auto-maintenance
If the cluster parameter ``maintain_node_health`` is enabled (see the
manpage for :command:`gnt-cluster`, the init and modify subcommands),
then the following will happen automatically:
- the watcher will shutdown any instances running on offline nodes
- the watcher will deactivate any DRBD devices on offline nodes
In the future, more actions are planned, so only enable this parameter
if the nodes are completely dedicated to Ganeti; otherwise it might be
possible to lose data due to auto-maintenance actions.
Removing a cluster entirely
......@@ -178,7 +178,8 @@ def InitCluster(cluster_name, mac_prefix,
master_netdev, file_storage_dir, candidate_pool_size,
secondary_ip=None, vg_name=None, beparams=None,
nicparams=None, hvparams=None, enabled_hypervisors=None,
modify_etc_hosts=True, modify_ssh_setup=True):
modify_etc_hosts=True, modify_ssh_setup=True,
"""Initialise the cluster.
@type candidate_pool_size: int
......@@ -321,6 +322,7 @@ def InitCluster(cluster_name, mac_prefix,
master_node_config = objects.Node(,
......@@ -78,6 +78,7 @@ __all__ = [
......@@ -940,6 +941,13 @@ USE_REPL_NET_OPT = cli_option("--use-replication-network",
" for talking to the nodes",
action="store_true", default=False)
cli_option("--maintain-node-health", dest="maintain_node_health",
metavar=_YORNO, default=None, type="bool",
help="Configure the cluster to automatically maintain node"
" health, by shutting down unknown instances, shutting down"
" unknown DRBD devices, etc.")
def _ParseArgs(argv, commands, aliases):
"""Parser for the command line arguments.
......@@ -2240,6 +2240,7 @@ class LUSetClusterParams(LogicalUnit):
if self.op.candidate_pool_size < 1:
raise errors.OpPrereqError("At least one master candidate needed",
_CheckBooleanOpField(self.op, "maintain_node_health")
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
......@@ -2432,6 +2433,9 @@ class LUSetClusterParams(LogicalUnit):
# we need to update the pool size here, otherwise the save will fail
_AdjustCandidatePool(self, [])
if self.op.maintain_node_health is not None:
self.cluster.maintain_node_health = self.op.maintain_node_health
self.cfg.Update(self.cluster, feedback_fn)
......@@ -3691,6 +3695,7 @@ class LUQueryClusterInfo(NoHooksLU):
"master_netdev": cluster.master_netdev,
"volume_group_name": cluster.volume_group_name,
"file_storage_dir": cluster.file_storage_dir,
"maintain_node_health": cluster.maintain_node_health,
"ctime": cluster.ctime,
"mtime": cluster.mtime,
"uuid": cluster.uuid,
......@@ -85,6 +85,24 @@ class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
self.client.HandleResponse(payload, ip, port)
class _Request(object):
"""Request status structure.
@ivar request: the request data
@ivar args: any extra arguments for the callback
@ivar expiry: the expiry timestamp of the request
@ivar sent: the set of contacted peers
@ivar rcvd: the set of peers who replied
def __init__(self, request, args, expiry, sent):
self.request = request
self.args = args
self.expiry = expiry
self.sent = frozenset(sent)
self.rcvd = set()
class ConfdClient:
"""Send queries to confd, and get back answers.
......@@ -92,6 +110,11 @@ class ConfdClient:
getting back answers, this is an asynchronous library. It can either work
through asyncore or with your own handling.
@type _requests: dict
@ivar _requests: dictionary indexes by salt, which contains data
about the outstanding requests; the values are objects of type
def __init__(self, hmac_key, peers, callback, port=None, logger=None):
"""Constructor for ConfdClient
......@@ -118,7 +141,6 @@ class ConfdClient:
self._confd_port = port
self._logger = logger
self._requests = {}
self._expire_requests = []
if self._confd_port is None:
self._confd_port = utils.GetDaemonPort(constants.CONFD)
......@@ -161,23 +183,18 @@ class ConfdClient:
now = time.time()
while self._expire_requests:
expire_time, rsalt = self._expire_requests[0]
if now >= expire_time:
(request, args) = self._requests[rsalt]
for rsalt, rq in self._requests.items():
if now >= rq.expiry:
del self._requests[rsalt]
client_reply = ConfdUpcallPayload(salt=rsalt,
def SendRequest(self, request, args=None, coverage=None, async=True):
def SendRequest(self, request, args=None, coverage=0, async=True):
"""Send a confd request to some MCs
@type request: L{objects.ConfdRequest}
......@@ -185,13 +202,19 @@ class ConfdClient:
@type args: tuple
@param args: additional callback arguments
@type coverage: integer
@param coverage: number of remote nodes to contact
@param coverage: number of remote nodes to contact; if default
(0), it will use a reasonable default
(L{ganeti.constants.CONFD_DEFAULT_REQ_COVERAGE}), if -1 is
passed, it will use the maximum number of peers, otherwise the
number passed in will be used
@type async: boolean
@param async: handle the write asynchronously
if coverage is None:
if coverage == 0:
coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
elif coverage == -1:
coverage = len(self._peers)
if coverage > len(self._peers):
raise errors.ConfdClientError("Not enough MCs known to provide the"
......@@ -219,9 +242,9 @@ class ConfdClient:
except errors.UdpDataSizeError:
raise errors.ConfdClientError("Request too big")
self._requests[request.rsalt] = (request, args)
expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
self._expire_requests.append((expire_time, request.rsalt))
self._requests[request.rsalt] = _Request(request, args, expire_time,
if not async:
......@@ -241,19 +264,21 @@ class ConfdClient:
(request, args) = self._requests[salt]
rq = self._requests[salt]
except KeyError:
if self._logger:
self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
client_reply = ConfdUpcallPayload(salt=salt,
......@@ -281,6 +306,83 @@ class ConfdClient:
return self._socket.process_next_packet(timeout=timeout)
def _NeededReplies(peer_cnt):
"""Compute the minimum safe number of replies for a query.
The algorithm is designed to work well for both small and big
number of peers:
- for less than three, we require all responses
- for less than five, we allow one miss
- otherwise, half the number plus one
This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
4->2, 5->3, 6->3, 7->4, etc.
@type peer_cnt: int
@param peer_cnt: the number of peers contacted
@rtype: int
@return: the number of replies which should give a safe coverage
if peer_cnt < 3:
return peer_cnt
elif peer_cnt < 5:
return peer_cnt - 1
return int(peer_cnt/2) + 1
def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
"""Wait for replies to a given request.
This method will wait until either the timeout expires or a
minimum number (computed using L{_NeededReplies}) of replies are
received for the given salt. It is useful when doing synchronous
calls to this library.
@param salt: the salt of the request we want responses for
@param timeout: the maximum timeout (should be less or equal to
@rtype: tuple
@return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
request is unknown, timed_out will be true and the counters
will be zero
def _CheckResponse():
if salt not in self._requests:
# expired?
if self._logger:
self._logger.debug("Discarding unknown/expired request: %s" % salt)
return MISSING
rq = self._requests[salt]
if len(rq.rcvd) >= expected:
# already got all replies
return (False, len(rq.sent), len(rq.rcvd))
# else wait, using default timeout
raise utils.RetryAgain()
MISSING = (True, 0, 0)
if salt not in self._requests:
return MISSING
# extend the expire time with the current timeout, so that we
# don't get the request expired from under us
rq = self._requests[salt]
rq.expiry += timeout
sent = len(rq.sent)
expected = self._NeededReplies(sent)
return utils.Retry(_CheckResponse, 0, timeout)
except utils.RetryTimeout:
if salt in self._requests:
rq = self._requests[salt]
return (True, len(rq.sent), len(rq.rcvd))
return MISSING
# UPCALL_REPLY: server reply upcall
# has all ConfdUpcallPayload fields populated
......@@ -510,6 +612,54 @@ class ConfdCountingCallback:
class StoreResultCallback:
"""Callback that simply stores the most recent answer.
@ivar _answers: dict of salt to (have_answer, reply)
_NO_KEY = (False, None)
def __init__(self):
"""Constructor for StoreResultCallback
# answers contains a dict of salt -> best result
self._answers = {}
def GetResponse(self, salt):
"""Return the best match for a salt
return self._answers.get(salt, self._NO_KEY)
def _HandleExpire(self, up):
"""Expiration handler.
if up.salt in self._answers and self._answers[up.salt] == self._NO_KEY:
del self._answers[up.salt]
def _HandleReply(self, up):
"""Handle a single confd reply, and decide whether to filter it.
self._answers[up.salt] = (True, up)
def __call__(self, up):
"""Filtering callback
@type up: L{ConfdUpcallPayload}
@param up: upper callback
if up.type == UPCALL_REPLY:
elif up.type == UPCALL_EXPIRE:
def GetConfdClient(callback):
"""Return a client configured using the given callback.
......@@ -1389,6 +1389,7 @@ class ConfigWriter:
constants.SS_INSTANCE_LIST: instance_data,
constants.SS_HYPERVISOR_LIST: hypervisor_list,
constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
@locking.ssynchronized(_config_lock, shared=1)
......@@ -675,6 +675,7 @@ SS_ONLINE_NODES = "online_nodes"
SS_INSTANCE_LIST = "instance_list"
SS_RELEASE_VERSION = "release_version"
SS_HYPERVISOR_LIST = "hypervisor_list"
SS_MAINTAIN_NODE_HEALTH = "maintain_node_health"
# cluster wide default parameters
......@@ -265,8 +265,9 @@ def ParseRequest(msg):
logging.error("LUXI request not a dict: %r", msg)
raise ProtocolError("Invalid LUXI request (not a dict)")
method = request.get(KEY_METHOD, None)
args = request.get(KEY_ARGS, None)
method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
if method is None or args is None:
logging.error("LUXI request missing method or arguments: %r", msg)
raise ProtocolError(("Invalid LUXI request (no method or arguments"
......@@ -865,6 +865,7 @@ class Cluster(TaggableObject):
def UpgradeConfig(self):
......@@ -911,6 +912,10 @@ class Cluster(TaggableObject):
if hvname != self.default_hypervisor])
self.default_hypervisor = None
# maintain_node_health added after 2.1.1
if self.maintain_node_health is None:
self.maintain_node_health = False
def ToDict(self):
"""Custom function for cluster.
......@@ -305,6 +305,7 @@ class OpSetClusterParams(OpCode):
......@@ -152,6 +152,7 @@ def LoadSignedJson(txt, key):
raise errors.SignatureError('Invalid external message')
if callable(key):
# pylint: disable-msg=E1103
key_selector = signed_dict.get("key_selector", None)
hmac_key = key(key_selector)
if not hmac_key:
......@@ -283,6 +283,7 @@ class SimpleStore(object):
_MAX_SIZE = 131072
......@@ -432,6 +433,14 @@ class SimpleStore(object):
nl = data.splitlines(False)
return nl
def GetMaintainNodeHealth(self):
"""Return the value of the maintain_node_health option.
data = self._ReadFile(constants.SS_MAINTAIN_NODE_HEALTH)
# we rely on the bool serialization here
return data == "True"
def GetMasterAndMyself(ss=None):
"""Get the master node and my own hostname.
......@@ -48,31 +48,68 @@
The <command>&dhpackage;</command> is a periodically run script
which is responsible for keeping the instances in the correct
status. It has two separate functions, one for the master node
and another one that runs on every node.
Its primary function is to try to keep running all instances
which are marked as <emphasis>up</emphasis> in the configuration
file, by trying to start them a limited number of times.
<title>Master operations</title>
Its other function is to <quote>repair</quote> DRBD links by
reactivating the block devices of instances which have
secondaries on nodes that have been rebooted.
Its primary function is to try to keep running all instances
which are marked as <emphasis>up</emphasis> in the configuration
file, by trying to start them a limited number of times.