mcpu.py 16.1 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
#
Iustin Pop's avatar
Iustin Pop committed
2
3
#

4
# Copyright (C) 2006, 2007, 2011 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module implementing the logic behind the cluster operations

This module implements the logic for doing operations in the cluster. There
are two kinds of classes defined:
  - logical units, which know how to deal with their specific opcode only
  - the processor, which dispatches the opcodes to their logical units

"""

31
import logging
32
33
import random
import time
Iustin Pop's avatar
Iustin Pop committed
34
35
36
37
38
39

from ganeti import opcodes
from ganeti import constants
from ganeti import errors
from ganeti import rpc
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
Iustin Pop's avatar
Iustin Pop committed
42

43

44
45
46
47
_OP_PREFIX = "Op"
_LU_PREFIX = "LU"


48
49
class LockAcquireTimeout(Exception):
  """Exception to report timeouts on acquiring locks.
50
51
52
53

  """


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def _CalculateLockAttemptTimeouts():
  """Calculate timeouts for lock attempts.

  """
  result = [1.0]

  # Wait for a total of at least 150s before doing a blocking acquire
  while sum(result) < 150.0:
    timeout = (result[-1] * 1.05) ** 1.25

    # Cap timeout at 10 seconds. This gives other jobs a chance to run
    # even if we're still trying to get our locks, before finally moving
    # to a blocking acquire.
    if timeout > 10.0:
      timeout = 10.0

    elif timeout < 0.1:
      # Lower boundary for safety
      timeout = 0.1

    result.append(timeout)

  return result


79
class LockAttemptTimeoutStrategy(object):
80
81
82
83
  """Class with lock acquire timeout strategy.

  """
  __slots__ = [
84
    "_timeouts",
85
    "_random_fn",
86
    "_time_fn",
87
88
    ]

89
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
90

91
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
92
93
    """Initializes this class.

94
    @param _time_fn: Time function for unittests
95
96
97
98
99
    @param _random_fn: Random number generator for unittests

    """
    object.__init__(self)

100
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
101
102
103
    self._time_fn = _time_fn
    self._random_fn = _random_fn

104
  def NextAttempt(self):
105
    """Returns the timeout for the next attempt.
106
107

    """
108
109
110
111
112
    try:
      timeout = self._timeouts.next()
    except StopIteration:
      # No more timeouts, do blocking acquire
      timeout = None
113

114
115
116
117
118
119
    if timeout is not None:
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
      # where two or more jobs are fighting for the same lock(s).
      variation_range = timeout * 0.1
      timeout += ((self._random_fn() * variation_range) -
                  (variation_range * 0.5))
120

121
    return timeout
122
123


Iustin Pop's avatar
Iustin Pop committed
124
class OpExecCbBase: # pylint: disable-msg=W0232
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
  """Base class for OpCode execution callbacks.

  """
  def NotifyStart(self):
    """Called when we are about to execute the LU.

    This function is called when we're about to start the lu's Exec() method,
    that is, after we have acquired all locks.

    """

  def Feedback(self, *args):
    """Sends feedback from the LU code to the end-user.

    """

141
142
  def CheckCancel(self):
    """Check whether job has been cancelled.
143
144
145

    """

146

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def _LUNameForOpName(opname):
  """Computes the LU name for a given OpCode name.

  """
  assert opname.startswith(_OP_PREFIX), \
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)

  return _LU_PREFIX + opname[len(_OP_PREFIX):]


def _ComputeDispatchTable():
  """Computes the opcode-to-lu dispatch table.

  """
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
              for op in opcodes.OP_MAPPING.values()
              if op.WITH_LU)


Iustin Pop's avatar
Iustin Pop committed
166
167
class Processor(object):
  """Object which runs OpCodes"""
168
  DISPATCH_TABLE = _ComputeDispatchTable()
Iustin Pop's avatar
Iustin Pop committed
169

170
  def __init__(self, context, ec_id):
Iustin Pop's avatar
Iustin Pop committed
171
172
    """Constructor for Processor

173
174
175
176
177
    @type context: GanetiContext
    @param context: global Ganeti context
    @type ec_id: string
    @param ec_id: execution context identifier

