ganeti-masterd 18.9 KB
Newer Older
1
#!/usr/bin/python
Iustin Pop's avatar
Iustin Pop committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Master daemon program.

Some classes deviates from the standard style guide since the
inheritance from parent classes requires it.

"""

Iustin Pop's avatar
Iustin Pop committed
29 30
# pylint: disable-msg=C0103
# C0103: Invalid name ganeti-masterd
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
33
import sys
Iustin Pop's avatar
Iustin Pop committed
34 35 36 37
import SocketServer
import time
import collections
import signal
38
import logging
Iustin Pop's avatar
Iustin Pop committed
39

40
from optparse import OptionParser
Iustin Pop's avatar
Iustin Pop committed
41

42
from ganeti import config
Iustin Pop's avatar
Iustin Pop committed
43
from ganeti import constants
44
from ganeti import daemon
Iustin Pop's avatar
Iustin Pop committed
45 46 47
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
48
from ganeti import locking
Iustin Pop's avatar
Iustin Pop committed
49 50
from ganeti import luxi
from ganeti import utils
51 52
from ganeti import errors
from ganeti import ssconf
53
from ganeti import workerpool
54
from ganeti import rpc
55
from ganeti import bootstrap
56
from ganeti import serializer
57 58


59 60
CLIENT_REQUEST_WORKERS = 16

61 62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
Iustin Pop's avatar
Iustin Pop committed
63 64


65
class ClientRequestWorker(workerpool.BaseWorker):
Iustin Pop's avatar
Iustin Pop committed
66
   # pylint: disable-msg=W0221
67 68 69 70 71 72 73 74 75
  def RunTask(self, server, request, client_address):
    """Process the request.

    This is copied from the code in ThreadingMixIn.

    """
    try:
      server.finish_request(request, client_address)
      server.close_request(request)
Iustin Pop's avatar
Iustin Pop committed
76
    except: # pylint: disable-msg=W0702
77 78 79 80
      server.handle_error(request, client_address)
      server.close_request(request)


Iustin Pop's avatar
Iustin Pop committed
81 82 83 84 85 86 87 88
class IOServer(SocketServer.UnixStreamServer):
  """IO thread class.

  This class takes care of initializing the other threads, setting
  signal handlers (which are processed only in this thread), and doing
  cleanup at shutdown.

  """
89
  def __init__(self, address, rqhandler):
90 91
    """IOServer constructor

Iustin Pop's avatar
Iustin Pop committed
92 93
    @param address: the address to bind this IOServer to
    @param rqhandler: RequestHandler type object
94 95

    """
Iustin Pop's avatar
Iustin Pop committed
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97 98

    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101 102

  def setup_queue(self):
103
    self.context = GanetiContext()
104 105
    self.request_workers = workerpool.WorkerPool("ClientReq",
                                                 CLIENT_REQUEST_WORKERS,
106
                                                 ClientRequestWorker)
Iustin Pop's avatar
Iustin Pop committed
107 108

  def process_request(self, request, client_address):
109
    """Add task to workerpool to process request.
