ganeti-masterd 15.8 KB
Newer Older
1
#!/usr/bin/python -u
Iustin Pop's avatar
Iustin Pop committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Master daemon program.

Some classes deviates from the standard style guide since the
inheritance from parent classes requires it.

"""


30
import os
31
import sys
Iustin Pop's avatar
Iustin Pop committed
32
33
34
35
import SocketServer
import time
import collections
import signal
36
import logging
Iustin Pop's avatar
Iustin Pop committed
37
38

from cStringIO import StringIO
39
from optparse import OptionParser
Iustin Pop's avatar
Iustin Pop committed
40

41
from ganeti import config
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import constants
43
from ganeti import daemon
Iustin Pop's avatar
Iustin Pop committed
44
45
46
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
47
from ganeti import locking
Iustin Pop's avatar
Iustin Pop committed
48
49
from ganeti import luxi
from ganeti import utils
50
51
from ganeti import errors
from ganeti import ssconf
52
from ganeti import workerpool
53
from ganeti import rpc
54
from ganeti import bootstrap
55
from ganeti import serializer
56
57


58
59
CLIENT_REQUEST_WORKERS = 16

60
61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
Iustin Pop's avatar
Iustin Pop committed
62
63


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class ClientRequestWorker(workerpool.BaseWorker):
  def RunTask(self, server, request, client_address):
    """Process the request.

    This is copied from the code in ThreadingMixIn.

    """
    try:
      server.finish_request(request, client_address)
      server.close_request(request)
    except:
      server.handle_error(request, client_address)
      server.close_request(request)


Iustin Pop's avatar
Iustin Pop committed
79
80
81
82
83
84
85
86
class IOServer(SocketServer.UnixStreamServer):
  """IO thread class.

  This class takes care of initializing the other threads, setting
  signal handlers (which are processed only in this thread), and doing
  cleanup at shutdown.

  """
87
  def __init__(self, address, rqhandler):
88
89
    """IOServer constructor

Iustin Pop's avatar
Iustin Pop committed
90
91
    @param address: the address to bind this IOServer to
    @param rqhandler: RequestHandler type object
92
93

    """
Iustin Pop's avatar
Iustin Pop committed
94
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
95
96

    # We'll only start threads once we've forked.
97
    self.context = None
98
    self.request_workers = None
99
100

  def setup_queue(self):
101
    self.context = GanetiContext()
102
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
                                                 ClientRequestWorker)
Iustin Pop's avatar
Iustin Pop committed
104
105

  def process_request(self, request, client_address):
106
    """Add task to workerpool to process request.
Iustin Pop's avatar
Iustin Pop committed
107
108

    """
109
    self.request_workers.AddTask(self, request, client_address)
Iustin Pop's avatar
Iustin Pop committed
110

111
112
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
  def serve_forever(self, signal_handlers=None):
Iustin Pop's avatar
Iustin Pop committed
113
    """Handle one request at a time until told to quit."""
114
115
116
117
118
119
120
121
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    # Since we use SignalHandled only once, the resulting dict will map all
    # signals to the same handler. We'll just use the first one.
    sighandler = signal_handlers.values()[0]
    while not sighandler.called:
      self.handle_request()
122
123
124
125
126
127
128
129

  def server_cleanup(self):
    """Cleanup the server.

    This involves shutting down the processor threads and the master
    socket.

    """
130
131
132
    try:
      self.server_close()
    finally:
133
      if self.request_workers:
134
        self.request_workers.TerminateWorkers()
135
136
      if self.context:
        self.context.jobqueue.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152


class ClientRqHandler(SocketServer.BaseRequestHandler):
  """Client handler"""
  EOM = '\3'
  READ_SIZE = 4096

  def setup(self):
    self._buffer = ""
    self._msgs = collections.deque()
    self._ops = ClientOps(self.server)

  def handle(self):
    while True:
      msg = self.read_message()
      if msg is None:
153
        logging.debug("client closed connection")
Iustin Pop's avatar
Iustin Pop committed
154
        break
155

156
      request = serializer.LoadJson(msg)
157
      logging.debug("request: %s", request)
Iustin Pop's avatar
Iustin Pop committed
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
Iustin Pop's avatar
Iustin Pop committed
160
        break
161
162
163
164
165

      method = request.get(luxi.KEY_METHOD, None)
      args = request.get(luxi.KEY_ARGS, None)
      if method is None or args is None:
        logging.error("no method or args in request")
Iustin Pop's avatar
Iustin Pop committed
166
        break
167
168
169
170
171

      success = False
      try:
        result = self._ops.handle_request(method, args)
        success = True
172
173
174
      except errors.GenericError, err:
        success = False
        result = (err.__class__.__name__, err.args)
175
176
177
178
179
180
181
182
183
184
      except:
        logging.error("Unexpected exception", exc_info=True)
        err = sys.exc_info()
        result = "Caught exception: %s" % str(err[1])

      response = {
        luxi.KEY_SUCCESS: success,
        luxi.KEY_RESULT: result,
        }
      logging.debug("response: %s", response)
185
      self.send_message(serializer.DumpJson(response))
Iustin Pop's avatar
Iustin Pop committed
186
187
188
189
190
191
192
193
194
195
196
197
198

  def read_message(self):
    while not self._msgs:
      data = self.request.recv(self.READ_SIZE)
      if not data:
        return None
      new_msgs = (self._buffer + data).split(self.EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def send_message(self, msg):
    #print "sending", msg
199
    # TODO: sendall is not guaranteed to send everything
Iustin Pop's avatar
Iustin Pop committed
200
201
202
203
204
205
206
207
    self.request.sendall(msg + self.EOM)


class ClientOps:
  """Class holding high-level client operations."""
  def __init__(self, server):
    self.server = server

208
  def handle_request(self, method, args):
209
    queue = self.server.context.jobqueue
210
211
212
213

    # TODO: Parameter validation

    if method == luxi.REQ_SUBMIT_JOB:
214
      logging.info("Received new job")
215
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
216
      return queue.SubmitJob(ops)
Iustin Pop's avatar
Iustin Pop committed
217

218
219
220
221
222
223
224
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
      logging.info("Received multiple jobs")
      jobs = []
      for ops in args:
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
      return queue.SubmitManyJobs(jobs)

225
    elif method == luxi.REQ_CANCEL_JOB:
226
      job_id = args
227
      logging.info("Received job cancel request for %s", job_id)
228
      return queue.CancelJob(job_id)
Iustin Pop's avatar
Iustin Pop committed
229

230
    elif method == luxi.REQ_ARCHIVE_JOB:
231
      job_id = args
232
      logging.info("Received job archive request for %s", job_id)
233
234
      return queue.ArchiveJob(job_id)

Iustin Pop's avatar
Iustin Pop committed
235
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
236
      (age, timeout) = args
237
238
      logging.info("Received job autoarchive request for age %s, timeout %s",
                   age, timeout)
239
      return queue.AutoArchiveJobs(age, timeout)
Iustin Pop's avatar
Iustin Pop committed
240

241
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
242
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
243
      logging.info("Received job poll request for %s", job_id)
244
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
245
                                     prev_log_serial, timeout)
246

247
248
    elif method == luxi.REQ_QUERY_JOBS:
      (job_ids, fields) = args
249
250
251
252
253
      if isinstance(job_ids, (tuple, list)) and job_ids:
        msg = ", ".join(job_ids)
      else:
        msg = str(job_ids)
      logging.info("Received job query request for %s", msg)
254
255
      return queue.QueryJobs(job_ids, fields)

256
    elif method == luxi.REQ_QUERY_INSTANCES:
257
      (names, fields, use_locking) = args
258
      logging.info("Received instance query request for %s", names)
259
260
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
261
262
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                    use_locking=use_locking)
263
264
      return self._Query(op)

Michael Hanselmann's avatar
Michael Hanselmann committed
265
    elif method == luxi.REQ_QUERY_NODES:
266
      (names, fields, use_locking) = args
267
      logging.info("Received node query request for %s", names)
268
269
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
270
271
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                use_locking=use_locking)
Michael Hanselmann's avatar
Michael Hanselmann committed
272
273
      return self._Query(op)

274
    elif method == luxi.REQ_QUERY_EXPORTS:
275
      nodes, use_locking = args
276
277
      if use_locking:
        raise errors.OpPrereqError("Sync queries are not allowed")
278
      logging.info("Received exports query request")
279
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
280
281
      return self._Query(op)

282
283
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
      fields = args
284
      logging.info("Received config values query request for %s", fields)
285
286
287
      op = opcodes.OpQueryConfigValues(output_fields=fields)
      return self._Query(op)

288
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
289
      logging.info("Received cluster info query request")
290
291
292
      op = opcodes.OpQueryClusterInfo()
      return self._Query(op)

293
294
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
      drain_flag = args
295
296
      logging.info("Received queue drain flag change request to %s",
                   drain_flag)
297
298
      return queue.SetDrainFlag(drain_flag)

299
    else:
300
301
      logging.info("Received invalid request '%s'", method)
      raise ValueError("Invalid operation '%s'" % method)
Iustin Pop's avatar
Iustin Pop committed
302

303
304
305
306
307
308
309
310
311
  def _DummyLog(self, *args):
    pass

  def _Query(self, op):
    """Runs the specified opcode and returns the result.

    """
    proc = mcpu.Processor(self.server.context)
    # TODO: Where should log messages go?
Iustin Pop's avatar
Iustin Pop committed
312
    return proc.ExecOpCode(op, self._DummyLog, None)
313

Iustin Pop's avatar
Iustin Pop committed
314

315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
class GanetiContext(object):
  """Context common to all ganeti threads.

  This class creates and holds common objects shared by all threads.

  """
  _instance = None

  def __init__(self):
    """Constructs a new GanetiContext object.

    There should be only a GanetiContext object at any time, so this
    function raises an error if this is not the case.

    """
    assert self.__class__._instance is None, "double GanetiContext instance"

332
    # Create global configuration object
333
    self.cfg = config.ConfigWriter()
334
335

    # Locking manager
Guido Trotter's avatar
Guido Trotter committed
336
    self.glm = locking.GanetiLockManager(
337
338
339
                self.cfg.GetNodeList(),
                self.cfg.GetInstanceList())

340
341
342
    # Job queue
    self.jobqueue = jqueue.JobQueue(self)

343
344
345
346
347
348
349
350
351
352
    # setting this also locks the class against attribute modifications
    self.__class__._instance = self

  def __setattr__(self, name, value):
    """Setting GanetiContext attributes is forbidden after initialization.

    """
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
    object.__setattr__(self, name, value)

353
354
355
356
357
358
359
  def AddNode(self, node):
    """Adds a node to the configuration and lock manager.

    """
    # Add it to the configuration
    self.cfg.AddNode(node)

360
    # If preseeding fails it'll not be added
361
    self.jobqueue.AddNode(node)
362

363
364
365
366
367
368
369
    # Add the new node to the Ganeti Lock Manager
    self.glm.add(locking.LEVEL_NODE, node.name)

  def ReaddNode(self, node):
    """Updates a node that's already in the configuration

    """
370
    # Synchronize the queue again
371
    self.jobqueue.AddNode(node)
372
373
374
375
376
377
378
379

  def RemoveNode(self, name):
    """Removes a node from the configuration and lock manager.

    """
    # Remove node from configuration
    self.cfg.RemoveNode(name)

380
381
382
    # Notify job queue
    self.jobqueue.RemoveNode(name)

383
384
385
    # Remove the node from the Ganeti Lock Manager
    self.glm.remove(locking.LEVEL_NODE, name)

386

387
388
389
390
391
392
393
394
395
def CheckAgreement():
  """Check the agreement on who is the master.

  The function uses a very simple algorithm: we must get more positive
  than negative answers. Since in most of the cases we are the master,
  we'll use our own config file for getting the node list. In the
  future we could collect the current node list from our (possibly
  obsolete) known nodes.

