ganeti-masterd 18.6 KB
Newer Older
1
#!/usr/bin/python
Iustin Pop's avatar
Iustin Pop committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Master daemon program.

Some classes deviates from the standard style guide since the
inheritance from parent classes requires it.

"""

Iustin Pop's avatar
Iustin Pop committed
29
30
# pylint: disable-msg=C0103
# C0103: Invalid name ganeti-masterd
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
33
import sys
Iustin Pop's avatar
Iustin Pop committed
34
35
36
37
import SocketServer
import time
import collections
import signal
38
import logging
Iustin Pop's avatar
Iustin Pop committed
39

40
from optparse import OptionParser
Iustin Pop's avatar
Iustin Pop committed
41

42
from ganeti import config
Iustin Pop's avatar
Iustin Pop committed
43
from ganeti import constants
44
from ganeti import daemon
Iustin Pop's avatar
Iustin Pop committed
45
46
47
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
48
from ganeti import locking
Iustin Pop's avatar
Iustin Pop committed
49
50
from ganeti import luxi
from ganeti import utils
51
52
from ganeti import errors
from ganeti import ssconf
53
from ganeti import workerpool
54
from ganeti import rpc
55
from ganeti import bootstrap
56
from ganeti import serializer
57
58


59
60
CLIENT_REQUEST_WORKERS = 16

61
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
Iustin Pop's avatar
Iustin Pop committed
63
64


65
class ClientRequestWorker(workerpool.BaseWorker):
Iustin Pop's avatar
Iustin Pop committed
66
   # pylint: disable-msg=W0221
67
68
69
70
71
72
73
74
75
  def RunTask(self, server, request, client_address):
    """Process the request.

    This is copied from the code in ThreadingMixIn.

    """
    try:
      server.finish_request(request, client_address)
      server.close_request(request)
Iustin Pop's avatar
Iustin Pop committed
76
    except: # pylint: disable-msg=W0702
77
78
79
80
      server.handle_error(request, client_address)
      server.close_request(request)


Iustin Pop's avatar
Iustin Pop committed
81
82
83
84
85
86
87
88
class IOServer(SocketServer.UnixStreamServer):
  """IO thread class.

  This class takes care of initializing the other threads, setting
  signal handlers (which are processed only in this thread), and doing
  cleanup at shutdown.

  """
89
  def __init__(self, address, rqhandler):
90
91
    """IOServer constructor

Iustin Pop's avatar
Iustin Pop committed
92
93
    @param address: the address to bind this IOServer to
    @param rqhandler: RequestHandler type object
94
95

    """
Iustin Pop's avatar
Iustin Pop committed
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97
98

    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101
102

  def setup_queue(self):
103
    self.context = GanetiContext()
104
105
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
                                                 ClientRequestWorker)
Iustin Pop's avatar
Iustin Pop committed
106
107

  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
Iustin Pop's avatar
Iustin Pop committed
109
110

    """
111
    self.request_workers.AddTask(self, request, client_address)
Iustin Pop's avatar
Iustin Pop committed
112

113
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
Iustin Pop's avatar
Iustin Pop committed
114
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
Iustin Pop's avatar
Iustin Pop committed
115
    """Handle one request at a time until told to quit."""
116
117
118
119
120
121
122
123
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    # Since we use SignalHandled only once, the resulting dict will map all
    # signals to the same handler. We'll just use the first one.
    sighandler = signal_handlers.values()[0]
    while not sighandler.called:
      self.handle_request()
124
125
126
127
128
129
130
131

  def server_cleanup(self):
    """Cleanup the server.

    This involves shutting down the processor threads and the master
    socket.

    """
132
133
134
    try:
      self.server_close()
    finally:
135
      if self.request_workers:
136
        self.request_workers.TerminateWorkers()
137
138
      if self.context:
        self.context.jobqueue.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
139
140
141
142
143
144
145
146


class ClientRqHandler(SocketServer.BaseRequestHandler):
  """Client handler"""
  EOM = '\3'
  READ_SIZE = 4096

  def setup(self):