Iustin Pop's avatar
Iustin Pop committed
110 111

    """
112
    self.request_workers.AddTask(self, request, client_address)
Iustin Pop's avatar
Iustin Pop committed
113

114
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
Iustin Pop's avatar
Iustin Pop committed
115
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
Iustin Pop's avatar
Iustin Pop committed
116
    """Handle one request at a time until told to quit."""
117 118 119 120 121 122 123 124
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    # Since we use SignalHandled only once, the resulting dict will map all
    # signals to the same handler. We'll just use the first one.
    sighandler = signal_handlers.values()[0]
    while not sighandler.called:
      self.handle_request()
125 126 127 128 129 130 131 132

  def server_cleanup(self):
    """Cleanup the server.

    This involves shutting down the processor threads and the master
    socket.

    """
133 134 135
    try:
      self.server_close()
    finally:
136
      if self.request_workers:
137
        self.request_workers.TerminateWorkers()
138 139
      if self.context:
        self.context.jobqueue.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
140 141 142 143 144 145 146 147


class ClientRqHandler(SocketServer.BaseRequestHandler):
  """Client handler"""
  EOM = '\3'
  READ_SIZE = 4096

  def setup(self):
Iustin Pop's avatar
Iustin Pop committed
148 149
    # pylint: disable-msg=W0201
    # setup() is the api for initialising for this class
Iustin Pop's avatar
Iustin Pop committed
150 151 152 153 154 155 156 157
    self._buffer = ""
    self._msgs = collections.deque()
    self._ops = ClientOps(self.server)

  def handle(self):
    while True:
      msg = self.read_message()
      if msg is None:
158
        logging.debug("client closed connection")
Iustin Pop's avatar
Iustin Pop committed
159
        break
160

161
      request = serializer.LoadJson(msg)
162
      logging.debug("request: %s", request)
Iustin Pop's avatar
Iustin Pop committed
163
      if not isinstance(request, dict):
164
        logging.error("wrong request received: %s", msg)
Iustin Pop's avatar
Iustin Pop committed
165
        break
166 167 168 169 170

      method = request.get(luxi.KEY_METHOD, None)
      args = request.get(luxi.KEY_ARGS, None)
      if method is None or args is None:
        logging.error("no method or args in request")
Iustin Pop's avatar
Iustin Pop committed
171
        break
172 173 174 175 176

      success = False
      try:
        result = self._ops.handle_request(method, args)
        success = True
177 178
      except errors.GenericError, err:
        success = False
179
        result = errors.EncodeException(err)
180 181 182 183 184 185 186 187 188 189
      except:
        logging.error("Unexpected exception", exc_info=True)
        err = sys.exc_info()
        result = "Caught exception: %s" % str(err[1])

      response = {
        luxi.KEY_SUCCESS: success,
        luxi.KEY_RESULT: result,
        }
      logging.debug("response: %s", response)
190
      self.send_message(serializer.DumpJson(response))
Iustin Pop's avatar
Iustin Pop committed
191 192 193 194 195 196 197 198 199 200 201 202 203

  def read_message(self):
    while not self._msgs:
      data = self.request.recv(self.READ_SIZE)
      if not data:
        return None
      new_msgs = (self._buffer + data).split(self.EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def send_message(self, msg):
    #print "sending", msg
204
    # TODO: sendall is not guaranteed to send everything
Iustin Pop's avatar
Iustin Pop committed
205 206 207 208 209 210 211 212
    self.request.sendall(msg + self.EOM)


class ClientOps:
  """Class holding high-level client operations."""
  def __init__(self, server):
    self.server = server

Iustin Pop's avatar
Iustin Pop committed
213
  def handle_request(self, method, args): # pylint: disable-msg=R0911
214
    queue = self.server.context.jobqueue
215 216 217

    # TODO: Parameter validation

Iustin Pop's avatar
Iustin Pop committed
218 219
    # TODO: Rewrite to not exit in each 'if/elif' branch

220
    if method == luxi.REQ_SUBMIT_JOB:
221
      logging.info("Received new job")
222
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
223
      return queue.SubmitJob(ops)
Iustin Pop's avatar
Iustin Pop committed
224

225 226 227 228 229 230 231
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
      logging.info("Received multiple jobs")
      jobs = []
      for ops in args:
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
      return queue.SubmitManyJobs(jobs)

232
    elif method == luxi.REQ_CANCEL_JOB:
233
      job_id = args
234
      logging.info("Received job cancel request for %s", job_id)
235
      return queue.CancelJob(job_id)
Iustin Pop's avatar
Iustin Pop committed
236

237
    elif method == luxi.REQ_ARCHIVE_JOB:
238
      job_id = args
239
      logging.info("Received job archive request for %s", job_id)
240 241
      return queue.ArchiveJob(job_id)

Iustin Pop's avatar
Iustin Pop committed
242
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
243
      (age, timeout) = args
244 245
      logging.info("Received job autoarchive request for age %s, timeout %s",
                   age, timeout)
246
      return queue.AutoArchiveJobs(age, timeout)
Iustin Pop's avatar
Iustin Pop committed
247

248
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
249
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
250
      logging.info("Received job poll request for %s", job_id)
251
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
252
                                     prev_log_serial, timeout)
253

254 255
    elif method == luxi.REQ_QUERY_JOBS:
      (job_ids, fields) = args
256
      if isinstance(job_ids, (tuple, list)) and job_ids:
257
        msg = utils.CommaJoin(job_ids)
258 259 260
      else:
        msg = str(job_ids)
      logging.info("Received job query request for %s", msg)
261 262
      return queue.QueryJobs(job_ids, fields)

263
    elif method == luxi.REQ_QUERY_INSTANCES:
264
      (names, fields, use_locking) = args
265
      logging.info("Received instance query request for %s", names)
266
      if use_locking:
267 268
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
269 270
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                    use_locking=use_locking)
271 272
      return self._Query(op)

Michael Hanselmann's avatar
Michael Hanselmann committed
273
    elif method == luxi.REQ_QUERY_NODES:
274
      (names, fields, use_locking) = args
275
      logging.info("Received node query request for %s", names)
276
      if use_locking:
277 278
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
279 280
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                use_locking=use_locking)
Michael Hanselmann's avatar
Michael Hanselmann committed
281 282
      return self._Query(op)

283
    elif method == luxi.REQ_QUERY_EXPORTS:
284
      nodes, use_locking = args
285
      if use_locking:
286 287
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
288
      logging.info("Received exports query request")
289
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
290 291
      return self._Query(op)

292 293
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
      fields = args
294
      logging.info("Received config values query request for %s", fields)
295 296 297
      op = opcodes.OpQueryConfigValues(output_fields=fields)
      return self._Query(op)

298
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
299
      logging.info("Received cluster info query request")
300 301 302
      op = opcodes.OpQueryClusterInfo()
      return self._Query(op)

Iustin Pop's avatar
Iustin Pop committed
303 304 305 306 307 308
    elif method == luxi.REQ_QUERY_TAGS:
      kind, name = args
      logging.info("Received tags query request")
      op = opcodes.OpGetTags(kind=kind, name=name)
      return self._Query(op)

309 310
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
      drain_flag = args
311 312
      logging.info("Received queue drain flag change request to %s",
                   drain_flag)
313 314
      return queue.SetDrainFlag(drain_flag)

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
      (until, ) = args

      if until is None:
        logging.info("Received request to no longer pause the watcher")
      else:
        if not isinstance(until, (int, float)):
          raise TypeError("Duration must be an integer or float")

        if until < time.time():
          raise errors.GenericError("Unable to set pause end time in the past")

        logging.info("Received request to pause the watcher until %s", until)

      return _SetWatcherPause(until)

331
    else:
332 333
      logging.info("Received invalid request '%s'", method)
      raise ValueError("Invalid operation '%s'" % method)
Iustin Pop's avatar
Iustin Pop committed
334

335 336 337 338
  def _Query(self, op):
    """Runs the specified opcode and returns the result.

    """
339 340
    # Queries don't have a job id
    proc = mcpu.Processor(self.server.context, None)
341
    return proc.ExecOpCode(op, None)
342

Iustin Pop's avatar
Iustin Pop committed
343

344 345 346 347 348 349
class GanetiContext(object):
  """Context common to all ganeti threads.

  This class creates and holds common objects shared by all threads.

  """
Iustin Pop's avatar
Iustin Pop committed
350 351
  # pylint: disable-msg=W0212
  # we do want to ensure a singleton here
352 353 354 355 356 357 358 359 360 361 362
  _instance = None

  def __init__(self):
    """Constructs a new GanetiContext object.

    There should be only a GanetiContext object at any time, so this
    function raises an error if this is not the case.

    """
    assert self.__class__._instance is None, "double GanetiContext instance"

363
    # Create global configuration object
364
    self.cfg = config.ConfigWriter()
365 366

    # Locking manager
Guido Trotter's avatar
Guido Trotter committed
367
    self.glm = locking.GanetiLockManager(
368 369 370
                self.cfg.GetNodeList(),
                self.cfg.GetInstanceList())

371 372 373
    # Job queue
    self.jobqueue = jqueue.JobQueue(self)

374 375 376 377 378 379 380 381 382 383
    # setting this also locks the class against attribute modifications
    self.__class__._instance = self

  def __setattr__(self, name, value):
    """Setting GanetiContext attributes is forbidden after initialization.

    """
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
    object.__setattr__(self, name, value)

384
  def AddNode(self, node, ec_id):
385 386 387 388
    """Adds a node to the configuration and lock manager.

    """
    # Add it to the configuration
389
    self.cfg.AddNode(node, ec_id)
390

391
    # If preseeding fails it'll not be added
392
    self.jobqueue.AddNode(node)
393

394 395 396 397 398 399 400
    # Add the new node to the Ganeti Lock Manager
    self.glm.add(locking.LEVEL_NODE, node.name)

  def ReaddNode(self, node):
    """Updates a node that's already in the configuration

    """
401
    # Synchronize the queue again
402
    self.jobqueue.AddNode(node)
403 404 405 406 407 408 409 410

  def RemoveNode(self, name):
    """Removes a node from the configuration and lock manager.

    """
    # Remove node from configuration
    self.cfg.RemoveNode(name)

411 412 413
    # Notify job queue
    self.jobqueue.RemoveNode(name)

414 415 416
    # Remove the node from the Ganeti Lock Manager
    self.glm.remove(locking.LEVEL_NODE, name)

417

418 419 420 421 422 423 424 425 426 427 428 429 430
def _SetWatcherPause(until):
  """Creates or removes the watcher pause file.

  @type until: None or int
  @param until: Unix timestamp saying until when the watcher shouldn't run

  """
  if until is None:
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
  else:
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
                    data="%d\n" % (until, ))

431 432
  return until

433

434 435 436 437 438 439 440 441 442
def CheckAgreement():
  """Check the agreement on who is the master.

  The function uses a very simple algorithm: we must get more positive
  than negative answers. Since in most of the cases we are the master,
  we'll use our own config file for getting the node list. In the
  future we could collect the current node list from our (possibly
  obsolete) known nodes.

