node.py 60.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#
#

# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Logical units dealing with nodes."""

import logging
import operator

from ganeti import constants
from ganeti import errors
from ganeti import locking
from ganeti import netutils
from ganeti import objects
from ganeti import opcodes
from ganeti import qlang
from ganeti import query
from ganeti import rpc
from ganeti import utils
from ganeti.masterd import iallocator

39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
42
43
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
Thomas Thrainer's avatar
Thomas Thrainer committed
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
46
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
Thomas Thrainer's avatar
Thomas Thrainer committed
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66


def _DecideSelfPromotion(lu, exceptions=None):
  """Decide whether I should promote myself as a master candidate.

  """
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
  # the new node will increase mc_max with one, so:
  mc_should = min(mc_should + 1, cp_size)
  return mc_now < mc_should


def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
  """Ensure that a node has the given secondary ip.

  @type lu: L{LogicalUnit}
  @param lu: the LU on behalf of which we make the check
Thomas Thrainer's avatar
Thomas Thrainer committed
67
  @type node: L{objects.Node}
68
69
70
71
72
  @param node: the node to check
  @type secondary_ip: string
  @param secondary_ip: the ip to check
  @type prereq: boolean
  @param prereq: whether to throw a prerequisite or an execute error
Thomas Thrainer's avatar
Thomas Thrainer committed
73
74
  @raise errors.OpPrereqError: if the node doesn't have the ip,
  and prereq=True
75
76
77
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False

  """
Thomas Thrainer's avatar
Thomas Thrainer committed
78
79
80
81
  # this can be called with a new node, which has no UUID yet, so perform the
  # RPC call using its name
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
  if not result.payload:
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
           " please fix and re-run this command" % secondary_ip)
    if prereq:
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
    else:
      raise errors.OpExecError(msg)


class LUNodeAdd(LogicalUnit):
  """Logical unit for adding node to the cluster.

  """
  HPATH = "node-add"
  HTYPE = constants.HTYPE_NODE
  _NFLAGS = ["master_capable", "vm_capable"]

  def CheckArguments(self):
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
    # validate/normalize the node name
    self.hostname = netutils.GetHostname(name=self.op.node_name,
                                         family=self.primary_ip_family)
    self.op.node_name = self.hostname.name

Thomas Thrainer's avatar
Thomas Thrainer committed
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
      raise errors.OpPrereqError("Cannot readd the master node",
                                 errors.ECODE_STATE)

    if self.op.readd and self.op.group:
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
                                 " being readded", errors.ECODE_INVAL)

  def BuildHooksEnv(self):
    """Build hooks env.

    This will run on all nodes before, and on all nodes + the new node after.

    """
    return {
      "OP_TARGET": self.op.node_name,
      "NODE_NAME": self.op.node_name,
      "NODE_PIP": self.op.primary_ip,
      "NODE_SIP": self.op.secondary_ip,
      "MASTER_CAPABLE": str(self.op.master_capable),
      "VM_CAPABLE": str(self.op.vm_capable),
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
134
135
136
137
138
    hook_nodes = self.cfg.GetNodeList()
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
    if new_node_info is not None:
      # Exclude added node
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

Thomas Thrainer's avatar
Thomas Thrainer committed
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
142
143
144
    return (hook_nodes, hook_nodes)

  def PreparePostHookNodes(self, post_hook_node_uuids):
    return post_hook_node_uuids + [self.new_node.uuid]
145
146
147
148
149
150
151
152
153
154
155
156

  def CheckPrereq(self):
    """Check prerequisites.

    This checks:
     - the new node is not already in the config
     - it is resolvable
     - its parameters (single/dual homed) matches the cluster

    Any errors are signaled by raising errors.OpPrereqError.

    """
157
158
    node_name = self.hostname.name
    self.op.primary_ip = self.hostname.ip
159
160
161
162
163
    if self.op.secondary_ip is None:
      if self.primary_ip_family == netutils.IP6Address.family:
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
                                   " IPv4 address must be given as secondary",
                                   errors.ECODE_INVAL)
164
      self.op.secondary_ip = self.op.primary_ip
165
166
167
168
169
170

    secondary_ip = self.op.secondary_ip
    if not netutils.IP4Address.IsValid(secondary_ip):
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
                                 " address" % secondary_ip, errors.ECODE_INVAL)

171
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
Thomas Thrainer's avatar
Thomas Thrainer committed
172
    if not self.op.readd and existing_node_info is not None:
173
      raise errors.OpPrereqError("Node %s is already in the configuration" %
Thomas Thrainer's avatar
Thomas Thrainer committed
174
175
176
177
                                 node_name, errors.ECODE_EXISTS)
    elif self.op.readd and existing_node_info is None:
      raise errors.OpPrereqError("Node %s is not in the configuration" %
                                 node_name, errors.ECODE_NOENT)
178
179
180

    self.changed_primary_ip = False

181
    for existing_node in self.cfg.GetAllNodesInfo().values():
Thomas Thrainer's avatar
Thomas Thrainer committed
182
      if self.op.readd and node_name == existing_node.name:
183
184
185
186
        if existing_node.secondary_ip != secondary_ip:
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
                                     " address configuration as before",
                                     errors.ECODE_INVAL)
187
        if existing_node.primary_ip != self.op.primary_ip:
188
189
190
191
          self.changed_primary_ip = True

        continue

192
193
      if (existing_node.primary_ip == self.op.primary_ip or
          existing_node.secondary_ip == self.op.primary_ip or
194
195
196
197
198
199
200
201
202
          existing_node.primary_ip == secondary_ip or
          existing_node.secondary_ip == secondary_ip):
        raise errors.OpPrereqError("New node ip address(es) conflict with"
                                   " existing node %s" % existing_node.name,
                                   errors.ECODE_NOTUNIQUE)

    # After this 'if' block, None is no longer a valid value for the
    # _capable op attributes
    if self.op.readd:
Thomas Thrainer's avatar
Thomas Thrainer committed
203
204
      assert existing_node_info is not None, \
        "Can't retrieve locked node %s" % node_name
205
206
      for attr in self._NFLAGS:
        if getattr(self.op, attr) is None:
Thomas Thrainer's avatar
Thomas Thrainer committed
207
          setattr(self.op, attr, getattr(existing_node_info, attr))
208
209
210
211
212
213
    else:
      for attr in self._NFLAGS:
        if getattr(self.op, attr) is None:
          setattr(self.op, attr, True)

    if self.op.readd and not self.op.vm_capable:
214
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
215
216
217
      if pri or sec:
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
                                   " flag set to false, but it already holds"
Thomas Thrainer's avatar
Thomas Thrainer committed
218
                                   " instances" % node_name,
219
220
221
222
                                   errors.ECODE_STATE)

    # check that the type of the node (single versus dual homed) is the
    # same as for the master
223
    myself = self.cfg.GetMasterNodeInfo()
224
    master_singlehomed = myself.secondary_ip == myself.primary_ip
225
    newbie_singlehomed = secondary_ip == self.op.primary_ip
226
227
228
229
230
231
232
233
234
235
236
    if master_singlehomed != newbie_singlehomed:
      if master_singlehomed:
        raise errors.OpPrereqError("The master has no secondary ip but the"
                                   " new node has one",
                                   errors.ECODE_INVAL)
      else:
        raise errors.OpPrereqError("The master has a secondary ip but the"
                                   " new node doesn't have one",
                                   errors.ECODE_INVAL)

    # checks reachability
237
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
238
239
240
241
242
243
244
245
246
247
248
249
      raise errors.OpPrereqError("Node not reachable by ping",
                                 errors.ECODE_ENVIRON)

    if not newbie_singlehomed:
      # check reachability from my secondary ip to newbie's secondary ip
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
                              source=myself.secondary_ip):
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
                                   " based ping to node daemon port",
                                   errors.ECODE_ENVIRON)

    if self.op.readd:
Thomas Thrainer's avatar
Thomas Thrainer committed
250
      exceptions = [existing_node_info.uuid]
251
252
253
254
255
256
257
258
259
    else:
      exceptions = []

    if self.op.master_capable:
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
    else:
      self.master_candidate = False

    if self.op.readd:
Thomas Thrainer's avatar
Thomas Thrainer committed
260
      self.new_node = existing_node_info
261
    else:
262
      node_group = self.cfg.LookupNodeGroup(self.op.group)
Thomas Thrainer's avatar
Thomas Thrainer committed
263
      self.new_node = objects.Node(name=node_name,
264
                                   primary_ip=self.op.primary_ip,
265
266
267
268
269
270
271
                                   secondary_ip=secondary_ip,
                                   master_candidate=self.master_candidate,
                                   offline=False, drained=False,
                                   group=node_group, ndparams={})

    if self.op.ndparams:
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
272
273
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
                           "node", "cluster or group")
274
275

    if self.op.hv_state:
276
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
277
278

    if self.op.disk_state:
279
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
280
281
282
283

    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
    #       it a property on the base class.
    rpcrunner = rpc.DnsOnlyRunner()
Thomas Thrainer's avatar
Thomas Thrainer committed
284
    result = rpcrunner.call_version([node_name])[node_name]
285
286
    result.Raise("Can't get version information from node %s" % node_name,
                 prereq=True)
287
288
    if constants.PROTOCOL_VERSION == result.payload:
      logging.info("Communication to node %s fine, sw version %s match",
Thomas Thrainer's avatar
Thomas Thrainer committed
289
                   node_name, result.payload)
290
291
292
293
294
295
    else:
      raise errors.OpPrereqError("Version mismatch master version %s,"
                                 " node version %s" %
                                 (constants.PROTOCOL_VERSION, result.payload),
                                 errors.ECODE_ENVIRON)

296
    vg_name = self.cfg.GetVGName()
297
298
    if vg_name is not None:
      vparams = {constants.NV_PVLIST: [vg_name]}
299
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
300
      cname = self.cfg.GetClusterName()
301
      result = rpcrunner.call_node_verify_light(
302
303
          [node_name], vparams, cname,
          self.cfg.GetClusterInfo().hvparams)[node_name]
304
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
305
306
307
308
      if errmsgs:
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)

309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
  def _InitOpenVSwitch(self):
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))

    ovs = filled_ndparams.get(constants.ND_OVS, None)
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)

    if ovs:
      if not ovs_link:
        self.LogInfo("No physical interface for OpenvSwitch was given."
                     " OpenvSwitch will not have an outside connection. This"
                     " might not be what you want.")

      result = self.rpc.call_node_configure_ovs(
                 self.new_node.name, ovs_name, ovs_link)
      result.Raise("Failed to initialize OpenVSwitch on new node")

327
328
329
330
331
332
333
334
  def Exec(self, feedback_fn):
    """Adds the new node to the cluster.

    """
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
      "Not owning BGL"

    # We adding a new node so we assume it's powered
335
    self.new_node.powered = True
336
337
338
339
340
341

    # for re-adds, reset the offline/drained/master-candidate flags;
    # we need to reset here, otherwise offline would prevent RPC calls
    # later in the procedure; this also means that if the re-add
    # fails, we are left with a non-offlined, broken node
    if self.op.readd:
Klaus Aehlig's avatar
Klaus Aehlig committed
342
      self.new_node.offline = False
343
      self.new_node.drained = False
344
345
      self.LogInfo("Readding a node, the offline/drained flags were reset")
      # if we demote the node, we do cleanup later in the procedure
346
      self.new_node.master_candidate = self.master_candidate
347
      if self.changed_primary_ip:
348
        self.new_node.primary_ip = self.op.primary_ip
349
350
351

    # copy the master/vm_capable flags
    for attr in self._NFLAGS:
352
      setattr(self.new_node, attr, getattr(self.op, attr))
353
354

    # notify the user about any possible mc promotion
355
    if self.new_node.master_candidate:
356
357
358
      self.LogInfo("Node will be a master candidate")

    if self.op.ndparams:
359
      self.new_node.ndparams = self.op.ndparams
360
    else:
361
      self.new_node.ndparams = {}
362
363

    if self.op.hv_state:
364
      self.new_node.hv_state_static = self.new_hv_state
365
366

    if self.op.disk_state:
367
      self.new_node.disk_state_static = self.new_disk_state
368
369
370
371

    # Add node to our /etc/hosts, and add key to known_hosts
    if self.cfg.GetClusterInfo().modify_etc_hosts:
      master_node = self.cfg.GetMasterNode()
Thomas Thrainer's avatar
Thomas Thrainer committed
372
373
374
      result = self.rpc.call_etc_hosts_modify(
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
                 self.hostname.ip)
375
376
      result.Raise("Can't update hosts file with new host data")

377
378
379
    if self.new_node.secondary_ip != self.new_node.primary_ip:
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
                               False)
380

Thomas Thrainer's avatar
Thomas Thrainer committed
381
    node_verifier_uuids = [self.cfg.GetMasterNode()]
382
    node_verify_param = {
383
      constants.NV_NODELIST: ([self.new_node.name], {}),
384
385
386
      # TODO: do a node-net-test as well?
    }

Thomas Thrainer's avatar
Thomas Thrainer committed
387
388
389
390
391
    result = self.rpc.call_node_verify(
               node_verifier_uuids, node_verify_param,
               self.cfg.GetClusterName(),
               self.cfg.GetClusterInfo().hvparams)
    for verifier in node_verifier_uuids:
392
393
394
395
396
397
398
399
400
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
      if nl_payload:
        for failed in nl_payload:
          feedback_fn("ssh/hostname verification failed"
                      " (checking from %s): %s" %
                      (verifier, nl_payload[failed]))
        raise errors.OpExecError("ssh/hostname verification failed")

401
    self._InitOpenVSwitch()
402

403
    if self.op.readd:
404
      self.context.ReaddNode(self.new_node)
Thomas Thrainer's avatar
Thomas Thrainer committed
405
      RedistributeAncillaryFiles(self)
406
      # make sure we redistribute the config
407
      self.cfg.Update(self.new_node, feedback_fn)
408
      # and make sure the new node will not have old files around
409
410
      if not self.new_node.master_candidate:
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
411
412
        result.Warn("Node failed to demote itself from master candidate status",
                    self.LogWarning)
413
    else:
414
      self.context.AddNode(self.new_node, self.proc.GetECId())
Thomas Thrainer's avatar
Thomas Thrainer committed
415
      RedistributeAncillaryFiles(self)
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440


class LUNodeSetParams(LogicalUnit):
  """Modifies the parameters of a node.

  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
      to the node role (as _ROLE_*)
  @cvar _R2F: a dictionary from node role to tuples of flags
  @cvar _FLAGS: a list of attribute names corresponding to the flags

  """
  HPATH = "node-modify"
  HTYPE = constants.HTYPE_NODE
  REQ_BGL = False
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
  _F2R = {
    (True, False, False): _ROLE_CANDIDATE,
    (False, True, False): _ROLE_DRAINED,
    (False, False, True): _ROLE_OFFLINE,
    (False, False, False): _ROLE_REGULAR,
    }
  _R2F = dict((v, k) for k, v in _F2R.items())
  _FLAGS = ["master_candidate", "drained", "offline"]

  def CheckArguments(self):
Thomas Thrainer's avatar
Thomas Thrainer committed
441
442
    (self.op.node_uuid, self.op.node_name) = \
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
                self.op.master_capable, self.op.vm_capable,
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
                self.op.disk_state]
    if all_mods.count(None) == len(all_mods):
      raise errors.OpPrereqError("Please pass at least one modification",
                                 errors.ECODE_INVAL)
    if all_mods.count(True) > 1:
      raise errors.OpPrereqError("Can't set the node into more than one"
                                 " state at the same time",
                                 errors.ECODE_INVAL)

    # Boolean value that tells us whether we might be demoting from MC
    self.might_demote = (self.op.master_candidate is False or
                         self.op.offline is True or
                         self.op.drained is True or
                         self.op.master_capable is False)

    if self.op.secondary_ip:
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
                                   " address" % self.op.secondary_ip,
                                   errors.ECODE_INVAL)

    self.lock_all = self.op.auto_promote and self.might_demote
    self.lock_instances = self.op.secondary_ip is not None

  def _InstanceFilter(self, instance):
    """Filter for getting affected instances.

    """
    return (instance.disk_template in constants.DTS_INT_MIRROR and
Thomas Thrainer's avatar
Thomas Thrainer committed
475
            self.op.node_uuid in instance.all_nodes)
476
477
478
479
480
481
482
483
484
485
486

  def ExpandNames(self):
    if self.lock_all:
      self.needed_locks = {
        locking.LEVEL_NODE: locking.ALL_SET,

        # Block allocations when all nodes are locked
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
        }
    else:
      self.needed_locks = {
Thomas Thrainer's avatar
Thomas Thrainer committed
487
        locking.LEVEL_NODE: self.op.node_uuid,
488
489
490
491
492
493
494
495
496
        }

    # Since modifying a node can have severe effects on currently running
    # operations the resource lock is at least acquired in shared mode
    self.needed_locks[locking.LEVEL_NODE_RES] = \
      self.needed_locks[locking.LEVEL_NODE]

    # Get all locks except nodes in shared mode; they are not used for anything
    # but read-only access
497
    self.share_locks = ShareAll()
498
499
500
501
502
503
    self.share_locks[locking.LEVEL_NODE] = 0
    self.share_locks[locking.LEVEL_NODE_RES] = 0
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0

    if self.lock_instances:
      self.needed_locks[locking.LEVEL_INSTANCE] = \
504
505
        self.cfg.GetInstanceNames(
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on the master node.

    """
    return {
      "OP_TARGET": self.op.node_name,
      "MASTER_CANDIDATE": str(self.op.master_candidate),
      "OFFLINE": str(self.op.offline),
      "DRAINED": str(self.op.drained),
      "MASTER_CAPABLE": str(self.op.master_capable),
      "VM_CAPABLE": str(self.op.vm_capable),
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
526
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
527
528
529
530
531
532
533
534
    return (nl, nl)

  def CheckPrereq(self):
    """Check prerequisites.

    This only checks the instance list against the existing names.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
535
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
536
537
538
539
540
    if self.lock_instances:
      affected_instances = \
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)

      # Verify instance locks
541
542
543
544
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
      wanted_instance_names = frozenset([inst.name for inst in
                                         affected_instances.values()])
      if wanted_instance_names - owned_instance_names:
545
546
547
548
        raise errors.OpPrereqError("Instances affected by changing node %s's"
                                   " secondary IP address have changed since"
                                   " locks were acquired, wanted '%s', have"
                                   " '%s'; retry the operation" %
Thomas Thrainer's avatar
Thomas Thrainer committed
549
                                   (node.name,
550
551
                                    utils.CommaJoin(wanted_instance_names),
                                    utils.CommaJoin(owned_instance_names)),
552
553
554
555
556
557
558
559
                                   errors.ECODE_STATE)
    else:
      affected_instances = None

    if (self.op.master_candidate is not None or
        self.op.drained is not None or
        self.op.offline is not None):
      # we can't change the master's node flags
Thomas Thrainer's avatar
Thomas Thrainer committed
560
      if node.uuid == self.cfg.GetMasterNode():
561
562
563
564
565
566
567
568
569
570
        raise errors.OpPrereqError("The master role can be changed"
                                   " only via master-failover",
                                   errors.ECODE_INVAL)

    if self.op.master_candidate and not node.master_capable:
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
                                 " it a master candidate" % node.name,
                                 errors.ECODE_STATE)

    if self.op.vm_capable is False:
Thomas Thrainer's avatar
Thomas Thrainer committed
571
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
572
573
574
575
576
577
578
579
580
581
      if ipri or isec:
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
                                   " the vm_capable flag" % node.name,
                                   errors.ECODE_STATE)

    if node.master_candidate and self.might_demote and not self.lock_all:
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
      # check if after removing the current node, we're missing master
      # candidates
      (mc_remaining, mc_should, _) = \
Thomas Thrainer's avatar
Thomas Thrainer committed
582
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
583
584
585
586
587
588
589
590
591
592
593
594
595
      if mc_remaining < mc_should:
        raise errors.OpPrereqError("Not enough master candidates, please"
                                   " pass auto promote option to allow"
                                   " promotion (--auto-promote or RAPI"
                                   " auto_promote=True)", errors.ECODE_STATE)

    self.old_flags = old_flags = (node.master_candidate,
                                  node.drained, node.offline)
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
    self.old_role = old_role = self._F2R[old_flags]

    # Check for ineffective changes
    for attr in self._FLAGS:
Thomas Thrainer's avatar
Thomas Thrainer committed
596
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
597
598
599
600
601
602
603
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
        setattr(self.op, attr, None)

    # Past this point, any flag change to False means a transition
    # away from the respective state, as only real changes are kept

    # TODO: We might query the real power state if it supports OOB
604
    if SupportsOob(self.cfg, node):
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
      if self.op.offline is False and not (node.powered or
                                           self.op.powered is True):
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
                                    " offline status can be reset") %
                                   self.op.node_name, errors.ECODE_STATE)
    elif self.op.powered is not None:
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
                                  " as it does not support out-of-band"
                                  " handling") % self.op.node_name,
                                 errors.ECODE_STATE)

    # If we're being deofflined/drained, we'll MC ourself if needed
    if (self.op.drained is False or self.op.offline is False or
        (self.op.master_capable and not node.master_capable)):
      if _DecideSelfPromotion(self):
        self.op.master_candidate = True
        self.LogInfo("Auto-promoting node to master candidate")

    # If we're no longer master capable, we'll demote ourselves from MC
    if self.op.master_capable is False and node.master_candidate:
      self.LogInfo("Demoting from master candidate")
      self.op.master_candidate = False

    # Compute new role
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
    if self.op.master_candidate:
      new_role = self._ROLE_CANDIDATE
    elif self.op.drained:
      new_role = self._ROLE_DRAINED
    elif self.op.offline:
      new_role = self._ROLE_OFFLINE
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
      # False is still in new flags, which means we're un-setting (the
      # only) True flag
      new_role = self._ROLE_REGULAR
    else: # no new flags, nothing, keep old role
      new_role = old_role

    self.new_role = new_role

    if old_role == self._ROLE_OFFLINE and new_role != old_role:
      # Trying to transition out of offline status
Thomas Thrainer's avatar
Thomas Thrainer committed
647
      result = self.rpc.call_version([node.uuid])[node.uuid]
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
      if result.fail_msg:
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
                                   " to report its version: %s" %
                                   (node.name, result.fail_msg),
                                   errors.ECODE_STATE)
      else:
        self.LogWarning("Transitioning node from offline to online state"
                        " without using re-add. Please make sure the node"
                        " is healthy!")

    # When changing the secondary ip, verify if this is a single-homed to
    # multi-homed transition or vice versa, and apply the relevant
    # restrictions.
    if self.op.secondary_ip:
      # Ok even without locking, because this can't be changed by any LU
663
      master = self.cfg.GetMasterNodeInfo()
664
665
      master_singlehomed = master.secondary_ip == master.primary_ip
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
Thomas Thrainer's avatar
Thomas Thrainer committed
666
        if self.op.force and node.uuid == master.uuid:
667
668
669
670
671
672
673
674
675
676
          self.LogWarning("Transitioning from single-homed to multi-homed"
                          " cluster; all nodes will require a secondary IP"
                          " address")
        else:
          raise errors.OpPrereqError("Changing the secondary ip on a"
                                     " single-homed cluster requires the"
                                     " --force option to be passed, and the"
                                     " target node to be the master",
                                     errors.ECODE_INVAL)
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
Thomas Thrainer's avatar
Thomas Thrainer committed
677
        if self.op.force and node.uuid == master.uuid:
678
679
680
681
682
683
684
685
686
687
          self.LogWarning("Transitioning from multi-homed to single-homed"
                          " cluster; secondary IP addresses will have to be"
                          " removed")
        else:
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
                                     " same as the primary IP on a multi-homed"
                                     " cluster, unless the --force option is"
                                     " passed, and the target node is the"
                                     " master", errors.ECODE_INVAL)

