__init__.py 24.3 KB
Newer Older
1
#
Iustin Pop's avatar
Iustin Pop committed
2
3
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


Iustin Pop's avatar
Iustin Pop committed
22
"""Tool to restart erroneously downed virtual machines.
Iustin Pop's avatar
Iustin Pop committed
23
24
25
26
27

This program and set of classes implement a watchdog to restart
virtual machines in a Ganeti cluster that have crashed or been killed
by a node reboot.  Run from cron or similar.

28
"""
Iustin Pop's avatar
Iustin Pop committed
29
30

import os
31
import os.path
Iustin Pop's avatar
Iustin Pop committed
32
33
import sys
import time
34
import logging
35
import operator
36
import errno
Iustin Pop's avatar
Iustin Pop committed
37
38
39
40
from optparse import OptionParser

from ganeti import utils
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
44
from ganeti import opcodes
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
49
50
51
from ganeti import qlang
from ganeti import objects
from ganeti import ssconf
from ganeti import ht
Iustin Pop's avatar
Iustin Pop committed
52

53
import ganeti.rapi.client # pylint: disable=W0611
54
55
56

from ganeti.watcher import nodemaint
from ganeti.watcher import state
57

Iustin Pop's avatar
Iustin Pop committed
58

59
MAXTRIES = 5
60
61
62
63
64
65
66
67
68
BAD_STATES = frozenset([
  constants.INSTST_ERRORDOWN,
  ])
HELPLESS_STATES = frozenset([
  constants.INSTST_NODEDOWN,
  constants.INSTST_NODEOFFLINE,
  ])
NOTICE = "NOTICE"
ERROR = "ERROR"
69

70
71
72
#: Number of seconds to wait between starting child processes for node groups
CHILD_PROCESS_DELAY = 1.0

73
74
75
#: How many seconds to wait for instance status file lock
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0

76

77
class NotMasterError(errors.GenericError):
78
  """Exception raised when this host is not the master."""
Iustin Pop's avatar
Iustin Pop committed
79
80


81
82
83
84
85
86
87
def ShouldPause():
  """Check whether we should pause.

  """
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))


88
89
90
91
def StartNodeDaemons():
  """Start all the daemons that should be running on all nodes.

  """
Iustin Pop's avatar
Iustin Pop committed
92
  # on master or not, try to start the node daemon
93
  utils.EnsureDaemon(constants.NODED)
94
  # start confd as well. On non candidates it will be in disabled mode.
95
  utils.EnsureDaemon(constants.CONFD)
96
97


Guido Trotter's avatar
Guido Trotter committed
98
99
100
101
def RunWatcherHooks():
  """Run the watcher hooks.

  """
102
103
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
                             constants.HOOKS_NAME_WATCHER)
104
105
  if not os.path.isdir(hooks_dir):
    return
Guido Trotter's avatar
Guido Trotter committed
106
107
108

  try:
    results = utils.RunParts(hooks_dir)
109
110
  except Exception, err: # pylint: disable=W0703
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
111
    return
Guido Trotter's avatar
Guido Trotter committed
112
113
114
115
116
117
118
119
120
121
122
123
124

  for (relname, status, runresult) in results:
    if status == constants.RUNPARTS_SKIP:
      logging.debug("Watcher hook %s: skipped", relname)
    elif status == constants.RUNPARTS_ERR:
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
    elif status == constants.RUNPARTS_RUN:
      if runresult.failed:
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
                        relname, runresult.exit_code, runresult.output)
      else:
        logging.debug("Watcher hook %s: success (output: %s)", relname,
                      runresult.output)
125
126
127
    else:
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
                                   status)
Guido Trotter's avatar
Guido Trotter committed
128

129

Iustin Pop's avatar
Iustin Pop committed
130
131
132
133
class Instance(object):
  """Abstraction for a Virtual Machine instance.

  """
134
  def __init__(self, name, status, autostart, snodes):
Iustin Pop's avatar
Iustin Pop committed
135
    self.name = name
136
    self.status = status
137
    self.autostart = autostart
138
    self.snodes = snodes
Iustin Pop's avatar
Iustin Pop committed
139