443 444 445 446 447 448 449 450 451 452
  In order to account for cold-start of all nodes, we retry for up to
  a minute until we get a real answer as the top-voted one. If the
  nodes are more out-of-sync, for now manual startup of the master
  should be attempted.

  Note that for a even number of nodes cluster, we need at least half
  of the nodes (beside ourselves) to vote for us. This creates a
  problem on two-node clusters, since in this case we require the
  other node to be up too to confirm our status.

453 454 455 456 457 458
  """
  myself = utils.HostInfo().name
  #temp instantiation of a config writer, used only to get the node list
  cfg = config.ConfigWriter()
  node_list = cfg.GetNodeList()
  del cfg
459 460 461 462 463 464 465 466 467
  retries = 6
  while retries > 0:
    votes = bootstrap.GatherMasterVotes(node_list)
    if not votes:
      # empty node list, this is a one node cluster
      return True
    if votes[0][0] is None:
      retries -= 1
      time.sleep(10)
468
      continue
469 470
    break
  if retries == 0:
Iustin Pop's avatar
Iustin Pop committed
471 472 473
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                     " after multiple retries. Aborting startup")
    return False
474 475 476
  # here a real node is at the top of the list
  all_votes = sum(item[1] for item in votes)
  top_node, top_votes = votes[0]
477

478 479 480
  result = False
  if top_node != myself:
    logging.critical("It seems we are not the master (top-voted node"
Iustin Pop's avatar
Iustin Pop committed
481 482
                     " is %s with %d out of %d votes)", top_node, top_votes,
                     all_votes)
483
  elif top_votes < all_votes - top_votes:
484
    logging.critical("It seems we are not the master (%d votes for,"
485 486 487 488 489
                     " %d votes against)", top_votes, all_votes - top_votes)
  else:
    result = True

  return result
490

Michael Hanselmann's avatar
Michael Hanselmann committed
491

492
def CheckAgreementWithRpc():
493 494
  rpc.Init()
  try:
495
    return CheckAgreement()
496 497
  finally:
    rpc.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
498

499

500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
def _RunInSeparateProcess(fn):
  """Runs a function in a separate process.

  Note: Only boolean return values are supported.

  @type fn: callable
  @param fn: Function to be called
  @rtype: bool

  """
  pid = os.fork()
  if pid == 0:
    # Child process
    try:
      # Call function
      result = int(bool(fn()))
      assert result in (0, 1)
Iustin Pop's avatar
Iustin Pop committed
517
    except: # pylint: disable-msg=W0702
518 519 520 521
      logging.exception("Error while calling function in separate process")
      # 0 and 1 are reserved for the return value
      result = 33

Iustin Pop's avatar
Iustin Pop committed
522
    os._exit(result) # pylint: disable-msg=W0212
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543

  # Parent process

  # Avoid zombies and check exit code
  (_, status) = os.waitpid(pid, 0)

  if os.WIFSIGNALED(status):
    signum = os.WTERMSIG(status)
    exitcode = None
  else:
    signum = None
    exitcode = os.WEXITSTATUS(status)

  if not (exitcode in (0, 1) and signum is None):
    logging.error("Child program failed (code=%s, signal=%s)",
                  exitcode, signum)
    sys.exit(constants.EXIT_FAILURE)

  return bool(exitcode)


544 545 546 547
def CheckMasterd(options, args):
  """Initial checks whether to run or exit with a failure.

  """
548 549 550 551
  if args: # masterd doesn't take any arguments
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
    sys.exit(constants.EXIT_FAILURE)

552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
  ssconf.CheckMaster(options.debug)

  # If CheckMaster didn't fail we believe we are the master, but we have to
  # confirm with the other nodes.
  if options.no_voting:
    if options.yes_do_it:
      return

    sys.stdout.write("The 'no voting' option has been selected.\n")
    sys.stdout.write("This is dangerous, please confirm by"
                     " typing uppercase 'yes': ")
    sys.stdout.flush()

    confirmation = sys.stdin.readline().strip()
    if confirmation != "YES":
Iustin Pop's avatar
Iustin Pop committed
567
      print >> sys.stderr, "Aborting."
568 569 570 571 572 573 574 575 576 577
      sys.exit(constants.EXIT_FAILURE)

    return

  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
  # process before we call utils.Daemonize in the current process.
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
    sys.exit(constants.EXIT_FAILURE)


578
def ExecMasterd (options, args): # pylint: disable-msg=W0613
Michael Hanselmann's avatar
Michael Hanselmann committed
579
  """Main master daemon function, executed with the PID file held.
