ganeti-watcher 15.6 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#!/usr/bin/python
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Tool to restart erronously downed virtual machines.

This program and set of classes implement a watchdog to restart
virtual machines in a Ganeti cluster that have crashed or been killed
by a node reboot.  Run from cron or similar.

28
"""
Iustin Pop's avatar
Iustin Pop committed
29

Iustin Pop's avatar
Iustin Pop committed
30
31
32
33
# pylint: disable-msg=C0103,W0142

# C0103: Invalid name ganeti-watcher

Iustin Pop's avatar
Iustin Pop committed
34
35
36
import os
import sys
import time
37
import logging
Iustin Pop's avatar
Iustin Pop committed
38
39
40
41
from optparse import OptionParser

from ganeti import utils
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
45
from ganeti import opcodes
from ganeti import cli
46
from ganeti import luxi
Iustin Pop's avatar
Iustin Pop committed
47
48


49
MAXTRIES = 5
50
BAD_STATES = ['ERROR_down']
51
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
52
53
NOTICE = 'NOTICE'
ERROR = 'ERROR'
54
55
56
KEY_RESTART_COUNT = "restart_count"
KEY_RESTART_WHEN = "restart_when"
KEY_BOOT_ID = "bootid"
57
58


59
60
61
62
# Global client object
client = None


63
class NotMasterError(errors.GenericError):
64
  """Exception raised when this host is not the master."""
Iustin Pop's avatar
Iustin Pop committed
65
66


67
68
69
70
71
72
73
def ShouldPause():
  """Check whether we should pause.

  """
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))


74
75
def EnsureDaemon(name):
  """Check for and start daemon if not alive.
76
77

  """
78
  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
79
  if result.failed:
80
81
82
    logging.error("Can't start daemon '%s', failure %s, output: %s",
                  name, result.fail_reason, result.output)
    return False
83

84
  return True
85
86


87
class WatcherState(object):
Iustin Pop's avatar
Iustin Pop committed
88
89
90
91
  """Interface to a state file recording restart attempts.

  """
  def __init__(self):
92
93
    """Open, lock, read and parse the file.

94
    Raises exception on lock contention.
95
96

    """
Iustin Pop's avatar
Iustin Pop committed
97
98
99
    # The two-step dance below is necessary to allow both opening existing
    # file read/write and creating if not existing.  Vanilla open will truncate
    # an existing file -or- allow creating if not existing.
100
101
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
    self.statefile = os.fdopen(fd, 'w+')
Iustin Pop's avatar
Iustin Pop committed
102

103
    utils.LockFile(self.statefile.fileno())
Iustin Pop's avatar
Iustin Pop committed
104

105
    try:
106
107
108
109
110
      state_data = self.statefile.read()
      if not state_data:
        self._data = {}
      else:
        self._data = serializer.Load(state_data)
Iustin Pop's avatar
Iustin Pop committed
111
    except Exception, msg: # pylint: disable-msg=W0703
112
      # Ignore errors while loading the file and treat it as empty
113
      self._data = {}
114
      logging.warning(("Invalid state file. Using defaults."
115
                       " Error message: %s"), msg)
116

117
118
119
120
    if "instance" not in self._data:
      self._data["instance"] = {}
    if "node" not in self._data:
      self._data["node"] = {}
121

Iustin Pop's avatar
Iustin Pop committed
122
    self._orig_data = serializer.Dump(self._data)
123

124
125
  def Save(self):
    """Save state to file, then unlock and close it.
126
127

    """
128
129
    assert self.statefile

Iustin Pop's avatar
Iustin Pop committed
130
131
    serialized_form = serializer.Dump(self._data)
    if self._orig_data == serialized_form:
132
133
134
135
      logging.debug("Data didn't change, just touching status file")
      os.utime(constants.WATCHER_STATEFILE, None)
      return

136
137
138
    # We need to make sure the file is locked before renaming it, otherwise
    # starting ganeti-watcher again at the same time will create a conflict.
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
Iustin Pop's avatar
Iustin Pop committed
139
                         data=serialized_form,
140
                         prewrite=utils.LockFile, close=False)
141
    self.statefile = os.fdopen(fd, 'w+')
142

143
  def Close(self):
144
145
146
147
148
    """Unlock configuration file and close it.

    """
    assert self.statefile

149
    # Files are automatically unlocked when closing them
150
151
152
153
154
    self.statefile.close()
    self.statefile = None

  def GetNodeBootID(self, name):
    """Returns the last boot ID of a node or None.
