-
Guido Trotter authored
* next: Create a new --no-voting option for masterfailover ganeti-masterd: allow non-interactive --no-voting Fix pylint warnings Add custom pylintrc bootstrap: Don't leak file descriptor when generating SSL certificate Fix problem with EAGAIN on socket connection in clients Fix some typos Increase maximum accepted size for a DRBD meta dev Cleanup config data when draining nodes Fix node readd issues backend.DemoteFromMC: don't fail for missing files Allow GetMasterCandidateStats to ignore some nodes Fix error message for extra files on non MC nodes Conflicts: lib/backend.py Most of the conflicts where in the new rpcs VS pylint fixes and usually the new rpcs fixed the pylint problems as well lib/bootstrap.py Small conflict between masterfailover --no-voting and new rpcs lib/cmdlib.py Net parameters conflicted here, kept that version lib/objects.py Same problem fixed in two different ways. 'next' version kept Signed-off-by:
Guido Trotter <ultrotter@google.com>
2f7140ba
mcpu.py 13.48 KiB
#
#
# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Module implementing the logic behind the cluster operations
This module implements the logic for doing operations in the cluster. There
are two kinds of classes defined:
- logical units, which know how to deal with their specific opcode only
- the processor, which dispatches the opcodes to their logical units
"""
import logging
from ganeti import opcodes
from ganeti import constants
from ganeti import errors
from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
class Processor(object):
"""Object which runs OpCodes"""
DISPATCH_TABLE = {
# Cluster
opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
opcodes.OpRenameCluster: cmdlib.LURenameCluster,
opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
# node lu
opcodes.OpAddNode: cmdlib.LUAddNode,
opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
opcodes.OpRemoveNode: cmdlib.LURemoveNode,
opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
# instance lu
opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
opcodes.OpRenameInstance: cmdlib.LURenameInstance,
opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
opcodes.OpRebootInstance: cmdlib.LURebootInstance,
opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
# os lu
opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
# exports lu
opcodes.OpQueryExports: cmdlib.LUQueryExports,
opcodes.OpExportInstance: cmdlib.LUExportInstance,
opcodes.OpRemoveExport: cmdlib.LURemoveExport,
# tags lu
opcodes.OpGetTags: cmdlib.LUGetTags,
opcodes.OpSearchTags: cmdlib.LUSearchTags,
opcodes.OpAddTags: cmdlib.LUAddTags,
opcodes.OpDelTags: cmdlib.LUDelTags,
# test lu
opcodes.OpTestDelay: cmdlib.LUTestDelay,
opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
}
def __init__(self, context):
"""Constructor for Processor
Args:
- feedback_fn: the feedback function (taking one string) to be run when
interesting events are happening
"""
self.context = context
self._feedback_fn = None
self.exclusive_BGL = False
self.rpc = rpc.RpcRunner(context.cfg)
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
"""
write_count = self.context.cfg.write_count
lu.CheckPrereq()
hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
self._feedback_fn, None)
if getattr(lu.op, "dry_run", False):
# in this mode, no post-hooks are run, and the config is not
# written (as it might have been modified by another LU, and we
# shouldn't do writeout on behalf of other threads
self.LogInfo("dry-run mode requested, not actually executing"
" the operation")
return lu.dry_run_result
try:
result = lu.Exec(self._feedback_fn)
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
self._feedback_fn, result)
finally:
# FIXME: This needs locks if not lu_class.REQ_BGL
if write_count != self.context.cfg.write_count:
hm.RunConfigUpdate()
return result
def _LockAndExecLU(self, lu, level):
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
proceeds up, till there are no more locks to acquire. Then it executes the
given LU and its opcodes.
"""
adding_locks = level in lu.add_locks
acquiring_locks = level in lu.needed_locks
if level not in locking.LEVELS:
if callable(self._run_notifier):
self._run_notifier()
result = self._ExecLU(lu)
elif adding_locks and acquiring_locks:
# We could both acquire and add locks at the same level, but for now we
# don't need this, so we'll avoid the complicated code needed.
raise NotImplementedError(
"Can't declare locks to acquire when adding others")
elif adding_locks or acquiring_locks:
lu.DeclareLocks(level)
share = lu.share_locks[level]
if acquiring_locks:
needed_locks = lu.needed_locks[level]
lu.acquired_locks[level] = self.context.glm.acquire(level,
needed_locks,
shared=share)
else: # adding_locks
add_locks = lu.add_locks[level]
lu.remove_locks[level] = add_locks
try:
self.context.glm.add(level, add_locks, acquired=1, shared=share)
except errors.LockError:
raise errors.OpPrereqError(
"Couldn't add locks (%s), probably because of a race condition"
" with another job, who added them first" % add_locks)
try:
try:
if adding_locks:
lu.acquired_locks[level] = add_locks
result = self._LockAndExecLU(lu, level + 1)
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
finally:
if self.context.glm.is_owned(level):
self.context.glm.release(level)
else:
result = self._LockAndExecLU(lu, level + 1)
return result
def ExecOpCode(self, op, feedback_fn, run_notifier):
"""Execute an opcode.
@type op: an OpCode instance
@param op: the opcode to be executed
@type feedback_fn: a function that takes a single argument
@param feedback_fn: this function will be used as feedback from the LU
code to the end-user
@type run_notifier: callable (no arguments) or None
@param run_notifier: this function (if callable) will be called when
we are about to call the lu's Exec() method, that
is, after we have acquired all locks
"""
if not isinstance(op, opcodes.OpCode):
raise errors.ProgrammerError("Non-opcode instance passed"
" to ExecOpcode")
self._feedback_fn = feedback_fn
self._run_notifier = run_notifier
lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
# shared fashion otherwise (to prevent concurrent run with an exclusive LU.
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
shared=not lu_class.REQ_BGL)
try:
self.exclusive_BGL = lu_class.REQ_BGL
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
self.exclusive_BGL = False
return result
def LogStep(self, current, total, message):
"""Log a change in LU execution progress.
"""
logging.debug("Step %d/%d %s", current, total, message)
self._feedback_fn("STEP %d/%d %s" % (current, total, message))
def LogWarning(self, message, *args, **kwargs):
"""Log a warning to the logs and the user.
The optional keyword argument is 'hint' and can be used to show a
hint to the user (presumably related to the warning). If the
message is empty, it will not be printed at all, allowing one to
show only a hint.
"""
assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
"Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
if args:
message = message % tuple(args)
if message:
logging.warning(message)
self._feedback_fn(" - WARNING: %s" % message)
if "hint" in kwargs:
self._feedback_fn(" Hint: %s" % kwargs["hint"])
def LogInfo(self, message, *args):
"""Log an informational message to the logs and the user.
"""
if args:
message = message % tuple(args)
logging.info(message)
self._feedback_fn(" - INFO: %s" % message)
class HooksMaster(object):
"""Hooks master.
This class distributes the run commands to the nodes based on the
specific LU class.
In order to remove the direct dependency on the rpc module, the
constructor needs a function which actually does the remote
call. This will usually be rpc.call_hooks_runner, but any function
which behaves the same works.
"""
def __init__(self, callfn, proc, lu):
self.callfn = callfn
self.proc = proc
self.lu = lu
self.op = lu.op
self.env, node_list_pre, node_list_post = self._BuildEnv()
self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
constants.HOOKS_PHASE_POST: node_list_post}
def _BuildEnv(self):
"""Compute the environment and the target nodes.
Based on the opcode and the current node list, this builds the
environment for the hooks and the target node list for the run.
"""
env = {
"PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
"GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
"GANETI_OP_CODE": self.op.OP_ID,
"GANETI_OBJECT_TYPE": self.lu.HTYPE,
"GANETI_DATA_DIR": constants.DATA_DIR,
}
if self.lu.HPATH is not None:
lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
if lu_env:
for key in lu_env:
env["GANETI_" + key] = lu_env[key]
else:
lu_nodes_pre = lu_nodes_post = []
return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
def _RunWrapper(self, node_list, hpath, phase):
"""Simple wrapper over self.callfn.
This method fixes the environment before doing the rpc call.
"""
env = self.env.copy()
env["GANETI_HOOKS_PHASE"] = phase
env["GANETI_HOOKS_PATH"] = hpath
if self.lu.cfg is not None:
env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
env = dict([(str(key), str(val)) for key, val in env.iteritems()])
return self.callfn(node_list, hpath, phase, env)
def RunPhase(self, phase):
"""Run all the scripts for a phase.
This is the main function of the HookMaster.
@param phase: one of L{constants.HOOKS_PHASE_POST} or
L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
@return: the processed results of the hooks multi-node rpc call
@raise errors.HooksFailure: on communication failure to the nodes
"""
if not self.node_list[phase]:
# empty node list, we should not attempt to run this as either
# we're in the cluster init phase and the rpc client part can't
# even attempt to run, or this LU doesn't do hooks at all
return
hpath = self.lu.HPATH
results = self._RunWrapper(self.node_list[phase], hpath, phase)
if phase == constants.HOOKS_PHASE_PRE:
errs = []
if not results:
raise errors.HooksFailure("Communication failure")
for node_name in results:
res = results[node_name]
if res.offline:
continue
msg = res.RemoteFailMsg()
if msg:
self.proc.LogWarning("Communication failure to node %s: %s",
node_name, msg)
continue
for script, hkr, output in res.payload:
if hkr == constants.HKR_FAIL:
errs.append((node_name, script, output))
if errs:
raise errors.HooksAbort(errs)
return results
def RunConfigUpdate(self):
"""Run the special configuration update hook
This is a special hook that runs only on the master after each
top-level LI if the configuration has been updated.
"""
phase = constants.HOOKS_PHASE_POST
hpath = constants.HOOKS_NAME_CFGUPDATE
nodes = [self.lu.cfg.GetMasterNode()]
self._RunWrapper(nodes, hpath, phase)