140
  def Restart(self, cl):
141
142
143
    """Encapsulates the start of an instance.

    """
144
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
145
    cli.SubmitOpCode(op, cl=cl)
Iustin Pop's avatar
Iustin Pop committed
146

147
  def ActivateDisks(self, cl):
148
149
150
    """Encapsulates the activation of all disks of an instance.

    """
151
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
152
    cli.SubmitOpCode(op, cl=cl)
Iustin Pop's avatar
Iustin Pop committed
153
154


155
156
class Node:
  """Data container representing cluster node.
157
158

  """
159
160
  def __init__(self, name, bootid, offline, secondaries):
    """Initializes this class.
Iustin Pop's avatar
Iustin Pop committed
161

162
163
164
165
166
    """
    self.name = name
    self.bootid = bootid
    self.offline = offline
    self.secondaries = secondaries
167

168

169
170
def _CheckInstances(cl, notepad, instances):
  """Make a pass over the list of instances, restarting downed ones.
171

172
173
  """
  notepad.MaintainInstanceList(instances.keys())
174

175
  started = set()
176

177
178
179
  for inst in instances.values():
    if inst.status in BAD_STATES:
      n = notepad.NumberOfRestartAttempts(inst.name)
180

181
182
183
184
      if n > MAXTRIES:
        logging.warning("Not restarting instance '%s', retries exhausted",
                        inst.name)
        continue
Iustin Pop's avatar
Iustin Pop committed
185

186
187
188
189
190
      if n == MAXTRIES:
        notepad.RecordRestartAttempt(inst.name)
        logging.error("Could not restart instance '%s' after %s attempts,"
                      " giving up", inst.name, MAXTRIES)
        continue
191

192
193
194
195
      try:
        logging.info("Restarting instance '%s' (attempt #%s)",
                     inst.name, n + 1)
        inst.Restart(cl)
196
      except Exception: # pylint: disable=W0703
197
198
199
        logging.exception("Error while restarting instance '%s'", inst.name)
      else:
        started.add(inst.name)
200

201
      notepad.RecordRestartAttempt(inst.name)
202

203
204
205
206
207
    else:
      if notepad.NumberOfRestartAttempts(inst.name):
        notepad.RemoveInstance(inst.name)
        if inst.status not in HELPLESS_STATES:
          logging.info("Restart of instance '%s' succeeded", inst.name)
Iustin Pop's avatar
Iustin Pop committed
208

209
  return started
Iustin Pop's avatar
Iustin Pop committed
210
211


212
213
def _CheckDisks(cl, notepad, nodes, instances, started):
  """Check all nodes for restarted ones.
214

Iustin Pop's avatar
Iustin Pop committed
215
  """
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
  check_nodes = []

  for node in nodes.values():
    old = notepad.GetNodeBootID(node.name)
    if not node.bootid:
      # Bad node, not returning a boot id
      if not node.offline:
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
                      node.name)
      continue

    if old != node.bootid:
      # Node's boot ID has changed, probably through a reboot
      check_nodes.append(node)

  if check_nodes:
    # Activate disks for all instances with any of the checked nodes as a
    # secondary node.
    for node in check_nodes:
      for instance_name in node.secondaries:
        try:
          inst = instances[instance_name]
        except KeyError:
          logging.info("Can't find instance '%s', maybe it was ignored",
                       instance_name)
241
          continue
Iustin Pop's avatar
Iustin Pop committed
242

243
244
245
        if not inst.autostart:
          logging.info("Skipping disk activation for non-autostart"
                       " instance '%s'", inst.name)
Iustin Pop's avatar
Iustin Pop committed
246
          continue
247
248
249
250
251
252

        if inst.name in started:
          # we already tried to start the instance, which should have
          # activated its drives (if they can be at all)
          logging.debug("Skipping disk activation for instance '%s' as"
                        " it was already started", inst.name)
Iustin Pop's avatar
Iustin Pop committed
253
          continue
254

Iustin Pop's avatar
Iustin Pop committed
255
        try:
256
257
          logging.info("Activating disks for instance '%s'", inst.name)
          inst.ActivateDisks(cl)
