cluster.py 129 KB
Newer Older
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Logical units dealing with the cluster."""

import copy
import itertools
import logging
import operator
import os
import re
import time

from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import hypervisor
from ganeti import locking
from ganeti import masterd
from ganeti import netutils
from ganeti import objects
from ganeti import opcodes
from ganeti import pathutils
from ganeti import query
43
import ganeti.rpc.node as rpc
44
45
46
47
48
49
from ganeti import runtime
from ganeti import ssh
from ganeti import uidpool
from ganeti import utils
from ganeti import vcluster

50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
53
54
55
56
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert
60
61
62
63

import ganeti.masterd.instance


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def _UpdateMasterClientCert(
    lu, master_uuid, cluster, feedback_fn,
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
  """Renews the master's client certificate and propagates the config.

  @type lu: C{LogicalUnit}
  @param lu: the logical unit holding the config
  @type master_uuid: string
  @param master_uuid: the master node's UUID
  @type cluster: C{objects.Cluster}
  @param cluster: the cluster's configuration
  @type feedback_fn: function
  @param feedback_fn: feedback functions for config updates
  @type client_cert: string
  @param client_cert: the path of the client certificate
  @type client_cert_tmp: string
  @param client_cert_tmp: the temporary path of the client certificate
  @rtype: string
  @return: the digest of the newly created client certificate

  """
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
                                cluster.candidate_certs)
  # This triggers an update of the config and distribution of it with the old
  # SSL certificate
  lu.cfg.Update(cluster, feedback_fn)

  utils.RemoveFile(client_cert)
  utils.RenameFile(client_cert_tmp, client_cert)
  return client_digest


class LUClusterRenewCrypto(NoHooksLU):
  """Renew the cluster's crypto tokens.

  Note that most of this operation is done in gnt_cluster.py, this LU only
  takes care of the renewal of the client SSL certificates.

  """
  def Exec(self, feedback_fn):
    master_uuid = self.cfg.GetMasterNode()
    cluster = self.cfg.GetClusterInfo()

    server_digest = utils.GetCertificateDigest(
      cert_filename=pathutils.NODED_CERT_FILE)
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
                                  server_digest,
                                  cluster.candidate_certs)
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
                                                feedback_fn)

    cluster.candidate_certs = {master_uuid: new_master_digest}
    nodes = self.cfg.GetAllNodesInfo()
    for (node_uuid, node_info) in nodes.items():
      if node_uuid != master_uuid:
        new_digest = CreateNewClientCert(self, node_uuid)
        if node_info.master_candidate:
          cluster.candidate_certs[node_uuid] = new_digest
    # Trigger another update of the config now with the new master cert
    self.cfg.Update(cluster, feedback_fn)


128
129
130
131
132
133
134
135
136
137
class LUClusterActivateMasterIp(NoHooksLU):
  """Activate the master IP on the master node.

  """
  def Exec(self, feedback_fn):
    """Activate the master IP.

    """
    master_params = self.cfg.GetMasterNetworkParameters()
    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
138
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
139
140
141
142
143
144
145
146
147
148
149
150
151
152
                                                   master_params, ems)
    result.Raise("Could not activate the master IP")


class LUClusterDeactivateMasterIp(NoHooksLU):
  """Deactivate the master IP on the master node.

  """
  def Exec(self, feedback_fn):
    """Deactivate the master IP.

    """
    master_params = self.cfg.GetMasterNetworkParameters()
    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
153
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
154
155
156
157
158
159
160
161
162
163
164
                                                     master_params, ems)
    result.Raise("Could not deactivate the master IP")


class LUClusterConfigQuery(NoHooksLU):
  """Return configuration values.

  """
  REQ_BGL = False

  def CheckArguments(self):
165
    self.cq = ClusterQuery(None, self.op.output_fields, False)
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229

  def ExpandNames(self):
    self.cq.ExpandNames(self)

  def DeclareLocks(self, level):
    self.cq.DeclareLocks(self, level)

  def Exec(self, feedback_fn):
    result = self.cq.OldStyleQuery(self)

    assert len(result) == 1

    return result[0]


class LUClusterDestroy(LogicalUnit):
  """Logical unit for destroying the cluster.

  """
  HPATH = "cluster-destroy"
  HTYPE = constants.HTYPE_CLUSTER

  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    return ([], [])

  def CheckPrereq(self):
    """Check prerequisites.

    This checks whether the cluster is empty.

    Any errors are signaled by raising errors.OpPrereqError.

    """
    master = self.cfg.GetMasterNode()

    nodelist = self.cfg.GetNodeList()
    if len(nodelist) != 1 or nodelist[0] != master:
      raise errors.OpPrereqError("There are still %d node(s) in"
                                 " this cluster." % (len(nodelist) - 1),
                                 errors.ECODE_INVAL)
    instancelist = self.cfg.GetInstanceList()
    if instancelist:
      raise errors.OpPrereqError("There are still %d instance(s) in"
                                 " this cluster." % len(instancelist),
                                 errors.ECODE_INVAL)

  def Exec(self, feedback_fn):
    """Destroys the cluster.

    """
    master_params = self.cfg.GetMasterNetworkParameters()

    # Run post hooks on master node before it's removed
Thomas Thrainer's avatar
Thomas Thrainer committed
230
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
231
232

    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
233
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
234
                                                     master_params, ems)
235
    result.Warn("Error disabling the master IP address", self.LogWarning)
Thomas Thrainer's avatar
Thomas Thrainer committed
236
    return master_params.uuid
237
238
239
240
241
242
243
244
245


class LUClusterPostInit(LogicalUnit):
  """Logical unit for running hooks after cluster initialization.

  """
  HPATH = "cluster-init"
  HTYPE = constants.HTYPE_CLUSTER

246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
  def CheckArguments(self):
    self.master_uuid = self.cfg.GetMasterNode()
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())

    # TODO: When Issue 584 is solved, and None is properly parsed when used
    # as a default value, ndparams.get(.., None) can be changed to
    # ndparams[..] to access the values directly

    # OpenvSwitch: Warn user if link is missing
    if (self.master_ndparams[constants.ND_OVS] and not
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
      self.LogInfo("No physical interface for OpenvSwitch was given."
                   " OpenvSwitch will not have an outside connection. This"
                   " might not be what you want.")

261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    return ([], [self.cfg.GetMasterNode()])

  def Exec(self, feedback_fn):
276
    """Create and configure Open vSwitch
277
278

    """
279
280
281
282
283
284
    if self.master_ndparams[constants.ND_OVS]:
      result = self.rpc.call_node_configure_ovs(
                 self.master_uuid,
                 self.master_ndparams[constants.ND_OVS_NAME],
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
      result.Raise("Could not successully configure Open vSwitch")
285

286
287
    cluster = self.cfg.GetClusterInfo()
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
288

289
290
291
    return True


292
class ClusterQuery(QueryBase):
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
  FIELDS = query.CLUSTER_FIELDS

  #: Do not sort (there is only one item)
  SORT_FIELD = None

  def ExpandNames(self, lu):
    lu.needed_locks = {}

    # The following variables interact with _QueryBase._GetNames
    self.wanted = locking.ALL_SET
    self.do_locking = self.use_locking

    if self.do_locking:
      raise errors.OpPrereqError("Can not use locking for cluster queries",
                                 errors.ECODE_INVAL)

  def DeclareLocks(self, lu, level):
    pass

  def _GetQueryData(self, lu):
    """Computes the list of nodes and their attributes.

    """
    # Locking is not used
    assert not (compat.any(lu.glm.is_owned(level)
                           for level in locking.LEVELS
                           if level != locking.LEVEL_CLUSTER) or
                self.do_locking or self.use_locking)

    if query.CQ_CONFIG in self.requested_data:
      cluster = lu.cfg.GetClusterInfo()
Thomas Thrainer's avatar
Thomas Thrainer committed
324
      nodes = lu.cfg.GetAllNodesInfo()
325
326
    else:
      cluster = NotImplemented
Thomas Thrainer's avatar
Thomas Thrainer committed
327
      nodes = NotImplemented
328
329
330
331
332
333
334

    if query.CQ_QUEUE_DRAINED in self.requested_data:
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
    else:
      drain_flag = NotImplemented

    if query.CQ_WATCHER_PAUSE in self.requested_data:
Thomas Thrainer's avatar
Thomas Thrainer committed
335
      master_node_uuid = lu.cfg.GetMasterNode()
336

Thomas Thrainer's avatar
Thomas Thrainer committed
337
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
338
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
Thomas Thrainer's avatar
Thomas Thrainer committed
339
                   lu.cfg.GetMasterNodeName())
340
341
342
343
344

      watcher_pause = result.payload
    else:
      watcher_pause = NotImplemented

Thomas Thrainer's avatar
Thomas Thrainer committed
345
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381


class LUClusterQuery(NoHooksLU):
  """Query cluster configuration.

  """
  REQ_BGL = False

  def ExpandNames(self):
    self.needed_locks = {}

  def Exec(self, feedback_fn):
    """Return cluster config.

    """
    cluster = self.cfg.GetClusterInfo()
    os_hvp = {}

    # Filter just for enabled hypervisors
    for os_name, hv_dict in cluster.os_hvp.items():
      os_hvp[os_name] = {}
      for hv_name, hv_params in hv_dict.items():
        if hv_name in cluster.enabled_hypervisors:
          os_hvp[os_name][hv_name] = hv_params

    # Convert ip_family to ip_version
    primary_ip_version = constants.IP4_VERSION
    if cluster.primary_ip_family == netutils.IP6Address.family:
      primary_ip_version = constants.IP6_VERSION

    result = {
      "software_version": constants.RELEASE_VERSION,
      "protocol_version": constants.PROTOCOL_VERSION,
      "config_version": constants.CONFIG_VERSION,
      "os_api_version": max(constants.OS_API_VERSIONS),
      "export_version": constants.EXPORT_VERSION,
382
      "vcs_version": constants.VCS_VERSION,
383
384
      "architecture": runtime.GetArchInfo(),
      "name": cluster.cluster_name,
Thomas Thrainer's avatar
Thomas Thrainer committed
385
      "master": self.cfg.GetMasterNodeName(),
386
387
388
389
390
391
392
393
394
395
396
397
      "default_hypervisor": cluster.primary_hypervisor,
      "enabled_hypervisors": cluster.enabled_hypervisors,
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
                        for hypervisor_name in cluster.enabled_hypervisors]),
      "os_hvp": os_hvp,
      "beparams": cluster.beparams,
      "osparams": cluster.osparams,
      "ipolicy": cluster.ipolicy,
      "nicparams": cluster.nicparams,
      "ndparams": cluster.ndparams,
      "diskparams": cluster.diskparams,
      "candidate_pool_size": cluster.candidate_pool_size,
Klaus Aehlig's avatar
Klaus Aehlig committed
398
      "max_running_jobs": cluster.max_running_jobs,
399
400
401
402
403
404
405
406
407
408
409
410
411
412
      "master_netdev": cluster.master_netdev,
      "master_netmask": cluster.master_netmask,
      "use_external_mip_script": cluster.use_external_mip_script,
      "volume_group_name": cluster.volume_group_name,
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
      "file_storage_dir": cluster.file_storage_dir,
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
      "maintain_node_health": cluster.maintain_node_health,
      "ctime": cluster.ctime,
      "mtime": cluster.mtime,
      "uuid": cluster.uuid,
      "tags": list(cluster.GetTags()),
      "uid_pool": cluster.uid_pool,
      "default_iallocator": cluster.default_iallocator,
413
      "default_iallocator_params": cluster.default_iallocator_params,
414
415
416
417
418
      "reserved_lvs": cluster.reserved_lvs,
      "primary_ip_version": primary_ip_version,
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
      "hidden_os": cluster.hidden_os,
      "blacklisted_os": cluster.blacklisted_os,
419
      "enabled_disk_templates": cluster.enabled_disk_templates,
420
      "instance_communication_network": cluster.instance_communication_network,
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
      }

    return result


class LUClusterRedistConf(NoHooksLU):
  """Force the redistribution of cluster configuration.

  This is a very simple LU.

  """
  REQ_BGL = False

  def ExpandNames(self):
    self.needed_locks = {
      locking.LEVEL_NODE: locking.ALL_SET,
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
    }
439
    self.share_locks = ShareAll()
440
441
442
443
444
445

  def Exec(self, feedback_fn):
    """Redistribute the configuration.

    """
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
446
    RedistributeAncillaryFiles(self)
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503


class LUClusterRename(LogicalUnit):
  """Rename the cluster.

  """
  HPATH = "cluster-rename"
  HTYPE = constants.HTYPE_CLUSTER

  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      "NEW_NAME": self.op.name,
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())

  def CheckPrereq(self):
    """Verify that the passed name is a valid one.

    """
    hostname = netutils.GetHostname(name=self.op.name,
                                    family=self.cfg.GetPrimaryIPFamily())

    new_name = hostname.name
    self.ip = new_ip = hostname.ip
    old_name = self.cfg.GetClusterName()
    old_ip = self.cfg.GetMasterIP()
    if new_name == old_name and new_ip == old_ip:
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
                                 " cluster has changed",
                                 errors.ECODE_INVAL)
    if new_ip != old_ip:
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
                                   " reachable on the network" %
                                   new_ip, errors.ECODE_NOTUNIQUE)

    self.op.name = new_name

  def Exec(self, feedback_fn):
    """Rename the cluster.

    """
    clustername = self.op.name
    new_ip = self.ip

    # shutdown the master IP
    master_params = self.cfg.GetMasterNetworkParameters()
    ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
504
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
505
506
507
508
509
510
511
512
513
514
515
516
517
                                                     master_params, ems)
    result.Raise("Could not disable the master role")

    try:
      cluster = self.cfg.GetClusterInfo()
      cluster.cluster_name = clustername
      cluster.master_ip = new_ip
      self.cfg.Update(cluster, feedback_fn)

      # update the known hosts file
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
      node_list = self.cfg.GetOnlineNodeList()
      try:
Thomas Thrainer's avatar
Thomas Thrainer committed
518
        node_list.remove(master_params.uuid)
519
520
      except ValueError:
        pass
521
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
522
523
    finally:
      master_params.ip = new_ip
Thomas Thrainer's avatar
Thomas Thrainer committed
524
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
525
                                                     master_params, ems)
526
527
      result.Warn("Could not re-enable the master role on the master,"
                  " please restart manually", self.LogWarning)
528
529
530
531
532
533
534
535
536
537
538
539

    return clustername


class LUClusterRepairDiskSizes(NoHooksLU):
  """Verifies the cluster disks sizes.

  """
  REQ_BGL = False

  def ExpandNames(self):
    if self.op.instances:
540
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
      # Not getting the node allocation lock as only a specific set of
      # instances (and their nodes) is going to be acquired
      self.needed_locks = {
        locking.LEVEL_NODE_RES: [],
        locking.LEVEL_INSTANCE: self.wanted_names,
        }
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
    else:
      self.wanted_names = None
      self.needed_locks = {
        locking.LEVEL_NODE_RES: locking.ALL_SET,
        locking.LEVEL_INSTANCE: locking.ALL_SET,

        # This opcode is acquires the node locks for all instances
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
        }

    self.share_locks = {
      locking.LEVEL_NODE_RES: 1,
      locking.LEVEL_INSTANCE: 0,
      locking.LEVEL_NODE_ALLOC: 1,
      }

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
      self._LockInstancesNodes(primary_only=True, level=level)

  def CheckPrereq(self):
    """Check prerequisites.

    This only checks the optional instance list against the existing names.

    """
    if self.wanted_names is None:
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)

    self.wanted_instances = \
578
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
579
580
581
582
583
584
585
586
587
588

  def _EnsureChildSizes(self, disk):
    """Ensure children of the disk have the needed disk size.

    This is valid mainly for DRBD8 and fixes an issue where the
    children have smaller disk size.

    @param disk: an L{ganeti.objects.Disk} object

    """
589
    if disk.dev_type == constants.DT_DRBD8:
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
      assert disk.children, "Empty children for DRBD8?"
      fchild = disk.children[0]
      mismatch = fchild.size < disk.size
      if mismatch:
        self.LogInfo("Child disk has size %d, parent %d, fixing",
                     fchild.size, disk.size)
        fchild.size = disk.size

      # and we recurse on this child only, not on the metadev
      return self._EnsureChildSizes(fchild) or mismatch
    else:
      return False

  def Exec(self, feedback_fn):
    """Verify the size of cluster disks.

    """
    # TODO: check child disks too
    # TODO: check differences in size between primary/secondary nodes
    per_node_disks = {}
    for instance in self.wanted_instances:
      pnode = instance.primary_node
      if pnode not in per_node_disks:
        per_node_disks[pnode] = []
      for idx, disk in enumerate(instance.disks):
        per_node_disks[pnode].append((instance, idx, disk))

    assert not (frozenset(per_node_disks.keys()) -
                self.owned_locks(locking.LEVEL_NODE_RES)), \
      "Not owning correct locks"
    assert not self.owned_locks(locking.LEVEL_NODE)

Thomas Thrainer's avatar
Thomas Thrainer committed
622
623
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
                                               per_node_disks.keys())
624

625
    changed = []
Thomas Thrainer's avatar
Thomas Thrainer committed
626
    for node_uuid, dskl in per_node_disks.items():
627
628
629
630
      if not dskl:
        # no disks on the node
        continue

631
      newl = [([v[2].Copy()], v[0]) for v in dskl]
Thomas Thrainer's avatar
Thomas Thrainer committed
632
633
      node_name = self.cfg.GetNodeName(node_uuid)
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
634
      if result.fail_msg:
635
        self.LogWarning("Failure in blockdev_getdimensions call to node"
Thomas Thrainer's avatar
Thomas Thrainer committed
636
                        " %s, ignoring", node_name)
637
638
639
        continue
      if len(result.payload) != len(dskl):
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
Thomas Thrainer's avatar
Thomas Thrainer committed
640
641
                        " result.payload=%s", node_name, len(dskl),
                        result.payload)
642
        self.LogWarning("Invalid result from node %s, ignoring node results",
Thomas Thrainer's avatar
Thomas Thrainer committed
643
                        node_name)
644
        continue
645
646
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
        if dimensions is None:
647
648
649
          self.LogWarning("Disk %d of instance %s did not return size"
                          " information, ignoring", idx, instance.name)
          continue
650
651
652
653
654
        if not isinstance(dimensions, (tuple, list)):
          self.LogWarning("Disk %d of instance %s did not return valid"
                          " dimension information, ignoring", idx,
                          instance.name)
          continue
655
        (size, spindles) = dimensions
656
657
658
659
660
661
662
663
664
665
666
        if not isinstance(size, (int, long)):
          self.LogWarning("Disk %d of instance %s did not return valid"
                          " size information, ignoring", idx, instance.name)
          continue
        size = size >> 20
        if size != disk.size:
          self.LogInfo("Disk %d of instance %s has mismatched size,"
                       " correcting: recorded %d, actual %d", idx,
                       instance.name, disk.size, size)
          disk.size = size
          self.cfg.Update(instance, feedback_fn)
667
          changed.append((instance.name, idx, "size", size))
Thomas Thrainer's avatar
Thomas Thrainer committed
668
        if es_flags[node_uuid]:
669
670
671
672
673
674
675
676
677
678
679
          if spindles is None:
            self.LogWarning("Disk %d of instance %s did not return valid"
                            " spindles information, ignoring", idx,
                            instance.name)
          elif disk.spindles is None or disk.spindles != spindles:
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
                         " correcting: recorded %s, actual %s",
                         idx, instance.name, disk.spindles, spindles)
            disk.spindles = spindles
            self.cfg.Update(instance, feedback_fn)
            changed.append((instance.name, idx, "spindles", disk.spindles))
680
681
        if self._EnsureChildSizes(disk):
          self.cfg.Update(instance, feedback_fn)
682
          changed.append((instance.name, idx, "size", disk.size))
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
    return changed


def _ValidateNetmask(cfg, netmask):
  """Checks if a netmask is valid.

  @type cfg: L{config.ConfigWriter}
  @param cfg: The cluster configuration
  @type netmask: int
  @param netmask: the netmask to be verified
  @raise errors.OpPrereqError: if the validation fails

  """
  ip_family = cfg.GetPrimaryIPFamily()
  try:
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
  except errors.ProgrammerError:
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
                               ip_family, errors.ECODE_INVAL)
  if not ipcls.ValidateNetmask(netmask):
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
                               (netmask), errors.ECODE_INVAL)


707
708
709
710
711
712
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
    file_disk_template):
  """Checks whether the given file-based storage directory is acceptable.

  Note: This function is public, because it is also used in bootstrap.py.
713
714
715
716
717
718
719

  @type logging_warn_fn: function
  @param logging_warn_fn: function which accepts a string and logs it
  @type file_storage_dir: string
  @param file_storage_dir: the directory to be used for file-based instances
  @type enabled_disk_templates: list of string
  @param enabled_disk_templates: the list of enabled disk templates
720
721
722
  @type file_disk_template: string
  @param file_disk_template: the file-based disk template for which the
      path should be checked
723
724

  """
725
726
727
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
            constants.ST_FILE, constants.ST_SHARED_FILE
         ))
728
  file_storage_enabled = file_disk_template in enabled_disk_templates
729
730
731
  if file_storage_dir is not None:
    if file_storage_dir == "":
      if file_storage_enabled:
732
733
734
735
        raise errors.OpPrereqError(
            "Unsetting the '%s' storage directory while having '%s' storage"
            " enabled is not permitted." %
            (file_disk_template, file_disk_template))
736
737
    else:
      if not file_storage_enabled:
738
739
740
        logging_warn_fn(
            "Specified a %s storage directory, although %s storage is not"
            " enabled." % (file_disk_template, file_disk_template))
741
  else:
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
    raise errors.ProgrammerError("Received %s storage dir with value"
                                 " 'None'." % file_disk_template)


def CheckFileStoragePathVsEnabledDiskTemplates(
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
  """Checks whether the given file storage directory is acceptable.

  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}

  """
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
      constants.DT_FILE)


def CheckSharedFileStoragePathVsEnabledDiskTemplates(
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
  """Checks whether the given shared file storage directory is acceptable.

  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}

  """
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
      constants.DT_SHARED_FILE)
768
769


770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
class LUClusterSetParams(LogicalUnit):
  """Change the parameters of the cluster.

  """
  HPATH = "cluster-modify"
  HTYPE = constants.HTYPE_CLUSTER
  REQ_BGL = False

  def CheckArguments(self):
    """Check parameters

    """
    if self.op.uid_pool:
      uidpool.CheckUidPool(self.op.uid_pool)

    if self.op.add_uids:
      uidpool.CheckUidPool(self.op.add_uids)

    if self.op.remove_uids:
      uidpool.CheckUidPool(self.op.remove_uids)

    if self.op.master_netmask is not None:
      _ValidateNetmask(self.cfg, self.op.master_netmask)

    if self.op.diskparams:
      for dt_params in self.op.diskparams.values():
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
      try:
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
799
        CheckDiskAccessModeValidity(self.op.diskparams)
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
      except errors.OpPrereqError, err:
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
                                   errors.ECODE_INVAL)

  def ExpandNames(self):
    # FIXME: in the future maybe other cluster params won't require checking on
    # all nodes to be modified.
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
    # resource locks the right thing, shouldn't it be the BGL instead?
    self.needed_locks = {
      locking.LEVEL_NODE: locking.ALL_SET,
      locking.LEVEL_INSTANCE: locking.ALL_SET,
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
    }
815
    self.share_locks = ShareAll()
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832

  def BuildHooksEnv(self):
    """Build hooks env.

    """
    return {
      "OP_TARGET": self.cfg.GetClusterName(),
      "NEW_VG_NAME": self.op.vg_name,
      }

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    mn = self.cfg.GetMasterNode()
    return ([mn], [mn])

Thomas Thrainer's avatar
Thomas Thrainer committed
833
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
834
835
836
                   new_enabled_disk_templates):
    """Check the consistency of the vg name on all nodes and in case it gets
       unset whether there are instances still using it.
