ganeti-masterd 16.2 KB
Newer Older
1
#!/usr/bin/python -u
Iustin Pop's avatar
Iustin Pop committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Master daemon program.

Some classes deviates from the standard style guide since the
inheritance from parent classes requires it.

"""


30
import os
31
import sys
Iustin Pop's avatar
Iustin Pop committed
32 33 34 35
import SocketServer
import time
import collections
import signal
36
import logging
Iustin Pop's avatar
Iustin Pop committed
37 38

from cStringIO import StringIO
39
from optparse import OptionParser
Iustin Pop's avatar
Iustin Pop committed
40

41
from ganeti import config
Iustin Pop's avatar
Iustin Pop committed
42 43 44 45
from ganeti import constants
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
46
from ganeti import locking
Iustin Pop's avatar
Iustin Pop committed
47 48
from ganeti import luxi
from ganeti import utils
49 50
from ganeti import errors
from ganeti import ssconf
51
from ganeti import workerpool
52
from ganeti import rpc
53
from ganeti import bootstrap
54
from ganeti import serializer
55 56


57 58
CLIENT_REQUEST_WORKERS = 16

59 60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
Iustin Pop's avatar
Iustin Pop committed
61 62


63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
class ClientRequestWorker(workerpool.BaseWorker):
  def RunTask(self, server, request, client_address):
    """Process the request.

    This is copied from the code in ThreadingMixIn.

    """
    try:
      server.finish_request(request, client_address)
      server.close_request(request)
    except:
      server.handle_error(request, client_address)
      server.close_request(request)


Iustin Pop's avatar
Iustin Pop committed
78 79 80 81 82 83 84 85
class IOServer(SocketServer.UnixStreamServer):
  """IO thread class.

  This class takes care of initializing the other threads, setting
  signal handlers (which are processed only in this thread), and doing
  cleanup at shutdown.

  """
86
  def __init__(self, address, rqhandler):
87 88
    """IOServer constructor

Iustin Pop's avatar
Iustin Pop committed
89 90
    @param address: the address to bind this IOServer to
    @param rqhandler: RequestHandler type object
91 92

    """
Iustin Pop's avatar
Iustin Pop committed
93
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
94 95

    # We'll only start threads once we've forked.
96
    self.context = None
97
    self.request_workers = None
98 99

  def setup_queue(self):
100
    self.context = GanetiContext()
101 102
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
                                                 ClientRequestWorker)
Iustin Pop's avatar
Iustin Pop committed
103 104

  def process_request(self, request, client_address):
105
    """Add task to workerpool to process request.
