ganeti-watcher 15.4 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#!/usr/bin/python
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Tool to restart erronously downed virtual machines.

This program and set of classes implement a watchdog to restart
virtual machines in a Ganeti cluster that have crashed or been killed
by a node reboot.  Run from cron or similar.

28
"""
Iustin Pop's avatar
Iustin Pop committed
29
30
31
32

import os
import sys
import time
33
import logging
34
import errno
Iustin Pop's avatar
Iustin Pop committed
35
36
37
38
from optparse import OptionParser

from ganeti import utils
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import errors
41
42
from ganeti import opcodes
from ganeti import cli
43
from ganeti import luxi
Iustin Pop's avatar
Iustin Pop committed
44
45


46
MAXTRIES = 5
47
BAD_STATES = ['ERROR_down']
48
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
49
50
NOTICE = 'NOTICE'
ERROR = 'ERROR'
51
52
53
KEY_RESTART_COUNT = "restart_count"
KEY_RESTART_WHEN = "restart_when"
KEY_BOOT_ID = "bootid"
54
55


56
57
58
59
# Global client object
client = None


60
class NotMasterError(errors.GenericError):
61
  """Exception raised when this host is not the master."""
Iustin Pop's avatar
Iustin Pop committed
62
63
64
65
66


def Indent(s, prefix='| '):
  """Indent a piece of text with a given prefix before each line.

Iustin Pop's avatar
Iustin Pop committed
67
68
  @param s: the string to indent
  @param prefix: the string to prepend each line
69

Iustin Pop's avatar
Iustin Pop committed
70
71
72
73
  """
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))


74
75
76
77
78
79
80
def ShouldPause():
  """Check whether we should pause.

  """
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))


81
82
def EnsureDaemon(name):
  """Check for and start daemon if not alive.
83
84

  """
85
  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
86
  if result.failed:
87
88
89
    logging.error("Can't start daemon '%s', failure %s, output: %s",
                  name, result.fail_reason, result.output)
    return False
90

91
  return True
92
93


94
class WatcherState(object):
Iustin Pop's avatar
Iustin Pop committed
95
96
97
98
  """Interface to a state file recording restart attempts.

  """
  def __init__(self):
99
100
    """Open, lock, read and parse the file.

101
    Raises exception on lock contention.
102
103

    """
Iustin Pop's avatar
Iustin Pop committed
104
105
106
    # The two-step dance below is necessary to allow both opening existing
    # file read/write and creating if not existing.  Vanilla open will truncate
    # an existing file -or- allow creating if not existing.
107
108
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
    self.statefile = os.fdopen(fd, 'w+')
Iustin Pop's avatar
Iustin Pop committed
109

110
    utils.LockFile(self.statefile.fileno())
Iustin Pop's avatar
Iustin Pop committed
111

112
    try:
113
114
115
116
117
      state_data = self.statefile.read()
      if not state_data:
        self._data = {}
      else:
        self._data = serializer.Load(state_data)
118
119
    except Exception, msg:
      # Ignore errors while loading the file and treat it as empty
120
      self._data = {}
121
      logging.warning(("Invalid state file. Using defaults."
122
                       " Error message: %s"), msg)
123

124
125
126
127
    if "instance" not in self._data:
      self._data["instance"] = {}
    if "node" not in self._data:
      self._data["node"] = {}
128

Iustin Pop's avatar
Iustin Pop committed
129
    self._orig_data = serializer.Dump(self._data)
130

131
132
  def Save(self):
    """Save state to file, then unlock and close it.
133
134

    """
135
136
    assert self.statefile

Iustin Pop's avatar
Iustin Pop committed
137
138
    serialized_form = serializer.Dump(self._data)
    if self._orig_data == serialized_form:
139
140
141
142
      logging.debug("Data didn't change, just touching status file")
      os.utime(constants.WATCHER_STATEFILE, None)
      return

143
144
145
    # We need to make sure the file is locked before renaming it, otherwise
    # starting ganeti-watcher again at the same time will create a conflict.
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
Iustin Pop's avatar
Iustin Pop committed
146
                         data=serialized_form,
147
                         prewrite=utils.LockFile, close=False)
148
    self.statefile = os.fdopen(fd, 'w+')
149

150
  def Close(self):
151
152
153
154
155
    """Unlock configuration file and close it.

    """
    assert self.statefile

156
    # Files are automatically unlocked when closing them
157
158
159
160
161
    self.statefile.close()
    self.statefile = None

  def GetNodeBootID(self, name):
    """Returns the last boot ID of a node or None.
Iustin Pop's avatar
Iustin Pop committed
162

163
    """
164
    ndata = self._data["node"]
165

166
167
    if name in ndata and KEY_BOOT_ID in ndata[name]:
      return ndata[name][KEY_BOOT_ID]
168
169
170
171
172
173
174
    return None

  def SetNodeBootID(self, name, bootid):
    """Sets the boot ID of a node.

    """
    assert bootid
Iustin Pop's avatar
Iustin Pop committed
175

176
    ndata = self._data["node"]
Iustin Pop's avatar
Iustin Pop committed
177

178
179
180
    if name not in ndata:
      ndata[name] = {}

181
    ndata[name][KEY_BOOT_ID] = bootid
182
183

  def NumberOfRestartAttempts(self, instance):
Iustin Pop's avatar
Iustin Pop committed
184
185
    """Returns number of previous restart attempts.

Iustin Pop's avatar
Iustin Pop committed
186
187
    @type instance: L{Instance}
    @param instance: the instance to look up
188

Iustin Pop's avatar
Iustin Pop committed
189
    """
190
    idata = self._data["instance"]
Iustin Pop's avatar
Iustin Pop committed
191

192
    if instance.name in idata:
193
      return idata[instance.name][KEY_RESTART_COUNT]
Iustin Pop's avatar
Iustin Pop committed
194
195
196

    return 0

197
  def RecordRestartAttempt(self, instance):
Iustin Pop's avatar
Iustin Pop committed
198
199
    """Record a restart attempt.

Iustin Pop's avatar
Iustin Pop committed
200
201
    @type instance: L{Instance}
    @param instance: the instance being restarted
202

Iustin Pop's avatar
Iustin Pop committed
203
    """
204
    idata = self._data["instance"]
Iustin Pop's avatar
Iustin Pop committed
205

206
207
208
209
    if instance.name not in idata:
      inst = idata[instance.name] = {}
    else:
      inst = idata[instance.name]
Iustin Pop's avatar
Iustin Pop committed
210

211
212
    inst[KEY_RESTART_WHEN] = time.time()
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
Iustin Pop's avatar
Iustin Pop committed
213

214
  def RemoveInstance(self, instance):
Iustin Pop's avatar
Iustin Pop committed
215
    """Update state to reflect that a machine is running.
Iustin Pop's avatar
Iustin Pop committed
216

Iustin Pop's avatar
Iustin Pop committed
217
218
    This method removes the record for a named instance (as we only
    track down instances).
Iustin Pop's avatar
Iustin Pop committed
219

Iustin Pop's avatar
Iustin Pop committed
220
221
    @type instance: L{Instance}
    @param instance: the instance to remove from books
222

Iustin Pop's avatar
Iustin Pop committed
223
    """
224
    idata = self._data["instance"]
Iustin Pop's avatar
Iustin Pop committed
225

226
227
    if instance.name in idata:
      del idata[instance.name]
Iustin Pop's avatar
Iustin Pop committed
228
229
230
231
232
233


class Instance(object):
  """Abstraction for a Virtual Machine instance.

  """
234
  def __init__(self, name, state, autostart):
Iustin Pop's avatar
Iustin Pop committed
235
236
    self.name = name
    self.state = state
237
    self.autostart = autostart
Iustin Pop's avatar
Iustin Pop committed
238
239

  def Restart(self):
240
241
242
    """Encapsulates the start of an instance.

    """
243
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
244
    cli.SubmitOpCode(op, cl=client)
Iustin Pop's avatar
Iustin Pop committed
245

246
247
248
249
  def ActivateDisks(self):
    """Encapsulates the activation of all disks of an instance.

    """
250
251
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
    cli.SubmitOpCode(op, cl=client)
Iustin Pop's avatar
Iustin Pop committed
252
253


254
def GetClusterData():
255
256
257
  """Get a list of instances on this cluster.

  """
258
259
260
261
262
263
  op1_fields = ["name", "status", "admin_state", "snodes"]
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
                                 use_locking=True)
  op2_fields = ["name", "bootid", "offline"]
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
                             use_locking=True)
Iustin Pop's avatar
Iustin Pop committed
264

265
  job_id = client.SubmitJob([op1, op2])
Iustin Pop's avatar
Iustin Pop committed
266

267
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
268

269
270
  logging.debug("Got data from cluster, writing instance status file")

271
272
  result = all_results[0]
  smap = {}
273

274
  instances = {}
275
276
277
278
279

  # write the upfile
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)

280
281
  for fields in result:
    (name, status, autostart, snodes) = fields
282

283
284
285
286
287
    # update the secondary node map
    for node in snodes:
      if node not in smap:
        smap[node] = []
      smap[node].append(name)
Iustin Pop's avatar
Iustin Pop committed
288

289
    instances[name] = Instance(name, status, autostart)
290

291
292
  nodes =  dict([(name, (bootid, offline))
                 for name, bootid, offline in all_results[1]])
293

294
  client.ArchiveJob(job_id)
295

296
  return instances, nodes, smap
Iustin Pop's avatar
Iustin Pop committed
297
298


299
class Watcher(object):
Iustin Pop's avatar
Iustin Pop committed
300
301
302
303
304
  """Encapsulate the logic for restarting erronously halted virtual machines.

  The calling program should periodically instantiate me and call Run().
  This will traverse the list of instances, and make up to MAXTRIES attempts
  to restart machines that are down.
305

Iustin Pop's avatar
Iustin Pop committed
306
  """
307
308
  def __init__(self, opts, notepad):
    self.notepad = notepad
Michael Hanselmann's avatar
Michael Hanselmann committed
309
    master = client.QueryConfigValues(["master_node"])[0]
310
    if master != utils.HostInfo().name:
311
      raise NotMasterError("This is not the master node")
312
313
314
    # first archive old jobs
    self.ArchiveJobs(opts.job_age)
    # and only then submit new ones
315
    self.instances, self.bootids, self.smap = GetClusterData()
316
    self.started_instances = set()
Iustin Pop's avatar
Iustin Pop committed
317
    self.opts = opts
Iustin Pop's avatar
Iustin Pop committed
318
319

  def Run(self):
320
321
322
323
324
325
326
    """Watcher run sequence.

    """
    notepad = self.notepad
    self.CheckInstances(notepad)
    self.CheckDisks(notepad)
    self.VerifyDisks()
327

328
329
  @staticmethod
  def ArchiveJobs(age):
Iustin Pop's avatar
Iustin Pop committed
330
331
332
333
    """Archive old jobs.

    """
    arch_count, left_count = client.AutoArchiveJobs(age)
Iustin Pop's avatar
Iustin Pop committed
334
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
Iustin Pop's avatar
Iustin Pop committed
335

336
337
  def CheckDisks(self, notepad):
    """Check all nodes for restarted ones.
338

Iustin Pop's avatar
Iustin Pop committed
339
    """
340
    check_nodes = []
341
    for name, (new_id, offline) in self.bootids.iteritems():
342
      old = notepad.GetNodeBootID(name)
Iustin Pop's avatar
Iustin Pop committed
343
344
      if new_id is None:
        # Bad node, not returning a boot id
345
346
347
        if not offline:
          logging.debug("Node %s missing boot id, skipping secondary checks",
                        name)
Iustin Pop's avatar
Iustin Pop committed
348
        continue
Iustin Pop's avatar
Iustin Pop committed
349
      if old != new_id:
350
351
352
353
354
355
        # Node's boot ID has changed, proably through a reboot.
        check_nodes.append(name)

    if check_nodes:
      # Activate disks for all instances with any of the checked nodes as a
      # secondary node.
356
357
      for node in check_nodes:
        if node not in self.smap:
358
          continue
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
        for instance_name in self.smap[node]:
          instance = self.instances[instance_name]
          if not instance.autostart:
            logging.info(("Skipping disk activation for non-autostart"
                          " instance %s"), instance.name)
            continue
          if instance.name in self.started_instances:
            # we already tried to start the instance, which should have
            # activated its drives (if they can be at all)
            continue
          try:
            logging.info("Activating disks for instance %s", instance.name)
            instance.ActivateDisks()
          except Exception:
            logging.exception("Error while activating disks for instance %s",
                              instance.name)
375
376
377

      # Keep changed boot IDs
      for name in check_nodes:
Iustin Pop's avatar
Iustin Pop committed
378
        notepad.SetNodeBootID(name, self.bootids[name][0])
Iustin Pop's avatar
Iustin Pop committed
379

380
381
382
383
  def CheckInstances(self, notepad):
    """Make a pass over the list of instances, restarting downed ones.

    """
384
    for instance in self.instances.values():
Iustin Pop's avatar
Iustin Pop committed
385
      if instance.state in BAD_STATES:
386
        n = notepad.NumberOfRestartAttempts(instance)
Iustin Pop's avatar
Iustin Pop committed
387
388
389
390
391
392
393

        if n > MAXTRIES:
          # stay quiet.
          continue
        elif n < MAXTRIES:
          last = " (Attempt #%d)" % (n + 1)
        else:
394
          notepad.RecordRestartAttempt(instance)
395
396
          logging.error("Could not restart %s after %d attempts, giving up",
                        instance.name, MAXTRIES)
Iustin Pop's avatar
Iustin Pop committed
397
398
          continue
        try:
399
400
          logging.info("Restarting %s%s",
                        instance.name, last)
Iustin Pop's avatar
Iustin Pop committed
401
          instance.Restart()
402
          self.started_instances.add(instance.name)
403
        except Exception:
Iustin Pop's avatar
Iustin Pop committed
404
405
          logging.exception("Error while restarting instance %s",
                            instance.name)
Iustin Pop's avatar
Iustin Pop committed
406

407
        notepad.RecordRestartAttempt(instance)
Iustin Pop's avatar
Iustin Pop committed
408
      elif instance.state in HELPLESS_STATES:
409
410
        if notepad.NumberOfRestartAttempts(instance):
          notepad.RemoveInstance(instance)
Iustin Pop's avatar
Iustin Pop committed
411
      else:
412
413
        if notepad.NumberOfRestartAttempts(instance):
          notepad.RemoveInstance(instance)
414
          logging.info("Restart of %s succeeded", instance.name)
Iustin Pop's avatar
Iustin Pop committed
415

416
417
  @staticmethod
  def VerifyDisks():
418
419
420
    """Run gnt-cluster verify-disks.

    """
421
    op = opcodes.OpVerifyDisks()
422
423
424
    job_id = client.SubmitJob([op])
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
    client.ArchiveJob(job_id)
425
426
427
428
429
430
431
432
    if not isinstance(result, (tuple, list)):
      logging.error("Can't get a valid result from verify-disks")
      return
    offline_disk_instances = result[2]
    if not offline_disk_instances:
      # nothing to do
      return
    logging.debug("Will activate disks for instances %s",
433
                  utils.CommaJoin(offline_disk_instances))
434
435
436
437
438
439
440
    # we submit only one job, and wait for it. not optimal, but spams
    # less the job queue
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
           for name in offline_disk_instances]
    job_id = cli.SendJob(job, cl=client)

    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
Iustin Pop's avatar
Iustin Pop committed
441
442
443
444
445


def ParseOptions():
  """Parse the command line options.

Iustin Pop's avatar
Iustin Pop committed
446
  @return: (options, args) as from OptionParser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
447
448
449
450
451
452
453

  """
  parser = OptionParser(description="Ganeti cluster watcher",
                        usage="%prog [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)

454
  parser.add_option(cli.DEBUG_OPT)
Iustin Pop's avatar
Iustin Pop committed
455
456
457
  parser.add_option("-A", "--job-age", dest="job_age",
                    help="Autoarchive jobs older than this age (default"
                    " 6 hours)", default=6*3600)
Iustin Pop's avatar
Iustin Pop committed
458
  options, args = parser.parse_args()
Iustin Pop's avatar
Iustin Pop committed
459
  options.job_age = cli.ParseTimespec(options.job_age)
Iustin Pop's avatar
Iustin Pop committed
460
461
462
463
464
465
466
  return options, args


def main():
  """Main function.

  """
467
468
  global client

Iustin Pop's avatar
Iustin Pop committed
469
470
  options, args = ParseOptions()

Iustin Pop's avatar
Iustin Pop committed
471
472
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
                     stderr_logging=options.debug)
Iustin Pop's avatar
Iustin Pop committed
473

474
475
476
477
  if ShouldPause():
    logging.debug("Pause has been set, exiting")
    sys.exit(constants.EXIT_SUCCESS)

478
  update_file = False
Iustin Pop's avatar
Iustin Pop committed
479
  try:
Guido Trotter's avatar
Guido Trotter committed
480
481
    # on master or not, try to start the node dameon
    EnsureDaemon(constants.NODED)
482

483
    notepad = WatcherState()
484
    try:
485
486
487
488
      try:
        client = cli.GetClient()
      except errors.OpPrereqError:
        # this is, from cli.GetClient, a not-master case
489
        logging.debug("Not on master, exiting")
490
        update_file = True
491
        sys.exit(constants.EXIT_SUCCESS)
492
493
494
      except luxi.NoMasterError, err:
        logging.warning("Master seems to be down (%s), trying to restart",
                        str(err))
495
        if not EnsureDaemon(constants.MASTERD):
496
497
498
499
          logging.critical("Can't start the master, exiting")
          sys.exit(constants.EXIT_FAILURE)
        # else retry the connection
        client = cli.GetClient()
500

Guido Trotter's avatar
Guido Trotter committed
501
502
      # we are on master now
      EnsureDaemon(constants.RAPI)
503

504
505
506
507
      try:
        watcher = Watcher(options, notepad)
      except errors.ConfigurationError:
        # Just exit if there's no configuration
508
        update_file = True
509
        sys.exit(constants.EXIT_SUCCESS)
510

511
      watcher.Run()
512
513
      update_file = True

514
    finally:
515
516
517
518
      if update_file:
        notepad.Save()
      else:
        logging.debug("Not updating status file due to failure")
519
520
  except SystemExit:
    raise
521
  except NotMasterError:
522
    logging.debug("Not master, exiting")
523
    sys.exit(constants.EXIT_NOTMASTER)
524
  except errors.ResolverError, err:
525
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
526
    sys.exit(constants.EXIT_NODESETUP_ERROR)
527
528
529
530
  except errors.JobQueueFull:
    logging.error("Job queue is full, can't query cluster state")
  except errors.JobQueueDrainError:
    logging.error("Job queue is drained, can't maintain cluster state")
531
532
533
  except Exception, err:
    logging.error(str(err), exc_info=True)
    sys.exit(constants.EXIT_FAILURE)
Iustin Pop's avatar
Iustin Pop committed
534

535

Iustin Pop's avatar
Iustin Pop committed
536
537
if __name__ == '__main__':
  main()