__init__.py 24.2 KB
Newer Older
1
#
Iustin Pop's avatar
Iustin Pop committed
2
3
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


Iustin Pop's avatar
Iustin Pop committed
22
"""Tool to restart erroneously downed virtual machines.
Iustin Pop's avatar
Iustin Pop committed
23
24
25
26
27

This program and set of classes implement a watchdog to restart
virtual machines in a Ganeti cluster that have crashed or been killed
by a node reboot.  Run from cron or similar.

28
"""
Iustin Pop's avatar
Iustin Pop committed
29
30

import os
31
import os.path
Iustin Pop's avatar
Iustin Pop committed
32
33
import sys
import time
34
import logging
35
import operator
36
import errno
Iustin Pop's avatar
Iustin Pop committed
37
38
39
40
from optparse import OptionParser

from ganeti import utils
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
44
from ganeti import opcodes
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
49
50
51
from ganeti import qlang
from ganeti import objects
from ganeti import ssconf
from ganeti import ht
Iustin Pop's avatar
Iustin Pop committed
52

53
import ganeti.rapi.client # pylint: disable=W0611
54
from ganeti.rapi.client import UsesRapiClient
55
56
57

from ganeti.watcher import nodemaint
from ganeti.watcher import state
58

Iustin Pop's avatar
Iustin Pop committed
59

60
MAXTRIES = 5
61
62
63
64
65
66
67
68
69
BAD_STATES = frozenset([
  constants.INSTST_ERRORDOWN,
  ])
HELPLESS_STATES = frozenset([
  constants.INSTST_NODEDOWN,
  constants.INSTST_NODEOFFLINE,
  ])
NOTICE = "NOTICE"
ERROR = "ERROR"
70

71
72
73
#: Number of seconds to wait between starting child processes for node groups
CHILD_PROCESS_DELAY = 1.0

74
75
76
#: How many seconds to wait for instance status file lock
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0

77

78
class NotMasterError(errors.GenericError):
79
  """Exception raised when this host is not the master."""
Iustin Pop's avatar
Iustin Pop committed
80
81


82
83
84
85
86
87
88
def ShouldPause():
  """Check whether we should pause.

  """
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))


89
90
91
92
def StartNodeDaemons():
  """Start all the daemons that should be running on all nodes.

  """
Iustin Pop's avatar
Iustin Pop committed
93
  # on master or not, try to start the node daemon
94
  utils.EnsureDaemon(constants.NODED)
95
  # start confd as well. On non candidates it will be in disabled mode.
Iustin Pop's avatar
Iustin Pop committed
96
97
  if constants.ENABLE_CONFD:
    utils.EnsureDaemon(constants.CONFD)
98
99


Guido Trotter's avatar
Guido Trotter committed
100
101
102
103
def RunWatcherHooks():
  """Run the watcher hooks.

  """
104
105
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
                             constants.HOOKS_NAME_WATCHER)
106
107
  if not os.path.isdir(hooks_dir):
    return
Guido Trotter's avatar
Guido Trotter committed
108
109
110

  try:
    results = utils.RunParts(hooks_dir)
111
112
  except Exception, err: # pylint: disable=W0703
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
113
    return
Guido Trotter's avatar
Guido Trotter committed
114
115
116
117
118
119
120
121
122
123
124
125
126

  for (relname, status, runresult) in results:
    if status == constants.RUNPARTS_SKIP:
      logging.debug("Watcher hook %s: skipped", relname)
    elif status == constants.RUNPARTS_ERR:
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
    elif status == constants.RUNPARTS_RUN:
      if runresult.failed:
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
                        relname, runresult.exit_code, runresult.output)
      else:
        logging.debug("Watcher hook %s: success (output: %s)", relname,
                      runresult.output)
127
128
129
    else:
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
                                   status)
Guido Trotter's avatar
Guido Trotter committed
130

131

Iustin Pop's avatar
Iustin Pop committed
132
133
134
135
class Instance(object):
  """Abstraction for a Virtual Machine instance.

  """
136
  def __init__(self, name, status, autostart, snodes):
Iustin Pop's avatar
Iustin Pop committed
137
    self.name = name
138
    self.status = status
139
    self.autostart = autostart
140
    self.snodes = snodes
Iustin Pop's avatar
Iustin Pop committed
141

142
  def Restart(self, cl):
143
144
145
    """Encapsulates the start of an instance.

    """
146
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
147
    cli.SubmitOpCode(op, cl=cl)
Iustin Pop's avatar
Iustin Pop committed
148

149
  def ActivateDisks(self, cl):
150
151
152
    """Encapsulates the activation of all disks of an instance.

    """
153
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
154
    cli.SubmitOpCode(op, cl=cl)
Iustin Pop's avatar
Iustin Pop committed
155
156


157
158
class Node:
  """Data container representing cluster node.
159
160

  """
161
162
  def __init__(self, name, bootid, offline, secondaries):
    """Initializes this class.
Iustin Pop's avatar
Iustin Pop committed
163

164
165
166
167
168
    """
    self.name = name
    self.bootid = bootid
    self.offline = offline
    self.secondaries = secondaries
169

170

171
172
def _CheckInstances(cl, notepad, instances):
  """Make a pass over the list of instances, restarting downed ones.
173

174
175
  """
  notepad.MaintainInstanceList(instances.keys())
176

177
  started = set()
178

179
180
181
  for inst in instances.values():
    if inst.status in BAD_STATES:
      n = notepad.NumberOfRestartAttempts(inst.name)
182

183
184
185
186
      if n > MAXTRIES:
        logging.warning("Not restarting instance '%s', retries exhausted",
                        inst.name)
        continue
Iustin Pop's avatar
Iustin Pop committed
187

188
189
190
191
192
      if n == MAXTRIES:
        notepad.RecordRestartAttempt(inst.name)
        logging.error("Could not restart instance '%s' after %s attempts,"
                      " giving up", inst.name, MAXTRIES)
        continue
193

194
195
196
197
      try:
        logging.info("Restarting instance '%s' (attempt #%s)",
                     inst.name, n + 1)
        inst.Restart(cl)
198
      except Exception: # pylint: disable=W0703
199
200
201
        logging.exception("Error while restarting instance '%s'", inst.name)
      else:
        started.add(inst.name)
202

203
      notepad.RecordRestartAttempt(inst.name)
204

205
206
207
208
209
    else:
      if notepad.NumberOfRestartAttempts(inst.name):
        notepad.RemoveInstance(inst.name)
        if inst.status not in HELPLESS_STATES:
          logging.info("Restart of instance '%s' succeeded", inst.name)
Iustin Pop's avatar
Iustin Pop committed
210

211
  return started
Iustin Pop's avatar
Iustin Pop committed
212
213


214
215
def _CheckDisks(cl, notepad, nodes, instances, started):
  """Check all nodes for restarted ones.
216

Iustin Pop's avatar
Iustin Pop committed
217
  """
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
  check_nodes = []

  for node in nodes.values():
    old = notepad.GetNodeBootID(node.name)
    if not node.bootid:
      # Bad node, not returning a boot id
      if not node.offline:
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
                      node.name)
      continue

    if old != node.bootid:
      # Node's boot ID has changed, probably through a reboot
      check_nodes.append(node)

  if check_nodes:
    # Activate disks for all instances with any of the checked nodes as a
    # secondary node.
    for node in check_nodes:
      for instance_name in node.secondaries:
        try:
          inst = instances[instance_name]
        except KeyError:
          logging.info("Can't find instance '%s', maybe it was ignored",
                       instance_name)
243
          continue
Iustin Pop's avatar
Iustin Pop committed
244

245
246
247
        if not inst.autostart:
          logging.info("Skipping disk activation for non-autostart"
                       " instance '%s'", inst.name)
Iustin Pop's avatar
Iustin Pop committed
248
          continue
249
250
251
252
253
254

        if inst.name in started:
          # we already tried to start the instance, which should have
          # activated its drives (if they can be at all)
          logging.debug("Skipping disk activation for instance '%s' as"
                        " it was already started", inst.name)
Iustin Pop's avatar
Iustin Pop committed
255
          continue
256

Iustin Pop's avatar
Iustin Pop committed
257
        try:
258
259
          logging.info("Activating disks for instance '%s'", inst.name)
          inst.ActivateDisks(cl)
260
        except Exception: # pylint: disable=W0703
261
262
          logging.exception("Error while activating disks for instance '%s'",
                            inst.name)
Iustin Pop's avatar
Iustin Pop committed
263

264
265
266
    # Keep changed boot IDs
    for node in check_nodes:
      notepad.SetNodeBootID(node.name, node.bootid)
Iustin Pop's avatar
Iustin Pop committed
267

268

269
270
def _CheckForOfflineNodes(nodes, instance):
  """Checks if given instances has any secondary in offline status.
271

272
273
  @param instance: The instance object
  @return: True if any of the secondary is offline, False otherwise
274

275
276
  """
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
277
278


279
280
def _VerifyDisks(cl, uuid, nodes, instances):
  """Run a per-group "gnt-cluster verify-disks".
281

282
283
284
285
286
  """
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
  ((_, offline_disk_instances, _), ) = \
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
  cl.ArchiveJob(job_id)
287

288
289
290
291
  if not offline_disk_instances:
    # nothing to do
    logging.debug("Verify-disks reported no offline disks, nothing to do")
    return
292

293
294
  logging.debug("Will activate disks for instance(s) %s",
                utils.CommaJoin(offline_disk_instances))
295

296
297
298
299
300
301
302
303
304
  # We submit only one job, and wait for it. Not optimal, but this puts less
  # load on the job queue.
  job = []
  for name in offline_disk_instances:
    try:
      inst = instances[name]
    except KeyError:
      logging.info("Can't find instance '%s', maybe it was ignored", name)
      continue
305

306
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
307
308
      logging.info("Skipping instance '%s' because it is in a helpless state"
                   " or has offline secondaries", name)
309
      continue
310

311
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
312

313
314
  if job:
    job_id = cli.SendJob(job, cl=cl)
315

316
317
    try:
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
318
    except Exception: # pylint: disable=W0703
319
      logging.exception("Error while activating disks")
Iustin Pop's avatar
Iustin Pop committed
320
321


322
323
324
325
326
327
328
329
330
331
332
333
def IsRapiResponding(hostname):
  """Connects to RAPI port and does a simple test.

  Connects to RAPI port of hostname and does a simple test. At this time, the
  test is GetVersion.

  @type hostname: string
  @param hostname: hostname of the node to connect to.
  @rtype: bool
  @return: Whether RAPI is working properly

  """
334
  curl_config = rapi.client.GenericCurlConfig()
335
336
  rapi_client = rapi.client.GanetiRapiClient(hostname,
                                             curl_config_fn=curl_config)
337
338
339
  try:
    master_version = rapi_client.GetVersion()
  except rapi.client.CertificateError, err:
340
    logging.warning("RAPI certificate error: %s", err)
341
342
    return False
  except rapi.client.GanetiApiError, err:
343
    logging.warning("RAPI error: %s", err)
344
    return False
345
346
347
  else:
    logging.debug("Reported RAPI version %s", master_version)
    return master_version == constants.RAPI_VERSION
348
349


Iustin Pop's avatar
Iustin Pop committed
350
351
352
def ParseOptions():
  """Parse the command line options.

Iustin Pop's avatar
Iustin Pop committed
353
  @return: (options, args) as from OptionParser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
354
355
356
357
358
359
360

  """
  parser = OptionParser(description="Ganeti cluster watcher",
                        usage="%prog [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)

361
  parser.add_option(cli.DEBUG_OPT)
362
  parser.add_option(cli.NODEGROUP_OPT)
363
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
Iustin Pop's avatar
Iustin Pop committed
364
                    help="Autoarchive jobs older than this age (default"
365
                          " 6 hours)")
366
367
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
                    action="store_true", help="Ignore cluster pause setting")
368
  parser.add_option("--wait-children", dest="wait_children",
369
                    action="store_true", help="Wait for child processes")
370
  parser.add_option("--no-wait-children", dest="wait_children",
371
372
                    action="store_false",
                    help="Don't wait for child processes")
373
374
  # See optparse documentation for why default values are not set by options
  parser.set_defaults(wait_children=True)
Iustin Pop's avatar
Iustin Pop committed
375
  options, args = parser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
376
  options.job_age = cli.ParseTimespec(options.job_age)
377
378
379
380
381

  if args:
    parser.error("No arguments expected")

  return (options, args)
Iustin Pop's avatar
Iustin Pop committed
382
383


384
385
386
387
def _WriteInstanceStatus(filename, data):
  """Writes the per-group instance status file.

  The entries are sorted.
388

389
390
391
392
  @type filename: string
  @param filename: Path to instance status file
  @type data: list of tuple; (instance name as string, status as string)
  @param data: Instance name and status
393
394

  """
395
396
  logging.debug("Updating instance status file '%s' with %s instances",
                filename, len(data))
397

398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
  utils.WriteFile(filename,
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
                                   sorted(data))))


def _UpdateInstanceStatus(filename, instances):
  """Writes an instance status file from L{Instance} objects.

  @type filename: string
  @param filename: Path to status file
  @type instances: list of L{Instance}

  """
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
                                  for inst in instances])


def _ReadInstanceStatus(filename):
  """Reads an instance status file.

  @type filename: string
  @param filename: Path to status file
  @rtype: tuple; (None or number, list of lists containing instance name and
    status)
  @return: File's mtime and instance status contained in the file; mtime is
    C{None} if file can't be read

  """
  logging.debug("Reading per-group instance status from '%s'", filename)

428
  statcb = utils.FileStatHelper()
429
430
431
432
433
434
435
436
437
  try:
    content = utils.ReadFile(filename, preread=statcb)
  except EnvironmentError, err:
    if err.errno == errno.ENOENT:
      logging.error("Can't read '%s', does not exist (yet)", filename)
    else:
      logging.exception("Unable to read '%s', ignoring", filename)
    return (None, None)
  else:
438
    return (statcb.st.st_mtime, [line.split(None, 1)
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
                                 for line in content.splitlines()])


def _MergeInstanceStatus(filename, pergroup_filename, groups):
  """Merges all per-group instance status files into a global one.

  @type filename: string
  @param filename: Path to global instance status file
  @type pergroup_filename: string
  @param pergroup_filename: Path to per-group status files, must contain "%s"
    to be replaced with group UUID
  @type groups: sequence
  @param groups: UUIDs of known groups

  """
  # Lock global status file in exclusive mode
  lock = utils.FileLock.Open(filename)
  try:
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
  except errors.LockError, err:
    # All per-group processes will lock and update the file. None of them
    # should take longer than 10 seconds (the value of
    # INSTANCE_STATUS_LOCK_TIMEOUT).
    logging.error("Can't acquire lock on instance status file '%s', not"
                  " updating: %s", filename, err)
    return

  logging.debug("Acquired exclusive lock on '%s'", filename)

  data = {}

  # Load instance status from all groups
  for group_uuid in groups:
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)

    if mtime is not None:
      for (instance_name, status) in instdata:
        data.setdefault(instance_name, []).append((mtime, status))

  # Select last update based on file mtime
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
                for (instance_name, status) in data.items()]

  # Write the global status file. Don't touch file after it's been
  # updated--there is no lock anymore.
  _WriteInstanceStatus(filename, inststatus)
485
486


487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def GetLuxiClient(try_restart):
  """Tries to connect to the master daemon.

  @type try_restart: bool
  @param try_restart: Whether to attempt to restart the master daemon

  """
  try:
    return cli.GetClient()
  except errors.OpPrereqError, err:
    # this is, from cli.GetClient, a not-master case
    raise NotMasterError("Not on master node (%s)" % err)

  except luxi.NoMasterError, err:
    if not try_restart:
      raise

    logging.warning("Master daemon seems to be down (%s), trying to restart",
                    err)

    if not utils.EnsureDaemon(constants.MASTERD):
      raise errors.GenericError("Can't start the master daemon")

    # Retry the connection
    return cli.GetClient()