396
397
398
399
400
401
402
403
404
405
  In order to account for cold-start of all nodes, we retry for up to
  a minute until we get a real answer as the top-voted one. If the
  nodes are more out-of-sync, for now manual startup of the master
  should be attempted.

  Note that for a even number of nodes cluster, we need at least half
  of the nodes (beside ourselves) to vote for us. This creates a
  problem on two-node clusters, since in this case we require the
  other node to be up too to confirm our status.

406
407
408
409
410
411
  """
  myself = utils.HostInfo().name
  #temp instantiation of a config writer, used only to get the node list
  cfg = config.ConfigWriter()
  node_list = cfg.GetNodeList()
  del cfg
412
413
414
415
416
417
418
419
420
  retries = 6
  while retries > 0:
    votes = bootstrap.GatherMasterVotes(node_list)
    if not votes:
      # empty node list, this is a one node cluster
      return True
    if votes[0][0] is None:
      retries -= 1
      time.sleep(10)
421
      continue
422
423
    break
  if retries == 0:
Iustin Pop's avatar
Iustin Pop committed
424
425
426
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                     " after multiple retries. Aborting startup")
    return False
427
428
429
430
431
432
  # here a real node is at the top of the list
  all_votes = sum(item[1] for item in votes)
  top_node, top_votes = votes[0]
  result = False
  if top_node != myself:
    logging.critical("It seems we are not the master (top-voted node"
Iustin Pop's avatar
Iustin Pop committed
433
434
                     " is %s with %d out of %d votes)", top_node, top_votes,
                     all_votes)
435
  elif top_votes < all_votes - top_votes:
436
    logging.critical("It seems we are not the master (%d votes for,"
437
438
439
440
441
                     " %d votes against)", top_votes, all_votes - top_votes)
  else:
    result = True

  return result
442

443
444
def CheckMASTERD(options, args):
  """Initial checks whether to run or exit with a failure
