ganeti-masterd 18.8 KB
Newer Older
1
#!/usr/bin/python
Iustin Pop's avatar
Iustin Pop committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Master daemon program.

Some classes deviates from the standard style guide since the
inheritance from parent classes requires it.

"""

Iustin Pop's avatar
Iustin Pop committed
29 30
# pylint: disable-msg=C0103
# C0103: Invalid name ganeti-masterd
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
33
import sys
Iustin Pop's avatar
Iustin Pop committed
34 35 36 37
import SocketServer
import time
import collections
import signal
38
import logging
Iustin Pop's avatar
Iustin Pop committed
39

40
from optparse import OptionParser
Iustin Pop's avatar
Iustin Pop committed
41

42
from ganeti import config
Iustin Pop's avatar
Iustin Pop committed
43
from ganeti import constants
44
from ganeti import daemon
Iustin Pop's avatar
Iustin Pop committed
45 46 47
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
48
from ganeti import locking
Iustin Pop's avatar
Iustin Pop committed
49 50
from ganeti import luxi
from ganeti import utils
51 52
from ganeti import errors
from ganeti import ssconf
53
from ganeti import workerpool
54
from ganeti import rpc
55
from ganeti import bootstrap
56
from ganeti import serializer
57 58


59 60
CLIENT_REQUEST_WORKERS = 16

61 62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
Iustin Pop's avatar
Iustin Pop committed
63 64


65
class ClientRequestWorker(workerpool.BaseWorker):
Iustin Pop's avatar
Iustin Pop committed
66
   # pylint: disable-msg=W0221
67 68 69 70 71 72 73 74 75
  def RunTask(self, server, request, client_address):
    """Process the request.

    This is copied from the code in ThreadingMixIn.

    """
    try:
      server.finish_request(request, client_address)
      server.close_request(request)
Iustin Pop's avatar
Iustin Pop committed
76
    except: # pylint: disable-msg=W0702
77 78 79 80
      server.handle_error(request, client_address)
      server.close_request(request)


Iustin Pop's avatar
Iustin Pop committed
81 82 83 84 85 86 87 88
class IOServer(SocketServer.UnixStreamServer):
  """IO thread class.

  This class takes care of initializing the other threads, setting
  signal handlers (which are processed only in this thread), and doing
  cleanup at shutdown.

  """
89
  def __init__(self, address, rqhandler):
90 91
    """IOServer constructor

Iustin Pop's avatar
Iustin Pop committed
92 93
    @param address: the address to bind this IOServer to
    @param rqhandler: RequestHandler type object
94 95

    """
Iustin Pop's avatar
Iustin Pop committed
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97 98

    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101 102

  def setup_queue(self):
103
    self.context = GanetiContext()
104 105
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
                                                 ClientRequestWorker)
Iustin Pop's avatar
Iustin Pop committed
106 107

  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
Iustin Pop's avatar
Iustin Pop committed
109 110

    """
111
    self.request_workers.AddTask(self, request, client_address)
Iustin Pop's avatar
Iustin Pop committed
112

113
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
Iustin Pop's avatar
Iustin Pop committed
114
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
Iustin Pop's avatar
Iustin Pop committed
115
    """Handle one request at a time until told to quit."""
116 117 118 119 120 121 122 123
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    # Since we use SignalHandled only once, the resulting dict will map all
    # signals to the same handler. We'll just use the first one.
    sighandler = signal_handlers.values()[0]
    while not sighandler.called:
      self.handle_request()
124 125 126 127 128 129 130 131

  def server_cleanup(self):
    """Cleanup the server.

    This involves shutting down the processor threads and the master
    socket.

    """
132 133 134
    try:
      self.server_close()
    finally:
135
      if self.request_workers:
136
        self.request_workers.TerminateWorkers()
137 138
      if self.context:
        self.context.jobqueue.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
139 140 141 142 143 144 145 146


class ClientRqHandler(SocketServer.BaseRequestHandler):
  """Client handler"""
  EOM = '\3'
  READ_SIZE = 4096

  def setup(self):