837
838

    """
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
                                            new_enabled_disk_templates)
    current_vg_name = self.cfg.GetVGName()

    if self.op.vg_name == '':
      if lvm_is_enabled:
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
                                   " disk templates are or get enabled.")

    if self.op.vg_name is None:
      if current_vg_name is None and lvm_is_enabled:
        raise errors.OpPrereqError("Please specify a volume group when"
                                   " enabling lvm-based disk-templates.")

854
    if self.op.vg_name is not None and not self.op.vg_name:
855
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
856
857
858
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
                                   " instances exist", errors.ECODE_INVAL)

859
860
    if (self.op.vg_name is not None and lvm_is_enabled) or \
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
Thomas Thrainer's avatar
Thomas Thrainer committed
861
      self._CheckVgNameOnNodes(node_uuids)
862

Thomas Thrainer's avatar
Thomas Thrainer committed
863
  def _CheckVgNameOnNodes(self, node_uuids):
864
865
866
    """Check the status of the volume group on each node.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
867
868
869
    vglist = self.rpc.call_vg_list(node_uuids)
    for node_uuid in node_uuids:
      msg = vglist[node_uuid].fail_msg
870
871
872
      if msg:
        # ignoring down node
        self.LogWarning("Error while gathering data on node %s"
Thomas Thrainer's avatar
Thomas Thrainer committed
873
874
                        " (ignoring node): %s",
                        self.cfg.GetNodeName(node_uuid), msg)
875
        continue
Thomas Thrainer's avatar
Thomas Thrainer committed
876
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
877
878
879
880
                                            self.op.vg_name,
                                            constants.MIN_VG_SIZE)
      if vgstatus:
        raise errors.OpPrereqError("Error on node '%s': %s" %
Thomas Thrainer's avatar
Thomas Thrainer committed
881
882
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
                                   errors.ECODE_ENVIRON)
883

884
  @staticmethod
885
886
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
                                old_enabled_disk_templates):
887
888
889
    """Computes three sets of disk templates.

    @see: C{_GetDiskTemplateSets} for more details.
890
891
892
893

    """
    enabled_disk_templates = None
    new_enabled_disk_templates = []
894
    disabled_disk_templates = []
895
896
    if op_enabled_disk_templates:
      enabled_disk_templates = op_enabled_disk_templates
897
      new_enabled_disk_templates = \
898
899
        list(set(enabled_disk_templates)
             - set(old_enabled_disk_templates))
900
      disabled_disk_templates = \
901
902
        list(set(old_enabled_disk_templates)
             - set(enabled_disk_templates))
903
    else:
904
      enabled_disk_templates = old_enabled_disk_templates
905
906
    return (enabled_disk_templates, new_enabled_disk_templates,
            disabled_disk_templates)
907

908
  def _GetDiskTemplateSets(self, cluster):
909
910
911
912
913
914
915
916
    """Computes three sets of disk templates.

    The three sets are:
      - disk templates that will be enabled after this operation (no matter if
        they were enabled before or not)
      - disk templates that get enabled by this operation (thus haven't been
        enabled before.)
      - disk templates that get disabled by this operation
917
918

    """
919
920
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
                                          cluster.enabled_disk_templates)
921

922
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
923
924
925
926
    """Checks the ipolicy.

    @type cluster: C{objects.Cluster}
    @param cluster: the cluster's configuration
927
928
929
    @type enabled_disk_templates: list of string
    @param enabled_disk_templates: list of (possibly newly) enabled disk
      templates
930
931

    """
932
    # FIXME: write unit tests for this
933
934
935
936
    if self.op.ipolicy:
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
                                           group_policy=False)

937
938
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
                                  enabled_disk_templates)
939

940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
      all_instances = self.cfg.GetAllInstancesInfo().values()
      violations = set()
      for group in self.cfg.GetAllNodeGroupsInfo().values():
        instances = frozenset([inst for inst in all_instances
                               if compat.any(nuuid in group.members
                                             for nuuid in inst.all_nodes)])
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
                                           self.cfg)
        if new:
          violations.update(new)

      if violations:
        self.LogWarning("After the ipolicy change the following instances"
                        " violate them: %s",
                        utils.CommaJoin(utils.NiceSort(violations)))
957
    else:
958
959
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
                                  enabled_disk_templates)
960

961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
    """Checks whether the set DRBD helper actually exists on the nodes.

    @type drbd_helper: string
    @param drbd_helper: path of the drbd usermode helper binary
    @type node_uuids: list of strings
    @param node_uuids: list of node UUIDs to check for the helper

    """
    # checks given drbd helper on all nodes
    helpers = self.rpc.call_drbd_helper(node_uuids)
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
      if ninfo.offline:
        self.LogInfo("Not checking drbd helper on offline node %s",
                     ninfo.name)
        continue
      msg = helpers[ninfo.uuid].fail_msg
      if msg:
        raise errors.OpPrereqError("Error checking drbd helper on node"
                                   " '%s': %s" % (ninfo.name, msg),
                                   errors.ECODE_ENVIRON)
      node_helper = helpers[ninfo.uuid].payload
      if node_helper != drbd_helper:
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
                                   (ninfo.name, node_helper),
                                   errors.ECODE_ENVIRON)

  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
989
    """Check the DRBD usermode helper.
990

991
992
    @type node_uuids: list of strings
    @param node_uuids: a list of nodes' UUIDs
993
994
995
996
997
998
    @type drbd_enabled: boolean
    @param drbd_enabled: whether DRBD will be enabled after this operation
      (no matter if it was disabled before or not)
    @type drbd_gets_enabled: boolen
    @param drbd_gets_enabled: true if DRBD was disabled before this
      operation, but will be enabled afterwards
999
1000

    """
1001
1002
1003
1004
    if self.op.drbd_helper == '':
      if drbd_enabled:
        raise errors.OpPrereqError("Cannot disable drbd helper while"
                                   " DRBD is enabled.")
1005
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1006
1007
1008
1009
        raise errors.OpPrereqError("Cannot disable drbd helper while"
                                   " drbd-based instances exist",
                                   errors.ECODE_INVAL)

1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
    else:
      if self.op.drbd_helper is not None and drbd_enabled:
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
      else:
        if drbd_gets_enabled:
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
          if current_drbd_helper is not None:
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
          else:
            raise errors.OpPrereqError("Cannot enable DRBD without a"
                                       " DRBD usermode helper set.")
1021

1022
1023
  def _CheckInstancesOfDisabledDiskTemplates(
      self, disabled_disk_templates):
1024
    """Check whether we try to disable a disk template that is in use.
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036

    @type disabled_disk_templates: list of string
    @param disabled_disk_templates: list of disk templates that are going to
      be disabled by this operation

    """
    for disk_template in disabled_disk_templates:
      if self.cfg.HasAnyDiskOfType(disk_template):
        raise errors.OpPrereqError(
            "Cannot disable disk template '%s', because there is at least one"
            " instance using it." % disk_template)

1037
1038
1039
1040
1041
1042
1043
  def CheckPrereq(self):
    """Check prerequisites.

    This checks whether the given params don't conflict and
    if the given volume group is valid.

    """
Thomas Thrainer's avatar
Thomas Thrainer committed
1044
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1045
    self.cluster = cluster = self.cfg.GetClusterInfo()
1046

Thomas Thrainer's avatar
Thomas Thrainer committed
1047
1048
1049
    vm_capable_node_uuids = [node.uuid
                             for node in self.cfg.GetAllNodesInfo().values()
                             if node.uuid in node_uuids and node.vm_capable]
1050

1051
    (enabled_disk_templates, new_enabled_disk_templates,
1052
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1053
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1054

Thomas Thrainer's avatar
Thomas Thrainer committed
1055
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1056
                      new_enabled_disk_templates)
