ganeti-masterd 17.9 KB
Newer Older
1
#!/usr/bin/python -u
Iustin Pop's avatar
Iustin Pop committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Master daemon program.

Some classes deviates from the standard style guide since the
inheritance from parent classes requires it.

"""


30
import os
31
import sys
Iustin Pop's avatar
Iustin Pop committed
32
33
34
35
import SocketServer
import time
import collections
import signal
36
import logging
Iustin Pop's avatar
Iustin Pop committed
37
38

from cStringIO import StringIO
39
from optparse import OptionParser
Iustin Pop's avatar
Iustin Pop committed
40

41
from ganeti import config
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import constants
43
from ganeti import daemon
Iustin Pop's avatar
Iustin Pop committed
44
45
46
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
47
from ganeti import locking
Iustin Pop's avatar
Iustin Pop committed
48
49
from ganeti import luxi
from ganeti import utils
50
51
from ganeti import errors
from ganeti import ssconf
52
from ganeti import workerpool
53
from ganeti import rpc
54
from ganeti import bootstrap
55
from ganeti import serializer
56
57


58
59
CLIENT_REQUEST_WORKERS = 16

60
61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
Iustin Pop's avatar
Iustin Pop committed
62
63


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class ClientRequestWorker(workerpool.BaseWorker):
  def RunTask(self, server, request, client_address):
    """Process the request.

    This is copied from the code in ThreadingMixIn.

    """
    try:
      server.finish_request(request, client_address)
      server.close_request(request)
    except:
      server.handle_error(request, client_address)
      server.close_request(request)


Iustin Pop's avatar
Iustin Pop committed
79
80
81
82
83
84
85
86
class IOServer(SocketServer.UnixStreamServer):
  """IO thread class.

  This class takes care of initializing the other threads, setting
  signal handlers (which are processed only in this thread), and doing
  cleanup at shutdown.

  """
87
  def __init__(self, address, rqhandler):
88
89
    """IOServer constructor

Iustin Pop's avatar
Iustin Pop committed
90
91
    @param address: the address to bind this IOServer to
    @param rqhandler: RequestHandler type object
92
93

    """
Iustin Pop's avatar
Iustin Pop committed
94
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
95
96

    # We'll only start threads once we've forked.
97
    self.context = None
98
    self.request_workers = None
99
100

  def setup_queue(self):
101
    self.context = GanetiContext()
102
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
                                                 ClientRequestWorker)
Iustin Pop's avatar
Iustin Pop committed
104
105

  def process_request(self, request, client_address):
106
    """Add task to workerpool to process request.
Iustin Pop's avatar
Iustin Pop committed
107
108

    """
109
    self.request_workers.AddTask(self, request, client_address)
Iustin Pop's avatar
Iustin Pop committed
110

111
112
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
  def serve_forever(self, signal_handlers=None):
Iustin Pop's avatar
Iustin Pop committed
113
    """Handle one request at a time until told to quit."""
114
115
116
117
118
119
120
121
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    # Since we use SignalHandled only once, the resulting dict will map all
    # signals to the same handler. We'll just use the first one.
    sighandler = signal_handlers.values()[0]
    while not sighandler.called:
      self.handle_request()
122
123
124
125
126
127
128
129

  def server_cleanup(self):
    """Cleanup the server.

    This involves shutting down the processor threads and the master
    socket.

    """
130
131
132
    try:
      self.server_close()
    finally:
133
      if self.request_workers:
134
        self.request_workers.TerminateWorkers()
135
136
      if self.context:
        self.context.jobqueue.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152


class ClientRqHandler(SocketServer.BaseRequestHandler):
  """Client handler"""
  EOM = '\3'
  READ_SIZE = 4096

  def setup(self):
    self._buffer = ""
    self._msgs = collections.deque()
    self._ops = ClientOps(self.server)

  def handle(self):
    while True:
      msg = self.read_message()
      if msg is None:
153
        logging.debug("client closed connection")
Iustin Pop's avatar
Iustin Pop committed
154
        break
155

156
      request = serializer.LoadJson(msg)
157
      logging.debug("request: %s", request)
Iustin Pop's avatar
Iustin Pop committed
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
Iustin Pop's avatar
Iustin Pop committed
160
        break
161
162
163
164
165

      method = request.get(luxi.KEY_METHOD, None)
      args = request.get(luxi.KEY_ARGS, None)
      if method is None or args is None:
        logging.error("no method or args in request")
Iustin Pop's avatar
Iustin Pop committed
166
        break
167
168
169
170
171

      success = False
      try:
        result = self._ops.handle_request(method, args)
        success = True
172
173
      except errors.GenericError, err:
        success = False
174
        result = errors.EncodeException(err)
175
176
177
178
179
180
181
182
183
184
      except:
        logging.error("Unexpected exception", exc_info=True)
        err = sys.exc_info()
        result = "Caught exception: %s" % str(err[1])

      response = {
        luxi.KEY_SUCCESS: success,
        luxi.KEY_RESULT: result,
        }
      logging.debug("response: %s", response)
185
      self.send_message(serializer.DumpJson(response))
Iustin Pop's avatar
Iustin Pop committed
186
187
188
189
190
191
192
193
194
195
196
197
198

  def read_message(self):
    while not self._msgs:
      data = self.request.recv(self.READ_SIZE)
      if not data:
        return None
      new_msgs = (self._buffer + data).split(self.EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def send_message(self, msg):
    #print "sending", msg
199
    # TODO: sendall is not guaranteed to send everything
Iustin Pop's avatar
Iustin Pop committed
200
201
202
203
204
205
206
207
    self.request.sendall(msg + self.EOM)


class ClientOps:
  """Class holding high-level client operations."""
  def __init__(self, server):
    self.server = server

208
  def handle_request(self, method, args):
209
    queue = self.server.context.jobqueue
210
211
212
213

    # TODO: Parameter validation

    if method == luxi.REQ_SUBMIT_JOB:
214
      logging.info("Received new job")
215
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
216
      return queue.SubmitJob(ops)
Iustin Pop's avatar
Iustin Pop committed
217

218
219
220
221
222
223
224
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
      logging.info("Received multiple jobs")
      jobs = []
      for ops in args:
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
      return queue.SubmitManyJobs(jobs)

225
    elif method == luxi.REQ_CANCEL_JOB:
226
      job_id = args
227
      logging.info("Received job cancel request for %s", job_id)
228
      return queue.CancelJob(job_id)
Iustin Pop's avatar
Iustin Pop committed
229

230
    elif method == luxi.REQ_ARCHIVE_JOB:
231
      job_id = args
232
      logging.info("Received job archive request for %s", job_id)
233
234
      return queue.ArchiveJob(job_id)

Iustin Pop's avatar
Iustin Pop committed
235
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
236
      (age, timeout) = args
237
238
      logging.info("Received job autoarchive request for age %s, timeout %s",
                   age, timeout)
239
      return queue.AutoArchiveJobs(age, timeout)
Iustin Pop's avatar
Iustin Pop committed
240

241
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
242
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
243
      logging.info("Received job poll request for %s", job_id)
244
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
245
                                     prev_log_serial, timeout)
246

247
248
    elif method == luxi.REQ_QUERY_JOBS:
      (job_ids, fields) = args
249
250
251
252
253
      if isinstance(job_ids, (tuple, list)) and job_ids:
        msg = ", ".join(job_ids)
      else:
        msg = str(job_ids)
      logging.info("Received job query request for %s", msg)
254
255
      return queue.QueryJobs(job_ids, fields)

256
    elif method == luxi.REQ_QUERY_INSTANCES:
257
      (names, fields, use_locking) = args
258
      logging.info("Received instance query request for %s", names)
259
260
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
261
262
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                    use_locking=use_locking)
263
264
      return self._Query(op)

Michael Hanselmann's avatar
Michael Hanselmann committed
265
    elif method == luxi.REQ_QUERY_NODES:
266
      (names, fields, use_locking) = args
267
      logging.info("Received node query request for %s", names)
268
269
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
270
271
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                use_locking=use_locking)
Michael Hanselmann's avatar
Michael Hanselmann committed
272
273
      return self._Query(op)

274
    elif method == luxi.REQ_QUERY_EXPORTS:
275
      nodes, use_locking = args
276
277
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
278
      logging.info("Received exports query request")
279
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
280
281
      return self._Query(op)

282
283
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
      fields = args
284
      logging.info("Received config values query request for %s", fields)
285
286
287
      op = opcodes.OpQueryConfigValues(output_fields=fields)
      return self._Query(op)

288
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
289
      logging.info("Received cluster info query request")
290
291
292
      op = opcodes.OpQueryClusterInfo()
      return self._Query(op)

293
294
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
      drain_flag = args
295
296
      logging.info("Received queue drain flag change request to %s",
                   drain_flag)
297
298
      return queue.SetDrainFlag(drain_flag)

299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
      (until, ) = args

      if until is None:
        logging.info("Received request to no longer pause the watcher")
      else:
        if not isinstance(until, (int, float)):
          raise TypeError("Duration must be an integer or float")

        if until < time.time():
          raise errors.GenericError("Unable to set pause end time in the past")

        logging.info("Received request to pause the watcher until %s", until)

      return _SetWatcherPause(until)

315
    else:
316
317
      logging.info("Received invalid request '%s'", method)
      raise ValueError("Invalid operation '%s'" % method)
Iustin Pop's avatar
Iustin Pop committed
318

319
320
321
322
323
324
325
326
327
  def _DummyLog(self, *args):
    pass

  def _Query(self, op):
    """Runs the specified opcode and returns the result.

    """
    proc = mcpu.Processor(self.server.context)
    # TODO: Where should log messages go?
Iustin Pop's avatar
Iustin Pop committed
328
    return proc.ExecOpCode(op, self._DummyLog, None)
329

Iustin Pop's avatar
Iustin Pop committed
330

331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class GanetiContext(object):
  """Context common to all ganeti threads.

  This class creates and holds common objects shared by all threads.

  """
  _instance = None

  def __init__(self):
    """Constructs a new GanetiContext object.

    There should be only a GanetiContext object at any time, so this
    function raises an error if this is not the case.

    """
    assert self.__class__._instance is None, "double GanetiContext instance"

348
    # Create global configuration object
349
    self.cfg = config.ConfigWriter()
350
351

    # Locking manager
Guido Trotter's avatar
Guido Trotter committed
352
    self.glm = locking.GanetiLockManager(
353
354
355
                self.cfg.GetNodeList(),
                self.cfg.GetInstanceList())

356
357
358
    # Job queue
    self.jobqueue = jqueue.JobQueue(self)

359
360
361
362
363
364
365
366
367
368
    # setting this also locks the class against attribute modifications
    self.__class__._instance = self

  def __setattr__(self, name, value):
    """Setting GanetiContext attributes is forbidden after initialization.

    """
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
    object.__setattr__(self, name, value)

369
370
371
372
373
374
375
  def AddNode(self, node):
    """Adds a node to the configuration and lock manager.

    """
    # Add it to the configuration
    self.cfg.AddNode(node)

376
    # If preseeding fails it'll not be added
377
    self.jobqueue.AddNode(node)
378

379
380
381
382
383
384
385
    # Add the new node to the Ganeti Lock Manager
    self.glm.add(locking.LEVEL_NODE, node.name)

  def ReaddNode(self, node):
    """Updates a node that's already in the configuration

    """
386
    # Synchronize the queue again
387
    self.jobqueue.AddNode(node)
388
389
390
391
392
393
394
395

  def RemoveNode(self, name):
    """Removes a node from the configuration and lock manager.

    """
    # Remove node from configuration
    self.cfg.RemoveNode(name)

396
397
398
    # Notify job queue
    self.jobqueue.RemoveNode(name)

399
400
401
    # Remove the node from the Ganeti Lock Manager
    self.glm.remove(locking.LEVEL_NODE, name)

402

403
404
405
406
407
408
409
410
411
412
413
414
415
def _SetWatcherPause(until):
  """Creates or removes the watcher pause file.

  @type until: None or int
  @param until: Unix timestamp saying until when the watcher shouldn't run

  """
  if until is None:
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
  else:
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
                    data="%d\n" % (until, ))

416
417
  return until

418

419
420
421
422
423
424
425
426
427
def CheckAgreement():
  """Check the agreement on who is the master.

  The function uses a very simple algorithm: we must get more positive
  than negative answers. Since in most of the cases we are the master,
  we'll use our own config file for getting the node list. In the
  future we could collect the current node list from our (possibly
  obsolete) known nodes.