688
      assert not (set([inst.name for inst in affected_instances.values()]) -
689
690
691
692
693
694
                  self.owned_locks(locking.LEVEL_INSTANCE))

      if node.offline:
        if affected_instances:
          msg = ("Cannot change secondary IP address: offline node has"
                 " instances (%s) configured to use it" %
695
696
                 utils.CommaJoin(
                   [inst.name for inst in affected_instances.values()]))
697
698
699
700
701
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
      else:
        # On online nodes, check that no instances are running, and that
        # the node has the new ip and we can reach it.
        for instance in affected_instances.values():
702
703
          CheckInstanceState(self, instance, INSTANCE_DOWN,
                             msg="cannot change secondary ip")
704

Thomas Thrainer's avatar
Thomas Thrainer committed
705
706
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
        if master.uuid != node.uuid:
707
708
709
710
711
712
713
714
715
          # check reachability from master secondary ip to new secondary ip
          if not netutils.TcpPing(self.op.secondary_ip,
                                  constants.DEFAULT_NODED_PORT,
                                  source=master.secondary_ip):
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
                                       " based ping to node daemon port",
                                       errors.ECODE_ENVIRON)

    if self.op.ndparams:
Thomas Thrainer's avatar
Thomas Thrainer committed
716
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
717
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
718
719
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
                           "node", "cluster or group")
720
721
722
      self.new_ndparams = new_ndparams

    if self.op.hv_state:
723
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
Thomas Thrainer's avatar
Thomas Thrainer committed
724
                                                node.hv_state_static)
725
726
727

    if self.op.disk_state:
      self.new_disk_state = \
Thomas Thrainer's avatar
Thomas Thrainer committed
728
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
729
730
731
732
733

  def Exec(self, feedback_fn):
    """Modifies a node.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
734
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
    result = []

    if self.op.ndparams:
      node.ndparams = self.new_ndparams

    if self.op.powered is not None:
      node.powered = self.op.powered

    if self.op.hv_state:
      node.hv_state_static = self.new_hv_state

    if self.op.disk_state:
      node.disk_state_static = self.new_disk_state

    for attr in ["master_capable", "vm_capable"]:
      val = getattr(self.op, attr)
      if val is not None:
        setattr(node, attr, val)
        result.append((attr, str(val)))

755
    if self.new_role != self.old_role:
756
      # Tell the node to demote itself, if no longer MC and not offline
757
758
      if self.old_role == self._ROLE_CANDIDATE and \
          self.new_role != self._ROLE_OFFLINE:
759
760
761
762
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
        if msg:
          self.LogWarning("Node failed to demote itself: %s", msg)

763
      new_flags = self._R2F[self.new_role]
764
765
766
767
768
769
770
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
        if of != nf:
          result.append((desc, str(nf)))
      (node.master_candidate, node.drained, node.offline) = new_flags

      # we locked all nodes, we adjust the CP before updating this node
      if self.lock_all:
Thomas Thrainer's avatar
Thomas Thrainer committed
771
        AdjustCandidatePool(self, [node.uuid])
772
773
774
775
776
777
778
779
780
781

    if self.op.secondary_ip:
      node.secondary_ip = self.op.secondary_ip
      result.append(("secondary_ip", self.op.secondary_ip))

    # this will trigger configuration file update, if needed
    self.cfg.Update(node, feedback_fn)

    # this will trigger job queue propagation or cleanup if the mc
    # flag changed
782
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
783
784
785
786
787
788
789
790
791
792
793
794
      self.context.ReaddNode(node)

    return result


class LUNodePowercycle(NoHooksLU):
  """Powercycles a node.

  """
  REQ_BGL = False

  def CheckArguments(self):
Thomas Thrainer's avatar
Thomas Thrainer committed
795
796
797
798
    (self.op.node_uuid, self.op.node_name) = \
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)

    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
      raise errors.OpPrereqError("The node is the master and the force"
                                 " parameter was not set",
                                 errors.ECODE_INVAL)

  def ExpandNames(self):
    """Locking for PowercycleNode.

    This is a last-resort option and shouldn't block on other
    jobs. Therefore, we grab no locks.

    """
    self.needed_locks = {}

  def Exec(self, feedback_fn):
    """Reboots a node.

    """
816
817
    default_hypervisor = self.cfg.GetHypervisorType()
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
Thomas Thrainer's avatar
Thomas Thrainer committed
818
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
819
820
                                           default_hypervisor,
                                           hvparams)
821
822
823
824
825
826
827
828
    result.Raise("Failed to schedule the reboot")
    return result.payload


def _GetNodeInstancesInner(cfg, fn):
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]


Thomas Thrainer's avatar
Thomas Thrainer committed
829
def _GetNodePrimaryInstances(cfg, node_uuid):
830
831
832
833
  """Returns primary instances on a node.

  """
  return _GetNodeInstancesInner(cfg,
Thomas Thrainer's avatar
Thomas Thrainer committed
834
                                lambda inst: node_uuid == inst.primary_node)
835
836


Thomas Thrainer's avatar
Thomas Thrainer committed
837
def _GetNodeSecondaryInstances(cfg, node_uuid):
838
839
840
841
  """Returns secondary instances on a node.

  """
  return _GetNodeInstancesInner(cfg,
Thomas Thrainer's avatar
Thomas Thrainer committed
842
                                lambda inst: node_uuid in inst.secondary_nodes)
843
844


Thomas Thrainer's avatar
Thomas Thrainer committed
845
def _GetNodeInstances(cfg, node_uuid):
846
847
848
849
  """Returns a list of all primary and secondary instances on a node.

  """

Thomas Thrainer's avatar
Thomas Thrainer committed
850
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
851
852
853
854
855
856
857
858
859


class LUNodeEvacuate(NoHooksLU):
  """Evacuates instances off a list of nodes.

  """
  REQ_BGL = False

  def CheckArguments(self):
860
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
861
862

  def ExpandNames(self):
Thomas Thrainer's avatar
Thomas Thrainer committed
863
864
    (self.op.node_uuid, self.op.node_name) = \
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
865
866

    if self.op.remote_node is not None:
Thomas Thrainer's avatar
Thomas Thrainer committed
867
868
869
      (self.op.remote_node_uuid, self.op.remote_node) = \
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
                              self.op.remote_node)
870
871
      assert self.op.remote_node

Thomas Thrainer's avatar
Thomas Thrainer committed
872
      if self.op.node_uuid == self.op.remote_node_uuid:
873
874
875
876
877
878
879
880
881
        raise errors.OpPrereqError("Can not use evacuated node as a new"
                                   " secondary node", errors.ECODE_INVAL)

      if self.op.mode != constants.NODE_EVAC_SEC:
        raise errors.OpPrereqError("Without the use of an iallocator only"
                                   " secondary instances can be evacuated",
                                   errors.ECODE_INVAL)

    # Declare locks
882
    self.share_locks = ShareAll()
883
884
885
886
887
888
889
890
891
892
893
    self.needed_locks = {
      locking.LEVEL_INSTANCE: [],
      locking.LEVEL_NODEGROUP: [],
      locking.LEVEL_NODE: [],
      }

    # Determine nodes (via group) optimistically, needs verification once locks
    # have been acquired
    self.lock_nodes = self._DetermineNodes()

  def _DetermineNodes(self):
Thomas Thrainer's avatar
Thomas Thrainer committed
894
    """Gets the list of node UUIDs to operate on.
