Commit dff5f600 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Convert LUClusterConfigQuery to query2



The main intention of this patch is to make it possible to retrieve
cluster tags via query2. While at it I decided to convert
LUClusterConfigQuery right away. Some of the values returned by
LUClusterQuery are also included, but the conversion of LUClusterQuery
is not yet complete.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 496d5ac8
......@@ -6165,38 +6165,70 @@ class LUClusterConfigQuery(NoHooksLU):
 
"""
REQ_BGL = False
_FIELDS_DYNAMIC = utils.FieldSet()
_FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
"watcher_pause", "volume_group_name")
 
def CheckArguments(self):
_CheckOutputFields(static=self._FIELDS_STATIC,
dynamic=self._FIELDS_DYNAMIC,
selected=self.op.output_fields)
self.cq = _ClusterQuery(None, self.op.output_fields, False)
 
def ExpandNames(self):
self.needed_locks = {}
self.cq.ExpandNames(self)
def DeclareLocks(self, level):
self.cq.DeclareLocks(self, level)
 
def Exec(self, feedback_fn):
"""Dump a representation of the cluster config to the standard output.
"""
values = []
for field in self.op.output_fields:
if field == "cluster_name":
entry = self.cfg.GetClusterName()
elif field == "master_node":
entry = self.cfg.GetMasterNode()
elif field == "drain_flag":
entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
elif field == "watcher_pause":
entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
elif field == "volume_group_name":
entry = self.cfg.GetVGName()
else:
raise errors.ParameterError(field)
values.append(entry)
return values
result = self.cq.OldStyleQuery(self)
assert len(result) == 1
return result[0]
class _ClusterQuery(_QueryBase):
FIELDS = query.CLUSTER_FIELDS
#: Do not sort (there is only one item)
SORT_FIELD = None
def ExpandNames(self, lu):
lu.needed_locks = {}
# The following variables interact with _QueryBase._GetNames
self.wanted = locking.ALL_SET
self.do_locking = self.use_locking
if self.do_locking:
raise errors.OpPrereqError("Can not use locking for cluster queries",
errors.ECODE_INVAL)
def DeclareLocks(self, lu, level):
pass
def _GetQueryData(self, lu):
"""Computes the list of nodes and their attributes.
"""
# Locking is not used
assert not (compat.any(lu.glm.is_owned(level)
for level in locking.LEVELS
if level != locking.LEVEL_CLUSTER) or
self.do_locking or self.use_locking)
if query.CQ_CONFIG in self.requested_data:
cluster = lu.cfg.GetClusterInfo()
else:
cluster = NotImplemented
if query.CQ_QUEUE_DRAINED in self.requested_data:
drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
else:
drain_flag = NotImplemented
if query.CQ_WATCHER_PAUSE in self.requested_data:
watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
else:
watcher_pause = NotImplemented
return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
 
 
class LUInstanceActivateDisks(NoHooksLU):
......@@ -13080,6 +13112,7 @@ class _ExportQuery(_QueryBase):
 
"""
# Locking is not used
# TODO
assert not (compat.any(lu.glm.is_owned(level)
for level in locking.LEVELS
if level != locking.LEVEL_CLUSTER) or
......@@ -15214,6 +15247,7 @@ class LUTestAllocator(NoHooksLU):
 
#: Query type implementations
_QUERY_IMPL = {
constants.QR_CLUSTER: _ClusterQuery,
constants.QR_INSTANCE: _InstanceQuery,
constants.QR_NODE: _NodeQuery,
constants.QR_GROUP: _GroupQuery,
......
......@@ -1611,6 +1611,7 @@ JQT_ALL = frozenset([
])
# Query resources
QR_CLUSTER = "cluster"
QR_INSTANCE = "instance"
QR_NODE = "node"
QR_LOCK = "lock"
......@@ -1621,6 +1622,7 @@ QR_EXPORT = "export"
#: List of resources which can be queried using L{opcodes.OpQuery}
QR_VIA_OP = frozenset([
QR_CLUSTER,
QR_INSTANCE,
QR_NODE,
QR_GROUP,
......
......@@ -62,6 +62,7 @@ from ganeti import utils
from ganeti import compat
from ganeti import objects
from ganeti import ht
from ganeti import runtime
from ganeti import qlang
from ganeti.constants import (QFT_UNKNOWN, QFT_TEXT, QFT_BOOL, QFT_NUMBER,
......@@ -94,6 +95,10 @@ from ganeti.constants import (QFT_UNKNOWN, QFT_TEXT, QFT_BOOL, QFT_NUMBER,
GQ_NODE,
GQ_INST) = range(200, 203)
(CQ_CONFIG,
CQ_QUEUE_DRAINED,
CQ_WATCHER_PAUSE) = range(300, 303)
# Query field flags
QFF_HOSTNAME = 0x01
QFF_IP_ADDRESS = 0x02
......@@ -874,6 +879,20 @@ def _MakeField(name, title, kind, doc):
doc=doc)
def _StaticValueInner(value, ctx, _): # pylint: disable=W0613
"""Returns a static value.
"""
return value
def _StaticValue(value):
"""Prepares a function to return a static value.
"""
return compat.partial(_StaticValueInner, value)
def _GetNodeRole(node, master_name):
"""Determine node role.
......@@ -2264,6 +2283,99 @@ def _BuildExportFields():
return _PrepareFieldList(fields, [])
_CLUSTER_VERSION_FIELDS = {
"software_version": ("SoftwareVersion", QFT_TEXT, constants.RELEASE_VERSION,
"Software version"),
"protocol_version": ("ProtocolVersion", QFT_NUMBER,
constants.PROTOCOL_VERSION,
"RPC protocol version"),
"config_version": ("ConfigVersion", QFT_NUMBER, constants.CONFIG_VERSION,
"Configuration format version"),
"os_api_version": ("OsApiVersion", QFT_NUMBER, max(constants.OS_API_VERSIONS),
"API version for OS template scripts"),
"export_version": ("ExportVersion", QFT_NUMBER, constants.EXPORT_VERSION,
"Import/export file format version"),
}
_CLUSTER_SIMPLE_FIELDS = {
"cluster_name": ("Name", QFT_TEXT, QFF_HOSTNAME, "Cluster name"),
"master_node": ("Master", QFT_TEXT, QFF_HOSTNAME, "Master node name"),
"volume_group_name": ("VgName", QFT_TEXT, 0, "LVM volume group name"),
}
class ClusterQueryData:
def __init__(self, cluster, drain_flag, watcher_pause):
"""Initializes this class.
@type cluster: L{objects.Cluster}
@param cluster: Instance of cluster object
@type drain_flag: bool
@param drain_flag: Whether job queue is drained
@type watcher_pause: number
@param watcher_pause: Until when watcher is paused (Unix timestamp)
"""
self._cluster = cluster
self.drain_flag = drain_flag
self.watcher_pause = watcher_pause
def __iter__(self):
return iter([self._cluster])
def _ClusterWatcherPause(ctx, _):
"""Returns until when watcher is paused (if available).
"""
if ctx.watcher_pause is None:
return _FS_UNAVAIL
else:
return ctx.watcher_pause
def _BuildClusterFields():
"""Builds list of fields for cluster information.
"""
fields = [
(_MakeField("tags", "Tags", QFT_OTHER, "Tags"), CQ_CONFIG, 0,
lambda ctx, cluster: list(cluster.GetTags())),
(_MakeField("architecture", "ArchInfo", QFT_OTHER,
"Architecture information"), None, 0,
lambda ctx, _: runtime.GetArchInfo()),
(_MakeField("drain_flag", "QueueDrained", QFT_BOOL,
"Flag whether job queue is drained"), CQ_QUEUE_DRAINED, 0,
lambda ctx, _: ctx.drain_flag),
(_MakeField("watcher_pause", "WatcherPause", QFT_TIMESTAMP,
"Until when watcher is paused"), CQ_WATCHER_PAUSE, 0,
_ClusterWatcherPause),
]
# Simple fields
fields.extend([
(_MakeField(name, title, kind, doc), CQ_CONFIG, flags, _GetItemAttr(name))
for (name, (title, kind, flags, doc)) in _CLUSTER_SIMPLE_FIELDS.items()
])
# Version fields
fields.extend([
(_MakeField(name, title, kind, doc), None, 0, _StaticValue(value))
for (name, (title, kind, value, doc)) in _CLUSTER_VERSION_FIELDS.items()
])
# Add timestamps
fields.extend(_GetItemTimestampFields(CQ_CONFIG))
return _PrepareFieldList(fields, [
("name", "cluster_name"),
])
#: Fields for cluster information
CLUSTER_FIELDS = _BuildClusterFields()
#: Fields available for node queries
NODE_FIELDS = _BuildNodeFields()
......@@ -2287,6 +2399,7 @@ EXPORT_FIELDS = _BuildExportFields()
#: All available resources
ALL_FIELDS = {
constants.QR_CLUSTER: CLUSTER_FIELDS,
constants.QR_INSTANCE: INSTANCE_FIELDS,
constants.QR_NODE: NODE_FIELDS,
constants.QR_LOCK: LOCK_FIELDS,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment