Commit 683335b3 authored by Helga Velroyen's avatar Helga Velroyen
Browse files

Utility functions for storage types



Handling various storage types for the free space reporting
requires some utility functions. They will be invoked in
later patches. Unit tests are included.
Signed-off-by: default avatarHelga Velroyen <helgav@google.com>
Reviewed-by: default avatarThomas Thrainer <thomasth@google.com>
parent 13669ecd
......@@ -1251,6 +1251,7 @@ python_tests = \
test/py/ganeti.utils.nodesetup_unittest.py \
test/py/ganeti.utils.process_unittest.py \
test/py/ganeti.utils.retry_unittest.py \
test/py/ganeti.utils.storage_unittest.py \
test/py/ganeti.utils.text_unittest.py \
test/py/ganeti.utils.wrapper_unittest.py \
test/py/ganeti.utils.x509_unittest.py \
......
......@@ -22,7 +22,10 @@
"""
import logging
from ganeti import constants
from ganeti import pathutils
def GetDiskTemplatesOfStorageType(storage_type):
......@@ -52,3 +55,108 @@ def LvmGetsEnabled(enabled_disk_templates, new_enabled_disk_templates):
return False
return set(GetLvmDiskTemplates()).intersection(
set(new_enabled_disk_templates))
def _GetDefaultStorageUnitForDiskTemplate(cfg, disk_template):
"""Retrieves the identifier of the default storage entity for the given
storage type.
@type disk_template: string
@param disk_template: a disk template, for example 'drbd'
@rtype: string
@return: identifier for a storage unit, for example the vg_name for lvm
storage
"""
storage_type = constants.DISK_TEMPLATES_STORAGE_TYPE[disk_template]
if disk_template in GetLvmDiskTemplates():
return (storage_type, cfg.GetVGName())
# FIXME: Adjust this, once FILE_STORAGE_DIR and SHARED_FILE_STORAGE_DIR
# are not in autoconf anymore.
elif disk_template == constants.DT_FILE:
return (storage_type, pathutils.DEFAULT_FILE_STORAGE_DIR)
elif disk_template == constants.DT_SHARED_FILE:
return (storage_type, pathutils.DEFAULT_SHARED_FILE_STORAGE_DIR)
else:
return (storage_type, None)
def _GetDefaultStorageUnitForSpindles(cfg):
"""Creates a 'spindle' storage unit, by retrieving the volume group
name and associating it to the lvm-pv storage type.
@rtype: (string, string)
@return: tuple (storage_type, storage_key), where storage type is
'lvm-pv' and storage_key the name of the default volume group
"""
return (constants.ST_LVM_PV, cfg.GetVGName())
# List of storage type for which space reporting is implemented.
# FIXME: Remove this, once the backend is capable to do this for all
# storage types.
_DISK_TEMPLATES_SPACE_QUERYABLE = GetLvmDiskTemplates() \
+ GetDiskTemplatesOfStorageType(constants.ST_FILE)
def GetStorageUnitsOfCluster(cfg, include_spindles=False):
"""Examines the cluster's configuration and returns a list of storage
units and their storage keys, ordered by the order in which they
are enabled.
@type cfg: L{config.ConfigWriter}
@param cfg: Cluster configuration
@type include_spindles: boolean
@param include_spindles: flag to include an extra storage unit for physical
volumes
@rtype: list of tuples (string, string)
@return: list of storage units, each storage unit being a tuple of
(storage_type, storage_key); storage_type is in
C{constants.VALID_STORAGE_TYPES} and the storage_key a string to
identify an entity of that storage type, for example a volume group
name for LVM storage or a file for file storage.
"""
cluster_config = cfg.GetClusterInfo()
storage_units = []
for disk_template in cluster_config.enabled_disk_templates:
if disk_template in _DISK_TEMPLATES_SPACE_QUERYABLE:
storage_units.append(
_GetDefaultStorageUnitForDiskTemplate(cfg, disk_template))
if include_spindles:
included_storage_types = set([st for (st, _) in storage_units])
if not constants.ST_LVM_PV in included_storage_types:
storage_units.append(
_GetDefaultStorageUnitForSpindles(cfg))
return storage_units
def LookupSpaceInfoByStorageType(storage_space_info, storage_type):
"""Looks up the storage space info for a given storage type.
Note that this lookup can be ambiguous if storage space reporting for several
units of the same storage type was requested. This function is only supposed
to be used for legacy code in situations where it actually is unambiguous.
@type storage_space_info: list of dicts
@param storage_space_info: result of C{GetNodeInfo}
@type storage_type: string
@param storage_type: a storage type, which is included in the storage_units
list
@rtype: tuple
@return: returns the element of storage_space_info that matches the given
storage type
"""
result = None
for unit_info in storage_space_info:
if unit_info["type"] == storage_type:
if result is None:
result = unit_info
else:
# There is more than one storage type in the query, log a warning
logging.warning("Storage space information requested for"
" ambiguous storage type '%s'.", storage_type)
return result
#!/usr/bin/python
#
# Copyright (C) 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for unittesting the ganeti.utils.storage module"""
import mock
import unittest
from ganeti import constants
from ganeti import errors
from ganeti import objects
from ganeti import pathutils
from ganeti.utils import storage
import testutils
class TestGetStorageUnitForDiskTemplate(unittest.TestCase):
def setUp(self):
self._default_vg_name = "some_vg_name"
self._cfg = mock.Mock()
self._cfg.GetVGName = mock.Mock(return_value=self._default_vg_name)
def testGetDefaultStorageUnitForDiskTemplateLvm(self):
for disk_template in [constants.DT_DRBD8, constants.DT_PLAIN]:
(storage_type, storage_key) = \
storage._GetDefaultStorageUnitForDiskTemplate(self._cfg,
disk_template)
self.assertEqual(storage_type, constants.ST_LVM_VG)
self.assertEqual(storage_key, self._default_vg_name)
def testGetDefaultStorageUnitForDiskTemplateFile(self):
(storage_type, storage_key) = \
storage._GetDefaultStorageUnitForDiskTemplate(self._cfg,
constants.DT_FILE)
self.assertEqual(storage_type, constants.ST_FILE)
self.assertEqual(storage_key, pathutils.DEFAULT_FILE_STORAGE_DIR)
def testGetDefaultStorageUnitForDiskTemplateSharedFile(self):
(storage_type, storage_key) = \
storage._GetDefaultStorageUnitForDiskTemplate(self._cfg,
constants.DT_SHARED_FILE)
self.assertEqual(storage_type, constants.ST_FILE)
self.assertEqual(storage_key, pathutils.DEFAULT_SHARED_FILE_STORAGE_DIR)
def testGetDefaultStorageUnitForDiskTemplateDiskless(self):
(storage_type, storage_key) = \
storage._GetDefaultStorageUnitForDiskTemplate(self._cfg,
constants.DT_DISKLESS)
self.assertEqual(storage_type, constants.ST_DISKLESS)
self.assertEqual(storage_key, None)
def testGetDefaultStorageUnitForSpindles(self):
(storage_type, storage_key) = \
storage._GetDefaultStorageUnitForSpindles(self._cfg)
self.assertEqual(storage_type, constants.ST_LVM_PV)
self.assertEqual(storage_key, self._default_vg_name)
class TestGetStorageUnitsOfCluster(unittest.TestCase):
def setUp(self):
storage._GetDefaultStorageUnitForDiskTemplate = \
mock.Mock(return_value=("foo", "bar"))
self._cluster_cfg = objects.Cluster()
self._enabled_disk_templates = \
[constants.DT_DRBD8, constants.DT_PLAIN, constants.DT_FILE,
constants.DT_SHARED_FILE]
self._cluster_cfg.enabled_disk_templates = \
self._enabled_disk_templates
self._cfg = mock.Mock()
self._cfg.GetClusterInfo = mock.Mock(return_value=self._cluster_cfg)
self._cfg.GetVGName = mock.Mock(return_value="some_vg_name")
def testGetStorageUnitsOfCluster(self):
storage_units = storage.GetStorageUnitsOfCluster(self._cfg)
self.assertEqual(len(storage_units), len(self._enabled_disk_templates))
def testGetStorageUnitsOfClusterWithSpindles(self):
storage_units = storage.GetStorageUnitsOfCluster(
self._cfg, include_spindles=True)
self.assertEqual(len(storage_units), len(self._enabled_disk_templates) + 1)
self.assertTrue(constants.ST_LVM_PV in [st for (st, sk) in storage_units])
class TestLookupSpaceInfoByStorageType(unittest.TestCase):
def setUp(self):
self._space_info = [
{"type": st, "name": st + "_key", "storage_size": 0, "storage_free": 0}
for st in constants.VALID_STORAGE_TYPES]
def testValidLookup(self):
query_type = constants.ST_LVM_PV
result = storage.LookupSpaceInfoByStorageType(self._space_info, query_type)
self.assertEqual(query_type, result["type"])
def testNotInList(self):
result = storage.LookupSpaceInfoByStorageType(self._space_info,
"non_existing_type")
self.assertEqual(None, result)
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment