ganeti-masterd 18 KB
Newer Older
1
#!/usr/bin/python
Iustin Pop's avatar
Iustin Pop committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Master daemon program.

Some classes deviates from the standard style guide since the
inheritance from parent classes requires it.

"""

Iustin Pop's avatar
Iustin Pop committed
29 30
# pylint: disable-msg=C0103
# C0103: Invalid name ganeti-masterd
Iustin Pop's avatar
Iustin Pop committed
31

32
import sys
Iustin Pop's avatar
Iustin Pop committed
33 34 35 36
import SocketServer
import time
import collections
import signal
37
import logging
Iustin Pop's avatar
Iustin Pop committed
38

39
from optparse import OptionParser
Iustin Pop's avatar
Iustin Pop committed
40

41
from ganeti import config
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import constants
43
from ganeti import daemon
Iustin Pop's avatar
Iustin Pop committed
44 45 46
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
47
from ganeti import locking
Iustin Pop's avatar
Iustin Pop committed
48 49
from ganeti import luxi
from ganeti import utils
50 51
from ganeti import errors
from ganeti import ssconf
52
from ganeti import workerpool
53
from ganeti import rpc
54
from ganeti import bootstrap
55
from ganeti import serializer
56 57


58 59
CLIENT_REQUEST_WORKERS = 16

60 61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
Iustin Pop's avatar
Iustin Pop committed
62 63


64
class ClientRequestWorker(workerpool.BaseWorker):
Iustin Pop's avatar
Iustin Pop committed
65
   # pylint: disable-msg=W0221
66 67 68 69 70 71 72 73 74
  def RunTask(self, server, request, client_address):
    """Process the request.

    This is copied from the code in ThreadingMixIn.

    """
    try:
      server.finish_request(request, client_address)
      server.close_request(request)
Iustin Pop's avatar
Iustin Pop committed
75
    except: # pylint: disable-msg=W0702
76 77 78 79
      server.handle_error(request, client_address)
      server.close_request(request)


Iustin Pop's avatar
Iustin Pop committed
80 81 82 83 84 85 86 87
class IOServer(SocketServer.UnixStreamServer):
  """IO thread class.

  This class takes care of initializing the other threads, setting
  signal handlers (which are processed only in this thread), and doing
  cleanup at shutdown.

  """
88
  def __init__(self, address, rqhandler):
89 90
    """IOServer constructor

Iustin Pop's avatar
Iustin Pop committed
91 92
    @param address: the address to bind this IOServer to
    @param rqhandler: RequestHandler type object
93 94

    """
Iustin Pop's avatar
Iustin Pop committed
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96 97

    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100 101

  def setup_queue(self):
102
    self.context = GanetiContext()
103 104
    self.request_workers = workerpool.WorkerPool("ClientReq",
                                                 CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
Iustin Pop's avatar
Iustin Pop committed
106 107

  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
Iustin Pop's avatar
Iustin Pop committed
109 110

    """
111
    self.request_workers.AddTask(self, request, client_address)
Iustin Pop's avatar
Iustin Pop committed
112

113
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
Iustin Pop's avatar
Iustin Pop committed
114
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
Iustin Pop's avatar
Iustin Pop committed
115
    """Handle one request at a time until told to quit."""
116 117 118 119 120 121 122 123
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    # Since we use SignalHandled only once, the resulting dict will map all
    # signals to the same handler. We'll just use the first one.
    sighandler = signal_handlers.values()[0]
    while not sighandler.called:
      self.handle_request()
124 125 126 127 128 129 130 131

  def server_cleanup(self):
    """Cleanup the server.

    This involves shutting down the processor threads and the master
    socket.

    """
132 133 134
    try:
      self.server_close()
    finally:
135
      if self.request_workers:
136
        self.request_workers.TerminateWorkers()
137 138
      if self.context:
        self.context.jobqueue.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
139 140 141 142 143 144 145 146


class ClientRqHandler(SocketServer.BaseRequestHandler):
  """Client handler"""
  EOM = '\3'
  READ_SIZE = 4096

  def setup(self):
Iustin Pop's avatar
Iustin Pop committed
147 148
    # pylint: disable-msg=W0201
    # setup() is the api for initialising for this class
Iustin Pop's avatar
Iustin Pop committed
149 150 151 152 153 154 155 156
    self._buffer = ""
    self._msgs = collections.deque()
    self._ops = ClientOps(self.server)

  def handle(self):
    while True:
      msg = self.read_message()
      if msg is None:
157
        logging.debug("client closed connection")
Iustin Pop's avatar
Iustin Pop committed
158
        break
159

160
      request = serializer.LoadJson(msg)
161
      logging.debug("request: %s", request)
Iustin Pop's avatar
Iustin Pop committed
162
      if not isinstance(request, dict):
163
        logging.error("wrong request received: %s", msg)
Iustin Pop's avatar
Iustin Pop committed
164
        break
165

Guido Trotter's avatar
Guido Trotter committed
166 167
      method = request.get(luxi.KEY_METHOD, None) # pylint: disable-msg=E1103
      args = request.get(luxi.KEY_ARGS, None) # pylint: disable-msg=E1103
168 169
      if method is None or args is None:
        logging.error("no method or args in request")
Iustin Pop's avatar
Iustin Pop committed
170
        break
171 172 173 174 175

      success = False
      try:
        result = self._ops.handle_request(method, args)
        success = True
176 177
      except errors.GenericError, err:
        success = False
178
        result = errors.EncodeException(err)
179 180 181 182 183 184 185 186 187 188
      except:
        logging.error("Unexpected exception", exc_info=True)
        err = sys.exc_info()
        result = "Caught exception: %s" % str(err[1])

      response = {
        luxi.KEY_SUCCESS: success,
        luxi.KEY_RESULT: result,
        }
      logging.debug("response: %s", response)
189
      self.send_message(serializer.DumpJson(response))
Iustin Pop's avatar
Iustin Pop committed
190 191 192 193 194 195 196 197 198 199 200 201 202

  def read_message(self):
    while not self._msgs:
      data = self.request.recv(self.READ_SIZE)
      if not data:
        return None
      new_msgs = (self._buffer + data).split(self.EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def send_message(self, msg):
    #print "sending", msg
203
    # TODO: sendall is not guaranteed to send everything
Iustin Pop's avatar
Iustin Pop committed
204 205 206 207 208 209 210 211
    self.request.sendall(msg + self.EOM)


class ClientOps:
  """Class holding high-level client operations."""
  def __init__(self, server):
    self.server = server

Iustin Pop's avatar
Iustin Pop committed
212
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213
    queue = self.server.context.jobqueue
214 215 216

    # TODO: Parameter validation

Iustin Pop's avatar
Iustin Pop committed
217 218
    # TODO: Rewrite to not exit in each 'if/elif' branch

219
    if method == luxi.REQ_SUBMIT_JOB:
220
      logging.info("Received new job")
221
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222
      return queue.SubmitJob(ops)
Iustin Pop's avatar
Iustin Pop committed
223

224 225 226 227 228 229 230
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
      logging.info("Received multiple jobs")
      jobs = []
      for ops in args:
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
      return queue.SubmitManyJobs(jobs)

231
    elif method == luxi.REQ_CANCEL_JOB:
232
      job_id = args
233
      logging.info("Received job cancel request for %s", job_id)
234
      return queue.CancelJob(job_id)
Iustin Pop's avatar
Iustin Pop committed
235

236
    elif method == luxi.REQ_ARCHIVE_JOB:
237
      job_id = args
238
      logging.info("Received job archive request for %s", job_id)
239 240
      return queue.ArchiveJob(job_id)

Iustin Pop's avatar
Iustin Pop committed
241
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242
      (age, timeout) = args
243 244
      logging.info("Received job autoarchive request for age %s, timeout %s",
                   age, timeout)
245
      return queue.AutoArchiveJobs(age, timeout)
Iustin Pop's avatar
Iustin Pop committed
246

247
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249
      logging.info("Received job poll request for %s", job_id)
250
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251
                                     prev_log_serial, timeout)
252

253 254
    elif method == luxi.REQ_QUERY_JOBS:
      (job_ids, fields) = args
255
      if isinstance(job_ids, (tuple, list)) and job_ids:
256
        msg = utils.CommaJoin(job_ids)
257 258 259
      else:
        msg = str(job_ids)
      logging.info("Received job query request for %s", msg)
260 261
      return queue.QueryJobs(job_ids, fields)

262
    elif method == luxi.REQ_QUERY_INSTANCES:
263
      (names, fields, use_locking) = args
264
      logging.info("Received instance query request for %s", names)
265
      if use_locking:
266 267
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
268 269
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                    use_locking=use_locking)
270 271
      return self._Query(op)

Michael Hanselmann's avatar
Michael Hanselmann committed
272
    elif method == luxi.REQ_QUERY_NODES:
273
      (names, fields, use_locking) = args
274
      logging.info("Received node query request for %s", names)
275
      if use_locking:
276 277
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
278 279
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                use_locking=use_locking)
Michael Hanselmann's avatar
Michael Hanselmann committed
280 281
      return self._Query(op)

282
    elif method == luxi.REQ_QUERY_EXPORTS:
283
      nodes, use_locking = args
284
      if use_locking:
285 286
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
287
      logging.info("Received exports query request")
288
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289 290
      return self._Query(op)

291 292
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
      fields = args
293
      logging.info("Received config values query request for %s", fields)
294 295 296
      op = opcodes.OpQueryConfigValues(output_fields=fields)
      return self._Query(op)

297
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298
      logging.info("Received cluster info query request")
299 300 301
      op = opcodes.OpQueryClusterInfo()
      return self._Query(op)

Iustin Pop's avatar
Iustin Pop committed
302 303 304 305 306 307
    elif method == luxi.REQ_QUERY_TAGS:
      kind, name = args
      logging.info("Received tags query request")
      op = opcodes.OpGetTags(kind=kind, name=name)
      return self._Query(op)

308 309
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
      drain_flag = args
310 311
      logging.info("Received queue drain flag change request to %s",
                   drain_flag)
312 313
      return queue.SetDrainFlag(drain_flag)

314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
      (until, ) = args

      if until is None:
        logging.info("Received request to no longer pause the watcher")
      else:
        if not isinstance(until, (int, float)):
          raise TypeError("Duration must be an integer or float")

        if until < time.time():
          raise errors.GenericError("Unable to set pause end time in the past")

        logging.info("Received request to pause the watcher until %s", until)

      return _SetWatcherPause(until)

330
    else:
331 332
      logging.info("Received invalid request '%s'", method)
      raise ValueError("Invalid operation '%s'" % method)
Iustin Pop's avatar
Iustin Pop committed
333

334 335 336 337
  def _Query(self, op):
    """Runs the specified opcode and returns the result.

    """
338 339
    # Queries don't have a job id
    proc = mcpu.Processor(self.server.context, None)
340
    return proc.ExecOpCode(op, None)
341

Iustin Pop's avatar
Iustin Pop committed
342

343 344 345 346 347 348
class GanetiContext(object):
  """Context common to all ganeti threads.

  This class creates and holds common objects shared by all threads.

  """
Iustin Pop's avatar
Iustin Pop committed
349 350
  # pylint: disable-msg=W0212
  # we do want to ensure a singleton here
351 352 353 354 355 356 357 358 359 360 361
  _instance = None

  def __init__(self):
    """Constructs a new GanetiContext object.

    There should be only a GanetiContext object at any time, so this
    function raises an error if this is not the case.

    """
    assert self.__class__._instance is None, "double GanetiContext instance"

362
    # Create global configuration object
363
    self.cfg = config.ConfigWriter()
364 365

    # Locking manager
Guido Trotter's avatar
Guido Trotter committed
366
    self.glm = locking.GanetiLockManager(
367 368 369
                self.cfg.GetNodeList(),
                self.cfg.GetInstanceList())

370 371 372
    # Job queue
    self.jobqueue = jqueue.JobQueue(self)

373 374 375 376 377 378 379 380 381 382
    # setting this also locks the class against attribute modifications
    self.__class__._instance = self

  def __setattr__(self, name, value):
    """Setting GanetiContext attributes is forbidden after initialization.

    """
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
    object.__setattr__(self, name, value)

383
  def AddNode(self, node, ec_id):
384 385 386 387
    """Adds a node to the configuration and lock manager.

    """
    # Add it to the configuration
388
    self.cfg.AddNode(node, ec_id)
389

390
    # If preseeding fails it'll not be added
391
    self.jobqueue.AddNode(node)
392

393 394 395 396 397 398 399
    # Add the new node to the Ganeti Lock Manager
    self.glm.add(locking.LEVEL_NODE, node.name)

  def ReaddNode(self, node):
    """Updates a node that's already in the configuration

    """
400
    # Synchronize the queue again
401
    self.jobqueue.AddNode(node)
402 403 404 405 406 407 408 409

  def RemoveNode(self, name):
    """Removes a node from the configuration and lock manager.

    """
    # Remove node from configuration
    self.cfg.RemoveNode(name)

410 411 412
    # Notify job queue
    self.jobqueue.RemoveNode(name)

413 414 415
    # Remove the node from the Ganeti Lock Manager
    self.glm.remove(locking.LEVEL_NODE, name)

416

417 418 419 420 421 422 423 424 425 426 427 428 429
def _SetWatcherPause(until):
  """Creates or removes the watcher pause file.

  @type until: None or int
  @param until: Unix timestamp saying until when the watcher shouldn't run

  """
  if until is None:
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
  else:
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
                    data="%d\n" % (until, ))

430 431
  return until

432

433 434 435 436 437 438 439 440 441
def CheckAgreement():
  """Check the agreement on who is the master.

  The function uses a very simple algorithm: we must get more positive
  than negative answers. Since in most of the cases we are the master,
  we'll use our own config file for getting the node list. In the
  future we could collect the current node list from our (possibly
  obsolete) known nodes.

442 443 444 445 446 447 448 449 450 451
  In order to account for cold-start of all nodes, we retry for up to
  a minute until we get a real answer as the top-voted one. If the
  nodes are more out-of-sync, for now manual startup of the master
  should be attempted.

  Note that for a even number of nodes cluster, we need at least half
  of the nodes (beside ourselves) to vote for us. This creates a
  problem on two-node clusters, since in this case we require the
  other node to be up too to confirm our status.

452 453 454 455 456 457
  """
  myself = utils.HostInfo().name
  #temp instantiation of a config writer, used only to get the node list
  cfg = config.ConfigWriter()
  node_list = cfg.GetNodeList()
  del cfg
