From 16e0b9c9761dfde78324e9bd2ad76bb26e425dfe Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Fri, 29 Jul 2011 15:49:20 +0200 Subject: [PATCH] ganeti-watcher: Split for node groups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch brings a huge change to ganeti-watcher to make it aware of node groups. Each node group is processed in its own subprocess, reducing the impact of long-running operations. The global watcher state file, $datadir/ganeti/watcher.data, is replaced with a state file per node group ($datadir/ganeti/watcher.${uuid}.data). Previously a lock on the state file was used to ensure only one instance of watcher was running at the same time. Some operations, e.g. βgnt-cluster renew-cryptoβ, blocked the watcher by acquiring an exclusive lock on the state file. Since the watcher processes now use different files, this method is no longer usable. Locking multiple files isn't atomic. Instead a dedicated lock file is used and every watcher process acquires a shared lock on it. If a Ganeti command wants to block the watcher it acquires the lock in exclusive mode. Each per-nodegroup watcher process also acquires an exclusive lock on its state file. This prevents multiple watchers from running for the same nodegroup. The code is reorganized heavily to clear up dependencies between functions and to get rid of the global βclientβ variable. The utility class βWatcherβ is removed in favour of stand-alone utility functions. Since the parent watcher process won't wait for its children by default, a new option (--wait-children) was added. It is used, for example, by QA. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/cli.py | 2 +- lib/watcher/__init__.py | 669 ++++++++++++++++++++++++---------------- qa/qa_daemon.py | 2 +- 3 files changed, 404 insertions(+), 269 deletions(-) diff --git a/lib/cli.py b/lib/cli.py index e156e44cd..52d5780ca 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -2261,7 +2261,7 @@ class _RunWhileClusterStoppedHelper: """ # Pause watcher by acquiring an exclusive lock on watcher state file self.feedback_fn("Blocking watcher") - watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE) + watcher_block = utils.FileLock.Open(constants.WATCHER_LOCK_FILE) try: # TODO: Currently, this just blocks. There's no timeout. # TODO: Should it be a shared lock? diff --git a/lib/watcher/__init__.py b/lib/watcher/__init__.py index 7dee79ec2..b99bf3384 100644 --- a/lib/watcher/__init__.py +++ b/lib/watcher/__init__.py @@ -32,6 +32,7 @@ import os.path import sys import time import logging +import operator from optparse import OptionParser from ganeti import utils @@ -43,6 +44,10 @@ from ganeti import cli from ganeti import luxi from ganeti import rapi from ganeti import netutils +from ganeti import qlang +from ganeti import objects +from ganeti import ssconf +from ganeti import ht import ganeti.rapi.client # pylint: disable-msg=W0611 @@ -51,10 +56,6 @@ from ganeti.watcher import state MAXTRIES = 5 - - -# Global LUXI client object -client = None BAD_STATES = frozenset([ constants.INSTST_ERRORDOWN, ]) @@ -65,6 +66,9 @@ HELPLESS_STATES = frozenset([ NOTICE = "NOTICE" ERROR = "ERROR" +#: Number of seconds to wait between starting child processes for node groups +CHILD_PROCESS_DELAY = 1.0 + class NotMasterError(errors.GenericError): """Exception raised when this host is not the master.""" @@ -129,256 +133,184 @@ class Instance(object): self.autostart = autostart self.snodes = snodes - def Restart(self): + def Restart(self, cl): """Encapsulates the start of an instance. """ op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) - cli.SubmitOpCode(op, cl=client) + cli.SubmitOpCode(op, cl=cl) - def ActivateDisks(self): + def ActivateDisks(self, cl): """Encapsulates the activation of all disks of an instance. """ op = opcodes.OpInstanceActivateDisks(instance_name=self.name) - cli.SubmitOpCode(op, cl=client) + cli.SubmitOpCode(op, cl=cl) -def GetClusterData(): - """Get a list of instances on this cluster. +class Node: + """Data container representing cluster node. """ - op1_fields = ["name", "status", "admin_state", "snodes"] - op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[], - use_locking=True) - op2_fields = ["name", "bootid", "offline"] - op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[], - use_locking=True) - - job_id = client.SubmitJob([op1, op2]) + def __init__(self, name, bootid, offline, secondaries): + """Initializes this class. - all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) + """ + self.name = name + self.bootid = bootid + self.offline = offline + self.secondaries = secondaries - logging.debug("Got data from cluster, writing instance status file") - result = all_results[0] - smap = {} +def _CheckInstances(cl, notepad, instances): + """Make a pass over the list of instances, restarting downed ones. - instances = {} + """ + notepad.MaintainInstanceList(instances.keys()) - _UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE) + started = set() - for fields in result: - (name, status, autostart, snodes) = fields + for inst in instances.values(): + if inst.status in BAD_STATES: + n = notepad.NumberOfRestartAttempts(inst.name) - # update the secondary node map - for node in snodes: - if node not in smap: - smap[node] = [] - smap[node].append(name) + if n > MAXTRIES: + logging.warning("Not restarting instance '%s', retries exhausted", + inst.name) + continue - instances[name] = Instance(name, status, autostart, snodes) + if n == MAXTRIES: + notepad.RecordRestartAttempt(inst.name) + logging.error("Could not restart instance '%s' after %s attempts," + " giving up", inst.name, MAXTRIES) + continue - nodes = dict([(name, (bootid, offline)) - for name, bootid, offline in all_results[1]]) + try: + logging.info("Restarting instance '%s' (attempt #%s)", + inst.name, n + 1) + inst.Restart(cl) + except Exception: # pylint: disable-msg=W0703 + logging.exception("Error while restarting instance '%s'", inst.name) + else: + started.add(inst.name) - client.ArchiveJob(job_id) + notepad.RecordRestartAttempt(inst.name) - return instances, nodes, smap + else: + if notepad.NumberOfRestartAttempts(inst.name): + notepad.RemoveInstance(inst.name) + if inst.status not in HELPLESS_STATES: + logging.info("Restart of instance '%s' succeeded", inst.name) + return started -class Watcher(object): - """Encapsulate the logic for restarting erroneously halted virtual machines. - The calling program should periodically instantiate me and call Run(). - This will traverse the list of instances, and make up to MAXTRIES attempts - to restart machines that are down. +def _CheckDisks(cl, notepad, nodes, instances, started): + """Check all nodes for restarted ones. """ - def __init__(self, opts, notepad): - self.notepad = notepad - master = client.QueryConfigValues(["master_node"])[0] - if master != netutils.Hostname.GetSysName(): - raise NotMasterError("This is not the master node") - # first archive old jobs - self.ArchiveJobs(opts.job_age) - # and only then submit new ones - self.instances, self.bootids, self.smap = GetClusterData() - self.started_instances = set() - self.opts = opts - - def Run(self): - """Watcher run sequence. - - """ - notepad = self.notepad - self.CheckInstances(notepad) - self.CheckDisks(notepad) - self.VerifyDisks() - - @staticmethod - def ArchiveJobs(age): - """Archive old jobs. - - """ - arch_count, left_count = client.AutoArchiveJobs(age) - logging.debug("Archived %s jobs, left %s", arch_count, left_count) - - def CheckDisks(self, notepad): - """Check all nodes for restarted ones. - - """ - check_nodes = [] - for name, (new_id, offline) in self.bootids.iteritems(): - old = notepad.GetNodeBootID(name) - if new_id is None: - # Bad node, not returning a boot id - if not offline: - logging.debug("Node %s missing boot id, skipping secondary checks", - name) - continue - if old != new_id: - # Node's boot ID has changed, proably through a reboot. - check_nodes.append(name) - - if check_nodes: - # Activate disks for all instances with any of the checked nodes as a - # secondary node. - for node in check_nodes: - if node not in self.smap: + check_nodes = [] + + for node in nodes.values(): + old = notepad.GetNodeBootID(node.name) + if not node.bootid: + # Bad node, not returning a boot id + if not node.offline: + logging.debug("Node '%s' missing boot ID, skipping secondary checks", + node.name) + continue + + if old != node.bootid: + # Node's boot ID has changed, probably through a reboot + check_nodes.append(node) + + if check_nodes: + # Activate disks for all instances with any of the checked nodes as a + # secondary node. + for node in check_nodes: + for instance_name in node.secondaries: + try: + inst = instances[instance_name] + except KeyError: + logging.info("Can't find instance '%s', maybe it was ignored", + instance_name) continue - for instance_name in self.smap[node]: - instance = self.instances[instance_name] - if not instance.autostart: - logging.info(("Skipping disk activation for non-autostart" - " instance %s"), instance.name) - continue - if instance.name in self.started_instances: - # we already tried to start the instance, which should have - # activated its drives (if they can be at all) - logging.debug("Skipping disk activation for instance %s, as" - " it was already started", instance.name) - continue - try: - logging.info("Activating disks for instance %s", instance.name) - instance.ActivateDisks() - except Exception: # pylint: disable-msg=W0703 - logging.exception("Error while activating disks for instance %s", - instance.name) - - # Keep changed boot IDs - for name in check_nodes: - notepad.SetNodeBootID(name, self.bootids[name][0]) - - def CheckInstances(self, notepad): - """Make a pass over the list of instances, restarting downed ones. - - """ - notepad.MaintainInstanceList(self.instances.keys()) - - for instance in self.instances.values(): - if instance.status in BAD_STATES: - n = notepad.NumberOfRestartAttempts(instance.name) - if n > MAXTRIES: - logging.warning("Not restarting instance %s, retries exhausted", - instance.name) + if not inst.autostart: + logging.info("Skipping disk activation for non-autostart" + " instance '%s'", inst.name) continue - elif n < MAXTRIES: - last = " (Attempt #%d)" % (n + 1) - else: - notepad.RecordRestartAttempt(instance.name) - logging.error("Could not restart %s after %d attempts, giving up", - instance.name, MAXTRIES) + + if inst.name in started: + # we already tried to start the instance, which should have + # activated its drives (if they can be at all) + logging.debug("Skipping disk activation for instance '%s' as" + " it was already started", inst.name) continue + try: - logging.info("Restarting %s%s", instance.name, last) - instance.Restart() - self.started_instances.add(instance.name) + logging.info("Activating disks for instance '%s'", inst.name) + inst.ActivateDisks(cl) except Exception: # pylint: disable-msg=W0703 - logging.exception("Error while restarting instance %s", - instance.name) + logging.exception("Error while activating disks for instance '%s'", + inst.name) - notepad.RecordRestartAttempt(instance.name) - elif instance.status in HELPLESS_STATES: - if notepad.NumberOfRestartAttempts(instance.name): - notepad.RemoveInstance(instance.name) - else: - if notepad.NumberOfRestartAttempts(instance.name): - notepad.RemoveInstance(instance.name) - logging.info("Restart of %s succeeded", instance.name) + # Keep changed boot IDs + for node in check_nodes: + notepad.SetNodeBootID(node.name, node.bootid) - def _CheckForOfflineNodes(self, instance): - """Checks if given instances has any secondary in offline status. - @param instance: The instance object - @return: True if any of the secondary is offline, False otherwise - - """ - bootids = [] - for node in instance.snodes: - bootids.append(self.bootids[node]) - - return compat.any(offline for (_, offline) in bootids) - - def VerifyDisks(self): - """Run gnt-cluster verify-disks. - - """ - job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()]) - result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0] - client.ArchiveJob(job_id) +def _CheckForOfflineNodes(nodes, instance): + """Checks if given instances has any secondary in offline status. - # Keep track of submitted jobs - jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug) + @param instance: The instance object + @return: True if any of the secondary is offline, False otherwise - archive_jobs = set() - for (status, job_id) in result[constants.JOB_IDS_KEY]: - jex.AddJobId(None, status, job_id) - if status: - archive_jobs.add(job_id) + """ + return compat.any(nodes[node_name].offline for node_name in instance.snodes) - offline_disk_instances = set() - for (status, result) in jex.GetResults(): - if not status: - logging.error("Verify-disks job failed: %s", result) - continue +def _VerifyDisks(cl, uuid, nodes, instances): + """Run a per-group "gnt-cluster verify-disks". - ((_, instances, _), ) = result + """ + job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)]) + ((_, offline_disk_instances, _), ) = \ + cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) + cl.ArchiveJob(job_id) - offline_disk_instances.update(instances) + if not offline_disk_instances: + # nothing to do + logging.debug("Verify-disks reported no offline disks, nothing to do") + return - for job_id in archive_jobs: - client.ArchiveJob(job_id) + logging.debug("Will activate disks for instance(s) %s", + utils.CommaJoin(offline_disk_instances)) - if not offline_disk_instances: - # nothing to do - logging.debug("verify-disks reported no offline disks, nothing to do") - return + # We submit only one job, and wait for it. Not optimal, but this puts less + # load on the job queue. + job = [] + for name in offline_disk_instances: + try: + inst = instances[name] + except KeyError: + logging.info("Can't find instance '%s', maybe it was ignored", name) + continue - logging.debug("Will activate disks for instance(s) %s", - utils.CommaJoin(offline_disk_instances)) + if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): + logging.info("Skipping instance '%s' because it is in a helpless state or" + " has offline secondaries", name) + continue - # we submit only one job, and wait for it. not optimal, but spams - # less the job queue - job = [] - for name in offline_disk_instances: - instance = self.instances[name] - if (instance.status in HELPLESS_STATES or - self._CheckForOfflineNodes(instance)): - logging.info("Skip instance %s because it is in helpless state or has" - " one offline secondary", name) - continue - job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) + job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) - if job: - job_id = cli.SendJob(job, cl=client) + if job: + job_id = cli.SendJob(job, cl=cl) - try: - cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) - except Exception: # pylint: disable-msg=W0703 - logging.exception("Error while activating disks") + try: + cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) + except Exception: # pylint: disable-msg=W0703 + logging.exception("Error while activating disks") def IsRapiResponding(hostname): @@ -421,11 +353,14 @@ def ParseOptions(): constants.RELEASE_VERSION) parser.add_option(cli.DEBUG_OPT) + parser.add_option(cli.NODEGROUP_OPT) parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, help="Autoarchive jobs older than this age (default" " 6 hours)") parser.add_option("--ignore-pause", dest="ignore_pause", default=False, action="store_true", help="Ignore cluster pause setting") + parser.add_option("--wait-children", dest="wait_children", default=False, + action="store_true", help="Wait for child processes") options, args = parser.parse_args() options.job_age = cli.ParseTimespec(options.job_age) @@ -454,13 +389,258 @@ def _UpdateInstanceStatus(cl, filename): for (name, status) in result)) +def GetLuxiClient(try_restart): + """Tries to connect to the master daemon. + + @type try_restart: bool + @param try_restart: Whether to attempt to restart the master daemon + + """ + try: + return cli.GetClient() + except errors.OpPrereqError, err: + # this is, from cli.GetClient, a not-master case + raise NotMasterError("Not on master node (%s)" % err) + + except luxi.NoMasterError, err: + if not try_restart: + raise + + logging.warning("Master daemon seems to be down (%s), trying to restart", + err) + + if not utils.EnsureDaemon(constants.MASTERD): + raise errors.GenericError("Can't start the master daemon") + + # Retry the connection + return cli.GetClient() + + +def _StartGroupChildren(cl, wait): + """Starts a new instance of the watcher for every node group. + + """ + assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) + for arg in sys.argv) + + result = cl.QueryGroups([], ["name", "uuid"], False) + + children = [] + + for (idx, (name, uuid)) in enumerate(result): + args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] + + if idx > 0: + # Let's not kill the system + time.sleep(CHILD_PROCESS_DELAY) + + logging.debug("Spawning child for group '%s' (%s), arguments %s", + name, uuid, args) + + try: + # TODO: Should utils.StartDaemon be used instead? + pid = os.spawnv(os.P_NOWAIT, args[0], args) + except Exception: # pylint: disable-msg=W0703 + logging.exception("Failed to start child for group '%s' (%s)", + name, uuid) + else: + logging.debug("Started with PID %s", pid) + children.append(pid) + + if wait: + for pid in children: + logging.debug("Waiting for child PID %s", pid) + try: + result = utils.RetryOnSignal(os.waitpid, pid, 0) + except EnvironmentError, err: + result = str(err) + + logging.debug("Child PID %s exited with status %s", pid, result) + + +def _ArchiveJobs(cl, age): + """Archives old jobs. + + """ + (arch_count, left_count) = cl.AutoArchiveJobs(age) + logging.debug("Archived %s jobs, left %s", arch_count, left_count) + + +def _CheckMaster(cl): + """Ensures current host is master node. + + """ + (master, ) = cl.QueryConfigValues(["master_node"]) + if master != netutils.Hostname.GetSysName(): + raise NotMasterError("This is not the master node") + + @rapi.client.UsesRapiClient +def _GlobalWatcher(opts): + """Main function for global watcher. + + At the end child processes are spawned for every node group. + + """ + StartNodeDaemons() + RunWatcherHooks() + + # Run node maintenance in all cases, even if master, so that old masters can + # be properly cleaned up + if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602 + nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602 + + try: + client = GetLuxiClient(True) + except NotMasterError: + # Don't proceed on non-master nodes + return constants.EXIT_SUCCESS + + # we are on master now + utils.EnsureDaemon(constants.RAPI) + + # If RAPI isn't responding to queries, try one restart + logging.debug("Attempting to talk to remote API on %s", + constants.IP4_ADDRESS_LOCALHOST) + if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): + logging.warning("Couldn't get answer from remote API, restaring daemon") + utils.StopDaemon(constants.RAPI) + utils.EnsureDaemon(constants.RAPI) + logging.debug("Second attempt to talk to remote API") + if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): + logging.fatal("RAPI is not responding") + logging.debug("Successfully talked to remote API") + + _CheckMaster(client) + _ArchiveJobs(client, opts.job_age) + _UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE) + + # Spawn child processes for all node groups + _StartGroupChildren(client, opts.wait_children) + + return constants.EXIT_SUCCESS + + +def _GetGroupData(cl, uuid): + """Retrieves instances and nodes per node group. + + """ + # TODO: Implement locking + job = [ + # Get all primary instances in group + opcodes.OpQuery(what=constants.QR_INSTANCE, + fields=["name", "status", "admin_state", "snodes", + "pnode.group.uuid", "snodes.group.uuid"], + filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]), + + # Get all nodes in group + opcodes.OpQuery(what=constants.QR_NODE, + fields=["name", "bootid", "offline"], + filter=[qlang.OP_EQUAL, "group.uuid", uuid]), + ] + + job_id = cl.SubmitJob(job) + results = map(objects.QueryResponse.FromDict, + cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) + cl.ArchiveJob(job_id) + + results_data = map(operator.attrgetter("data"), results) + + # Ensure results are tuples with two values + assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) + + # Extract values ignoring result status + (raw_instances, raw_nodes) = [[map(compat.snd, values) + for values in res] + for res in results_data] + + secondaries = {} + instances = [] + + # Load all instances + for (name, status, autostart, snodes, pnode_group_uuid, + snodes_group_uuid) in raw_instances: + if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): + logging.error("Ignoring split instance '%s', primary group %s, secondary" + " groups %s", name, pnode_group_uuid, + utils.CommaJoin(snodes_group_uuid)) + else: + instances.append(Instance(name, status, autostart, snodes)) + + for node in snodes: + secondaries.setdefault(node, set()).add(name) + + # Load all nodes + nodes = [Node(name, bootid, offline, secondaries.get(name, set())) + for (name, bootid, offline) in raw_nodes] + + return (dict((node.name, node) for node in nodes), + dict((inst.name, inst) for inst in instances)) + + +def _KnownGroup(uuid): + """Checks if a group UUID is known by ssconf. + + """ + groups = ssconf.SimpleStore().GetNodegroupList() + + return compat.any(line.strip() and line.split()[0] == uuid + for line in groups) + + +def _GroupWatcher(opts): + """Main function for per-group watcher process. + + """ + group_uuid = opts.nodegroup.lower() + + if not utils.UUID_RE.match(group_uuid): + raise errors.GenericError("Node group parameter (%s) must be given a UUID," + " got '%s'" % + (cli.NODEGROUP_OPT_NAME, group_uuid)) + + logging.info("Watcher for node group '%s'", group_uuid) + + # Check if node group is known + if not _KnownGroup(group_uuid): + raise errors.GenericError("Node group '%s' is not known by ssconf" % + group_uuid) + + state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid + + logging.debug("Using state file %s", state_path) + + # Global watcher + statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602 + if not statefile: + return constants.EXIT_FAILURE + + notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602 + try: + # Connect to master daemon + client = GetLuxiClient(False) + + _CheckMaster(client) + + (nodes, instances) = _GetGroupData(client, group_uuid) + + started = _CheckInstances(client, notepad, instances) + _CheckDisks(client, notepad, nodes, instances, started) + _VerifyDisks(client, group_uuid, nodes, instances) + except Exception, err: + logging.info("Not updating status file due to failure: %s", err) + raise + else: + # Save changes for next run + notepad.Save(state_path) + + return constants.EXIT_SUCCESS + + def Main(): """Main function. """ - global client # pylint: disable-msg=W0603 - (options, _) = ParseOptions() utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0], @@ -470,69 +650,24 @@ def Main(): logging.debug("Pause has been set, exiting") return constants.EXIT_SUCCESS - statefile = \ - state.OpenStateFile(constants.WATCHER_STATEFILE) - if not statefile: - return constants.EXIT_FAILURE - - update_file = False + # Try to acquire global watcher lock in shared mode + lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE) try: - StartNodeDaemons() - RunWatcherHooks() - # run node maintenance in all cases, even if master, so that old - # masters can be properly cleaned up too - if nodemaint.NodeMaintenance.ShouldRun(): - nodemaint.NodeMaintenance().Exec() - - notepad = state.WatcherState(statefile) - try: - try: - client = cli.GetClient() - except errors.OpPrereqError: - # this is, from cli.GetClient, a not-master case - logging.debug("Not on master, exiting") - update_file = True - return constants.EXIT_SUCCESS - except luxi.NoMasterError, err: - logging.warning("Master seems to be down (%s), trying to restart", - str(err)) - if not utils.EnsureDaemon(constants.MASTERD): - logging.critical("Can't start the master, exiting") - return constants.EXIT_FAILURE - # else retry the connection - client = cli.GetClient() - - # we are on master now - utils.EnsureDaemon(constants.RAPI) - - # If RAPI isn't responding to queries, try one restart. - logging.debug("Attempting to talk with RAPI.") - if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): - logging.warning("Couldn't get answer from Ganeti RAPI daemon." - " Restarting Ganeti RAPI.") - utils.StopDaemon(constants.RAPI) - utils.EnsureDaemon(constants.RAPI) - logging.debug("Second attempt to talk with RAPI") - if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): - logging.fatal("RAPI is not responding. Please investigate.") - logging.debug("Successfully talked to RAPI.") + lock.Shared(blocking=False) + except (EnvironmentError, errors.LockError), err: + logging.error("Can't acquire lock on %s: %s", + constants.WATCHER_LOCK_FILE, err) + return constants.EXIT_SUCCESS - try: - watcher = Watcher(options, notepad) - except errors.ConfigurationError: - # Just exit if there's no configuration - update_file = True - return constants.EXIT_SUCCESS - - watcher.Run() - update_file = True - - finally: - if update_file: - notepad.Save(constants.WATCHER_STATEFILE) - else: - logging.debug("Not updating status file due to failure") - except SystemExit: + if options.nodegroup is None: + fn = _GlobalWatcher + else: + # Per-nodegroup watcher + fn = _GroupWatcher + + try: + return fn(options) + except (SystemExit, KeyboardInterrupt): raise except NotMasterError: logging.debug("Not master, exiting") diff --git a/qa/qa_daemon.py b/qa/qa_daemon.py index 85d0a0fe2..9a043208f 100644 --- a/qa/qa_daemon.py +++ b/qa/qa_daemon.py @@ -76,7 +76,7 @@ def _RunWatcherDaemon(): """Runs the ganeti-watcher daemon on the master node. """ - AssertCommand(["ganeti-watcher", "-d", "--ignore-pause"]) + AssertCommand(["ganeti-watcher", "-d", "--ignore-pause", "--wait-children"]) def TestPauseWatcher(): -- GitLab