Skip to content
Snippets Groups Projects
Commit 16e0b9c9 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

ganeti-watcher: Split for node groups


This patch brings a huge change to ganeti-watcher to make it aware of
node groups. Each node group is processed in its own subprocess,
reducing the impact of long-running operations.

The global watcher state file, $datadir/ganeti/watcher.data, is replaced
with a state file per node group ($datadir/ganeti/watcher.${uuid}.data).

Previously a lock on the state file was used to ensure only one instance
of watcher was running at the same time. Some operations, e.g.
“gnt-cluster renew-crypto”, blocked the watcher by acquiring an
exclusive lock on the state file. Since the watcher processes now use
different files, this method is no longer usable. Locking multiple files
isn't atomic. Instead a dedicated lock file is used and every watcher
process acquires a shared lock on it. If a Ganeti command wants to block
the watcher it acquires the lock in exclusive mode.

Each per-nodegroup watcher process also acquires an exclusive lock on
its state file. This prevents multiple watchers from running for the
same nodegroup.

The code is reorganized heavily to clear up dependencies between
functions and to get rid of the global “client” variable. The utility
class “Watcher” is removed in favour of stand-alone utility functions.

Since the parent watcher process won't wait for its children by
default, a new option (--wait-children) was added. It is used, for
example, by QA.

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent de9c12f7
No related branches found
No related tags found
No related merge requests found
......@@ -2261,7 +2261,7 @@ class _RunWhileClusterStoppedHelper:
"""
# Pause watcher by acquiring an exclusive lock on watcher state file
self.feedback_fn("Blocking watcher")
watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
watcher_block = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
try:
# TODO: Currently, this just blocks. There's no timeout.
# TODO: Should it be a shared lock?
......
......@@ -32,6 +32,7 @@ import os.path
import sys
import time
import logging
import operator
from optparse import OptionParser
from ganeti import utils
......@@ -43,6 +44,10 @@ from ganeti import cli
from ganeti import luxi
from ganeti import rapi
from ganeti import netutils
from ganeti import qlang
from ganeti import objects
from ganeti import ssconf
from ganeti import ht
import ganeti.rapi.client # pylint: disable-msg=W0611
......@@ -51,10 +56,6 @@ from ganeti.watcher import state
MAXTRIES = 5
# Global LUXI client object
client = None
BAD_STATES = frozenset([
constants.INSTST_ERRORDOWN,
])
......@@ -65,6 +66,9 @@ HELPLESS_STATES = frozenset([
NOTICE = "NOTICE"
ERROR = "ERROR"
#: Number of seconds to wait between starting child processes for node groups
CHILD_PROCESS_DELAY = 1.0
class NotMasterError(errors.GenericError):
"""Exception raised when this host is not the master."""
......@@ -129,256 +133,184 @@ class Instance(object):
self.autostart = autostart
self.snodes = snodes
def Restart(self):
def Restart(self, cl):
"""Encapsulates the start of an instance.
"""
op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
cli.SubmitOpCode(op, cl=client)
cli.SubmitOpCode(op, cl=cl)
def ActivateDisks(self):
def ActivateDisks(self, cl):
"""Encapsulates the activation of all disks of an instance.
"""
op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
cli.SubmitOpCode(op, cl=client)
cli.SubmitOpCode(op, cl=cl)
def GetClusterData():
"""Get a list of instances on this cluster.
class Node:
"""Data container representing cluster node.
"""
op1_fields = ["name", "status", "admin_state", "snodes"]
op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
use_locking=True)
op2_fields = ["name", "bootid", "offline"]
op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
use_locking=True)
job_id = client.SubmitJob([op1, op2])
def __init__(self, name, bootid, offline, secondaries):
"""Initializes this class.
all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
"""
self.name = name
self.bootid = bootid
self.offline = offline
self.secondaries = secondaries
logging.debug("Got data from cluster, writing instance status file")
result = all_results[0]
smap = {}
def _CheckInstances(cl, notepad, instances):
"""Make a pass over the list of instances, restarting downed ones.
instances = {}
"""
notepad.MaintainInstanceList(instances.keys())
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
started = set()
for fields in result:
(name, status, autostart, snodes) = fields
for inst in instances.values():
if inst.status in BAD_STATES:
n = notepad.NumberOfRestartAttempts(inst.name)
# update the secondary node map
for node in snodes:
if node not in smap:
smap[node] = []
smap[node].append(name)
if n > MAXTRIES:
logging.warning("Not restarting instance '%s', retries exhausted",
inst.name)
continue
instances[name] = Instance(name, status, autostart, snodes)
if n == MAXTRIES:
notepad.RecordRestartAttempt(inst.name)
logging.error("Could not restart instance '%s' after %s attempts,"
" giving up", inst.name, MAXTRIES)
continue
nodes = dict([(name, (bootid, offline))
for name, bootid, offline in all_results[1]])
try:
logging.info("Restarting instance '%s' (attempt #%s)",
inst.name, n + 1)
inst.Restart(cl)
except Exception: # pylint: disable-msg=W0703
logging.exception("Error while restarting instance '%s'", inst.name)
else:
started.add(inst.name)
client.ArchiveJob(job_id)
notepad.RecordRestartAttempt(inst.name)
return instances, nodes, smap
else:
if notepad.NumberOfRestartAttempts(inst.name):
notepad.RemoveInstance(inst.name)
if inst.status not in HELPLESS_STATES:
logging.info("Restart of instance '%s' succeeded", inst.name)
return started
class Watcher(object):
"""Encapsulate the logic for restarting erroneously halted virtual machines.
The calling program should periodically instantiate me and call Run().
This will traverse the list of instances, and make up to MAXTRIES attempts
to restart machines that are down.
def _CheckDisks(cl, notepad, nodes, instances, started):
"""Check all nodes for restarted ones.
"""
def __init__(self, opts, notepad):
self.notepad = notepad
master = client.QueryConfigValues(["master_node"])[0]
if master != netutils.Hostname.GetSysName():
raise NotMasterError("This is not the master node")
# first archive old jobs
self.ArchiveJobs(opts.job_age)
# and only then submit new ones
self.instances, self.bootids, self.smap = GetClusterData()
self.started_instances = set()
self.opts = opts
def Run(self):
"""Watcher run sequence.
"""
notepad = self.notepad
self.CheckInstances(notepad)
self.CheckDisks(notepad)
self.VerifyDisks()
@staticmethod
def ArchiveJobs(age):
"""Archive old jobs.
"""
arch_count, left_count = client.AutoArchiveJobs(age)
logging.debug("Archived %s jobs, left %s", arch_count, left_count)
def CheckDisks(self, notepad):
"""Check all nodes for restarted ones.
"""
check_nodes = []
for name, (new_id, offline) in self.bootids.iteritems():
old = notepad.GetNodeBootID(name)
if new_id is None:
# Bad node, not returning a boot id
if not offline:
logging.debug("Node %s missing boot id, skipping secondary checks",
name)
continue
if old != new_id:
# Node's boot ID has changed, proably through a reboot.
check_nodes.append(name)
if check_nodes:
# Activate disks for all instances with any of the checked nodes as a
# secondary node.
for node in check_nodes:
if node not in self.smap:
check_nodes = []
for node in nodes.values():
old = notepad.GetNodeBootID(node.name)
if not node.bootid:
# Bad node, not returning a boot id
if not node.offline:
logging.debug("Node '%s' missing boot ID, skipping secondary checks",
node.name)
continue
if old != node.bootid:
# Node's boot ID has changed, probably through a reboot
check_nodes.append(node)
if check_nodes:
# Activate disks for all instances with any of the checked nodes as a
# secondary node.
for node in check_nodes:
for instance_name in node.secondaries:
try:
inst = instances[instance_name]
except KeyError:
logging.info("Can't find instance '%s', maybe it was ignored",
instance_name)
continue
for instance_name in self.smap[node]:
instance = self.instances[instance_name]
if not instance.autostart:
logging.info(("Skipping disk activation for non-autostart"
" instance %s"), instance.name)
continue
if instance.name in self.started_instances:
# we already tried to start the instance, which should have
# activated its drives (if they can be at all)
logging.debug("Skipping disk activation for instance %s, as"
" it was already started", instance.name)
continue
try:
logging.info("Activating disks for instance %s", instance.name)
instance.ActivateDisks()
except Exception: # pylint: disable-msg=W0703
logging.exception("Error while activating disks for instance %s",
instance.name)
# Keep changed boot IDs
for name in check_nodes:
notepad.SetNodeBootID(name, self.bootids[name][0])
def CheckInstances(self, notepad):
"""Make a pass over the list of instances, restarting downed ones.
"""
notepad.MaintainInstanceList(self.instances.keys())
for instance in self.instances.values():
if instance.status in BAD_STATES:
n = notepad.NumberOfRestartAttempts(instance.name)
if n > MAXTRIES:
logging.warning("Not restarting instance %s, retries exhausted",
instance.name)
if not inst.autostart:
logging.info("Skipping disk activation for non-autostart"
" instance '%s'", inst.name)
continue
elif n < MAXTRIES:
last = " (Attempt #%d)" % (n + 1)
else:
notepad.RecordRestartAttempt(instance.name)
logging.error("Could not restart %s after %d attempts, giving up",
instance.name, MAXTRIES)
if inst.name in started:
# we already tried to start the instance, which should have
# activated its drives (if they can be at all)
logging.debug("Skipping disk activation for instance '%s' as"
" it was already started", inst.name)
continue
try:
logging.info("Restarting %s%s", instance.name, last)
instance.Restart()
self.started_instances.add(instance.name)
logging.info("Activating disks for instance '%s'", inst.name)
inst.ActivateDisks(cl)
except Exception: # pylint: disable-msg=W0703
logging.exception("Error while restarting instance %s",
instance.name)
logging.exception("Error while activating disks for instance '%s'",
inst.name)
notepad.RecordRestartAttempt(instance.name)
elif instance.status in HELPLESS_STATES:
if notepad.NumberOfRestartAttempts(instance.name):
notepad.RemoveInstance(instance.name)
else:
if notepad.NumberOfRestartAttempts(instance.name):
notepad.RemoveInstance(instance.name)
logging.info("Restart of %s succeeded", instance.name)
# Keep changed boot IDs
for node in check_nodes:
notepad.SetNodeBootID(node.name, node.bootid)
def _CheckForOfflineNodes(self, instance):
"""Checks if given instances has any secondary in offline status.
@param instance: The instance object
@return: True if any of the secondary is offline, False otherwise
"""
bootids = []
for node in instance.snodes:
bootids.append(self.bootids[node])
return compat.any(offline for (_, offline) in bootids)
def VerifyDisks(self):
"""Run gnt-cluster verify-disks.
"""
job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
client.ArchiveJob(job_id)
def _CheckForOfflineNodes(nodes, instance):
"""Checks if given instances has any secondary in offline status.
# Keep track of submitted jobs
jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
@param instance: The instance object
@return: True if any of the secondary is offline, False otherwise
archive_jobs = set()
for (status, job_id) in result[constants.JOB_IDS_KEY]:
jex.AddJobId(None, status, job_id)
if status:
archive_jobs.add(job_id)
"""
return compat.any(nodes[node_name].offline for node_name in instance.snodes)
offline_disk_instances = set()
for (status, result) in jex.GetResults():
if not status:
logging.error("Verify-disks job failed: %s", result)
continue
def _VerifyDisks(cl, uuid, nodes, instances):
"""Run a per-group "gnt-cluster verify-disks".
((_, instances, _), ) = result
"""
job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
((_, offline_disk_instances, _), ) = \
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
cl.ArchiveJob(job_id)
offline_disk_instances.update(instances)
if not offline_disk_instances:
# nothing to do
logging.debug("Verify-disks reported no offline disks, nothing to do")
return
for job_id in archive_jobs:
client.ArchiveJob(job_id)
logging.debug("Will activate disks for instance(s) %s",
utils.CommaJoin(offline_disk_instances))
if not offline_disk_instances:
# nothing to do
logging.debug("verify-disks reported no offline disks, nothing to do")
return
# We submit only one job, and wait for it. Not optimal, but this puts less
# load on the job queue.
job = []
for name in offline_disk_instances:
try:
inst = instances[name]
except KeyError:
logging.info("Can't find instance '%s', maybe it was ignored", name)
continue
logging.debug("Will activate disks for instance(s) %s",
utils.CommaJoin(offline_disk_instances))
if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
logging.info("Skipping instance '%s' because it is in a helpless state or"
" has offline secondaries", name)
continue
# we submit only one job, and wait for it. not optimal, but spams
# less the job queue
job = []
for name in offline_disk_instances:
instance = self.instances[name]
if (instance.status in HELPLESS_STATES or
self._CheckForOfflineNodes(instance)):
logging.info("Skip instance %s because it is in helpless state or has"
" one offline secondary", name)
continue
job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
if job:
job_id = cli.SendJob(job, cl=client)
if job:
job_id = cli.SendJob(job, cl=cl)
try:
cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
except Exception: # pylint: disable-msg=W0703
logging.exception("Error while activating disks")
try:
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
except Exception: # pylint: disable-msg=W0703
logging.exception("Error while activating disks")
def IsRapiResponding(hostname):
......@@ -421,11 +353,14 @@ def ParseOptions():
constants.RELEASE_VERSION)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.NODEGROUP_OPT)
parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
help="Autoarchive jobs older than this age (default"
" 6 hours)")
parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
action="store_true", help="Ignore cluster pause setting")
parser.add_option("--wait-children", dest="wait_children", default=False,
action="store_true", help="Wait for child processes")
options, args = parser.parse_args()
options.job_age = cli.ParseTimespec(options.job_age)
......@@ -454,13 +389,258 @@ def _UpdateInstanceStatus(cl, filename):
for (name, status) in result))
def GetLuxiClient(try_restart):
"""Tries to connect to the master daemon.
@type try_restart: bool
@param try_restart: Whether to attempt to restart the master daemon
"""
try:
return cli.GetClient()
except errors.OpPrereqError, err:
# this is, from cli.GetClient, a not-master case
raise NotMasterError("Not on master node (%s)" % err)
except luxi.NoMasterError, err:
if not try_restart:
raise
logging.warning("Master daemon seems to be down (%s), trying to restart",
err)
if not utils.EnsureDaemon(constants.MASTERD):
raise errors.GenericError("Can't start the master daemon")
# Retry the connection
return cli.GetClient()
def _StartGroupChildren(cl, wait):
"""Starts a new instance of the watcher for every node group.
"""
assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
for arg in sys.argv)
result = cl.QueryGroups([], ["name", "uuid"], False)
children = []
for (idx, (name, uuid)) in enumerate(result):
args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
if idx > 0:
# Let's not kill the system
time.sleep(CHILD_PROCESS_DELAY)
logging.debug("Spawning child for group '%s' (%s), arguments %s",
name, uuid, args)
try:
# TODO: Should utils.StartDaemon be used instead?
pid = os.spawnv(os.P_NOWAIT, args[0], args)
except Exception: # pylint: disable-msg=W0703
logging.exception("Failed to start child for group '%s' (%s)",
name, uuid)
else:
logging.debug("Started with PID %s", pid)
children.append(pid)
if wait:
for pid in children:
logging.debug("Waiting for child PID %s", pid)
try:
result = utils.RetryOnSignal(os.waitpid, pid, 0)
except EnvironmentError, err:
result = str(err)
logging.debug("Child PID %s exited with status %s", pid, result)
def _ArchiveJobs(cl, age):
"""Archives old jobs.
"""
(arch_count, left_count) = cl.AutoArchiveJobs(age)
logging.debug("Archived %s jobs, left %s", arch_count, left_count)
def _CheckMaster(cl):
"""Ensures current host is master node.
"""
(master, ) = cl.QueryConfigValues(["master_node"])
if master != netutils.Hostname.GetSysName():
raise NotMasterError("This is not the master node")
@rapi.client.UsesRapiClient
def _GlobalWatcher(opts):
"""Main function for global watcher.
At the end child processes are spawned for every node group.
"""
StartNodeDaemons()
RunWatcherHooks()
# Run node maintenance in all cases, even if master, so that old masters can
# be properly cleaned up
if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
try:
client = GetLuxiClient(True)
except NotMasterError:
# Don't proceed on non-master nodes
return constants.EXIT_SUCCESS
# we are on master now
utils.EnsureDaemon(constants.RAPI)
# If RAPI isn't responding to queries, try one restart
logging.debug("Attempting to talk to remote API on %s",
constants.IP4_ADDRESS_LOCALHOST)
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
logging.warning("Couldn't get answer from remote API, restaring daemon")
utils.StopDaemon(constants.RAPI)
utils.EnsureDaemon(constants.RAPI)
logging.debug("Second attempt to talk to remote API")
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
logging.fatal("RAPI is not responding")
logging.debug("Successfully talked to remote API")
_CheckMaster(client)
_ArchiveJobs(client, opts.job_age)
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
# Spawn child processes for all node groups
_StartGroupChildren(client, opts.wait_children)
return constants.EXIT_SUCCESS
def _GetGroupData(cl, uuid):
"""Retrieves instances and nodes per node group.
"""
# TODO: Implement locking
job = [
# Get all primary instances in group
opcodes.OpQuery(what=constants.QR_INSTANCE,
fields=["name", "status", "admin_state", "snodes",
"pnode.group.uuid", "snodes.group.uuid"],
filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
# Get all nodes in group
opcodes.OpQuery(what=constants.QR_NODE,
fields=["name", "bootid", "offline"],
filter=[qlang.OP_EQUAL, "group.uuid", uuid]),
]
job_id = cl.SubmitJob(job)
results = map(objects.QueryResponse.FromDict,
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
cl.ArchiveJob(job_id)
results_data = map(operator.attrgetter("data"), results)
# Ensure results are tuples with two values
assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
# Extract values ignoring result status
(raw_instances, raw_nodes) = [[map(compat.snd, values)
for values in res]
for res in results_data]
secondaries = {}
instances = []
# Load all instances
for (name, status, autostart, snodes, pnode_group_uuid,
snodes_group_uuid) in raw_instances:
if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
logging.error("Ignoring split instance '%s', primary group %s, secondary"
" groups %s", name, pnode_group_uuid,
utils.CommaJoin(snodes_group_uuid))
else:
instances.append(Instance(name, status, autostart, snodes))
for node in snodes:
secondaries.setdefault(node, set()).add(name)
# Load all nodes
nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
for (name, bootid, offline) in raw_nodes]
return (dict((node.name, node) for node in nodes),
dict((inst.name, inst) for inst in instances))
def _KnownGroup(uuid):
"""Checks if a group UUID is known by ssconf.
"""
groups = ssconf.SimpleStore().GetNodegroupList()
return compat.any(line.strip() and line.split()[0] == uuid
for line in groups)
def _GroupWatcher(opts):
"""Main function for per-group watcher process.
"""
group_uuid = opts.nodegroup.lower()
if not utils.UUID_RE.match(group_uuid):
raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
" got '%s'" %
(cli.NODEGROUP_OPT_NAME, group_uuid))
logging.info("Watcher for node group '%s'", group_uuid)
# Check if node group is known
if not _KnownGroup(group_uuid):
raise errors.GenericError("Node group '%s' is not known by ssconf" %
group_uuid)
state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
logging.debug("Using state file %s", state_path)
# Global watcher
statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
if not statefile:
return constants.EXIT_FAILURE
notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
try:
# Connect to master daemon
client = GetLuxiClient(False)
_CheckMaster(client)
(nodes, instances) = _GetGroupData(client, group_uuid)
started = _CheckInstances(client, notepad, instances)
_CheckDisks(client, notepad, nodes, instances, started)
_VerifyDisks(client, group_uuid, nodes, instances)
except Exception, err:
logging.info("Not updating status file due to failure: %s", err)
raise
else:
# Save changes for next run
notepad.Save(state_path)
return constants.EXIT_SUCCESS
def Main():
"""Main function.
"""
global client # pylint: disable-msg=W0603
(options, _) = ParseOptions()
utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
......@@ -470,69 +650,24 @@ def Main():
logging.debug("Pause has been set, exiting")
return constants.EXIT_SUCCESS
statefile = \
state.OpenStateFile(constants.WATCHER_STATEFILE)
if not statefile:
return constants.EXIT_FAILURE
update_file = False
# Try to acquire global watcher lock in shared mode
lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
try:
StartNodeDaemons()
RunWatcherHooks()
# run node maintenance in all cases, even if master, so that old
# masters can be properly cleaned up too
if nodemaint.NodeMaintenance.ShouldRun():
nodemaint.NodeMaintenance().Exec()
notepad = state.WatcherState(statefile)
try:
try:
client = cli.GetClient()
except errors.OpPrereqError:
# this is, from cli.GetClient, a not-master case
logging.debug("Not on master, exiting")
update_file = True
return constants.EXIT_SUCCESS
except luxi.NoMasterError, err:
logging.warning("Master seems to be down (%s), trying to restart",
str(err))
if not utils.EnsureDaemon(constants.MASTERD):
logging.critical("Can't start the master, exiting")
return constants.EXIT_FAILURE
# else retry the connection
client = cli.GetClient()
# we are on master now
utils.EnsureDaemon(constants.RAPI)
# If RAPI isn't responding to queries, try one restart.
logging.debug("Attempting to talk with RAPI.")
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
logging.warning("Couldn't get answer from Ganeti RAPI daemon."
" Restarting Ganeti RAPI.")
utils.StopDaemon(constants.RAPI)
utils.EnsureDaemon(constants.RAPI)
logging.debug("Second attempt to talk with RAPI")
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
logging.fatal("RAPI is not responding. Please investigate.")
logging.debug("Successfully talked to RAPI.")
lock.Shared(blocking=False)
except (EnvironmentError, errors.LockError), err:
logging.error("Can't acquire lock on %s: %s",
constants.WATCHER_LOCK_FILE, err)
return constants.EXIT_SUCCESS
try:
watcher = Watcher(options, notepad)
except errors.ConfigurationError:
# Just exit if there's no configuration
update_file = True
return constants.EXIT_SUCCESS
watcher.Run()
update_file = True
finally:
if update_file:
notepad.Save(constants.WATCHER_STATEFILE)
else:
logging.debug("Not updating status file due to failure")
except SystemExit:
if options.nodegroup is None:
fn = _GlobalWatcher
else:
# Per-nodegroup watcher
fn = _GroupWatcher
try:
return fn(options)
except (SystemExit, KeyboardInterrupt):
raise
except NotMasterError:
logging.debug("Not master, exiting")
......
......@@ -76,7 +76,7 @@ def _RunWatcherDaemon():
"""Runs the ganeti-watcher daemon on the master node.
"""
AssertCommand(["ganeti-watcher", "-d", "--ignore-pause"])
AssertCommand(["ganeti-watcher", "-d", "--ignore-pause", "--wait-children"])
def TestPauseWatcher():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment