mcpu.py 18.9 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
#
Iustin Pop's avatar
Iustin Pop committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module implementing the logic behind the cluster operations

This module implements the logic for doing operations in the cluster. There
are two kinds of classes defined:
  - logical units, which know how to deal with their specific opcode only
  - the processor, which dispatches the opcodes to their logical units

"""

31
import logging
32 33
import random
import time
Iustin Pop's avatar
Iustin Pop committed
34 35 36 37 38 39

from ganeti import opcodes
from ganeti import constants
from ganeti import errors
from ganeti import rpc
from ganeti import cmdlib
40
from ganeti import locking
41
from ganeti import utils
Iustin Pop's avatar
Iustin Pop committed
42

43

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
class _LockAcquireTimeout(Exception):
  """Internal exception to report timeouts on acquiring locks.

  """


class _LockTimeoutStrategy(object):
  """Class with lock acquire timeout strategy.

  """
  __slots__ = [
    "_attempts",
    "_random_fn",
    "_start_time",
    ]

  _MAX_ATTEMPTS = 10
  """How many retries before going into blocking mode"""

  _ATTEMPT_FACTOR = 1.75
  """Factor between attempts"""

  def __init__(self, _random_fn=None):
    """Initializes this class.

    @param _random_fn: Random number generator for unittests

    """
    object.__init__(self)

    self._start_time = None
    self._attempts = 0

    if _random_fn is None:
      self._random_fn = random.random
    else:
      self._random_fn = _random_fn

  def NextAttempt(self):
    """Advances to the next attempt.

    """
    assert self._attempts >= 0
    self._attempts += 1

  def CalcRemainingTimeout(self):
    """Returns the remaining timeout.

    """
    assert self._attempts >= 0

    if self._attempts == self._MAX_ATTEMPTS:
      # Only blocking acquires after 10 retries
      return None

    if self._attempts > self._MAX_ATTEMPTS:
      raise RuntimeError("Blocking acquire ran into timeout")

    # Get start time on first calculation
    if self._start_time is None:
      self._start_time = time.time()

    # Calculate remaining time for this attempt
    timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
               time.time())

    if timeout > 10.0:
      # Cap timeout at 10 seconds. This gives other jobs a chance to run
      # even if we're still trying to get our locks, before finally moving
      # to a blocking acquire.
      timeout = 10.0

    elif timeout < 0.1:
      # Lower boundary
      timeout = 0.1

    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
    # where two or more jobs are fighting for the same lock(s).
    variation_range = timeout * 0.1
    timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)

    assert timeout >= 0.0, "Timeout must be positive"

    return timeout


130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
class OpExecCbBase:
  """Base class for OpCode execution callbacks.

  """
  def NotifyStart(self):
    """Called when we are about to execute the LU.

    This function is called when we're about to start the lu's Exec() method,
    that is, after we have acquired all locks.

    """

  def Feedback(self, *args):
    """Sends feedback from the LU code to the end-user.

    """

147 148 149 150 151
  def ReportLocks(self, msg):
    """Report lock operations.

    """

152

Iustin Pop's avatar
Iustin Pop committed
153 154 155 156
class Processor(object):
  """Object which runs OpCodes"""
  DISPATCH_TABLE = {
    # Cluster
157
    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
Iustin Pop's avatar
Iustin Pop committed
158 159 160
    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
161
    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
162
    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
163
    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
164
    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
165
    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
166
    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
Iustin Pop's avatar
Iustin Pop committed
167 168 169
    # node lu
    opcodes.OpAddNode: cmdlib.LUAddNode,
    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
170
    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
171
    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
172
    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
173
    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
Iustin Pop's avatar
Iustin Pop committed
174
    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
Iustin Pop's avatar
Iustin Pop committed
175
    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
Iustin Pop's avatar
Iustin Pop committed
176
    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
177
    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
178
    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
Iustin Pop's avatar
Iustin Pop committed
179 180
    # instance lu
    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
181
    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
Iustin Pop's avatar
Iustin Pop committed
182
    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
183
    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
Iustin Pop's avatar
Iustin Pop committed
184 185 186
    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
187
    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
Iustin Pop's avatar
Iustin Pop committed
188 189
    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
Iustin Pop's avatar
Iustin Pop committed
190
    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
Iustin Pop's avatar
Iustin Pop committed
191
    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
192
    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
193
    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
Iustin Pop's avatar
Iustin Pop committed
194 195 196
    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
197
    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
Iustin Pop's avatar
Iustin Pop committed
198
    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
Iustin Pop's avatar
Iustin Pop committed
199 200 201 202 203
    # os lu
    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
    # exports lu
    opcodes.OpQueryExports: cmdlib.LUQueryExports,
    opcodes.OpExportInstance: cmdlib.LUExportInstance,
204
    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
205 206
    # tags lu
    opcodes.OpGetTags: cmdlib.LUGetTags,
Iustin Pop's avatar
Iustin Pop committed
207
    opcodes.OpSearchTags: cmdlib.LUSearchTags,
208 209
    opcodes.OpAddTags: cmdlib.LUAddTags,
    opcodes.OpDelTags: cmdlib.LUDelTags,
210 211
    # test lu
    opcodes.OpTestDelay: cmdlib.LUTestDelay,
212
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
Iustin Pop's avatar
Iustin Pop committed
213 214
    }

215
  def __init__(self, context):
Iustin Pop's avatar
Iustin Pop committed
216 217 218
    """Constructor for Processor

    """
219
    self.context = context
220
    self._cbs = None
Iustin Pop's avatar
Iustin Pop committed
221
    self.rpc = rpc.RpcRunner(context.cfg)
222
    self.hmclass = HooksMaster
Iustin Pop's avatar
Iustin Pop committed
223

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
  def _ReportLocks(self, level, names, shared, acquired):
    """Reports lock operations.

    @type level: int
    @param level: Lock level
    @type names: list or string
    @param names: Lock names
    @type shared: bool
    @param shared: Whether the lock should be acquired in shared mode
    @type acquired: bool
    @param acquired: Whether the lock has already been acquired

    """
    parts = []

    # Build message
    if acquired:
      parts.append("acquired")
    else:
      parts.append("waiting")

    parts.append(locking.LEVEL_NAMES[level])

    if names == locking.ALL_SET:
      parts.append("ALL")
    elif isinstance(names, basestring):
      parts.append(names)
    else:
      parts.append(",".join(names))

    if shared:
      parts.append("shared")
    else:
      parts.append("exclusive")

    msg = "/".join(parts)

    logging.debug("LU locks %s", msg)

    if self._cbs:
      self._cbs.ReportLocks(msg)

266 267 268 269 270 271
  def _ExecLU(self, lu):
    """Logical Unit execution sequence.

    """
    write_count = self.context.cfg.write_count
    lu.CheckPrereq()
272
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
273 274
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
275
                     self._Feedback, None)
276 277 278 279 280 281 282 283 284

    if getattr(lu.op, "dry_run", False):
      # in this mode, no post-hooks are run, and the config is not
      # written (as it might have been modified by another LU, and we
      # shouldn't do writeout on behalf of other threads
      self.LogInfo("dry-run mode requested, not actually executing"
                   " the operation")
      return lu.dry_run_result

285
    try:
286
      result = lu.Exec(self._Feedback)
287 288
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
289
                                self._Feedback, result)
290 291 292 293 294 295 296
    finally:
      # FIXME: This needs locks if not lu_class.REQ_BGL
      if write_count != self.context.cfg.write_count:
        hm.RunConfigUpdate()

    return result

297
  def _LockAndExecLU(self, lu, level, calc_timeout):
298 299 300 301 302 303 304
    """Execute a Logical Unit, with the needed locks.

    This is a recursive function that starts locking the given level, and
    proceeds up, till there are no more locks to acquire. Then it executes the
    given LU and its opcodes.

    """
305 306
    adding_locks = level in lu.add_locks
    acquiring_locks = level in lu.needed_locks
307
    if level not in locking.LEVELS:
308 309 310
      if self._cbs:
        self._cbs.NotifyStart()

311
      result = self._ExecLU(lu)
312

313 314 315
    elif adding_locks and acquiring_locks:
      # We could both acquire and add locks at the same level, but for now we
      # don't need this, so we'll avoid the complicated code needed.
316 317 318
      raise NotImplementedError("Can't declare locks to acquire when adding"
                                " others")

319
    elif adding_locks or acquiring_locks:
Guido Trotter's avatar
Guido Trotter committed
320
      lu.DeclareLocks(level)
Guido Trotter's avatar
Guido Trotter committed
321
      share = lu.share_locks[level]
322

323
      try:
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
        assert adding_locks ^ acquiring_locks, \
          "Locks must be either added or acquired"

        if acquiring_locks:
          # Acquiring locks
          needed_locks = lu.needed_locks[level]

          self._ReportLocks(level, needed_locks, share, False)
          acquired = self.context.glm.acquire(level,
                                              needed_locks,
                                              shared=share,
                                              timeout=calc_timeout())
          # TODO: Report timeout
          self._ReportLocks(level, needed_locks, share, True)

          if acquired is None:
            raise _LockAcquireTimeout()

          lu.acquired_locks[level] = acquired

        else:
          # Adding locks
          add_locks = lu.add_locks[level]
          lu.remove_locks[level] = add_locks

          try:
            self.context.glm.add(level, add_locks, acquired=1, shared=share)
          except errors.LockError:
            raise errors.OpPrereqError(
              "Couldn't add locks (%s), probably because of a race condition"
              " with another job, who added them first" % add_locks)

          lu.acquired_locks[level] = add_locks
357
        try:
358
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
359 360 361
        finally:
          if level in lu.remove_locks:
            self.context.glm.remove(level, lu.remove_locks[level])
362
      finally:
363
        if self.context.glm.is_owned(level):
364
          self.context.glm.release(level)
365

366
    else:
367
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
368 369 370

    return result

371
  def ExecOpCode(self, op, cbs):
Iustin Pop's avatar
Iustin Pop committed
372 373
    """Execute an opcode.

Iustin Pop's avatar
Iustin Pop committed
374 375
    @type op: an OpCode instance
    @param op: the opcode to be executed
376 377
    @type cbs: L{OpExecCbBase}
    @param cbs: Runtime callbacks
Iustin Pop's avatar
Iustin Pop committed
378 379 380

    """
    if not isinstance(op, opcodes.OpCode):
381 382
      raise errors.ProgrammerError("Non-opcode instance passed"
                                   " to ExecOpcode")
Iustin Pop's avatar
Iustin Pop committed
383

384
    self._cbs = cbs
385
    try:
386 387 388 389
      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
      if lu_class is None:
        raise errors.OpCodeUnknown("Unknown opcode")

390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
      timeout_strategy = _LockTimeoutStrategy()
      calc_timeout = timeout_strategy.CalcRemainingTimeout

      while True:
        try:
          self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
                            not lu_class.REQ_BGL, False)
          try:
            # Acquire the Big Ganeti Lock exclusively if this LU requires it,
            # and in a shared fashion otherwise (to prevent concurrent run with
            # an exclusive LU.
            acquired_bgl = self.context.glm.acquire(locking.LEVEL_CLUSTER,
                                                    [locking.BGL],
                                                    shared=not lu_class.REQ_BGL,
                                                    timeout=calc_timeout())
          finally:
            # TODO: Report timeout
            self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
                              not lu_class.REQ_BGL, True)

          if acquired_bgl is None:
            raise _LockAcquireTimeout()

          try:
            lu = lu_class(self, op, self.context, self.rpc)
            lu.ExpandNames()
            assert lu.needed_locks is not None, "needed_locks not set by LU"

            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
          finally:
            self.context.glm.release(locking.LEVEL_CLUSTER)

        except _LockAcquireTimeout:
          # Timeout while waiting for lock, try again
          pass

        timeout_strategy.NextAttempt()

428
    finally:
429
      self._cbs = None
430

431 432 433 434 435 436 437
  def _Feedback(self, *args):
    """Forward call to feedback callback function.

    """
    if self._cbs:
      self._cbs.Feedback(*args)

438 439 440 441
  def LogStep(self, current, total, message):
    """Log a change in LU execution progress.

    """
442
    logging.debug("Step %d/%d %s", current, total, message)
443
    self._Feedback("STEP %d/%d %s" % (current, total, message))
444

445
  def LogWarning(self, message, *args, **kwargs):
446 447
    """Log a warning to the logs and the user.