258
        except Exception: # pylint: disable=W0703
259
260
          logging.exception("Error while activating disks for instance '%s'",
                            inst.name)
Iustin Pop's avatar
Iustin Pop committed
261

262
263
264
    # Keep changed boot IDs
    for node in check_nodes:
      notepad.SetNodeBootID(node.name, node.bootid)
Iustin Pop's avatar
Iustin Pop committed
265

266

267
268
def _CheckForOfflineNodes(nodes, instance):
  """Checks if given instances has any secondary in offline status.
269

270
271
  @param instance: The instance object
  @return: True if any of the secondary is offline, False otherwise
272

273
274
  """
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
275
276


277
278
def _VerifyDisks(cl, uuid, nodes, instances):
  """Run a per-group "gnt-cluster verify-disks".
279

280
281
282
283
284
  """
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
  ((_, offline_disk_instances, _), ) = \
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
  cl.ArchiveJob(job_id)
285

286
287
288
289
  if not offline_disk_instances:
    # nothing to do
    logging.debug("Verify-disks reported no offline disks, nothing to do")
    return
290

291
292
  logging.debug("Will activate disks for instance(s) %s",
                utils.CommaJoin(offline_disk_instances))
293

294
295
296
297
298
299
300
301
302
  # We submit only one job, and wait for it. Not optimal, but this puts less
  # load on the job queue.
  job = []
  for name in offline_disk_instances:
    try:
      inst = instances[name]
    except KeyError:
      logging.info("Can't find instance '%s', maybe it was ignored", name)
      continue
303

304
305
306
307
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
      logging.info("Skipping instance '%s' because it is in a helpless state or"
                   " has offline secondaries", name)
      continue
308

309
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
310

311
312
  if job:
    job_id = cli.SendJob(job, cl=cl)
313

314
315
    try:
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316
    except Exception: # pylint: disable=W0703
317
      logging.exception("Error while activating disks")
Iustin Pop's avatar
Iustin Pop committed
318
319


320
321
322
323
324
325
326
327
328
329
330
331
def IsRapiResponding(hostname):
  """Connects to RAPI port and does a simple test.

  Connects to RAPI port of hostname and does a simple test. At this time, the
  test is GetVersion.

  @type hostname: string
  @param hostname: hostname of the node to connect to.
  @rtype: bool
  @return: Whether RAPI is working properly

  """
332
  curl_config = rapi.client.GenericCurlConfig()
333
334
  rapi_client = rapi.client.GanetiRapiClient(hostname,
                                             curl_config_fn=curl_config)
335
336
337
  try:
    master_version = rapi_client.GetVersion()
  except rapi.client.CertificateError, err:
338
    logging.warning("RAPI certificate error: %s", err)
339
340
    return False
  except rapi.client.GanetiApiError, err:
341
    logging.warning("RAPI error: %s", err)
342
    return False
343
344
345
  else:
    logging.debug("Reported RAPI version %s", master_version)
    return master_version == constants.RAPI_VERSION
346
347


Iustin Pop's avatar
Iustin Pop committed
348
349
350
def ParseOptions():
  """Parse the command line options.

Iustin Pop's avatar
Iustin Pop committed
351
  @return: (options, args) as from OptionParser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
352
353
354
355
356
357
358

  """
  parser = OptionParser(description="Ganeti cluster watcher",
                        usage="%prog [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)

359
  parser.add_option(cli.DEBUG_OPT)
360
  parser.add_option(cli.NODEGROUP_OPT)
361
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
Iustin Pop's avatar
Iustin Pop committed
362
                    help="Autoarchive jobs older than this age (default"
363
                          " 6 hours)")
364
365
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
                    action="store_true", help="Ignore cluster pause setting")
366
  parser.add_option("--wait-children", dest="wait_children",
367
                    action="store_true", help="Wait for child processes")
368
369
370
371
  parser.add_option("--no-wait-children", dest="wait_children",
                    action="store_false", help="Don't wait for child processes")
  # See optparse documentation for why default values are not set by options
  parser.set_defaults(wait_children=True)
Iustin Pop's avatar
Iustin Pop committed
372
  options, args = parser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
373
  options.job_age = cli.ParseTimespec(options.job_age)
374
375
376
377
378

  if args:
    parser.error("No arguments expected")

  return (options, args)
Iustin Pop's avatar
Iustin Pop committed
379
380


381
382
383
384
def _WriteInstanceStatus(filename, data):
  """Writes the per-group instance status file.

  The entries are sorted.
385

386
387
388
389
  @type filename: string
  @param filename: Path to instance status file
  @type data: list of tuple; (instance name as string, status as string)
  @param data: Instance name and status
390
391

  """
392
393
  logging.debug("Updating instance status file '%s' with %s instances",
                filename, len(data))
394

395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
  utils.WriteFile(filename,
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
                                   sorted(data))))


def _UpdateInstanceStatus(filename, instances):
  """Writes an instance status file from L{Instance} objects.

  @type filename: string
  @param filename: Path to status file
  @type instances: list of L{Instance}

  """
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
                                  for inst in instances])


class _StatCb:
  """Helper to store file handle's C{fstat}.

  """
  def __init__(self):
    """Initializes this class.

    """
    self.st = None

  def __call__(self, fh):
    """Calls C{fstat} on file handle.
424

425
426
    """
    self.st = os.fstat(fh.fileno())
427

428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451

def _ReadInstanceStatus(filename):
  """Reads an instance status file.

  @type filename: string
  @param filename: Path to status file
  @rtype: tuple; (None or number, list of lists containing instance name and
    status)
  @return: File's mtime and instance status contained in the file; mtime is
    C{None} if file can't be read

  """
  logging.debug("Reading per-group instance status from '%s'", filename)

  statcb = _StatCb()
  try:
    content = utils.ReadFile(filename, preread=statcb)
  except EnvironmentError, err:
    if err.errno == errno.ENOENT:
      logging.error("Can't read '%s', does not exist (yet)", filename)
    else:
      logging.exception("Unable to read '%s', ignoring", filename)
    return (None, None)
  else:
452
    return (statcb.st.st_mtime, [line.split(None, 1)
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
                                 for line in content.splitlines()])


def _MergeInstanceStatus(filename, pergroup_filename, groups):
  """Merges all per-group instance status files into a global one.

  @type filename: string
  @param filename: Path to global instance status file
  @type pergroup_filename: string
  @param pergroup_filename: Path to per-group status files, must contain "%s"
    to be replaced with group UUID
  @type groups: sequence
  @param groups: UUIDs of known groups

  """
  # Lock global status file in exclusive mode
  lock = utils.FileLock.Open(filename)
  try:
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
  except errors.LockError, err:
    # All per-group processes will lock and update the file. None of them
    # should take longer than 10 seconds (the value of
    # INSTANCE_STATUS_LOCK_TIMEOUT).
    logging.error("Can't acquire lock on instance status file '%s', not"
                  " updating: %s", filename, err)
    return

  logging.debug("Acquired exclusive lock on '%s'", filename)

  data = {}

  # Load instance status from all groups
  for group_uuid in groups:
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)

    if mtime is not None:
      for (instance_name, status) in instdata:
        data.setdefault(instance_name, []).append((mtime, status))

  # Select last update based on file mtime
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
                for (instance_name, status) in data.items()]

  # Write the global status file. Don't touch file after it's been
  # updated--there is no lock anymore.
  _WriteInstanceStatus(filename, inststatus)
499
500


501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def GetLuxiClient(try_restart):
  """Tries to connect to the master daemon.

  @type try_restart: bool
  @param try_restart: Whether to attempt to restart the master daemon

  """
  try:
    return cli.GetClient()
  except errors.OpPrereqError, err:
    # this is, from cli.GetClient, a not-master case
    raise NotMasterError("Not on master node (%s)" % err)

  except luxi.NoMasterError, err:
    if not try_restart:
      raise

    logging.warning("Master daemon seems to be down (%s), trying to restart",
                    err)

    if not utils.EnsureDaemon(constants.MASTERD):
      raise errors.GenericError("Can't start the master daemon")

    # Retry the connection
    return cli.GetClient()


def _StartGroupChildren(cl, wait):
  """Starts a new instance of the watcher for every node group.

  """
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
                        for arg in sys.argv)

  result = cl.QueryGroups([], ["name", "uuid"], False)

  children = []

  for (idx, (name, uuid)) in enumerate(result):
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]

    if idx > 0:
      # Let's not kill the system
      time.sleep(CHILD_PROCESS_DELAY)

    logging.debug("Spawning child for group '%s' (%s), arguments %s",
                  name, uuid, args)

    try:
      # TODO: Should utils.StartDaemon be used instead?
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
552
    except Exception: # pylint: disable=W0703
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
      logging.exception("Failed to start child for group '%s' (%s)",
                        name, uuid)
    else:
      logging.debug("Started with PID %s", pid)
      children.append(pid)

  if wait:
    for pid in children:
      logging.debug("Waiting for child PID %s", pid)
      try:
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
      except EnvironmentError, err:
        result = str(err)

      logging.debug("Child PID %s exited with status %s", pid, result)


def _ArchiveJobs(cl, age):
  """Archives old jobs.

  """
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)


def _CheckMaster(cl):
  """Ensures current host is master node.

  """
  (master, ) = cl.QueryConfigValues(["master_node"])
  if master != netutils.Hostname.GetSysName():
    raise NotMasterError("This is not the master node")


587
@rapi.client.UsesRapiClient
588
589
590
591
592
593
594
595
596
597
598
def _GlobalWatcher(opts):
  """Main function for global watcher.

  At the end child processes are spawned for every node group.

  """
  StartNodeDaemons()
  RunWatcherHooks()

  # Run node maintenance in all cases, even if master, so that old masters can
  # be properly cleaned up
599
600
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640

  try:
    client = GetLuxiClient(True)
  except NotMasterError:
    # Don't proceed on non-master nodes
    return constants.EXIT_SUCCESS

  # we are on master now
  utils.EnsureDaemon(constants.RAPI)

  # If RAPI isn't responding to queries, try one restart
  logging.debug("Attempting to talk to remote API on %s",
                constants.IP4_ADDRESS_LOCALHOST)
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
    logging.warning("Couldn't get answer from remote API, restaring daemon")
    utils.StopDaemon(constants.RAPI)
    utils.EnsureDaemon(constants.RAPI)
    logging.debug("Second attempt to talk to remote API")
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
      logging.fatal("RAPI is not responding")
  logging.debug("Successfully talked to remote API")

  _CheckMaster(client)
  _ArchiveJobs(client, opts.job_age)

  # Spawn child processes for all node groups
  _StartGroupChildren(client, opts.wait_children)

  return constants.EXIT_SUCCESS


def _GetGroupData(cl, uuid):
  """Retrieves instances and nodes per node group.

  """
  job = [
    # Get all primary instances in group
    opcodes.OpQuery(what=constants.QR_INSTANCE,
                    fields=["name", "status", "admin_state", "snodes",
                            "pnode.group.uuid", "snodes.group.uuid"],
641
642
                    filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
                    use_locking=True),
643
644
645
646

    # Get all nodes in group
    opcodes.OpQuery(what=constants.QR_NODE,
                    fields=["name", "bootid", "offline"],
647
648
                    filter=[qlang.OP_EQUAL, "group.uuid", uuid],
                    use_locking=True),
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
    ]

  job_id = cl.SubmitJob(job)
  results = map(objects.QueryResponse.FromDict,
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
  cl.ArchiveJob(job_id)

  results_data = map(operator.attrgetter("data"), results)

  # Ensure results are tuples with two values
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))

  # Extract values ignoring result status
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
                                 for values in res]
                                for res in results_data]

  secondaries = {}
  instances = []

  # Load all instances
  for (name, status, autostart, snodes, pnode_group_uuid,
       snodes_group_uuid) in raw_instances:
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
                    " groups %s", name, pnode_group_uuid,
                    utils.CommaJoin(snodes_group_uuid))
    else:
      instances.append(Instance(name, status, autostart, snodes))

      for node in snodes:
        secondaries.setdefault(node, set()).add(name)

  # Load all nodes
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
           for (name, bootid, offline) in raw_nodes]

  return (dict((node.name, node) for node in nodes),
          dict((inst.name, inst) for inst in instances))


690
691
def _LoadKnownGroups():
  """Returns a list of all node groups known by L{ssconf}.
692
693
694
695

  """
  groups = ssconf.SimpleStore().GetNodegroupList()

696
697
698
699
700
701
702
  result = list(line.split(None, 1)[0] for line in groups
                if line.strip())

  if not compat.all(map(utils.UUID_RE.match, result)):
    raise errors.GenericError("Ssconf contains invalid group UUID")

  return result
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717


def _GroupWatcher(opts):
  """Main function for per-group watcher process.

  """
  group_uuid = opts.nodegroup.lower()

  if not utils.UUID_RE.match(group_uuid):
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
                              " got '%s'" %
                              (cli.NODEGROUP_OPT_NAME, group_uuid))

  logging.info("Watcher for node group '%s'", group_uuid)

718
719
  known_groups = _LoadKnownGroups()

720
  # Check if node group is known
721
  if group_uuid not in known_groups:
722
723
724
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
                              group_uuid)

725
  # Group UUID has been verified and should not contain any dangerous characters
726
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
727
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
728
729
730
731

  logging.debug("Using state file %s", state_path)

  # Global watcher
732
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
733
734
735
  if not statefile:
    return constants.EXIT_FAILURE

736
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
737
738
739
740
741
742
743
744
  try:
    # Connect to master daemon
    client = GetLuxiClient(False)

    _CheckMaster(client)

    (nodes, instances) = _GetGroupData(client, group_uuid)

745
746
747
748
749
750
751
    # Update per-group instance status file
    _UpdateInstanceStatus(inst_status_path, instances.values())

    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
                         known_groups)

752
753
754
755
756
757
758
759
760
761
762
763
764
    started = _CheckInstances(client, notepad, instances)
    _CheckDisks(client, notepad, nodes, instances, started)
    _VerifyDisks(client, group_uuid, nodes, instances)
  except Exception, err:
    logging.info("Not updating status file due to failure: %s", err)
    raise
  else:
    # Save changes for next run
    notepad.Save(state_path)

  return constants.EXIT_SUCCESS


765
def Main():
Iustin Pop's avatar
Iustin Pop committed
766
767
768
  """Main function.

  """
769
  (options, _) = ParseOptions()
Iustin Pop's avatar
Iustin Pop committed
770

771
772
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
                     debug=options.debug, stderr_logging=options.debug)
Iustin Pop's avatar
Iustin Pop committed
773

774
  if ShouldPause() and not options.ignore_pause:
775
    logging.debug("Pause has been set, exiting")
776
    return constants.EXIT_SUCCESS
777

778
779
  # Try to acquire global watcher lock in shared mode
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
Iustin Pop's avatar
Iustin Pop committed
780
  try:
781
782
783
784
785
    lock.Shared(blocking=False)
  except (EnvironmentError, errors.LockError), err:
    logging.error("Can't acquire lock on %s: %s",
                  constants.WATCHER_LOCK_FILE, err)
    return constants.EXIT_SUCCESS
786

787
788
789
790
791
792
793
794
795
  if options.nodegroup is None:
    fn = _GlobalWatcher
  else:
    # Per-nodegroup watcher
    fn = _GroupWatcher

  try:
    return fn(options)
  except (SystemExit, KeyboardInterrupt):
796
    raise
797
  except NotMasterError:
798
    logging.debug("Not master, exiting")
799
    return constants.EXIT_NOTMASTER
800
  except errors.ResolverError, err:
801
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
802
    return constants.EXIT_NODESETUP_ERROR
803
804
805
806
  except errors.JobQueueFull:
    logging.error("Job queue is full, can't query cluster state")
  except errors.JobQueueDrainError:
    logging.error("Job queue is drained, can't maintain cluster state")
807
  except Exception, err:
808
    logging.exception(str(err))
809
    return constants.EXIT_FAILURE
810

811
  return constants.EXIT_SUCCESS