Iustin Pop's avatar
Iustin Pop committed
147
148
    # pylint: disable-msg=W0201
    # setup() is the api for initialising for this class
Iustin Pop's avatar
Iustin Pop committed
149
150
151
152
153
154
155
156
    self._buffer = ""
    self._msgs = collections.deque()
    self._ops = ClientOps(self.server)

  def handle(self):
    while True:
      msg = self.read_message()
      if msg is None:
157
        logging.debug("client closed connection")
Iustin Pop's avatar
Iustin Pop committed
158
        break
159

160
      request = serializer.LoadJson(msg)
161
      logging.debug("request: %s", request)
Iustin Pop's avatar
Iustin Pop committed
162
      if not isinstance(request, dict):
163
        logging.error("wrong request received: %s", msg)
Iustin Pop's avatar
Iustin Pop committed
164
        break
165
166
167
168
169

      method = request.get(luxi.KEY_METHOD, None)
      args = request.get(luxi.KEY_ARGS, None)
      if method is None or args is None:
        logging.error("no method or args in request")
Iustin Pop's avatar
Iustin Pop committed
170
        break
171
172
173
174
175

      success = False
      try:
        result = self._ops.handle_request(method, args)
        success = True
176
177
      except errors.GenericError, err:
        success = False
178
        result = errors.EncodeException(err)
179
180
181
182
183
184
185
186
187
188
      except:
        logging.error("Unexpected exception", exc_info=True)
        err = sys.exc_info()
        result = "Caught exception: %s" % str(err[1])

      response = {
        luxi.KEY_SUCCESS: success,
        luxi.KEY_RESULT: result,
        }
      logging.debug("response: %s", response)
189
      self.send_message(serializer.DumpJson(response))
Iustin Pop's avatar
Iustin Pop committed
190
191
192
193
194
195
196
197
198
199
200
201
202

  def read_message(self):
    while not self._msgs:
      data = self.request.recv(self.READ_SIZE)
      if not data:
        return None
      new_msgs = (self._buffer + data).split(self.EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def send_message(self, msg):
    #print "sending", msg
203
    # TODO: sendall is not guaranteed to send everything
Iustin Pop's avatar
Iustin Pop committed
204
205
206
207
208
209
210
211
    self.request.sendall(msg + self.EOM)


class ClientOps:
  """Class holding high-level client operations."""
  def __init__(self, server):
    self.server = server

Iustin Pop's avatar
Iustin Pop committed
212
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213
    queue = self.server.context.jobqueue
214
215
216

    # TODO: Parameter validation

Iustin Pop's avatar
Iustin Pop committed
217
218
    # TODO: Rewrite to not exit in each 'if/elif' branch

219
    if method == luxi.REQ_SUBMIT_JOB:
220
      logging.info("Received new job")
221
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222
      return queue.SubmitJob(ops)
Iustin Pop's avatar
Iustin Pop committed
223

224
225
226
227
228
229
230
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
      logging.info("Received multiple jobs")
      jobs = []
      for ops in args:
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
      return queue.SubmitManyJobs(jobs)

231
    elif method == luxi.REQ_CANCEL_JOB:
232
      job_id = args
233
      logging.info("Received job cancel request for %s", job_id)
234
      return queue.CancelJob(job_id)
Iustin Pop's avatar
Iustin Pop committed
235

236
    elif method == luxi.REQ_ARCHIVE_JOB:
237
      job_id = args
238
      logging.info("Received job archive request for %s", job_id)
239
240
      return queue.ArchiveJob(job_id)

Iustin Pop's avatar
Iustin Pop committed
241
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242
      (age, timeout) = args
243
244
      logging.info("Received job autoarchive request for age %s, timeout %s",
                   age, timeout)
245
      return queue.AutoArchiveJobs(age, timeout)
Iustin Pop's avatar
Iustin Pop committed
246

247
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249
      logging.info("Received job poll request for %s", job_id)
250
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251
                                     prev_log_serial, timeout)
252