448 449 450 451
    The optional keyword argument is 'hint' and can be used to show a
    hint to the user (presumably related to the warning). If the
    message is empty, it will not be printed at all, allowing one to
    show only a hint.
452

453 454 455 456 457 458 459
    """
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
    if args:
      message = message % tuple(args)
    if message:
      logging.warning(message)
460
      self._Feedback(" - WARNING: %s" % message)
461
    if "hint" in kwargs:
462
      self._Feedback("      Hint: %s" % kwargs["hint"])
463 464

  def LogInfo(self, message, *args):
465 466 467
    """Log an informational message to the logs and the user.

    """
468 469
    if args:
      message = message % tuple(args)
470
    logging.info(message)
471
    self._Feedback(" - INFO: %s" % message)
472

Iustin Pop's avatar
Iustin Pop committed
473 474 475 476 477 478 479 480 481 482 483 484 485

class HooksMaster(object):
  """Hooks master.

  This class distributes the run commands to the nodes based on the
  specific LU class.

  In order to remove the direct dependency on the rpc module, the
  constructor needs a function which actually does the remote
  call. This will usually be rpc.call_hooks_runner, but any function
  which behaves the same works.

  """
486
  def __init__(self, callfn, lu):
Iustin Pop's avatar
Iustin Pop committed
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
    self.callfn = callfn
    self.lu = lu
    self.op = lu.op
    self.env, node_list_pre, node_list_post = self._BuildEnv()
    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
                      constants.HOOKS_PHASE_POST: node_list_post}

  def _BuildEnv(self):
    """Compute the environment and the target nodes.

    Based on the opcode and the current node list, this builds the
    environment for the hooks and the target node list for the run.

    """
    env = {
      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
      "GANETI_OP_CODE": self.op.OP_ID,
      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
506
      "GANETI_DATA_DIR": constants.DATA_DIR,
Iustin Pop's avatar
Iustin Pop committed
507 508
      }

509 510 511 512 513 514 515
    if self.lu.HPATH is not None:
      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
      if lu_env:
        for key in lu_env:
          env["GANETI_" + key] = lu_env[key]
    else:
      lu_nodes_pre = lu_nodes_post = []
Iustin Pop's avatar
Iustin Pop committed
516

517 518 519 520 521 522 523 524 525 526 527
    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)

  def _RunWrapper(self, node_list, hpath, phase):
    """Simple wrapper over self.callfn.

    This method fixes the environment before doing the rpc call.

    """
    env = self.env.copy()
    env["GANETI_HOOKS_PHASE"] = phase
    env["GANETI_HOOKS_PATH"] = hpath
Michael Hanselmann's avatar
Michael Hanselmann committed
528 529 530
    if self.lu.cfg is not None:
      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
Iustin Pop's avatar
Iustin Pop committed
531

532
    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
Iustin Pop's avatar
Iustin Pop committed
533

534
    return self.callfn(node_list, hpath, phase, env)
Iustin Pop's avatar
Iustin Pop committed
535

536
  def RunPhase(self, phase, nodes=None):
Iustin Pop's avatar
Iustin Pop committed
537 538 539 540
    """Run all the scripts for a phase.

    This is the main function of the HookMaster.