Iustin Pop's avatar
Iustin Pop committed
147 148
    # pylint: disable-msg=W0201
    # setup() is the api for initialising for this class
Iustin Pop's avatar
Iustin Pop committed
149 150 151 152 153 154 155 156
    self._buffer = ""
    self._msgs = collections.deque()
    self._ops = ClientOps(self.server)

  def handle(self):
    while True:
      msg = self.read_message()
      if msg is None:
157
        logging.debug("client closed connection")
Iustin Pop's avatar
Iustin Pop committed
158
        break
159

160
      request = serializer.LoadJson(msg)
161
      logging.debug("request: %s", request)
Iustin Pop's avatar
Iustin Pop committed
162
      if not isinstance(request, dict):
163
        logging.error("wrong request received: %s", msg)
Iustin Pop's avatar
Iustin Pop committed
164
        break
165 166 167 168 169

      method = request.get(luxi.KEY_METHOD, None)
      args = request.get(luxi.KEY_ARGS, None)
      if method is None or args is None:
        logging.error("no method or args in request")
Iustin Pop's avatar
Iustin Pop committed
170
        break
171 172 173 174 175

      success = False
      try:
        result = self._ops.handle_request(method, args)
        success = True
176 177
      except errors.GenericError, err:
        success = False
178
        result = errors.EncodeException(err)
179 180 181 182 183 184 185 186 187 188
      except:
        logging.error("Unexpected exception", exc_info=True)
        err = sys.exc_info()
        result = "Caught exception: %s" % str(err[1])

      response = {
        luxi.KEY_SUCCESS: success,
        luxi.KEY_RESULT: result,
        }
      logging.debug("response: %s", response)
189
      self.send_message(serializer.DumpJson(response))
Iustin Pop's avatar
Iustin Pop committed
190 191 192 193 194 195 196 197 198 199 200 201 202

  def read_message(self):
    while not self._msgs:
      data = self.request.recv(self.READ_SIZE)
      if not data:
        return None
      new_msgs = (self._buffer + data).split(self.EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def send_message(self, msg):
    #print "sending", msg
203
    # TODO: sendall is not guaranteed to send everything
Iustin Pop's avatar
Iustin Pop committed
204 205 206 207 208 209 210 211
    self.request.sendall(msg + self.EOM)


class ClientOps:
  """Class holding high-level client operations."""
  def __init__(self, server):
    self.server = server

Iustin Pop's avatar
Iustin Pop committed
212
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213
    queue = self.server.context.jobqueue
214 215 216

    # TODO: Parameter validation

Iustin Pop's avatar
Iustin Pop committed
217 218
    # TODO: Rewrite to not exit in each 'if/elif' branch

219
    if method == luxi.REQ_SUBMIT_JOB:
220
      logging.info("Received new job")
221
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222
      return queue.SubmitJob(ops)
Iustin Pop's avatar
Iustin Pop committed
223

224 225 226 227 228 229 230
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
      logging.info("Received multiple jobs")
      jobs = []
      for ops in args:
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
      return queue.SubmitManyJobs(jobs)

231
    elif method == luxi.REQ_CANCEL_JOB:
232
      job_id = args
233
      logging.info("Received job cancel request for %s", job_id)
234
      return queue.CancelJob(job_id)
Iustin Pop's avatar
Iustin Pop committed
235

236
    elif method == luxi.REQ_ARCHIVE_JOB:
237
      job_id = args
238
      logging.info("Received job archive request for %s", job_id)
239 240
      return queue.ArchiveJob(job_id)

Iustin Pop's avatar
Iustin Pop committed
241
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242
      (age, timeout) = args
243 244
      logging.info("Received job autoarchive request for age %s, timeout %s",
                   age, timeout)
245
      return queue.AutoArchiveJobs(age, timeout)
Iustin Pop's avatar
Iustin Pop committed
246

247
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249
      logging.info("Received job poll request for %s", job_id)
250
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251
                                     prev_log_serial, timeout)
252

253 254
    elif method == luxi.REQ_QUERY_JOBS:
      (job_ids, fields) = args
255
      if isinstance(job_ids, (tuple, list)) and job_ids:
256
        msg = utils.CommaJoin(job_ids)
257 258 259
      else:
        msg = str(job_ids)
      logging.info("Received job query request for %s", msg)
260 261
      return queue.QueryJobs(job_ids, fields)

262
    elif method == luxi.REQ_QUERY_INSTANCES:
263
      (names, fields, use_locking) = args
264
      logging.info("Received instance query request for %s", names)
265
      if use_locking:
266 267
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
268 269
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                    use_locking=use_locking)
270 271
      return self._Query(op)

Michael Hanselmann's avatar
Michael Hanselmann committed
272
    elif method == luxi.REQ_QUERY_NODES:
273
      (names, fields, use_locking) = args
274
      logging.info("Received node query request for %s", names)
275
      if use_locking:
276 277
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
278 279
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                use_locking=use_locking)
Michael Hanselmann's avatar
Michael Hanselmann committed
280 281
      return self._Query(op)

282
    elif method == luxi.REQ_QUERY_EXPORTS:
283
      nodes, use_locking = args
284
      if use_locking:
285 286
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
287
      logging.info("Received exports query request")
288
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289 290
      return self._Query(op)

291 292
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
      fields = args
293
      logging.info("Received config values query request for %s", fields)
294 295 296
      op = opcodes.OpQueryConfigValues(output_fields=fields)
      return self._Query(op)

297
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298
      logging.info("Received cluster info query request")
299 300 301
      op = opcodes.OpQueryClusterInfo()
      return self._Query(op)

Iustin Pop's avatar
Iustin Pop committed
302 303 304 305 306 307
    elif method == luxi.REQ_QUERY_TAGS:
      kind, name = args
      logging.info("Received tags query request")
      op = opcodes.OpGetTags(kind=kind, name=name)
      return self._Query(op)

308 309
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
      drain_flag = args
310 311
      logging.info("Received queue drain flag change request to %s",
                   drain_flag)
312 313
      return queue.SetDrainFlag(drain_flag)

314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
      (until, ) = args

      if until is None:
        logging.info("Received request to no longer pause the watcher")
      else:
        if not isinstance(until, (int, float)):
          raise TypeError("Duration must be an integer or float")

        if until < time.time():
          raise errors.GenericError("Unable to set pause end time in the past")

        logging.info("Received request to pause the watcher until %s", until)

      return _SetWatcherPause(until)

330
    else:
331 332
      logging.info("Received invalid request '%s'", method)
      raise ValueError("Invalid operation '%s'" % method)
Iustin Pop's avatar
Iustin Pop committed
333

334 335 336 337
  def _Query(self, op):
    """Runs the specified opcode and returns the result.

    """
338 339
    # Queries don't have a job id
    proc = mcpu.Processor(self.server.context, None)
340
    return proc.ExecOpCode(op, None)
341

Iustin Pop's avatar
Iustin Pop committed
342

343 344 345 346 347 348
class GanetiContext(object):
  """Context common to all ganeti threads.

  This class creates and holds common objects shared by all threads.

  """
Iustin Pop's avatar
Iustin Pop committed
349 350
  # pylint: disable-msg=W0212
  # we do want to ensure a singleton here
351 352 353 354 355 356 357 358 359 360 361
  _instance = None

  def __init__(self):
    """Constructs a new GanetiContext object.

    There should be only a GanetiContext object at any time, so this
    function raises an error if this is not the case.

    """
    assert self.__class__._instance is None, "double GanetiContext instance"

362
    # Create global configuration object
363
    self.cfg = config.ConfigWriter()
364 365

    # Locking manager
Guido Trotter's avatar
Guido Trotter committed
366
    self.glm = locking.GanetiLockManager(
367 368 369
                self.cfg.GetNodeList(),
                self.cfg.GetInstanceList())

370 371 372
    # Job queue
    self.jobqueue = jqueue.JobQueue(self)