Iustin Pop's avatar
Iustin Pop committed
155

156
    """
157
    ndata = self._data["node"]
158

159
160
    if name in ndata and KEY_BOOT_ID in ndata[name]:
      return ndata[name][KEY_BOOT_ID]
161
162
163
164
165
166
167
    return None

  def SetNodeBootID(self, name, bootid):
    """Sets the boot ID of a node.

    """
    assert bootid
Iustin Pop's avatar
Iustin Pop committed
168

169
    ndata = self._data["node"]
Iustin Pop's avatar
Iustin Pop committed
170

171
172
173
    if name not in ndata:
      ndata[name] = {}

174
    ndata[name][KEY_BOOT_ID] = bootid
175
176

  def NumberOfRestartAttempts(self, instance):
Iustin Pop's avatar
Iustin Pop committed
177
178
    """Returns number of previous restart attempts.

Iustin Pop's avatar
Iustin Pop committed
179
180
    @type instance: L{Instance}
    @param instance: the instance to look up
181

Iustin Pop's avatar
Iustin Pop committed
182
    """
183
    idata = self._data["instance"]
Iustin Pop's avatar
Iustin Pop committed
184

185
    if instance.name in idata:
186
      return idata[instance.name][KEY_RESTART_COUNT]
Iustin Pop's avatar
Iustin Pop committed
187
188
189

    return 0

190
  def RecordRestartAttempt(self, instance):
Iustin Pop's avatar
Iustin Pop committed
191
192
    """Record a restart attempt.

Iustin Pop's avatar
Iustin Pop committed
193
194
    @type instance: L{Instance}
    @param instance: the instance being restarted
195

Iustin Pop's avatar
Iustin Pop committed
196
    """
197
    idata = self._data["instance"]
Iustin Pop's avatar
Iustin Pop committed
198

199
200
201
202
    if instance.name not in idata:
      inst = idata[instance.name] = {}
    else:
      inst = idata[instance.name]
Iustin Pop's avatar
Iustin Pop committed
203

204
205
    inst[KEY_RESTART_WHEN] = time.time()
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
Iustin Pop's avatar
Iustin Pop committed
206

207
  def RemoveInstance(self, instance):
Iustin Pop's avatar
Iustin Pop committed
208
    """Update state to reflect that a machine is running.
Iustin Pop's avatar
Iustin Pop committed
209

Iustin Pop's avatar
Iustin Pop committed
210
211
    This method removes the record for a named instance (as we only
    track down instances).
Iustin Pop's avatar
Iustin Pop committed
212

Iustin Pop's avatar
Iustin Pop committed
213
214
    @type instance: L{Instance}
    @param instance: the instance to remove from books
215

Iustin Pop's avatar
Iustin Pop committed
216
    """
217
    idata = self._data["instance"]
Iustin Pop's avatar
Iustin Pop committed
218

219
220
    if instance.name in idata:
      del idata[instance.name]
Iustin Pop's avatar
Iustin Pop committed
221
222
223
224
225
226


class Instance(object):
  """Abstraction for a Virtual Machine instance.

  """
227
  def __init__(self, name, state, autostart):
Iustin Pop's avatar
Iustin Pop committed
228
229
    self.name = name
    self.state = state
230
    self.autostart = autostart
Iustin Pop's avatar
Iustin Pop committed
231
232

  def Restart(self):
233
234
235
    """Encapsulates the start of an instance.

    """
236
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
237
    cli.SubmitOpCode(op, cl=client)
Iustin Pop's avatar
Iustin Pop committed
238

239
240
241
242
  def ActivateDisks(self):
    """Encapsulates the activation of all disks of an instance.

    """
243
244
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
    cli.SubmitOpCode(op, cl=client)
Iustin Pop's avatar
Iustin Pop committed
245
246


247
def GetClusterData():
248
249
250
  """Get a list of instances on this cluster.

  """
251
252
253
254
255
256
  op1_fields = ["name", "status", "admin_state", "snodes"]
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
                                 use_locking=True)
  op2_fields = ["name", "bootid", "offline"]
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
                             use_locking=True)
Iustin Pop's avatar
Iustin Pop committed
257

258
  job_id = client.SubmitJob([op1, op2])
Iustin Pop's avatar
Iustin Pop committed
259

260
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
261

262
263
  logging.debug("Got data from cluster, writing instance status file")

264
265
  result = all_results[0]
  smap = {}
266

267
  instances = {}
268
269
270
271
272

  # write the upfile
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)

273
274
  for fields in result:
    (name, status, autostart, snodes) = fields
275

276
277
278
279
280
    # update the secondary node map
    for node in snodes:
      if node not in smap:
        smap[node] = []
      smap[node].append(name)
Iustin Pop's avatar
Iustin Pop committed
281

282
    instances[name] = Instance(name, status, autostart)
283

284
285
  nodes =  dict([(name, (bootid, offline))
                 for name, bootid, offline in all_results[1]])
286

287
  client.ArchiveJob(job_id)
288

289
  return instances, nodes, smap
Iustin Pop's avatar
Iustin Pop committed
290
291


292
class Watcher(object):
Iustin Pop's avatar
Iustin Pop committed
293
294
295
296
297
  """Encapsulate the logic for restarting erronously halted virtual machines.

  The calling program should periodically instantiate me and call Run().
  This will traverse the list of instances, and make up to MAXTRIES attempts
  to restart machines that are down.
298

Iustin Pop's avatar
Iustin Pop committed
299
  """
300
301
  def __init__(self, opts, notepad):
    self.notepad = notepad
Michael Hanselmann's avatar
Michael Hanselmann committed
302
    master = client.QueryConfigValues(["master_node"])[0]
303
    if master != utils.HostInfo().name:
304
      raise NotMasterError("This is not the master node")
305
306
307
    # first archive old jobs
    self.ArchiveJobs(opts.job_age)
    # and only then submit new ones
308
    self.instances, self.bootids, self.smap = GetClusterData()
309
    self.started_instances = set()
Iustin Pop's avatar
Iustin Pop committed
310
    self.opts = opts
Iustin Pop's avatar
Iustin Pop committed
311
312

  def Run(self):
313
314
315
316
317
318
319
    """Watcher run sequence.

    """
    notepad = self.notepad
    self.CheckInstances(notepad)
    self.CheckDisks(notepad)
    self.VerifyDisks()
320

321
322
  @staticmethod
  def ArchiveJobs(age):
Iustin Pop's avatar
Iustin Pop committed
323
324
325
326
    """Archive old jobs.

    """
    arch_count, left_count = client.AutoArchiveJobs(age)
Iustin Pop's avatar
Iustin Pop committed
327
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
Iustin Pop's avatar
Iustin Pop committed
328

329
330
  def CheckDisks(self, notepad):
    """Check all nodes for restarted ones.
331