1057

1058
1059
1060
1061
    if self.op.file_storage_dir is not None:
      CheckFileStoragePathVsEnabledDiskTemplates(
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)

1062
1063
1064
1065
1066
    if self.op.shared_file_storage_dir is not None:
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
          self.LogWarning, self.op.shared_file_storage_dir,
          enabled_disk_templates)

1067
1068
1069
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087

    # validate params changes
    if self.op.beparams:
      objects.UpgradeBeParams(self.op.beparams)
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)

    if self.op.ndparams:
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)

      # TODO: we need a more general way to handle resetting
      # cluster-level parameters to default values
      if self.new_ndparams["oob_program"] == "":
        self.new_ndparams["oob_program"] = \
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]

    if self.op.hv_state:
1088
1089
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
                                           self.cluster.hv_state_static)
1090
1091
1092
1093
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
                               for hv, values in new_hv_state.items())

    if self.op.disk_state:
1094
1095
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
                                               self.cluster.disk_state_static)
1096
1097
1098
1099
1100
      self.new_disk_state = \
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
                            for name, values in svalues.items()))
             for storage, svalues in new_disk_state.items())

1101
    self._CheckIpolicy(cluster, enabled_disk_templates)
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143

    if self.op.nicparams:
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
      nic_errors = []

      # check all instances for consistency
      for instance in self.cfg.GetAllInstancesInfo().values():
        for nic_idx, nic in enumerate(instance.nics):
          params_copy = copy.deepcopy(nic.nicparams)
          params_filled = objects.FillDict(self.new_nicparams, params_copy)

          # check parameter syntax
          try:
            objects.NIC.CheckParameterSyntax(params_filled)
          except errors.ConfigurationError, err:
            nic_errors.append("Instance %s, nic/%d: %s" %
                              (instance.name, nic_idx, err))

          # if we're moving instances to routed, check that they have an ip
          target_mode = params_filled[constants.NIC_MODE]
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
                              " address" % (instance.name, nic_idx))
      if nic_errors:
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
                                   "\n".join(nic_errors), errors.ECODE_INVAL)

    # hypervisor list/parameters
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
    if self.op.hvparams:
      for hv_name, hv_dict in self.op.hvparams.items():
        if hv_name not in self.new_hvparams:
          self.new_hvparams[hv_name] = hv_dict
        else:
          self.new_hvparams[hv_name].update(hv_dict)

    # disk template parameters
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
    if self.op.diskparams:
      for dt_name, dt_params in self.op.diskparams.items():
1144
        if dt_name not in self.new_diskparams:
1145
1146
1147
          self.new_diskparams[dt_name] = dt_params
        else:
          self.new_diskparams[dt_name].update(dt_params)
1148
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166

    # os hypervisor parameters
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
    if self.op.os_hvp:
      for os_name, hvs in self.op.os_hvp.items():
        if os_name not in self.new_os_hvp:
          self.new_os_hvp[os_name] = hvs
        else:
          for hv_name, hv_dict in hvs.items():
            if hv_dict is None:
              # Delete if it exists
              self.new_os_hvp[os_name].pop(hv_name, None)
            elif hv_name not in self.new_os_hvp[os_name]:
              self.new_os_hvp[os_name][hv_name] = hv_dict
            else:
              self.new_os_hvp[os_name][hv_name].update(hv_dict)

    # os parameters
1167
    self._BuildOSParams(cluster)
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194

    # changes to the hypervisor list
    if self.op.enabled_hypervisors is not None:
      self.hv_list = self.op.enabled_hypervisors
      for hv in self.hv_list:
        # if the hypervisor doesn't already exist in the cluster
        # hvparams, we initialize it to empty, and then (in both
        # cases) we make sure to fill the defaults, as we might not
        # have a complete defaults list if the hypervisor wasn't
        # enabled before
        if hv not in new_hvp:
          new_hvp[hv] = {}
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
    else:
      self.hv_list = cluster.enabled_hypervisors

    if self.op.hvparams or self.op.enabled_hypervisors is not None:
      # either the enabled list has changed, or the parameters have, validate
      for hv_name, hv_params in self.new_hvparams.items():
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
            (self.op.enabled_hypervisors and
             hv_name in self.op.enabled_hypervisors)):
          # either this is a new hypervisor, or its parameters have changed
          hv_class = hypervisor.GetHypervisorClass(hv_name)
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
          hv_class.CheckParameterSyntax(hv_params)
Thomas Thrainer's avatar
Thomas Thrainer committed
1195
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209

    self._CheckDiskTemplateConsistency()

    if self.op.os_hvp:
      # no need to check any newly-enabled hypervisors, since the
      # defaults have already been checked in the above code-block
      for os_name, os_hvp in self.new_os_hvp.items():
        for hv_name, hv_params in os_hvp.items():
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
          # we need to fill in the new os_hvp on top of the actual hv_p
          cluster_defaults = self.new_hvparams.get(hv_name, {})
          new_osp = objects.FillDict(cluster_defaults, hv_params)
          hv_class = hypervisor.GetHypervisorClass(hv_name)
          hv_class.CheckParameterSyntax(new_osp)
Thomas Thrainer's avatar
Thomas Thrainer committed
1210
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220

    if self.op.default_iallocator:
      alloc_script = utils.FindFile(self.op.default_iallocator,
                                    constants.IALLOCATOR_SEARCH_PATH,
                                    os.path.isfile)
      if alloc_script is None:
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
                                   " specified" % self.op.default_iallocator,
                                   errors.ECODE_INVAL)

1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
  def _BuildOSParams(self, cluster):
    "Calculate the new OS parameters for this operation."

    def _GetNewParams(source, new_params):
      "Wrapper around GetUpdatedParams."
      if new_params is None:
        return source
      result = objects.FillDict(source, {}) # deep copy of source
      for os_name in new_params:
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
                                           new_params[os_name],
                                           use_none=True)
        if not result[os_name]:
          del result[os_name] # we removed all parameters
      return result

    self.new_osp = _GetNewParams(cluster.osparams,
                                 self.op.osparams)
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
                                         self.op.osparams_private_cluster)

    # Remove os validity check
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
    for os_name in changed_oses:
      os_params = cluster.SimpleFillOS(
        os_name,
        self.new_osp.get(os_name, {}),
        os_params_private=self.new_osp_private.get(os_name, {})
      )
      # check the parameter validity (remote check)
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
                    os_name, os_params)