445

446
  """
447
448
449
  rpc.Init()
  try:
    ssconf.CheckMaster(options.debug)
450

451
    # we believe we are the master, let's ask the other nodes...
452
    if options.no_voting and not options.yes_do_it:
453
454
455
456
457
458
459
460
      sys.stdout.write("The 'no voting' option has been selected.\n")
      sys.stdout.write("This is dangerous, please confirm by"
                       " typing uppercase 'yes': ")
      sys.stdout.flush()
      confirmation = sys.stdin.readline().strip()
      if confirmation != "YES":
        print "Aborting."
        return
461
    elif not options.no_voting:
462
463
      if not CheckAgreement():
        return
464
465
  finally:
    rpc.Shutdown()
Iustin Pop's avatar
Iustin Pop committed
466

467

468
469
def ExecMASTERD(options, args):
  """Main MASTERD function, executed with the pidfile held.
470

471
472
473
474
  """
  # This is safe to do as the pid file guarantees against
  # concurrent execution.
  utils.RemoveFile(constants.MASTER_SOCKET)
475

476
477
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
  try:
478
    rpc.Init()
479
    try:
480
      # activate ip
481
      master_node = ssconf.SimpleStore().GetMasterNode()
482
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
483
484
485
      msg = result.RemoteFailMsg()
      if msg:
        logging.error("Can't activate master IP address: %s", msg)
486
487
488
489
490
491

      master.setup_queue()
      try:
        master.serve_forever()
      finally:
        master.server_cleanup()
492
    finally:
493
      rpc.Shutdown()
494
  finally:
495
    utils.RemoveFile(constants.MASTER_SOCKET)
496

Iustin Pop's avatar
Iustin Pop committed
497

498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
def main():
  """Main function"""
  parser = OptionParser(description="Ganeti master daemon",
                        usage="%prog [-f] [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)
  parser.add_option("--no-voting", dest="no_voting",
                    help="Do not check that the nodes agree on this node"
                    " being the master and start the daemon unconditionally",
                    default=False, action="store_true")
  parser.add_option("--yes-do-it", dest="yes_do_it",
                    help="Override interactive check for --no-voting",
                    default=False, action="store_true")
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
         ]
  daemon.GenericMain(constants.MASTERD, parser, dirs,
                     CheckMASTERD, ExecMASTERD)

Iustin Pop's avatar
Iustin Pop committed
517
518
if __name__ == "__main__":
  main()