253
254
    elif method == luxi.REQ_QUERY_JOBS:
      (job_ids, fields) = args
255
      if isinstance(job_ids, (tuple, list)) and job_ids:
256
        msg = utils.CommaJoin(job_ids)
257
258
259
      else:
        msg = str(job_ids)
      logging.info("Received job query request for %s", msg)
260
261
      return queue.QueryJobs(job_ids, fields)

262
    elif method == luxi.REQ_QUERY_INSTANCES:
263
      (names, fields, use_locking) = args
264
      logging.info("Received instance query request for %s", names)
265
      if use_locking:
266
267
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
268
269
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                    use_locking=use_locking)
270
271
      return self._Query(op)

Michael Hanselmann's avatar
Michael Hanselmann committed
272
    elif method == luxi.REQ_QUERY_NODES:
273
      (names, fields, use_locking) = args
274
      logging.info("Received node query request for %s", names)
275
      if use_locking:
276
277
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
278
279
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                use_locking=use_locking)
Michael Hanselmann's avatar
Michael Hanselmann committed
280
281
      return self._Query(op)

282
    elif method == luxi.REQ_QUERY_EXPORTS:
283
      nodes, use_locking = args
284
      if use_locking:
285
286
        raise errors.OpPrereqError("Sync queries are not allowed",
                                   errors.ECODE_INVAL)
287
      logging.info("Received exports query request")
288
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289
290
      return self._Query(op)

291
292
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
      fields = args
293
      logging.info("Received config values query request for %s", fields)
294
295
296
      op = opcodes.OpQueryConfigValues(output_fields=fields)
      return self._Query(op)

297
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298
      logging.info("Received cluster info query request")
299
300
301
      op = opcodes.OpQueryClusterInfo()
      return self._Query(op)

302
303
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
      drain_flag = args
304
305
      logging.info("Received queue drain flag change request to %s",
                   drain_flag)
306
307
      return queue.SetDrainFlag(drain_flag)

308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
      (until, ) = args

      if until is None:
        logging.info("Received request to no longer pause the watcher")
      else:
        if not isinstance(until, (int, float)):
          raise TypeError("Duration must be an integer or float")

        if until < time.time():
          raise errors.GenericError("Unable to set pause end time in the past")

        logging.info("Received request to pause the watcher until %s", until)

      return _SetWatcherPause(until)

324
    else:
325
326
      logging.info("Received invalid request '%s'", method)
      raise ValueError("Invalid operation '%s'" % method)
Iustin Pop's avatar
Iustin Pop committed
327

328
329
330
331
  def _Query(self, op):
    """Runs the specified opcode and returns the result.

    """
332
333
    # Queries don't have a job id
    proc = mcpu.Processor(self.server.context, None)
334
    return proc.ExecOpCode(op, None)
335

Iustin Pop's avatar
Iustin Pop committed
336

337
338
339
340
341
342
class GanetiContext(object):
  """Context common to all ganeti threads.

  This class creates and holds common objects shared by all threads.

  """
Iustin Pop's avatar
Iustin Pop committed
343
344
  # pylint: disable-msg=W0212
  # we do want to ensure a singleton here
345
346
347
348
349
350
351
352
353
354
355
  _instance = None

  def __init__(self):
    """Constructs a new GanetiContext object.

    There should be only a GanetiContext object at any time, so this
    function raises an error if this is not the case.

    """
    assert self.__class__._instance is None, "double GanetiContext instance"

356
    # Create global configuration object
357
    self.cfg = config.ConfigWriter()
358
359

    # Locking manager
Guido Trotter's avatar
Guido Trotter committed
360
    self.glm = locking.GanetiLockManager(
361
362
363
                self.cfg.GetNodeList(),
                self.cfg.GetInstanceList())

364
365
366
    # Job queue
    self.jobqueue = jqueue.JobQueue(self)

367
368
369
370
371
372
373
374
375
376
    # setting this also locks the class against attribute modifications
    self.__class__._instance = self

  def __setattr__(self, name, value):
    """Setting GanetiContext attributes is forbidden after initialization.

    """
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
    object.__setattr__(self, name, value)