580

581 582 583 584
  """
  # This is safe to do as the pid file guarantees against
  # concurrent execution.
  utils.RemoveFile(constants.MASTER_SOCKET)
585

586 587
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
  try:
588
    rpc.Init()
589
    try:
590
      # activate ip
591
      master_node = ssconf.SimpleStore().GetMasterNode()
592
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
593
      msg = result.fail_msg
594 595
      if msg:
        logging.error("Can't activate master IP address: %s", msg)
596 597 598 599 600 601

      master.setup_queue()
      try:
        master.serve_forever()
      finally:
        master.server_cleanup()
602
    finally:
603
      rpc.Shutdown()
604
  finally:
605
    utils.RemoveFile(constants.MASTER_SOCKET)
606

Iustin Pop's avatar
Iustin Pop committed
607

608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
def main():
  """Main function"""
  parser = OptionParser(description="Ganeti master daemon",
                        usage="%prog [-f] [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)
  parser.add_option("--no-voting", dest="no_voting",
                    help="Do not check that the nodes agree on this node"
                    " being the master and start the daemon unconditionally",
                    default=False, action="store_true")
  parser.add_option("--yes-do-it", dest="yes_do_it",
                    help="Override interactive check for --no-voting",
                    default=False, action="store_true")
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
         ]
  daemon.GenericMain(constants.MASTERD, parser, dirs,
Michael Hanselmann's avatar
Michael Hanselmann committed
625 626
                     CheckMasterd, ExecMasterd)

627

Iustin Pop's avatar
Iustin Pop committed
628 629
if __name__ == "__main__":
  main()