Iustin Pop's avatar
Iustin Pop committed
332
    """
333
    check_nodes = []
334
    for name, (new_id, offline) in self.bootids.iteritems():
335
      old = notepad.GetNodeBootID(name)
Iustin Pop's avatar
Iustin Pop committed
336
337
      if new_id is None:
        # Bad node, not returning a boot id
338
339
340
        if not offline:
          logging.debug("Node %s missing boot id, skipping secondary checks",
                        name)
Iustin Pop's avatar
Iustin Pop committed
341
        continue
Iustin Pop's avatar
Iustin Pop committed
342
      if old != new_id:
343
344
345
346
347
348
        # Node's boot ID has changed, proably through a reboot.
        check_nodes.append(name)

    if check_nodes:
      # Activate disks for all instances with any of the checked nodes as a
      # secondary node.
349
350
      for node in check_nodes:
        if node not in self.smap:
351
          continue
352
353
354
355
356
357
358
359
360
361
362
363
364
        for instance_name in self.smap[node]:
          instance = self.instances[instance_name]
          if not instance.autostart:
            logging.info(("Skipping disk activation for non-autostart"
                          " instance %s"), instance.name)
            continue
          if instance.name in self.started_instances:
            # we already tried to start the instance, which should have
            # activated its drives (if they can be at all)
            continue
          try:
            logging.info("Activating disks for instance %s", instance.name)
            instance.ActivateDisks()
Iustin Pop's avatar
Iustin Pop committed
365
          except Exception: # pylint: disable-msg=W0703
366
367
            logging.exception("Error while activating disks for instance %s",
                              instance.name)
368
369
370

      # Keep changed boot IDs
      for name in check_nodes:
Iustin Pop's avatar
Iustin Pop committed
371
        notepad.SetNodeBootID(name, self.bootids[name][0])
Iustin Pop's avatar
Iustin Pop committed
372

373
374
375
376
  def CheckInstances(self, notepad):
    """Make a pass over the list of instances, restarting downed ones.

    """
377
    for instance in self.instances.values():
Iustin Pop's avatar
Iustin Pop committed
378
      if instance.state in BAD_STATES:
379
        n = notepad.NumberOfRestartAttempts(instance)
Iustin Pop's avatar
Iustin Pop committed
380
381
382
383
384
385
386

        if n > MAXTRIES:
          # stay quiet.
          continue
        elif n < MAXTRIES:
          last = " (Attempt #%d)" % (n + 1)
        else:
387
          notepad.RecordRestartAttempt(instance)
388
389
          logging.error("Could not restart %s after %d attempts, giving up",
                        instance.name, MAXTRIES)
Iustin Pop's avatar
Iustin Pop committed
390
391
          continue
        try:
392
393
          logging.info("Restarting %s%s",
                        instance.name, last)
Iustin Pop's avatar
Iustin Pop committed
394
          instance.Restart()
395
          self.started_instances.add(instance.name)
Iustin Pop's avatar
Iustin Pop committed
396
        except Exception: # pylint: disable-msg=W0703
Iustin Pop's avatar
Iustin Pop committed
397
398
          logging.exception("Error while restarting instance %s",
                            instance.name)
Iustin Pop's avatar
Iustin Pop committed
399

400
        notepad.RecordRestartAttempt(instance)
Iustin Pop's avatar
Iustin Pop committed
401
      elif instance.state in HELPLESS_STATES:
402
403
        if notepad.NumberOfRestartAttempts(instance):
          notepad.RemoveInstance(instance)
Iustin Pop's avatar
Iustin Pop committed
404
      else:
405
406
        if notepad.NumberOfRestartAttempts(instance):
          notepad.RemoveInstance(instance)
407
          logging.info("Restart of %s succeeded", instance.name)
Iustin Pop's avatar
Iustin Pop committed
408

409
410
  @staticmethod
  def VerifyDisks():
411
412
413
    """Run gnt-cluster verify-disks.

    """
414
    op = opcodes.OpVerifyDisks()
415
416
417
    job_id = client.SubmitJob([op])
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
    client.ArchiveJob(job_id)
418
419
420
421
422
423
424
425
    if not isinstance(result, (tuple, list)):
      logging.error("Can't get a valid result from verify-disks")
      return
    offline_disk_instances = result[2]
    if not offline_disk_instances:
      # nothing to do
      return
    logging.debug("Will activate disks for instances %s",
426
                  utils.CommaJoin(offline_disk_instances))
427
428
429
430
431
432
    # we submit only one job, and wait for it. not optimal, but spams
    # less the job queue
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
           for name in offline_disk_instances]
    job_id = cli.SendJob(job, cl=client)

433
434
435
436
    try:
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
    except Exception: # pylint: disable-msg=W0703
      logging.exception("Error while activating disks")
Iustin Pop's avatar
Iustin Pop committed
437
438
439
440
441


def ParseOptions():
  """Parse the command line options.

Iustin Pop's avatar
Iustin Pop committed
442
  @return: (options, args) as from OptionParser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
443
444
445
446
447
448
449

  """
  parser = OptionParser(description="Ganeti cluster watcher",
                        usage="%prog [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)

450
  parser.add_option(cli.DEBUG_OPT)
Iustin Pop's avatar
Iustin Pop committed
451
452
453
  parser.add_option("-A", "--job-age", dest="job_age",
                    help="Autoarchive jobs older than this age (default"
                    " 6 hours)", default=6*3600)
Iustin Pop's avatar
Iustin Pop committed
454
  options, args = parser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
455
  options.job_age = cli.ParseTimespec(options.job_age)
Iustin Pop's avatar
Iustin Pop committed
456
457
458
459
460
461
462
  return options, args


def main():
  """Main function.

  """
Iustin Pop's avatar
Iustin Pop committed
463
  global client # pylint: disable-msg=W0603
464

465
466
467
468
469
  options, args = ParseOptions()

  if args: # watcher doesn't take any arguments
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
    sys.exit(constants.EXIT_FAILURE)
Iustin Pop's avatar
Iustin Pop committed
470

Iustin Pop's avatar
Iustin Pop committed
471
472
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
                     stderr_logging=options.debug)
Iustin Pop's avatar
Iustin Pop committed
473

474
475
476
477
  if ShouldPause():
    logging.debug("Pause has been set, exiting")
    sys.exit(constants.EXIT_SUCCESS)

478
  update_file = False
Iustin Pop's avatar
Iustin Pop committed
479
  try:
Guido Trotter's avatar
Guido Trotter committed
480
481
    # on master or not, try to start the node dameon
    EnsureDaemon(constants.NODED)
482
483
    # start confd as well. On non candidates it will be in disabled mode.
    EnsureDaemon(constants.CONFD)
484

485
    notepad = WatcherState()
486
    try:
487
488
489
490
      try:
        client = cli.GetClient()
      except errors.OpPrereqError:
        # this is, from cli.GetClient, a not-master case
491
        logging.debug("Not on master, exiting")
492
        update_file = True
493
        sys.exit(constants.EXIT_SUCCESS)
494
495
496
      except luxi.NoMasterError, err:
        logging.warning("Master seems to be down (%s), trying to restart",
                        str(err))
497
        if not EnsureDaemon(constants.MASTERD):
498
499
500
501
          logging.critical("Can't start the master, exiting")
          sys.exit(constants.EXIT_FAILURE)
        # else retry the connection
        client = cli.GetClient()
502

Guido Trotter's avatar
Guido Trotter committed
503
504
      # we are on master now
      EnsureDaemon(constants.RAPI)
505

506
507
508
509
      try:
        watcher = Watcher(options, notepad)
      except errors.ConfigurationError:
        # Just exit if there's no configuration
510
        update_file = True
511
        sys.exit(constants.EXIT_SUCCESS)
512

513
      watcher.Run()
514
515
      update_file = True

516
    finally:
517
518
519
520
      if update_file:
        notepad.Save()
      else:
        logging.debug("Not updating status file due to failure")
521
522
  except SystemExit:
    raise
523
  except NotMasterError:
524
    logging.debug("Not master, exiting")
525
    sys.exit(constants.EXIT_NOTMASTER)
526
  except errors.ResolverError, err:
527
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
528
    sys.exit(constants.EXIT_NODESETUP_ERROR)
529
530
531
532
  except errors.JobQueueFull:
    logging.error("Job queue is full, can't query cluster state")
  except errors.JobQueueDrainError:
    logging.error("Job queue is drained, can't maintain cluster state")
533
534
535
  except Exception, err:
    logging.error(str(err), exc_info=True)
    sys.exit(constants.EXIT_FAILURE)
Iustin Pop's avatar
Iustin Pop committed
536

537

Iustin Pop's avatar
Iustin Pop committed
538
539
if __name__ == '__main__':
  main()