377
  def AddNode(self, node, ec_id):
378
379
380
381
    """Adds a node to the configuration and lock manager.

    """
    # Add it to the configuration
382
    self.cfg.AddNode(node, ec_id)
383

384
    # If preseeding fails it'll not be added
385
    self.jobqueue.AddNode(node)
386

387
388
389
390
391
392
393
    # Add the new node to the Ganeti Lock Manager
    self.glm.add(locking.LEVEL_NODE, node.name)

  def ReaddNode(self, node):
    """Updates a node that's already in the configuration

    """
394
    # Synchronize the queue again
395
    self.jobqueue.AddNode(node)
396
397
398
399
400
401
402
403

  def RemoveNode(self, name):
    """Removes a node from the configuration and lock manager.

    """
    # Remove node from configuration
    self.cfg.RemoveNode(name)

404
405
406
    # Notify job queue
    self.jobqueue.RemoveNode(name)

407
408
409
    # Remove the node from the Ganeti Lock Manager
    self.glm.remove(locking.LEVEL_NODE, name)

410

411
412
413
414
415
416
417
418
419
420
421
422
423
def _SetWatcherPause(until):
  """Creates or removes the watcher pause file.

  @type until: None or int
  @param until: Unix timestamp saying until when the watcher shouldn't run

  """
  if until is None:
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
  else:
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
                    data="%d\n" % (until, ))

424
425
  return until

426

427
428
429
430
431
432
433
434
435
def CheckAgreement():
  """Check the agreement on who is the master.

  The function uses a very simple algorithm: we must get more positive
  than negative answers. Since in most of the cases we are the master,
  we'll use our own config file for getting the node list. In the
  future we could collect the current node list from our (possibly
  obsolete) known nodes.

436
437
438
439
440
441
442
443
444
445
  In order to account for cold-start of all nodes, we retry for up to
  a minute until we get a real answer as the top-voted one. If the
  nodes are more out-of-sync, for now manual startup of the master
  should be attempted.

  Note that for a even number of nodes cluster, we need at least half
  of the nodes (beside ourselves) to vote for us. This creates a
  problem on two-node clusters, since in this case we require the
  other node to be up too to confirm our status.

446
447
448
449
450
451
  """
  myself = utils.HostInfo().name
  #temp instantiation of a config writer, used only to get the node list
  cfg = config.ConfigWriter()
  node_list = cfg.GetNodeList()
  del cfg
452
453
454
455
456
457
458
459
460
  retries = 6
  while retries > 0:
    votes = bootstrap.GatherMasterVotes(node_list)
    if not votes:
      # empty node list, this is a one node cluster
      return True
    if votes[0][0] is None:
      retries -= 1
      time.sleep(10)
461
      continue
462
463
    break
  if retries == 0:
Iustin Pop's avatar
Iustin Pop committed
464
465
466
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                     " after multiple retries. Aborting startup")
    return False
467
468
469
  # here a real node is at the top of the list
  all_votes = sum(item[1] for item in votes)
  top_node, top_votes = votes[0]
470

471
472
473
  result = False
  if top_node != myself:
    logging.critical("It seems we are not the master (top-voted node"
Iustin Pop's avatar
Iustin Pop committed
474
475
                     " is %s with %d out of %d votes)", top_node, top_votes,
                     all_votes)
476
  elif top_votes < all_votes - top_votes:
477
    logging.critical("It seems we are not the master (%d votes for,"
478
479
480
481
482
                     " %d votes against)", top_votes, all_votes - top_votes)
  else:
    result = True

  return result
483

Michael Hanselmann's avatar
Michael Hanselmann committed
484

485
def CheckAgreementWithRpc():
486
487
  rpc.Init()
  try:
488
    return CheckAgreement()
489
490
  finally:
    rpc.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
491

492