def _StartGroupChildren(cl, wait):
  """Starts a new instance of the watcher for every node group.

  """
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
                        for arg in sys.argv)

  result = cl.QueryGroups([], ["name", "uuid"], False)

  children = []

  for (idx, (name, uuid)) in enumerate(result):
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]

    if idx > 0:
      # Let's not kill the system
      time.sleep(CHILD_PROCESS_DELAY)

    logging.debug("Spawning child for group '%s' (%s), arguments %s",
                  name, uuid, args)

    try:
      # TODO: Should utils.StartDaemon be used instead?
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
538
    except Exception: # pylint: disable=W0703
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
      logging.exception("Failed to start child for group '%s' (%s)",
                        name, uuid)
    else:
      logging.debug("Started with PID %s", pid)
      children.append(pid)

  if wait:
    for pid in children:
      logging.debug("Waiting for child PID %s", pid)
      try:
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
      except EnvironmentError, err:
        result = str(err)

      logging.debug("Child PID %s exited with status %s", pid, result)


def _ArchiveJobs(cl, age):
  """Archives old jobs.

  """
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)


def _CheckMaster(cl):
  """Ensures current host is master node.

  """
  (master, ) = cl.QueryConfigValues(["master_node"])
  if master != netutils.Hostname.GetSysName():
    raise NotMasterError("This is not the master node")


573
@UsesRapiClient
574
575
576
577
578
579
580
581
582
583
584
def _GlobalWatcher(opts):
  """Main function for global watcher.

  At the end child processes are spawned for every node group.

  """
  StartNodeDaemons()
  RunWatcherHooks()

  # Run node maintenance in all cases, even if master, so that old masters can
  # be properly cleaned up
585
586
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626

  try:
    client = GetLuxiClient(True)
  except NotMasterError:
    # Don't proceed on non-master nodes
    return constants.EXIT_SUCCESS

  # we are on master now
  utils.EnsureDaemon(constants.RAPI)

  # If RAPI isn't responding to queries, try one restart
  logging.debug("Attempting to talk to remote API on %s",
                constants.IP4_ADDRESS_LOCALHOST)
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
    logging.warning("Couldn't get answer from remote API, restaring daemon")
    utils.StopDaemon(constants.RAPI)
    utils.EnsureDaemon(constants.RAPI)
    logging.debug("Second attempt to talk to remote API")
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
      logging.fatal("RAPI is not responding")
  logging.debug("Successfully talked to remote API")

  _CheckMaster(client)
  _ArchiveJobs(client, opts.job_age)

  # Spawn child processes for all node groups
  _StartGroupChildren(client, opts.wait_children)

  return constants.EXIT_SUCCESS


def _GetGroupData(cl, uuid):
  """Retrieves instances and nodes per node group.

  """
  job = [
    # Get all primary instances in group
    opcodes.OpQuery(what=constants.QR_INSTANCE,
                    fields=["name", "status", "admin_state", "snodes",
                            "pnode.group.uuid", "snodes.group.uuid"],
627
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
628
                    use_locking=True),
629
630
631
632

    # Get all nodes in group
    opcodes.OpQuery(what=constants.QR_NODE,
                    fields=["name", "bootid", "offline"],
633
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
634
                    use_locking=True),
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
    ]

  job_id = cl.SubmitJob(job)
  results = map(objects.QueryResponse.FromDict,
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
  cl.ArchiveJob(job_id)

  results_data = map(operator.attrgetter("data"), results)

  # Ensure results are tuples with two values
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))

  # Extract values ignoring result status
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
                                 for values in res]
                                for res in results_data]

  secondaries = {}
  instances = []

  # Load all instances
  for (name, status, autostart, snodes, pnode_group_uuid,
       snodes_group_uuid) in raw_instances:
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
                    " groups %s", name, pnode_group_uuid,
                    utils.CommaJoin(snodes_group_uuid))
    else:
      instances.append(Instance(name, status, autostart, snodes))

      for node in snodes:
        secondaries.setdefault(node, set()).add(name)

  # Load all nodes
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
           for (name, bootid, offline) in raw_nodes]

  return (dict((node.name, node) for node in nodes),
          dict((inst.name, inst) for inst in instances))


676
677
def _LoadKnownGroups():
  """Returns a list of all node groups known by L{ssconf}.
678
679
680
681

  """
  groups = ssconf.SimpleStore().GetNodegroupList()

682
683
684
685
686
687
688
  result = list(line.split(None, 1)[0] for line in groups
                if line.strip())

  if not compat.all(map(utils.UUID_RE.match, result)):
    raise errors.GenericError("Ssconf contains invalid group UUID")

  return result
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703


def _GroupWatcher(opts):
  """Main function for per-group watcher process.

  """
  group_uuid = opts.nodegroup.lower()

  if not utils.UUID_RE.match(group_uuid):
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
                              " got '%s'" %
                              (cli.NODEGROUP_OPT_NAME, group_uuid))

  logging.info("Watcher for node group '%s'", group_uuid)

704
705
  known_groups = _LoadKnownGroups()

706
  # Check if node group is known
707
  if group_uuid not in known_groups:
708
709
710
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
                              group_uuid)

711
712
  # Group UUID has been verified and should not contain any dangerous
  # characters
713
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
714
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
715
716
717
718

  logging.debug("Using state file %s", state_path)

  # Global watcher
719
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
720
721
722
  if not statefile:
    return constants.EXIT_FAILURE

723
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
724
725
726
727
728
729
730
731
  try:
    # Connect to master daemon
    client = GetLuxiClient(False)

    _CheckMaster(client)

    (nodes, instances) = _GetGroupData(client, group_uuid)

732
733
734
735
736
737
738
    # Update per-group instance status file
    _UpdateInstanceStatus(inst_status_path, instances.values())

    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
                         known_groups)

739
740
741
742
743
744
745
746
747
748
749
750
751
    started = _CheckInstances(client, notepad, instances)
    _CheckDisks(client, notepad, nodes, instances, started)
    _VerifyDisks(client, group_uuid, nodes, instances)
  except Exception, err:
    logging.info("Not updating status file due to failure: %s", err)
    raise
  else:
    # Save changes for next run
    notepad.Save(state_path)

  return constants.EXIT_SUCCESS


752
def Main():
Iustin Pop's avatar
Iustin Pop committed
753
754
755
  """Main function.

  """
756
  (options, _) = ParseOptions()
Iustin Pop's avatar
Iustin Pop committed
757

758
759
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
                     debug=options.debug, stderr_logging=options.debug)
Iustin Pop's avatar
Iustin Pop committed
760

761
  if ShouldPause() and not options.ignore_pause:
762
    logging.debug("Pause has been set, exiting")
763
    return constants.EXIT_SUCCESS
764

765
766
  # Try to acquire global watcher lock in shared mode
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
Iustin Pop's avatar
Iustin Pop committed
767
  try:
768
769
770
771
772
    lock.Shared(blocking=False)
  except (EnvironmentError, errors.LockError), err:
    logging.error("Can't acquire lock on %s: %s",
                  constants.WATCHER_LOCK_FILE, err)
    return constants.EXIT_SUCCESS
773

774
775
776
777
778
779
780
781
782
  if options.nodegroup is None:
    fn = _GlobalWatcher
  else:
    # Per-nodegroup watcher
    fn = _GroupWatcher

  try:
    return fn(options)
  except (SystemExit, KeyboardInterrupt):
783
    raise
784
  except NotMasterError:
785
    logging.debug("Not master, exiting")
786
    return constants.EXIT_NOTMASTER
787
  except errors.ResolverError, err:
788
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
789
    return constants.EXIT_NODESETUP_ERROR
790
791
792
793
  except errors.JobQueueFull:
    logging.error("Job queue is full, can't query cluster state")
  except errors.JobQueueDrainError:
    logging.error("Job queue is drained, can't maintain cluster state")
794
  except Exception, err:
795
    logging.exception(str(err))
796
    return constants.EXIT_FAILURE
797

798
  return constants.EXIT_SUCCESS