Iustin Pop's avatar
Iustin Pop committed
106 107

    """
108
    self.request_workers.AddTask(self, request, client_address)
Iustin Pop's avatar
Iustin Pop committed
109 110 111

  def serve_forever(self):
    """Handle one request at a time until told to quit."""
112 113 114 115 116 117
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
    try:
      while not sighandler.called:
        self.handle_request()
    finally:
      sighandler.Reset()
118 119 120 121 122 123 124 125

  def server_cleanup(self):
    """Cleanup the server.

    This involves shutting down the processor threads and the master
    socket.

    """
126 127 128
    try:
      self.server_close()
    finally:
129
      if self.request_workers:
130
        self.request_workers.TerminateWorkers()
131 132
      if self.context:
        self.context.jobqueue.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148


class ClientRqHandler(SocketServer.BaseRequestHandler):
  """Client handler"""
  EOM = '\3'
  READ_SIZE = 4096

  def setup(self):
    self._buffer = ""
    self._msgs = collections.deque()
    self._ops = ClientOps(self.server)

  def handle(self):
    while True:
      msg = self.read_message()
      if msg is None:
149
        logging.debug("client closed connection")
Iustin Pop's avatar
Iustin Pop committed
150
        break
151

152
      request = serializer.LoadJson(msg)
153
      logging.debug("request: %s", request)
Iustin Pop's avatar
Iustin Pop committed
154
      if not isinstance(request, dict):
155
        logging.error("wrong request received: %s", msg)
Iustin Pop's avatar
Iustin Pop committed
156
        break
157 158 159 160 161

      method = request.get(luxi.KEY_METHOD, None)
      args = request.get(luxi.KEY_ARGS, None)
      if method is None or args is None:
        logging.error("no method or args in request")
Iustin Pop's avatar
Iustin Pop committed
162
        break
163 164 165 166 167

      success = False
      try:
        result = self._ops.handle_request(method, args)
        success = True
168 169 170
      except errors.GenericError, err:
        success = False
        result = (err.__class__.__name__, err.args)
171 172 173 174 175 176 177 178 179 180
      except:
        logging.error("Unexpected exception", exc_info=True)
        err = sys.exc_info()
        result = "Caught exception: %s" % str(err[1])

      response = {
        luxi.KEY_SUCCESS: success,
        luxi.KEY_RESULT: result,
        }
      logging.debug("response: %s", response)
181
      self.send_message(serializer.DumpJson(response))
Iustin Pop's avatar
Iustin Pop committed
182 183 184 185 186 187 188 189 190 191 192 193 194

  def read_message(self):
    while not self._msgs:
      data = self.request.recv(self.READ_SIZE)
      if not data:
        return None
      new_msgs = (self._buffer + data).split(self.EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def send_message(self, msg):
    #print "sending", msg
195
    # TODO: sendall is not guaranteed to send everything
Iustin Pop's avatar
Iustin Pop committed
196 197 198 199 200 201 202 203
    self.request.sendall(msg + self.EOM)


class ClientOps:
  """Class holding high-level client operations."""
  def __init__(self, server):
    self.server = server

204
  def handle_request(self, method, args):
205
    queue = self.server.context.jobqueue
206 207 208 209

    # TODO: Parameter validation

    if method == luxi.REQ_SUBMIT_JOB:
210
      logging.info("Received new job")
211
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
212
      return queue.SubmitJob(ops)
Iustin Pop's avatar
Iustin Pop committed
213

214 215 216 217 218 219 220
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
      logging.info("Received multiple jobs")
      jobs = []
      for ops in args:
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
      return queue.SubmitManyJobs(jobs)

221
    elif method == luxi.REQ_CANCEL_JOB:
222
      job_id = args
223
      logging.info("Received job cancel request for %s", job_id)
224
      return queue.CancelJob(job_id)
Iustin Pop's avatar
Iustin Pop committed
225

226
    elif method == luxi.REQ_ARCHIVE_JOB:
227
      job_id = args
228
      logging.info("Received job archive request for %s", job_id)
229 230
      return queue.ArchiveJob(job_id)

Iustin Pop's avatar
Iustin Pop committed
231
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
232
      (age, timeout) = args
233 234
      logging.info("Received job autoarchive request for age %s, timeout %s",
                   age, timeout)
235
      return queue.AutoArchiveJobs(age, timeout)
Iustin Pop's avatar
Iustin Pop committed
236

237
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
238
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
239
      logging.info("Received job poll request for %s", job_id)
240
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
241
                                     prev_log_serial, timeout)
242

243 244
    elif method == luxi.REQ_QUERY_JOBS:
      (job_ids, fields) = args
245 246 247 248 249
      if isinstance(job_ids, (tuple, list)) and job_ids:
        msg = ", ".join(job_ids)
      else:
        msg = str(job_ids)
      logging.info("Received job query request for %s", msg)
250 251
      return queue.QueryJobs(job_ids, fields)

252
    elif method == luxi.REQ_QUERY_INSTANCES:
253
      (names, fields, use_locking) = args
254
      logging.info("Received instance query request for %s", names)
255 256
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
257 258
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                    use_locking=use_locking)
259 260
      return self._Query(op)

Michael Hanselmann's avatar
Michael Hanselmann committed
261
    elif method == luxi.REQ_QUERY_NODES:
262
      (names, fields, use_locking) = args
263
      logging.info("Received node query request for %s", names)
264 265
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
266 267
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                use_locking=use_locking)
Michael Hanselmann's avatar
Michael Hanselmann committed
268 269
      return self._Query(op)

270
    elif method == luxi.REQ_QUERY_EXPORTS:
271
      nodes, use_locking = args
272 273
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
274
      logging.info("Received exports query request")
275
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
276 277
      return self._Query(op)

278 279
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
      fields = args
280
      logging.info("Received config values query request for %s", fields)
281 282 283
      op = opcodes.OpQueryConfigValues(output_fields=fields)
      return self._Query(op)

284
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
285
      logging.info("Received cluster info query request")
286 287 288
      op = opcodes.OpQueryClusterInfo()
      return self._Query(op)

289 290
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
      drain_flag = args
291 292
      logging.info("Received queue drain flag change request to %s",
                   drain_flag)
293 294
      return queue.SetDrainFlag(drain_flag)

295
    else:
296 297
      logging.info("Received invalid request '%s'", method)
      raise ValueError("Invalid operation '%s'" % method)
Iustin Pop's avatar
Iustin Pop committed
298

299 300 301 302 303 304 305 306 307
  def _DummyLog(self, *args):
    pass

  def _Query(self, op):
    """Runs the specified opcode and returns the result.

    """
    proc = mcpu.Processor(self.server.context)
    # TODO: Where should log messages go?
Iustin Pop's avatar
Iustin Pop committed
308
    return proc.ExecOpCode(op, self._DummyLog, None)
309

Iustin Pop's avatar
Iustin Pop committed
310

311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
class GanetiContext(object):
  """Context common to all ganeti threads.

  This class creates and holds common objects shared by all threads.

  """
  _instance = None

  def __init__(self):
    """Constructs a new GanetiContext object.

    There should be only a GanetiContext object at any time, so this
    function raises an error if this is not the case.

    """
    assert self.__class__._instance is None, "double GanetiContext instance"

328
    # Create global configuration object
329
    self.cfg = config.ConfigWriter()
330 331

    # Locking manager
Guido Trotter's avatar
Guido Trotter committed
332
    self.glm = locking.GanetiLockManager(
333 334 335
                self.cfg.GetNodeList(),
                self.cfg.GetInstanceList())

336 337 338
    # Job queue
    self.jobqueue = jqueue.JobQueue(self)

339 340 341 342 343 344 345 346 347 348
    # setting this also locks the class against attribute modifications
    self.__class__._instance = self

  def __setattr__(self, name, value):
    """Setting GanetiContext attributes is forbidden after initialization.

    """
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
    object.__setattr__(self, name, value)

349 350 351 352 353 354 355
  def AddNode(self, node):
    """Adds a node to the configuration and lock manager.

    """
    # Add it to the configuration
    self.cfg.AddNode(node)

356
    # If preseeding fails it'll not be added
357
    self.jobqueue.AddNode(node)
358

359 360 361 362 363 364 365
    # Add the new node to the Ganeti Lock Manager
    self.glm.add(locking.LEVEL_NODE, node.name)

  def ReaddNode(self, node):
    """Updates a node that's already in the configuration

    """
366
    # Synchronize the queue again
367
    self.jobqueue.AddNode(node)
368 369 370 371 372 373 374 375

  def RemoveNode(self, name):
    """Removes a node from the configuration and lock manager.

    """
    # Remove node from configuration
    self.cfg.RemoveNode(name)

376 377 378
    # Notify job queue
    self.jobqueue.RemoveNode(name)

379 380 381
    # Remove the node from the Ganeti Lock Manager
    self.glm.remove(locking.LEVEL_NODE, name)

382

383 384 385
def ParseOptions():
  """Parse the command line options.