373 374 375 376 377 378 379 380 381 382
    # setting this also locks the class against attribute modifications
    self.__class__._instance = self

  def __setattr__(self, name, value):
    """Setting GanetiContext attributes is forbidden after initialization.

    """
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
    object.__setattr__(self, name, value)

383
  def AddNode(self, node, ec_id):
384 385 386 387
    """Adds a node to the configuration and lock manager.

    """
    # Add it to the configuration
388
    self.cfg.AddNode(node, ec_id)
389

390
    # If preseeding fails it'll not be added
391
    self.jobqueue.AddNode(node)
392

393 394 395 396 397 398 399
    # Add the new node to the Ganeti Lock Manager
    self.glm.add(locking.LEVEL_NODE, node.name)

  def ReaddNode(self, node):
    """Updates a node that's already in the configuration

    """
400
    # Synchronize the queue again
401
    self.jobqueue.AddNode(node)
402 403 404 405 406 407 408 409

  def RemoveNode(self, name):
    """Removes a node from the configuration and lock manager.

    """
    # Remove node from configuration
    self.cfg.RemoveNode(name)

410 411 412
    # Notify job queue
    self.jobqueue.RemoveNode(name)

413 414 415
    # Remove the node from the Ganeti Lock Manager
    self.glm.remove(locking.LEVEL_NODE, name)

416

417 418 419 420 421 422 423 424 425 426 427 428 429
def _SetWatcherPause(until):
  """Creates or removes the watcher pause file.

  @type until: None or int
  @param until: Unix timestamp saying until when the watcher shouldn't run

  """
  if until is None:
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
  else:
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
                    data="%d\n" % (until, ))

430 431
  return until

432

433 434 435 436 437 438 439 440 441
def CheckAgreement():
  """Check the agreement on who is the master.

  The function uses a very simple algorithm: we must get more positive
  than negative answers. Since in most of the cases we are the master,
  we'll use our own config file for getting the node list. In the
  future we could collect the current node list from our (possibly
  obsolete) known nodes.

442 443 444 445 446 447 448 449 450 451
  In order to account for cold-start of all nodes, we retry for up to
  a minute until we get a real answer as the top-voted one. If the
  nodes are more out-of-sync, for now manual startup of the master
  should be attempted.

  Note that for a even number of nodes cluster, we need at least half
  of the nodes (beside ourselves) to vote for us. This creates a
  problem on two-node clusters, since in this case we require the
  other node to be up too to confirm our status.

452 453 454 455 456 457
  """
  myself = utils.HostInfo().name
  #temp instantiation of a config writer, used only to get the node list
  cfg = config.ConfigWriter()
  node_list = cfg.GetNodeList()
  del cfg
458 459 460 461 462 463 464 465 466
  retries = 6
  while retries > 0:
    votes = bootstrap.GatherMasterVotes(node_list)
    if not votes:
      # empty node list, this is a one node cluster
      return True
    if votes[0][0] is None:
      retries -= 1
      time.sleep(10)
467
      continue
468 469
    break
  if retries == 0:
Iustin Pop's avatar
Iustin Pop committed
470 471 472
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                     " after multiple retries. Aborting startup")
    return False
473 474 475
  # here a real node is at the top of the list
  all_votes = sum(item[1] for item in votes)
  top_node, top_votes = votes[0]
476

477 478 479
  result = False
  if top_node != myself:
    logging.critical("It seems we are not the master (top-voted node"
Iustin Pop's avatar
Iustin Pop committed
480 481
                     " is %s with %d out of %d votes)", top_node, top_votes,
                     all_votes)
482
  elif top_votes < all_votes - top_votes:
483
    logging.critical("It seems we are not the master (%d votes for,"
484 485 486 487 488
                     " %d votes against)", top_votes, all_votes - top_votes)
  else:
    result = True

  return result
489

Michael Hanselmann's avatar
Michael Hanselmann committed
490

491
def CheckAgreementWithRpc():
492 493
  rpc.Init()
  try:
494
    return CheckAgreement()
495 496
  finally:
    rpc.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
497