895
896
897
898

    """
    if self.op.remote_node is None:
      # Iallocator will choose any node(s) in the same group
Thomas Thrainer's avatar
Thomas Thrainer committed
899
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
900
    else:
Thomas Thrainer's avatar
Thomas Thrainer committed
901
      group_nodes = frozenset([self.op.remote_node_uuid])
902
903

    # Determine nodes to be locked
Thomas Thrainer's avatar
Thomas Thrainer committed
904
    return set([self.op.node_uuid]) | group_nodes
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932

  def _DetermineInstances(self):
    """Builds list of instances to operate on.

    """
    assert self.op.mode in constants.NODE_EVAC_MODES

    if self.op.mode == constants.NODE_EVAC_PRI:
      # Primary instances only
      inst_fn = _GetNodePrimaryInstances
      assert self.op.remote_node is None, \
        "Evacuating primary instances requires iallocator"
    elif self.op.mode == constants.NODE_EVAC_SEC:
      # Secondary instances only
      inst_fn = _GetNodeSecondaryInstances
    else:
      # All instances
      assert self.op.mode == constants.NODE_EVAC_ALL
      inst_fn = _GetNodeInstances
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
      # per instance
      raise errors.OpPrereqError("Due to an issue with the iallocator"
                                 " interface it is not possible to evacuate"
                                 " all instances at once; specify explicitly"
                                 " whether to evacuate primary or secondary"
                                 " instances",
                                 errors.ECODE_INVAL)

Thomas Thrainer's avatar
Thomas Thrainer committed
933
    return inst_fn(self.cfg, self.op.node_uuid)
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952

  def DeclareLocks(self, level):
    if level == locking.LEVEL_INSTANCE:
      # Lock instances optimistically, needs verification once node and group
      # locks have been acquired
      self.needed_locks[locking.LEVEL_INSTANCE] = \
        set(i.name for i in self._DetermineInstances())

    elif level == locking.LEVEL_NODEGROUP:
      # Lock node groups for all potential target nodes optimistically, needs
      # verification once nodes have been acquired
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)

    elif level == locking.LEVEL_NODE:
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes

  def CheckPrereq(self):
    # Verify locks
953
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)

    need_nodes = self._DetermineNodes()

    if not owned_nodes.issuperset(need_nodes):
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
                                 " locks were acquired, current nodes are"
                                 " are '%s', used to be '%s'; retry the"
                                 " operation" %
                                 (self.op.node_name,
                                  utils.CommaJoin(need_nodes),
                                  utils.CommaJoin(owned_nodes)),
                                 errors.ECODE_STATE)

    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
    if owned_groups != wanted_groups:
      raise errors.OpExecError("Node groups changed since locks were acquired,"
                               " current groups are '%s', used to be '%s';"
                               " retry the operation" %
                               (utils.CommaJoin(wanted_groups),
                                utils.CommaJoin(owned_groups)))

    # Determine affected instances
    self.instances = self._DetermineInstances()
    self.instance_names = [i.name for i in self.instances]

981
    if set(self.instance_names) != owned_instance_names:
982
983
984
985
986
      raise errors.OpExecError("Instances on node '%s' changed since locks"
                               " were acquired, current instances are '%s',"
                               " used to be '%s'; retry the operation" %
                               (self.op.node_name,
                                utils.CommaJoin(self.instance_names),
987
                                utils.CommaJoin(owned_instance_names)))
988
989
990
991
992
993
994
995
996
997
998

    if self.instance_names:
      self.LogInfo("Evacuating instances from node '%s': %s",
                   self.op.node_name,
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
    else:
      self.LogInfo("No instances to evacuate from node '%s'",
                   self.op.node_name)

    if self.op.remote_node is not None:
      for i in self.instances:
Thomas Thrainer's avatar
Thomas Thrainer committed
999
        if i.primary_node == self.op.remote_node_uuid:
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
          raise errors.OpPrereqError("Node %s is the primary node of"
                                     " instance %s, cannot use it as"
                                     " secondary" %
                                     (self.op.remote_node, i.name),
                                     errors.ECODE_INVAL)

  def Exec(self, feedback_fn):
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)

    if not self.instance_names:
      # No instances to evacuate
      jobs = []

    elif self.op.iallocator is not None:
      # TODO: Implement relocation to other group
1015
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
                                     instances=list(self.instance_names))
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)

      ial.Run(self.op.iallocator)

      if not ial.success:
        raise errors.OpPrereqError("Can't compute node evacuation using"
                                   " iallocator '%s': %s" %
                                   (self.op.iallocator, ial.info),
                                   errors.ECODE_NORES)

1027
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056

    elif self.op.remote_node is not None:
      assert self.op.mode == constants.NODE_EVAC_SEC
      jobs = [
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
                                        remote_node=self.op.remote_node,
                                        disks=[],
                                        mode=constants.REPLACE_DISK_CHG,
                                        early_release=self.op.early_release)]
        for instance_name in self.instance_names]

    else:
      raise errors.ProgrammerError("No iallocator or remote node")

    return ResultWithJobs(jobs)


class LUNodeMigrate(LogicalUnit):
  """Migrate all instances from a node.

  """
  HPATH = "node-migrate"
  HTYPE = constants.HTYPE_NODE
  REQ_BGL = False

  def CheckArguments(self):
    pass

  def ExpandNames(self):
Thomas Thrainer's avatar
Thomas Thrainer committed
1057
1058
    (self.op.node_uuid, self.op.node_name) = \
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1059

1060
    self.share_locks = ShareAll()
1061
    self.needed_locks = {
Thomas Thrainer's avatar
Thomas Thrainer committed
1062
      locking.LEVEL_NODE: [self.op.node_uuid],
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
      }

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on the master, the primary and all the secondaries.

    """
    return {
      "NODE_NAME": self.op.node_name,
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    nl = [self.cfg.GetMasterNode()]
    return (nl, nl)

  def CheckPrereq(self):
    pass

  def Exec(self, feedback_fn):
    # Prepare jobs for migration instances
    jobs = [
1089
1090
1091
1092
1093
1094
1095
1096
      [opcodes.OpInstanceMigrate(
        instance_name=inst.name,
        mode=self.op.mode,
        live=self.op.live,
        iallocator=self.op.iallocator,
        target_node=self.op.target_node,
        allow_runtime_changes=self.op.allow_runtime_changes,
        ignore_ipolicy=self.op.ignore_ipolicy)]
Thomas Thrainer's avatar
Thomas Thrainer committed
1097
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1098
1099
1100
1101
1102
1103
1104

    # TODO: Run iallocator in this opcode and pass correct placement options to
    # OpInstanceMigrate. Since other jobs can modify the cluster between
    # running the iallocator and the actual migration, a good consistency model
    # will have to be found.

    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
Thomas Thrainer's avatar
Thomas Thrainer committed
1105
            frozenset([self.op.node_uuid]))
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128

    return ResultWithJobs(jobs)


def _GetStorageTypeArgs(cfg, storage_type):
  """Returns the arguments for a storage type.

  """
  # Special case for file storage
  if storage_type == constants.ST_FILE:
    # storage.FileStorage wants a list of storage directories
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]

  return []


class LUNodeModifyStorage(NoHooksLU):
  """Logical unit for modifying a storage volume on a node.

  """
  REQ_BGL = False

  def CheckArguments(self):
Thomas Thrainer's avatar
Thomas Thrainer committed
1129
1130
    (self.op.node_uuid, self.op.node_name) = \
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147

    storage_type = self.op.storage_type

    try:
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
    except KeyError:
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
                                 " modified" % storage_type,
                                 errors.ECODE_INVAL)

    diff = set(self.op.changes.keys()) - modifiable
    if diff:
      raise errors.OpPrereqError("The following fields can not be modified for"
                                 " storage units of type '%s': %r" %
                                 (storage_type, list(diff)),
                                 errors.ECODE_INVAL)

1148
1149
1150
1151
1152
1153
  def CheckPrereq(self):
    """Check prerequisites.

    """
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)

1154
1155
  def ExpandNames(self):
    self.needed_locks = {
Thomas Thrainer's avatar
Thomas Thrainer committed
1156
      locking.LEVEL_NODE: self.op.node_uuid,
1157
1158
1159
1160
1161
1162
1163
      }

  def Exec(self, feedback_fn):
    """Computes the list of nodes and their attributes.

    """
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
Thomas Thrainer's avatar
Thomas Thrainer committed
1164
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1165
1166
1167
1168
1169
1170
                                          self.op.storage_type, st_args,
                                          self.op.name, self.op.changes)
    result.Raise("Failed to modify storage unit '%s' on %s" %
                 (self.op.name, self.op.node_name))


1171
class NodeQuery(QueryBase):
1172
1173
1174
1175
  FIELDS = query.NODE_FIELDS

  def ExpandNames(self, lu):
    lu.needed_locks = {}
1176
    lu.share_locks = ShareAll()
1177
1178

    if self.names:
Thomas Thrainer's avatar
Thomas Thrainer committed
1179
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
    else:
      self.wanted = locking.ALL_SET

    self.do_locking = (self.use_locking and
                       query.NQ_LIVE in self.requested_data)

    if self.do_locking:
      # If any non-static field is requested we need to lock the nodes
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET

  def DeclareLocks(self, lu, level):
    pass

  def _GetQueryData(self, lu):
    """Computes the list of nodes and their attributes.

    """
    all_info = lu.cfg.GetAllNodesInfo()

Thomas Thrainer's avatar
Thomas Thrainer committed
1200
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1201
1202
1203
1204

    # Gather data as requested
    if query.NQ_LIVE in self.requested_data:
      # filter out non-vm_capable nodes
Thomas Thrainer's avatar
Thomas Thrainer committed
1205
1206
      toquery_node_uuids = [node.uuid for node in all_info.values()
                            if node.vm_capable and node.uuid in node_uuids]
1207
1208
1209
      default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
      raw_storage_units = utils.storage.GetStorageUnits(
          lu.cfg, [default_template])
1210
1211
      storage_units = rpc.PrepareStorageUnitsForNodes(
          lu.cfg, raw_storage_units, toquery_node_uuids)
1212
1213
1214
      default_hypervisor = lu.cfg.GetHypervisorType()
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
      hvspecs = [(default_hypervisor, hvparams)]
1215
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1216
                                        hvspecs)
1217
      live_data = dict(
1218
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1219
1220
          for (uuid, nresult) in node_data.items()
          if not nresult.fail_msg and nresult.payload)