Iustin Pop's avatar
Iustin Pop committed
386
  @return: (options, args) as from OptionParser.parse_args()
387 388 389 390 391 392 393 394 395 396 397 398 399

  """
  parser = OptionParser(description="Ganeti master daemon",
                        usage="%prog [-f] [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)

  parser.add_option("-f", "--foreground", dest="fork",
                    help="Don't detach from the current terminal",
                    default=True, action="store_false")
  parser.add_option("-d", "--debug", dest="debug",
                    help="Enable some debug messages",
                    default=False, action="store_true")
400 401 402 403
  parser.add_option("--no-voting", dest="no_voting",
                    help="Do not check that the nodes agree on this node"
                    " being the master and start the daemon unconditionally",
                    default=False, action="store_true")
404 405 406 407
  parser.add_option("--yes-do-it", dest="yes_do_it",
                    help="Override interactive check for --no-voting",
                    default=False, action="store_true")

408 409 410 411
  options, args = parser.parse_args()
  return options, args


412 413 414 415 416 417 418 419 420
def CheckAgreement():
  """Check the agreement on who is the master.

  The function uses a very simple algorithm: we must get more positive
  than negative answers. Since in most of the cases we are the master,
  we'll use our own config file for getting the node list. In the
  future we could collect the current node list from our (possibly
  obsolete) known nodes.