Iustin Pop's avatar
Iustin Pop committed
541 542
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
543
    @param nodes: overrides the predefined list of nodes for the given phase
Iustin Pop's avatar
Iustin Pop committed
544 545
    @return: the processed results of the hooks multi-node rpc call
    @raise errors.HooksFailure: on communication failure to the nodes
546
    @raise errors.HooksAbort: on failure of one of the hooks
547

Iustin Pop's avatar
Iustin Pop committed
548
    """
549
    if not self.node_list[phase] and not nodes:
550 551 552
      # empty node list, we should not attempt to run this as either
      # we're in the cluster init phase and the rpc client part can't
      # even attempt to run, or this LU doesn't do hooks at all
Iustin Pop's avatar
Iustin Pop committed
553
      return
554
    hpath = self.lu.HPATH
555 556 557 558
    if nodes is not None:
      results = self._RunWrapper(nodes, hpath, phase)
    else:
      results = self._RunWrapper(self.node_list[phase], hpath, phase)
559 560 561 562 563 564 565
    errs = []
    if not results:
      msg = "Communication Failure"
      if phase == constants.HOOKS_PHASE_PRE:
        raise errors.HooksFailure(msg)
      else:
        self.lu.LogWarning(msg)
566
        return results
567 568 569 570
    for node_name in results:
      res = results[node_name]
      if res.offline:
        continue
571
      msg = res.fail_msg
572 573 574 575 576 577 578
      if msg:
        self.lu.LogWarning("Communication failure to node %s: %s",
                           node_name, msg)
        continue
      for script, hkr, output in res.payload:
        if hkr == constants.HKR_FAIL:
          if phase == constants.HOOKS_PHASE_PRE:
Iustin Pop's avatar
Iustin Pop committed
579
            errs.append((node_name, script, output))
580 581
          else:
            if not output:
582
              output = "(no output)"
583 584 585 586
            self.lu.LogWarning("On %s script %s failed, output: %s" %
                               (node_name, script, output))
    if errs and phase == constants.HOOKS_PHASE_PRE:
      raise errors.HooksAbort(errs)
587
    return results
588 589 590 591 592 593 594 595 596 597

  def RunConfigUpdate(self):
    """Run the special configuration update hook

    This is a special hook that runs only on the master after each
    top-level LI if the configuration has been updated.

    """
    phase = constants.HOOKS_PHASE_POST
    hpath = constants.HOOKS_NAME_CFGUPDATE
Michael Hanselmann's avatar
Michael Hanselmann committed
598
    nodes = [self.lu.cfg.GetMasterNode()]
Iustin Pop's avatar
Iustin Pop committed
599
    self._RunWrapper(nodes, hpath, phase)