498

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
def _RunInSeparateProcess(fn):
  """Runs a function in a separate process.

  Note: Only boolean return values are supported.

  @type fn: callable
  @param fn: Function to be called
  @rtype: bool

  """
  pid = os.fork()
  if pid == 0:
    # Child process
    try:
      # Call function
      result = int(bool(fn()))
      assert result in (0, 1)
Iustin Pop's avatar
Iustin Pop committed
516
    except: # pylint: disable-msg=W0702
517 518 519 520
      logging.exception("Error while calling function in separate process")
      # 0 and 1 are reserved for the return value
      result = 33

Iustin Pop's avatar
Iustin Pop committed
521
    os._exit(result) # pylint: disable-msg=W0212
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542

  # Parent process

  # Avoid zombies and check exit code
  (_, status) = os.waitpid(pid, 0)

  if os.WIFSIGNALED(status):
    signum = os.WTERMSIG(status)
    exitcode = None
  else:
    signum = None
    exitcode = os.WEXITSTATUS(status)

  if not (exitcode in (0, 1) and signum is None):
    logging.error("Child program failed (code=%s, signal=%s)",
                  exitcode, signum)
    sys.exit(constants.EXIT_FAILURE)

  return bool(exitcode)


543 544 545 546
def CheckMasterd(options, args):
  """Initial checks whether to run or exit with a failure.

  """
547 548 549 550
  if args: # masterd doesn't take any arguments
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
    sys.exit(constants.EXIT_FAILURE)

551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
  ssconf.CheckMaster(options.debug)

  # If CheckMaster didn't fail we believe we are the master, but we have to
  # confirm with the other nodes.
  if options.no_voting:
    if options.yes_do_it:
      return

    sys.stdout.write("The 'no voting' option has been selected.\n")
    sys.stdout.write("This is dangerous, please confirm by"
                     " typing uppercase 'yes': ")
    sys.stdout.flush()

    confirmation = sys.stdin.readline().strip()
    if confirmation != "YES":
Iustin Pop's avatar
Iustin Pop committed
566
      print >> sys.stderr, "Aborting."
567 568 569 570 571 572 573 574 575 576
      sys.exit(constants.EXIT_FAILURE)

    return

  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
  # process before we call utils.Daemonize in the current process.
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
    sys.exit(constants.EXIT_FAILURE)


577
def ExecMasterd (options, args): # pylint: disable-msg=W0613
Michael Hanselmann's avatar
Michael Hanselmann committed
578
  """Main master daemon function, executed with the PID file held.
579

580 581 582 583
  """
  # This is safe to do as the pid file guarantees against
  # concurrent execution.
  utils.RemoveFile(constants.MASTER_SOCKET)
584

585 586
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
  try:
587
    rpc.Init()
588
    try:
589
      # activate ip
590
      master_node = ssconf.SimpleStore().GetMasterNode()
591
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
592
      msg = result.fail_msg
593 594
      if msg:
        logging.error("Can't activate master IP address: %s", msg)
595 596 597 598 599 600

      master.setup_queue()
      try:
        master.serve_forever()
      finally:
        master.server_cleanup()
601
    finally:
602
      rpc.Shutdown()
603
  finally:
604
    utils.RemoveFile(constants.MASTER_SOCKET)
605

Iustin Pop's avatar
Iustin Pop committed
606

607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
def main():
  """Main function"""
  parser = OptionParser(description="Ganeti master daemon",
                        usage="%prog [-f] [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)
  parser.add_option("--no-voting", dest="no_voting",
                    help="Do not check that the nodes agree on this node"
                    " being the master and start the daemon unconditionally",
                    default=False, action="store_true")
  parser.add_option("--yes-do-it", dest="yes_do_it",
                    help="Override interactive check for --no-voting",
                    default=False, action="store_true")
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
         ]
  daemon.GenericMain(constants.MASTERD, parser, dirs,
Michael Hanselmann's avatar
Michael Hanselmann committed
624 625
                     CheckMasterd, ExecMasterd)

626

Iustin Pop's avatar
Iustin Pop committed
627 628
if __name__ == "__main__":
  main()