428
429
430
431
432
433
434
435
436
437
  In order to account for cold-start of all nodes, we retry for up to
  a minute until we get a real answer as the top-voted one. If the
  nodes are more out-of-sync, for now manual startup of the master
  should be attempted.

  Note that for a even number of nodes cluster, we need at least half
  of the nodes (beside ourselves) to vote for us. This creates a
  problem on two-node clusters, since in this case we require the
  other node to be up too to confirm our status.

438
439
440
441
442
443
  """
  myself = utils.HostInfo().name
  #temp instantiation of a config writer, used only to get the node list
  cfg = config.ConfigWriter()
  node_list = cfg.GetNodeList()
  del cfg
444
445
446
447
448
449
450
451
452
  retries = 6
  while retries > 0:
    votes = bootstrap.GatherMasterVotes(node_list)
    if not votes:
      # empty node list, this is a one node cluster
      return True
    if votes[0][0] is None:
      retries -= 1
      time.sleep(10)
453
      continue
454
455
    break
  if retries == 0:
Iustin Pop's avatar
Iustin Pop committed
456
457
458
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                     " after multiple retries. Aborting startup")
    return False
459
460
461
  # here a real node is at the top of the list
  all_votes = sum(item[1] for item in votes)
  top_node, top_votes = votes[0]
462

463
464
465
  result = False
  if top_node != myself:
    logging.critical("It seems we are not the master (top-voted node"
Iustin Pop's avatar
Iustin Pop committed
466
467
                     " is %s with %d out of %d votes)", top_node, top_votes,
                     all_votes)
468
  elif top_votes < all_votes - top_votes:
469
    logging.critical("It seems we are not the master (%d votes for,"
470
471
472
473
474
                     " %d votes against)", top_votes, all_votes - top_votes)
  else:
    result = True

  return result
475

Michael Hanselmann's avatar
Michael Hanselmann committed
476

477
def CheckAgreementWithRpc():
478
479
  rpc.Init()
  try:
480
    return CheckAgreement()
481
482
  finally:
    rpc.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
483

484

485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
def _RunInSeparateProcess(fn):
  """Runs a function in a separate process.

  Note: Only boolean return values are supported.

  @type fn: callable
  @param fn: Function to be called
  @rtype: bool

  """
  pid = os.fork()
  if pid == 0:
    # Child process
    try:
      # Call function
      result = int(bool(fn()))
      assert result in (0, 1)
    except:
      logging.exception("Error while calling function in separate process")
      # 0 and 1 are reserved for the return value
      result = 33

    os._exit(result)

  # Parent process

  # Avoid zombies and check exit code
  (_, status) = os.waitpid(pid, 0)

  if os.WIFSIGNALED(status):
    signum = os.WTERMSIG(status)
    exitcode = None
  else:
    signum = None
    exitcode = os.WEXITSTATUS(status)

  if not (exitcode in (0, 1) and signum is None):
    logging.error("Child program failed (code=%s, signal=%s)",
                  exitcode, signum)
    sys.exit(constants.EXIT_FAILURE)

  return bool(exitcode)


529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
def CheckMasterd(options, args):
  """Initial checks whether to run or exit with a failure.

  """
  ssconf.CheckMaster(options.debug)

  # If CheckMaster didn't fail we believe we are the master, but we have to
  # confirm with the other nodes.
  if options.no_voting:
    if options.yes_do_it:
      return

    sys.stdout.write("The 'no voting' option has been selected.\n")
    sys.stdout.write("This is dangerous, please confirm by"
                     " typing uppercase 'yes': ")
    sys.stdout.flush()

    confirmation = sys.stdin.readline().strip()
    if confirmation != "YES":
      print >>sys.stderr, "Aborting."
      sys.exit(constants.EXIT_FAILURE)

    return

  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
  # process before we call utils.Daemonize in the current process.
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
    sys.exit(constants.EXIT_FAILURE)


Michael Hanselmann's avatar
Michael Hanselmann committed
559
560
def ExecMasterd (options, args):
  """Main master daemon function, executed with the PID file held.