1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
  def _CheckDiskTemplateConsistency(self):
    """Check whether the disk templates that are going to be disabled
       are still in use by some instances.

    """
    if self.op.enabled_disk_templates:
      cluster = self.cfg.GetClusterInfo()
      instances = self.cfg.GetAllInstancesInfo()

      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
        - set(self.op.enabled_disk_templates)
      for instance in instances.itervalues():
        if instance.disk_template in disk_templates_to_remove:
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
                                     " because instance '%s' is using it." %
                                     (instance.disk_template, instance.name))

1271
1272
  def _SetVgName(self, feedback_fn):
    """Determines and sets the new volume group name.
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283

    """
    if self.op.vg_name is not None:
      new_volume = self.op.vg_name
      if not new_volume:
        new_volume = None
      if new_volume != self.cfg.GetVGName():
        self.cfg.SetVGName(new_volume)
      else:
        feedback_fn("Cluster LVM configuration already in desired"
                    " state, not changing")
1284

1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
  def _SetFileStorageDir(self, feedback_fn):
    """Set the file storage directory.

    """
    if self.op.file_storage_dir is not None:
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
        feedback_fn("Global file storage dir already set to value '%s'"
                    % self.cluster.file_storage_dir)
      else:
        self.cluster.file_storage_dir = self.op.file_storage_dir

1296
1297
  def _SetDrbdHelper(self, feedback_fn):
    """Set the DRBD usermode helper.
1298
1299

    """
1300
    if self.op.drbd_helper is not None:
1301
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1302
1303
        feedback_fn("Note that you specified a drbd user helper, but did not"
                    " enable the drbd disk template.")
1304
1305
1306
1307
1308
1309
1310
1311
      new_helper = self.op.drbd_helper
      if not new_helper:
        new_helper = None
      if new_helper != self.cfg.GetDRBDHelper():
        self.cfg.SetDRBDHelper(new_helper)
      else:
        feedback_fn("Cluster DRBD helper already in desired state,"
                    " not changing")
1312
1313
1314
1315
1316
1317
1318

  def Exec(self, feedback_fn):
    """Change the parameters of the cluster.

    """
    if self.op.enabled_disk_templates:
      self.cluster.enabled_disk_templates = \
1319
        list(self.op.enabled_disk_templates)
1320
1321
1322
1323
1324

    self._SetVgName(feedback_fn)
    self._SetFileStorageDir(feedback_fn)
    self._SetDrbdHelper(feedback_fn)

1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
    if self.op.hvparams:
      self.cluster.hvparams = self.new_hvparams
    if self.op.os_hvp:
      self.cluster.os_hvp = self.new_os_hvp
    if self.op.enabled_hypervisors is not None:
      self.cluster.hvparams = self.new_hvparams
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
    if self.op.beparams:
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
    if self.op.nicparams:
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
    if self.op.ipolicy:
      self.cluster.ipolicy = self.new_ipolicy
    if self.op.osparams:
      self.cluster.osparams = self.new_osp
1340
1341
    if self.op.osparams_private_cluster:
      self.cluster.osparams_private_cluster = self.new_osp_private
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
    if self.op.ndparams:
      self.cluster.ndparams = self.new_ndparams
    if self.op.diskparams:
      self.cluster.diskparams = self.new_diskparams
    if self.op.hv_state:
      self.cluster.hv_state_static = self.new_hv_state
    if self.op.disk_state:
      self.cluster.disk_state_static = self.new_disk_state

    if self.op.candidate_pool_size is not None:
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
      # we need to update the pool size here, otherwise the save will fail
1354
      AdjustCandidatePool(self, [], feedback_fn)
1355

1356
1357
1358
    if self.op.max_running_jobs is not None:
      self.cluster.max_running_jobs = self.op.max_running_jobs

1359
1360
1361
1362
1363
1364
    if self.op.maintain_node_health is not None:
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
        feedback_fn("Note: CONFD was disabled at build time, node health"
                    " maintenance is not useful (still enabling it)")
      self.cluster.maintain_node_health = self.op.maintain_node_health

1365
1366
1367
    if self.op.modify_etc_hosts is not None:
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts

1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
    if self.op.prealloc_wipe_disks is not None:
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks

    if self.op.add_uids is not None:
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)

    if self.op.remove_uids is not None:
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)

    if self.op.uid_pool is not None:
      self.cluster.uid_pool = self.op.uid_pool

    if self.op.default_iallocator is not None:
      self.cluster.default_iallocator = self.op.default_iallocator

1383
1384
1385
    if self.op.default_iallocator_params is not None:
      self.cluster.default_iallocator_params = self.op.default_iallocator_params

1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
    if self.op.reserved_lvs is not None:
      self.cluster.reserved_lvs = self.op.reserved_lvs

    if self.op.use_external_mip_script is not None:
      self.cluster.use_external_mip_script = self.op.use_external_mip_script

    def helper_os(aname, mods, desc):
      desc += " OS list"
      lst = getattr(self.cluster, aname)
      for key, val in mods:
        if key == constants.DDM_ADD:
          if val in lst:
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
          else:
            lst.append(val)
        elif key == constants.DDM_REMOVE:
          if val in lst:
            lst.remove(val)
          else:
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
        else:
          raise errors.ProgrammerError("Invalid modification '%s'" % key)

    if self.op.hidden_os:
      helper_os("hidden_os", self.op.hidden_os, "hidden")

    if self.op.blacklisted_os:
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")

    if self.op.master_netdev:
      master_params = self.cfg.GetMasterNetworkParameters()
      ems = self.cfg.GetUseExternalMipScript()
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
                  self.cluster.master_netdev)
Thomas Thrainer's avatar
Thomas Thrainer committed
1420
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1421
                                                       master_params, ems)
1422
1423
1424
1425
1426
1427
1428
      if not self.op.force:
        result.Raise("Could not disable the master ip")
      else:
        if result.fail_msg:
          msg = ("Could not disable the master ip (continuing anyway): %s" %
                 result.fail_msg)
          feedback_fn(msg)
1429
1430
1431
1432
1433
1434
1435
      feedback_fn("Changing master_netdev from %s to %s" %
                  (master_params.netdev, self.op.master_netdev))
      self.cluster.master_netdev = self.op.master_netdev

    if self.op.master_netmask:
      master_params = self.cfg.GetMasterNetworkParameters()
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
Thomas Thrainer's avatar
Thomas Thrainer committed
1436
1437
1438
1439
      result = self.rpc.call_node_change_master_netmask(
                 master_params.uuid, master_params.netmask,
                 self.op.master_netmask, master_params.ip,
                 master_params.netdev)