493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def _RunInSeparateProcess(fn):
  """Runs a function in a separate process.

  Note: Only boolean return values are supported.

  @type fn: callable
  @param fn: Function to be called
  @rtype: bool

  """
  pid = os.fork()
  if pid == 0:
    # Child process
    try:
      # Call function
      result = int(bool(fn()))
      assert result in (0, 1)
Iustin Pop's avatar
Iustin Pop committed
510
    except: # pylint: disable-msg=W0702
511
512
513
514
      logging.exception("Error while calling function in separate process")
      # 0 and 1 are reserved for the return value
      result = 33

Iustin Pop's avatar
Iustin Pop committed
515
    os._exit(result) # pylint: disable-msg=W0212
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536

  # Parent process

  # Avoid zombies and check exit code
  (_, status) = os.waitpid(pid, 0)

  if os.WIFSIGNALED(status):
    signum = os.WTERMSIG(status)
    exitcode = None
  else:
    signum = None
    exitcode = os.WEXITSTATUS(status)

  if not (exitcode in (0, 1) and signum is None):
    logging.error("Child program failed (code=%s, signal=%s)",
                  exitcode, signum)
    sys.exit(constants.EXIT_FAILURE)

  return bool(exitcode)


537
538
539
540
def CheckMasterd(options, args):
  """Initial checks whether to run or exit with a failure.

  """
541
542
543
544
  if args: # masterd doesn't take any arguments
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
    sys.exit(constants.EXIT_FAILURE)

545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
  ssconf.CheckMaster(options.debug)

  # If CheckMaster didn't fail we believe we are the master, but we have to
  # confirm with the other nodes.
  if options.no_voting:
    if options.yes_do_it:
      return

    sys.stdout.write("The 'no voting' option has been selected.\n")
    sys.stdout.write("This is dangerous, please confirm by"
                     " typing uppercase 'yes': ")
    sys.stdout.flush()

    confirmation = sys.stdin.readline().strip()
    if confirmation != "YES":
Iustin Pop's avatar
Iustin Pop committed
560
      print >> sys.stderr, "Aborting."
561
562
563
564
565
566
567
568
569
570
      sys.exit(constants.EXIT_FAILURE)

    return

  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
  # process before we call utils.Daemonize in the current process.
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
    sys.exit(constants.EXIT_FAILURE)


571
def ExecMasterd (options, args): # pylint: disable-msg=W0613
Michael Hanselmann's avatar
Michael Hanselmann committed
572
  """Main master daemon function, executed with the PID file held.
573

574
575
576
577
  """
  # This is safe to do as the pid file guarantees against
  # concurrent execution.
  utils.RemoveFile(constants.MASTER_SOCKET)
578

579
580
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
  try:
581
    rpc.Init()
582
    try:
583
      # activate ip
584
      master_node = ssconf.SimpleStore().GetMasterNode()
585
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
586
      msg = result.fail_msg
587
588
      if msg:
        logging.error("Can't activate master IP address: %s", msg)
589
590
591
592
593
594

      master.setup_queue()
      try:
        master.serve_forever()
      finally:
        master.server_cleanup()
595
    finally:
596
      rpc.Shutdown()
597
  finally:
598
    utils.RemoveFile(constants.MASTER_SOCKET)
599

Iustin Pop's avatar
Iustin Pop committed
600

601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
def main():
  """Main function"""
  parser = OptionParser(description="Ganeti master daemon",
                        usage="%prog [-f] [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)
  parser.add_option("--no-voting", dest="no_voting",
                    help="Do not check that the nodes agree on this node"
                    " being the master and start the daemon unconditionally",
                    default=False, action="store_true")
  parser.add_option("--yes-do-it", dest="yes_do_it",
                    help="Override interactive check for --no-voting",
                    default=False, action="store_true")
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
         ]
  daemon.GenericMain(constants.MASTERD, parser, dirs,
Michael Hanselmann's avatar
Michael Hanselmann committed
618
619
                     CheckMasterd, ExecMasterd)

620

Iustin Pop's avatar
Iustin Pop committed
621
622
if __name__ == "__main__":
  main()