Iustin Pop's avatar
Iustin Pop committed
178
    """
179
    self.context = context
180
    self._ec_id = ec_id
181
    self._cbs = None
Iustin Pop's avatar
Iustin Pop committed
182
    self.rpc = rpc.RpcRunner(context.cfg)
183
    self.hmclass = HooksMaster
Iustin Pop's avatar
Iustin Pop committed
184

185
  def _AcquireLocks(self, level, names, shared, timeout, priority):
186
187
188
189
190
191
192
193
194
195
    """Acquires locks via the Ganeti lock manager.

    @type level: int
    @param level: Lock level
    @type names: list or string
    @param names: Lock names
    @type shared: bool
    @param shared: Whether the locks should be acquired in shared mode
    @type timeout: None or float
    @param timeout: Timeout for acquiring the locks
196
197
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
        amount of time
198
199

    """
200
201
    if self._cbs:
      self._cbs.CheckCancel()
202
203

    acquired = self.context.glm.acquire(level, names, shared=shared,
204
                                        timeout=timeout, priority=priority)
205

206
207
208
    if acquired is None:
      raise LockAcquireTimeout()

209
210
    return acquired

211
212
213
214
215
216
  def _ExecLU(self, lu):
    """Logical Unit execution sequence.

    """
    write_count = self.context.cfg.write_count
    lu.CheckPrereq()
217
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
218
219
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
220
                     self.Log, None)
221
222
223
224
225
226
227
228
229

    if getattr(lu.op, "dry_run", False):
      # in this mode, no post-hooks are run, and the config is not
      # written (as it might have been modified by another LU, and we
      # shouldn't do writeout on behalf of other threads
      self.LogInfo("dry-run mode requested, not actually executing"
                   " the operation")
      return lu.dry_run_result

230
    try:
231
      result = lu.Exec(self.Log)
232
233
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
234
                                self.Log, result)
235
236
237
238
239
240
241
    finally:
      # FIXME: This needs locks if not lu_class.REQ_BGL
      if write_count != self.context.cfg.write_count:
        hm.RunConfigUpdate()

    return result

242
  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
243
244
245
246
247
248
249
    """Execute a Logical Unit, with the needed locks.

    This is a recursive function that starts locking the given level, and
    proceeds up, till there are no more locks to acquire. Then it executes the
    given LU and its opcodes.

    """
250
251
    adding_locks = level in lu.add_locks
    acquiring_locks = level in lu.needed_locks
252
    if level not in locking.LEVELS:
253
254
255
      if self._cbs:
        self._cbs.NotifyStart()

256
      result = self._ExecLU(lu)
257

258
259
260
    elif adding_locks and acquiring_locks:
      # We could both acquire and add locks at the same level, but for now we
      # don't need this, so we'll avoid the complicated code needed.
261
262
263
      raise NotImplementedError("Can't declare locks to acquire when adding"
                                " others")

264
    elif adding_locks or acquiring_locks:
Guido Trotter's avatar
Guido Trotter committed
265
      lu.DeclareLocks(level)
Guido Trotter's avatar
Guido Trotter committed
266
      share = lu.share_locks[level]
267

268
      try:
269
270
271
272
273
274
275
        assert adding_locks ^ acquiring_locks, \
          "Locks must be either added or acquired"

        if acquiring_locks:
          # Acquiring locks
          needed_locks = lu.needed_locks[level]

276
          acquired = self._AcquireLocks(level, needed_locks, share,
277
                                        calc_timeout(), priority)
278
279
280
281
282
283
284
285
286
287
        else:
          # Adding locks
          add_locks = lu.add_locks[level]
          lu.remove_locks[level] = add_locks

          try:
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
          except errors.LockError:
            raise errors.OpPrereqError(
              "Couldn't add locks (%s), probably because of a race condition"
288
289
              " with another job, who added them first" % add_locks,
              errors.ECODE_FAULT)
290

291
292
          acquired = add_locks

293
        try:
294
295
          lu.acquired_locks[level] = acquired

296
          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
297
298
299
        finally:
          if level in lu.remove_locks:
            self.context.glm.remove(level, lu.remove_locks[level])
300
      finally:
301
        if self.context.glm.is_owned(level):
302
          self.context.glm.release(level)
303

304
    else:
305
      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
306
307
308

    return result

309
  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
Iustin Pop's avatar
Iustin Pop committed
310
311
    """Execute an opcode.