1440
      result.Warn("Could not change the master IP netmask", feedback_fn)
1441
1442
1443
1444
1445
1446
1447
1448
1449
      self.cluster.master_netmask = self.op.master_netmask

    self.cfg.Update(self.cluster, feedback_fn)

    if self.op.master_netdev:
      master_params = self.cfg.GetMasterNetworkParameters()
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
                  self.op.master_netdev)
      ems = self.cfg.GetUseExternalMipScript()
Thomas Thrainer's avatar
Thomas Thrainer committed
1450
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1451
                                                     master_params, ems)
1452
1453
      result.Warn("Could not re-enable the master ip on the master,"
                  " please restart manually", self.LogWarning)
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509


class LUClusterVerify(NoHooksLU):
  """Submits all jobs necessary to verify the cluster.

  """
  REQ_BGL = False

  def ExpandNames(self):
    self.needed_locks = {}

  def Exec(self, feedback_fn):
    jobs = []

    if self.op.group_name:
      groups = [self.op.group_name]
      depends_fn = lambda: None
    else:
      groups = self.cfg.GetNodeGroupList()

      # Verify global configuration
      jobs.append([
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
        ])

      # Always depend on global verification
      depends_fn = lambda: [(-len(jobs), [])]

    jobs.extend(
      [opcodes.OpClusterVerifyGroup(group_name=group,
                                    ignore_errors=self.op.ignore_errors,
                                    depends=depends_fn())]
      for group in groups)

    # Fix up all parameters
    for op in itertools.chain(*jobs): # pylint: disable=W0142
      op.debug_simulate_errors = self.op.debug_simulate_errors
      op.verbose = self.op.verbose
      op.error_codes = self.op.error_codes
      try:
        op.skip_checks = self.op.skip_checks
      except AttributeError:
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)

    return ResultWithJobs(jobs)


class _VerifyErrors(object):
  """Mix-in for cluster/group verify LUs.

  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
  self.op and self._feedback_fn to be available.)

  """

  ETYPE_FIELD = "code"
Helga Velroyen's avatar
Helga Velroyen committed
1510
1511
  ETYPE_ERROR = constants.CV_ERROR
  ETYPE_WARNING = constants.CV_WARNING
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608

  def _Error(self, ecode, item, msg, *args, **kwargs):
    """Format an error message.

    Based on the opcode's error_codes parameter, either format a
    parseable error code, or a simpler error string.

    This must be called only from Exec and functions called from Exec.

    """
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
    itype, etxt, _ = ecode
    # If the error code is in the list of ignored errors, demote the error to a
    # warning
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
      ltype = self.ETYPE_WARNING
    # first complete the msg
    if args:
      msg = msg % args
    # then format the whole message
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
    else:
      if item:
        item = " " + item
      else:
        item = ""
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
    # and finally report it via the feedback_fn
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
    # do not mark the operation as failed for WARN cases only
    if ltype == self.ETYPE_ERROR:
      self.bad = True

  def _ErrorIf(self, cond, *args, **kwargs):
    """Log an error message if the passed condition is True.

    """
    if (bool(cond)
        or self.op.debug_simulate_errors): # pylint: disable=E1101
      self._Error(*args, **kwargs)


def _GetAllHypervisorParameters(cluster, instances):
  """Compute the set of all hypervisor parameters.

  @type cluster: L{objects.Cluster}
  @param cluster: the cluster object
  @param instances: list of L{objects.Instance}
  @param instances: additional instances from which to obtain parameters
  @rtype: list of (origin, hypervisor, parameters)
  @return: a list with all parameters found, indicating the hypervisor they
       apply to, and the origin (can be "cluster", "os X", or "instance Y")

  """
  hvp_data = []

  for hv_name in cluster.enabled_hypervisors:
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))

  for os_name, os_hvp in cluster.os_hvp.items():
    for hv_name, hv_params in os_hvp.items():
      if hv_params:
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
        hvp_data.append(("os %s" % os_name, hv_name, full_params))

  # TODO: collapse identical parameter values in a single one
  for instance in instances:
    if instance.hvparams:
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
                       cluster.FillHV(instance)))

  return hvp_data


class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
  """Verifies the cluster config.

  """
  REQ_BGL = False

  def _VerifyHVP(self, hvp_data):
    """Verifies locally the syntax of the hypervisor parameters.

    """
    for item, hv_name, hv_params in hvp_data:
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
             (item, hv_name))
      try:
        hv_class = hypervisor.GetHypervisorClass(hv_name)
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
        hv_class.CheckParameterSyntax(hv_params)
      except errors.GenericError, err:
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))

  def ExpandNames(self):
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1609
    self.share_locks = ShareAll()
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634

  def CheckPrereq(self):
    """Check prerequisites.

    """
    # Retrieve all information
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
    self.all_node_info = self.cfg.GetAllNodesInfo()
    self.all_inst_info = self.cfg.GetAllInstancesInfo()

  def Exec(self, feedback_fn):
    """Verify integrity of cluster, performing various test on nodes.

    """
    self.bad = False
    self._feedback_fn = feedback_fn

    feedback_fn("* Verifying cluster config")

    for msg in self.cfg.VerifyConfig():
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)

    feedback_fn("* Verifying cluster certificate files")

    for cert_filename in pathutils.ALL_CERT_FILES:
Helga Velroyen's avatar
Helga Velroyen committed
1635
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1636
1637
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)

1638
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1639
1640
1641
1642
                                    pathutils.NODED_CERT_FILE),
                  constants.CV_ECLUSTERCERT,
                  None,
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1643
                    constants.LUXID_USER + " user")
1644

1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
    feedback_fn("* Verifying hypervisor parameters")

    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
                                                self.all_inst_info.values()))

    feedback_fn("* Verifying all nodes belong to an existing group")

    # We do this verification here because, should this bogus circumstance
    # occur, it would never be caught by VerifyGroup, which only acts on
    # nodes/instances reachable from existing node groups.

Thomas Thrainer's avatar
Thomas Thrainer committed
1656
    dangling_nodes = set(node for node in self.all_node_info.values()
1657
1658
1659
1660
1661
1662
                         if node.group not in self.all_group_info)

    dangling_instances = {}
    no_node_instances = []

    for inst in self.all_inst_info.values():