561

562
563
564
565
  """
  # This is safe to do as the pid file guarantees against
  # concurrent execution.
  utils.RemoveFile(constants.MASTER_SOCKET)
566

567
568
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
  try:
569
    rpc.Init()
570
    try:
571
      # activate ip
572
      master_node = ssconf.SimpleStore().GetMasterNode()
573
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
574
575
576
      msg = result.RemoteFailMsg()
      if msg:
        logging.error("Can't activate master IP address: %s", msg)
577
578
579
580
581
582

      master.setup_queue()
      try:
        master.serve_forever()
      finally:
        master.server_cleanup()
583
    finally:
584
      rpc.Shutdown()
585
  finally:
586
    utils.RemoveFile(constants.MASTER_SOCKET)
587

Iustin Pop's avatar
Iustin Pop committed
588

589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def main():
  """Main function"""
  parser = OptionParser(description="Ganeti master daemon",
                        usage="%prog [-f] [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)
  parser.add_option("--no-voting", dest="no_voting",
                    help="Do not check that the nodes agree on this node"
                    " being the master and start the daemon unconditionally",
                    default=False, action="store_true")
  parser.add_option("--yes-do-it", dest="yes_do_it",
                    help="Override interactive check for --no-voting",
                    default=False, action="store_true")
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
         ]
  daemon.GenericMain(constants.MASTERD, parser, dirs,
Michael Hanselmann's avatar
Michael Hanselmann committed
606
607
                     CheckMasterd, ExecMasterd)

608

Iustin Pop's avatar
Iustin Pop committed
609
610
if __name__ == "__main__":
  main()