Iustin Pop's avatar
Iustin Pop committed
312
313
    @type op: an OpCode instance
    @param op: the opcode to be executed
314
315
    @type cbs: L{OpExecCbBase}
    @param cbs: Runtime callbacks
316
317
    @type timeout: float or None
    @param timeout: Maximum time to acquire all locks, None for no timeout
318
319
    @type priority: number or None
    @param priority: Priority for acquiring lock(s)
320
321
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
        amount of time
Iustin Pop's avatar
Iustin Pop committed
322
323
324

    """
    if not isinstance(op, opcodes.OpCode):
325
326
      raise errors.ProgrammerError("Non-opcode instance passed"
                                   " to ExecOpcode")
Iustin Pop's avatar
Iustin Pop committed
327

328
329
330
331
332
333
334
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
    if lu_class is None:
      raise errors.OpCodeUnknown("Unknown opcode")

    if timeout is None:
      calc_timeout = lambda: None
    else:
335
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
336

337
    self._cbs = cbs
338
    try:
339
340
341
      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
      # and in a shared fashion otherwise (to prevent concurrent run with
      # an exclusive LU.
342
343
344
      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
                          not lu_class.REQ_BGL, calc_timeout(),
                          priority)
345
346
347
348
      try:
        lu = lu_class(self, op, self.context, self.rpc)
        lu.ExpandNames()
        assert lu.needed_locks is not None, "needed_locks not set by LU"
349
350

        try:
351
352
          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
                                     priority)
353
354
355
356
357
        finally:
          if self._ec_id:
            self.context.cfg.DropECReservations(self._ec_id)
      finally:
        self.context.glm.release(locking.LEVEL_CLUSTER)
358
    finally:
359
      self._cbs = None
360

361
  def Log(self, *args):
362
363
364
365
366
367
    """Forward call to feedback callback function.

    """
    if self._cbs:
      self._cbs.Feedback(*args)

368
369
370
371
  def LogStep(self, current, total, message):
    """Log a change in LU execution progress.

    """
372
    logging.debug("Step %d/%d %s", current, total, message)
373
    self.Log("STEP %d/%d %s" % (current, total, message))
374

375
  def LogWarning(self, message, *args, **kwargs):
376
377
    """Log a warning to the logs and the user.

378
379
380
381
    The optional keyword argument is 'hint' and can be used to show a
    hint to the user (presumably related to the warning). If the
    message is empty, it will not be printed at all, allowing one to
    show only a hint.
382

383
384
385
386
387
388
389
    """
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
    if args:
      message = message % tuple(args)
    if message:
      logging.warning(message)
390
      self.Log(" - WARNING: %s" % message)
391
    if "hint" in kwargs:
392
      self.Log("      Hint: %s" % kwargs["hint"])
393
394

  def LogInfo(self, message, *args):
395
396
397
    """Log an informational message to the logs and the user.

    """
398
399
    if args:
      message = message % tuple(args)
400
    logging.info(message)
401
    self.Log(" - INFO: %s" % message)
402

403
  def GetECId(self):
404
405
406
    """Returns the current execution context ID.

    """
407
    if not self._ec_id:
408
409
      raise errors.ProgrammerError("Tried to use execution context id when"
                                   " not set")
410
411
    return self._ec_id

Iustin Pop's avatar
Iustin Pop committed
412
413
414
415
416
417
418
419
420
421
422
423
424

class HooksMaster(object):
  """Hooks master.

  This class distributes the run commands to the nodes based on the
  specific LU class.

  In order to remove the direct dependency on the rpc module, the
  constructor needs a function which actually does the remote
  call. This will usually be rpc.call_hooks_runner, but any function
  which behaves the same works.

  """
425
  def __init__(self, callfn, lu):
Iustin Pop's avatar
Iustin Pop committed
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
    self.callfn = callfn
    self.lu = lu
    self.op = lu.op
    self.env, node_list_pre, node_list_post = self._BuildEnv()
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
                      constants.HOOKS_PHASE_POST: node_list_post}

  def _BuildEnv(self):
    """Compute the environment and the target nodes.

    Based on the opcode and the current node list, this builds the
    environment for the hooks and the target node list for the run.

    """
    env = {
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
      "GANETI_OP_CODE": self.op.OP_ID,
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
445
      "GANETI_DATA_DIR": constants.DATA_DIR,
Iustin Pop's avatar
Iustin Pop committed
446
447
      }

448
449
450
451
452
453
454
    if self.lu.HPATH is not None:
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
      if lu_env:
        for key in lu_env:
          env["GANETI_" + key] = lu_env[key]
    else:
      lu_nodes_pre = lu_nodes_post = []
Iustin Pop's avatar
Iustin Pop committed
455

456
457
458
459
460
461
462
463
464
465
466
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)

  def _RunWrapper(self, node_list, hpath, phase):
    """Simple wrapper over self.callfn.

    This method fixes the environment before doing the rpc call.

    """
    env = self.env.copy()
    env["GANETI_HOOKS_PHASE"] = phase
    env["GANETI_HOOKS_PATH"] = hpath
Michael Hanselmann's avatar
Michael Hanselmann committed
467
468
469
    if self.lu.cfg is not None:
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
Iustin Pop's avatar
Iustin Pop committed
470

471
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
Iustin Pop's avatar
Iustin Pop committed
472

473
    return self.callfn(node_list, hpath, phase, env)
Iustin Pop's avatar
Iustin Pop committed
474

475
  def RunPhase(self, phase, nodes=None):
Iustin Pop's avatar
Iustin Pop committed
476
477
478
479
    """Run all the scripts for a phase.

    This is the main function of the HookMaster.