458 459 460 461 462 463 464 465 466
  retries = 6
  while retries > 0:
    votes = bootstrap.GatherMasterVotes(node_list)
    if not votes:
      # empty node list, this is a one node cluster
      return True
    if votes[0][0] is None:
      retries -= 1
      time.sleep(10)
467
      continue
468 469
    break
  if retries == 0:
Iustin Pop's avatar
Iustin Pop committed
470 471
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                     " after multiple retries. Aborting startup")
472 473
    logging.critical("Use the --no-voting option if you understand what"
                     " effects it has on the cluster state")
Iustin Pop's avatar
Iustin Pop committed
474
    return False
475 476 477
  # here a real node is at the top of the list
  all_votes = sum(item[1] for item in votes)
  top_node, top_votes = votes[0]
478

479 480 481
  result = False
  if top_node != myself:
    logging.critical("It seems we are not the master (top-voted node"
Iustin Pop's avatar
Iustin Pop committed
482 483
                     " is %s with %d out of %d votes)", top_node, top_votes,
                     all_votes)
484
  elif top_votes < all_votes - top_votes:
485
    logging.critical("It seems we are not the master (%d votes for,"
486 487 488 489 490
                     " %d votes against)", top_votes, all_votes - top_votes)
  else:
    result = True

  return result
491

Michael Hanselmann's avatar
Michael Hanselmann committed
492

493
def CheckAgreementWithRpc():
494 495
  rpc.Init()
  try:
496
    return CheckAgreement()
497 498
  finally:
    rpc.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
499

500

501 502 503 504
def CheckMasterd(options, args):
  """Initial checks whether to run or exit with a failure.

  """
505 506 507 508
  if args: # masterd doesn't take any arguments
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
    sys.exit(constants.EXIT_FAILURE)

509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
  ssconf.CheckMaster(options.debug)

  # If CheckMaster didn't fail we believe we are the master, but we have to
  # confirm with the other nodes.
  if options.no_voting:
    if options.yes_do_it:
      return

    sys.stdout.write("The 'no voting' option has been selected.\n")
    sys.stdout.write("This is dangerous, please confirm by"
                     " typing uppercase 'yes': ")
    sys.stdout.flush()

    confirmation = sys.stdin.readline().strip()
    if confirmation != "YES":
Iustin Pop's avatar
Iustin Pop committed
524
      print >> sys.stderr, "Aborting."
525 526 527 528 529 530
      sys.exit(constants.EXIT_FAILURE)

    return

  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
  # process before we call utils.Daemonize in the current process.
531
  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
532 533 534
    sys.exit(constants.EXIT_FAILURE)


535
def ExecMasterd (options, args): # pylint: disable-msg=W0613
Michael Hanselmann's avatar
Michael Hanselmann committed
536
  """Main master daemon function, executed with the PID file held.
537

538 539 540 541
  """
  # This is safe to do as the pid file guarantees against
  # concurrent execution.
  utils.RemoveFile(constants.MASTER_SOCKET)
542

543 544
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
  try:
545
    rpc.Init()
546
    try:
547
      # activate ip
548
      master_node = ssconf.SimpleStore().GetMasterNode()
549
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
550
      msg = result.fail_msg
551 552
      if msg:
        logging.error("Can't activate master IP address: %s", msg)
553 554 555 556 557 558

      master.setup_queue()
      try:
        master.serve_forever()
      finally:
        master.server_cleanup()
559
    finally:
560
      rpc.Shutdown()
561
  finally:
562
    utils.RemoveFile(constants.MASTER_SOCKET)
563

Iustin Pop's avatar
Iustin Pop committed
564

565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
def main():
  """Main function"""
  parser = OptionParser(description="Ganeti master daemon",
                        usage="%prog [-f] [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)
  parser.add_option("--no-voting", dest="no_voting",
                    help="Do not check that the nodes agree on this node"
                    " being the master and start the daemon unconditionally",
                    default=False, action="store_true")
  parser.add_option("--yes-do-it", dest="yes_do_it",
                    help="Override interactive check for --no-voting",
                    default=False, action="store_true")
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
         ]
  daemon.GenericMain(constants.MASTERD, parser, dirs,
Michael Hanselmann's avatar
Michael Hanselmann committed
582 583
                     CheckMasterd, ExecMasterd)

584

Iustin Pop's avatar
Iustin Pop committed
585 586
if __name__ == "__main__":
  main()