421 422 423 424 425 426 427 428 429 430
  In order to account for cold-start of all nodes, we retry for up to
  a minute until we get a real answer as the top-voted one. If the
  nodes are more out-of-sync, for now manual startup of the master
  should be attempted.

  Note that for a even number of nodes cluster, we need at least half
  of the nodes (beside ourselves) to vote for us. This creates a
  problem on two-node clusters, since in this case we require the
  other node to be up too to confirm our status.

431 432 433 434 435 436
  """
  myself = utils.HostInfo().name
  #temp instantiation of a config writer, used only to get the node list
  cfg = config.ConfigWriter()
  node_list = cfg.GetNodeList()
  del cfg
437 438 439 440 441 442 443 444 445
  retries = 6
  while retries > 0:
    votes = bootstrap.GatherMasterVotes(node_list)
    if not votes:
      # empty node list, this is a one node cluster
      return True
    if votes[0][0] is None:
      retries -= 1
      time.sleep(10)
446
      continue
447 448
    break
  if retries == 0:
Iustin Pop's avatar
Iustin Pop committed
449 450 451
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                     " after multiple retries. Aborting startup")
    return False
452 453 454 455 456 457
  # here a real node is at the top of the list
  all_votes = sum(item[1] for item in votes)
  top_node, top_votes = votes[0]
  result = False
  if top_node != myself:
    logging.critical("It seems we are not the master (top-voted node"
Iustin Pop's avatar
Iustin Pop committed
458 459
                     " is %s with %d out of %d votes)", top_node, top_votes,
                     all_votes)
460
  elif top_votes < all_votes - top_votes:
461
    logging.critical("It seems we are not the master (%d votes for,"
462 463 464 465 466
                     " %d votes against)", top_votes, all_votes - top_votes)
  else:
    result = True

  return result
467 468


Iustin Pop's avatar
Iustin Pop committed
469 470 471
def main():
  """Main function"""

472
  options, args = ParseOptions()
473
  utils.no_fork = True
474

Iustin Pop's avatar
Iustin Pop committed
475 476 477
  if options.fork:
    utils.CloseFDs()

478 479 480
  rpc.Init()
  try:
    ssconf.CheckMaster(options.debug)
481

482
    # we believe we are the master, let's ask the other nodes...
483
    if options.no_voting and not options.yes_do_it:
484 485 486 487 488 489 490 491
      sys.stdout.write("The 'no voting' option has been selected.\n")
      sys.stdout.write("This is dangerous, please confirm by"
                       " typing uppercase 'yes': ")
      sys.stdout.flush()
      confirmation = sys.stdin.readline().strip()
      if confirmation != "YES":
        print "Aborting."
        return
492
    elif not options.no_voting:
493 494
      if not CheckAgreement():
        return
495

496 497 498
    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
           ]
499
    utils.EnsureDirs(dirs)
500

501 502 503 504
    # This is safe to do as the pid file guarantees against
    # concurrent execution.
    utils.RemoveFile(constants.MASTER_SOCKET)

505 506 507
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
  finally:
    rpc.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
508

509 510
  # become a daemon
  if options.fork:
Iustin Pop's avatar
Iustin Pop committed
511
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
512

513
  utils.WritePidFile(constants.MASTERD_PID)
514
  try:
515
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
516
                       stderr_logging=not options.fork, multithreaded=True)
517

518
    logging.info("Ganeti master daemon startup")
519

520
    rpc.Init()
521
    try:
522
      # activate ip
523
      master_node = ssconf.SimpleStore().GetMasterNode()
524
      if not rpc.RpcRunner.call_node_start_master(master_node, False, False):
525 526 527 528 529 530 531
        logging.error("Can't activate master IP address")

      master.setup_queue()
      try:
        master.serve_forever()
      finally:
        master.server_cleanup()
532
    finally:
533
      rpc.Shutdown()
534
  finally:
535
    utils.RemovePidFile(constants.MASTERD_PID)
536
    utils.RemoveFile(constants.MASTER_SOCKET)
537

Iustin Pop's avatar
Iustin Pop committed
538 539 540

if __name__ == "__main__":
  main()