1221
1222
1223
1224
    else:
      live_data = None

    if query.NQ_INST in self.requested_data:
Thomas Thrainer's avatar
Thomas Thrainer committed
1225
1226
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1227
1228

      inst_data = lu.cfg.GetAllInstancesInfo()
1229
      inst_uuid_to_inst_name = {}
1230
1231

      for inst in inst_data.values():
1232
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1233
        if inst.primary_node in node_to_primary:
1234
          node_to_primary[inst.primary_node].add(inst.uuid)
1235
1236
        for secnode in inst.secondary_nodes:
          if secnode in node_to_secondary:
1237
            node_to_secondary[secnode].add(inst.uuid)
1238
1239
1240
    else:
      node_to_primary = None
      node_to_secondary = None
1241
      inst_uuid_to_inst_name = None
1242
1243

    if query.NQ_OOB in self.requested_data:
Thomas Thrainer's avatar
Thomas Thrainer committed
1244
1245
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
                         for uuid, node in all_info.iteritems())
1246
1247
1248
1249
1250
1251
1252
1253
    else:
      oob_support = None

    if query.NQ_GROUP in self.requested_data:
      groups = lu.cfg.GetAllNodeGroupsInfo()
    else:
      groups = {}

Thomas Thrainer's avatar
Thomas Thrainer committed
1254
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1255
                               live_data, lu.cfg.GetMasterNode(),
1256
1257
1258
                               node_to_primary, node_to_secondary,
                               inst_uuid_to_inst_name, groups, oob_support,
                               lu.cfg.GetClusterInfo())
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268


class LUNodeQuery(NoHooksLU):
  """Logical unit for querying nodes.

  """
  # pylint: disable=W0142
  REQ_BGL = False

  def CheckArguments(self):
1269
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
                         self.op.output_fields, self.op.use_locking)

  def ExpandNames(self):
    self.nq.ExpandNames(self)

  def DeclareLocks(self, level):
    self.nq.DeclareLocks(self, level)

  def Exec(self, feedback_fn):
    return self.nq.OldStyleQuery(self)


Jose A. Lopes's avatar
Jose A. Lopes committed
1282
1283
def _CheckOutputFields(fields, selected):
  """Checks whether all selected fields are valid according to fields.
1284

Jose A. Lopes's avatar
Jose A. Lopes committed
1285
1286
1287
1288
  @type fields: L{utils.FieldSet}
  @param fields: fields set
  @type selected: L{utils.FieldSet}
  @param selected: fields set
1289
1290

  """
Jose A. Lopes's avatar
Jose A. Lopes committed
1291
  delta = fields.NonMatching(selected)
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
  if delta:
    raise errors.OpPrereqError("Unknown output fields selected: %s"
                               % ",".join(delta), errors.ECODE_INVAL)


class LUNodeQueryvols(NoHooksLU):
  """Logical unit for getting volumes on node(s).

  """
  REQ_BGL = False

  def CheckArguments(self):
1304
1305
1306
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
                                      constants.VF_VG, constants.VF_NAME,
                                      constants.VF_SIZE, constants.VF_INSTANCE),
Jose A. Lopes's avatar
Jose A. Lopes committed
1307
                       self.op.output_fields)
1308
1309

  def ExpandNames(self):
1310
    self.share_locks = ShareAll()
1311
1312
1313

    if self.op.nodes:
      self.needed_locks = {
Thomas Thrainer's avatar
Thomas Thrainer committed
1314
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
        }
    else:
      self.needed_locks = {
        locking.LEVEL_NODE: locking.ALL_SET,
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
        }

  def Exec(self, feedback_fn):
    """Computes the list of nodes and their attributes.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
1326
1327
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
    volumes = self.rpc.call_node_volumes(node_uuids)
1328
1329

    ilist = self.cfg.GetAllInstancesInfo()
Thomas Thrainer's avatar
Thomas Thrainer committed
1330
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1331
1332

    output = []
Thomas Thrainer's avatar
Thomas Thrainer committed
1333
1334
    for node_uuid in node_uuids:
      nresult = volumes[node_uuid]
1335
1336
1337
1338
      if nresult.offline:
        continue
      msg = nresult.fail_msg
      if msg:
Thomas Thrainer's avatar
Thomas Thrainer committed
1339
1340
        self.LogWarning("Can't compute volume data on node %s: %s",
                        self.cfg.GetNodeName(node_uuid), msg)
1341
1342
1343
        continue

      node_vols = sorted(nresult.payload,
1344
                         key=operator.itemgetter(constants.VF_DEV))
1345
1346
1347
1348

      for vol in node_vols:
        node_output = []
        for field in self.op.output_fields:
1349
          if field == constants.VF_NODE:
Thomas Thrainer's avatar
Thomas Thrainer committed
1350
            val = self.cfg.GetNodeName(node_uuid)
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
          elif field == constants.VF_PHYS:
            val = vol[constants.VF_DEV]
          elif field == constants.VF_VG:
            val = vol[constants.VF_VG]
          elif field == constants.VF_NAME:
            val = vol[constants.VF_NAME]
          elif field == constants.VF_SIZE:
            val = int(float(vol[constants.VF_SIZE]))
          elif field == constants.VF_INSTANCE:
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
                                 vol[constants.VF_NAME]), None)
Thomas Thrainer's avatar
Thomas Thrainer committed
1362
1363
1364
1365
            if inst is not None:
              val = inst.name
            else:
              val = "-"
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
          else:
            raise errors.ParameterError(field)
          node_output.append(str(val))

        output.append(node_output)

    return output


class LUNodeQueryStorage(NoHooksLU):
  """Logical unit for getting information on storage units on node(s).

  """
  REQ_BGL = False

  def CheckArguments(self):
Jose A. Lopes's avatar
Jose A. Lopes committed
1382
1383
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
                       self.op.output_fields)
1384
1385

  def ExpandNames(self):
1386
    self.share_locks = ShareAll()
1387
1388
1389

    if self.op.nodes:
      self.needed_locks = {
Thomas Thrainer's avatar
Thomas Thrainer committed
1390
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],