Iustin Pop's avatar
Iustin Pop committed
480
481
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
482
    @param nodes: overrides the predefined list of nodes for the given phase
Iustin Pop's avatar
Iustin Pop committed
483
484
    @return: the processed results of the hooks multi-node rpc call
    @raise errors.HooksFailure: on communication failure to the nodes
485
    @raise errors.HooksAbort: on failure of one of the hooks
486

Iustin Pop's avatar
Iustin Pop committed
487
    """
488
    if not self.node_list[phase] and not nodes:
489
490
491
      # empty node list, we should not attempt to run this as either
      # we're in the cluster init phase and the rpc client part can't
      # even attempt to run, or this LU doesn't do hooks at all
Iustin Pop's avatar
Iustin Pop committed
492
      return
493
    hpath = self.lu.HPATH
494
495
496
497
    if nodes is not None:
      results = self._RunWrapper(nodes, hpath, phase)
    else:
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
498
499
500
501
502
503
504
    errs = []
    if not results:
      msg = "Communication Failure"
      if phase == constants.HOOKS_PHASE_PRE:
        raise errors.HooksFailure(msg)
      else:
        self.lu.LogWarning(msg)
505
        return results
506
507
508
509
    for node_name in results:
      res = results[node_name]
      if res.offline:
        continue
510
      msg = res.fail_msg
511
512
513
514
515
516
517
      if msg:
        self.lu.LogWarning("Communication failure to node %s: %s",
                           node_name, msg)
        continue
      for script, hkr, output in res.payload:
        if hkr == constants.HKR_FAIL:
          if phase == constants.HOOKS_PHASE_PRE:
Iustin Pop's avatar
Iustin Pop committed
518
            errs.append((node_name, script, output))
519
520
          else:
            if not output:
521
              output = "(no output)"
522
523
524
525
            self.lu.LogWarning("On %s script %s failed, output: %s" %
                               (node_name, script, output))
    if errs and phase == constants.HOOKS_PHASE_PRE:
      raise errors.HooksAbort(errs)
526
    return results
527
528
529
530
531
532
533
534
535
536

  def RunConfigUpdate(self):
    """Run the special configuration update hook

    This is a special hook that runs only on the master after each
    top-level LI if the configuration has been updated.

    """
    phase = constants.HOOKS_PHASE_POST
    hpath = constants.HOOKS_NAME_CFGUPDATE
Michael Hanselmann's avatar
Michael Hanselmann committed
537
    nodes = [self.lu.cfg.GetMasterNode()]
Iustin Pop's avatar
Iustin Pop committed
538
    self._RunWrapper(nodes, hpath, phase)