Commit ec79568d authored by Iustin Pop's avatar Iustin Pop
Browse files

Implement lockless query operations

This patch adds the framework for, and enables lockless OpQueryInstances. This
means that instances will be shown in ERROR_up or ERROR_down state, even though
this is not an error (but just an in-progress job).

The framework is implemented as follows:
  - the OpQueryInstances, OpQueryNodes and OpQueryExports opcodes take
    an additional “use_locking” flag which will denote whether to lock
    or not; this patch only implements this for LUQueryInstances
  - the luxi query functions take an additional argument use_locking
    which is passed to the master daemon, and then passed to the above
    opcodes
  - cli.py export a new SYNC_OPT command line options which implement
    setting this flag to true
  - except for gnt-instance list, which uses this option, and for
    name-only queries (e.g. QueryNodes(fields=["names"])), all other
    callers are setting this flag to True
  - RAPI also sets the flag to True

The patch was tested with a continuous (0.2s sleep in-between)
gnt-instance list during a burnin, and no problems were observed.

Reviewed-by: ultrotter
parent 00ad5362
......@@ -234,18 +234,20 @@ class ClientOps:
return queue.QueryJobs(job_ids, fields)
elif method == luxi.REQ_QUERY_INSTANCES:
(names, fields) = args
op = opcodes.OpQueryInstances(names=names, output_fields=fields)
(names, fields, use_locking) = args
op = opcodes.OpQueryInstances(names=names, output_fields=fields,
use_locking=use_locking)
return self._Query(op)
elif method == luxi.REQ_QUERY_NODES:
(names, fields) = args
op = opcodes.OpQueryNodes(names=names, output_fields=fields)
(names, fields, use_locking) = args
op = opcodes.OpQueryNodes(names=names, output_fields=fields,
use_locking=use_locking)
return self._Query(op)
elif method == luxi.REQ_QUERY_EXPORTS:
nodes = args
op = opcodes.OpQueryExports(nodes=nodes)
nodes, use_locking = args
op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
return self._Query(op)
elif method == luxi.REQ_QUERY_CONFIG_VALUES:
......
......@@ -236,7 +236,7 @@ def GetInstanceList(with_secondaries=None):
if with_secondaries is not None:
fields.append("snodes")
result = client.QueryInstances([], fields)
result = client.QueryInstances([], fields, True)
instances = []
for fields in result:
......@@ -264,7 +264,7 @@ def GetNodeBootIDs():
"""Get a dict mapping nodes to boot IDs.
"""
result = client.QueryNodes([], ["name", "bootid", "offline"])
result = client.QueryNodes([], ["name", "bootid", "offline"], True)
return dict([(name, (bootid, offline)) for name, bootid, offline in result])
......
......@@ -51,7 +51,7 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
"FormatError", "SplitNodeOption", "SubmitOrSend",
"JobSubmittedException", "FormatTimestamp", "ParseTimespec",
"ValidateBeParams", "ToStderr", "ToStdout", "UsesRPC",
"GetOnlineNodes", "JobExecutor",
"GetOnlineNodes", "JobExecutor", "SYNC_OPT",
]
......@@ -190,6 +190,11 @@ SUBMIT_OPT = make_option("--submit", dest="submit_only",
help="Submit the job and return the job ID, but"
" don't wait for the job to finish")
SYNC_OPT = make_option("--sync", dest="do_locking",
default=False, action="store_true",
help="Grab locks while doing the queries"
" in order to ensure more consistent results")
def ARGS_FIXED(val):
"""Macro-like function denoting a fixed number of arguments"""
......
......@@ -3074,7 +3074,7 @@ class LUQueryInstances(NoHooksLU):
"""Logical unit for querying instances.
"""
_OP_REQP = ["output_fields", "names"]
_OP_REQP = ["output_fields", "names", "use_locking"]
REQ_BGL = False
_FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
"admin_state", "admin_ram",
......@@ -3108,7 +3108,8 @@ class LUQueryInstances(NoHooksLU):
else:
self.wanted = locking.ALL_SET
self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
self.do_locking = self.do_node_query and self.op.use_locking
if self.do_locking:
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
self.needed_locks[locking.LEVEL_NODE] = []
......@@ -3157,7 +3158,7 @@ class LUQueryInstances(NoHooksLU):
bad_nodes = []
off_nodes = []
if self.do_locking:
if self.do_node_query:
live_data = {}
node_data = self.rpc.call_all_instances_info(nodes, hv_list)
for name in nodes:
......
......@@ -357,14 +357,14 @@ class Client(object):
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
def QueryInstances(self, names, fields):
return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
def QueryInstances(self, names, fields, use_locking):
return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
def QueryNodes(self, names, fields):
return self.CallMethod(REQ_QUERY_NODES, (names, fields))
def QueryNodes(self, names, fields, use_locking):
return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
def QueryExports(self, nodes):
return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
def QueryExports(self, nodes, use_locking):
return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
def QueryConfigValues(self, fields):
return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
......
......@@ -319,7 +319,7 @@ class OpAddNode(OpCode):
class OpQueryNodes(OpCode):
"""Compute the list of nodes."""
OP_ID = "OP_NODE_QUERY"
__slots__ = ["output_fields", "names"]
__slots__ = ["output_fields", "names", "use_locking"]
class OpQueryNodeVolumes(OpCode):
......@@ -450,7 +450,7 @@ class OpDeactivateInstanceDisks(OpCode):
class OpQueryInstances(OpCode):
"""Compute the list of instances."""
OP_ID = "OP_INSTANCE_QUERY"
__slots__ = ["output_fields", "names"]
__slots__ = ["output_fields", "names", "use_locking"]
class OpQueryInstanceData(OpCode):
......@@ -488,7 +488,7 @@ class OpDiagnoseOS(OpCode):
class OpQueryExports(OpCode):
"""Compute the list of exported images."""
OP_ID = "OP_BACKUP_QUERY"
__slots__ = ["nodes"]
__slots__ = ["nodes", "use_locking"]
class OpExportInstance(OpCode):
......
......@@ -208,11 +208,11 @@ class R_2_nodes(baserlib.R_Generic):
"""
client = luxi.Client()
nodesdata = client.QueryNodes([], ["name"])
nodesdata = client.QueryNodes([], ["name"], True)
nodeslist = [row[0] for row in nodesdata]
if 'bulk' in self.queryargs:
bulkdata = client.QueryNodes(nodeslist, N_FIELDS)
bulkdata = client.QueryNodes(nodeslist, N_FIELDS, True)
return baserlib.MapBulkFields(bulkdata, N_FIELDS)
return baserlib.BuildUriList(nodeslist, "/2/nodes/%s",
......@@ -292,12 +292,12 @@ class R_2_instances(baserlib.R_Generic):
"""
client = luxi.Client()
instancesdata = client.QueryInstances([], ["name"])
instancesdata = client.QueryInstances([], ["name"], True)
instanceslist = [row[0] for row in instancesdata]
if 'bulk' in self.queryargs:
bulkdata = client.QueryInstances(instanceslist, I_FIELDS)
bulkdata = client.QueryInstances(instanceslist, I_FIELDS, True)
return baserlib.MapBulkFields(bulkdata, I_FIELDS)
else:
......
......@@ -45,7 +45,7 @@ def PrintExportList(opts, args):
@return: the desired exit code
"""
exports = GetClient().QueryExports(opts.nodes)
exports = GetClient().QueryExports(opts.nodes, True)
retcode = 0
for node in exports:
ToStdout("Node: %s", node)
......
......@@ -81,7 +81,7 @@ def _ExpandMultiNames(mode, names, client=None):
if mode == _SHUTDOWN_CLUSTER:
if names:
raise errors.OpPrereqError("Cluster filter mode takes no arguments")
idata = client.QueryInstances([], ["name"])
idata = client.QueryInstances([], ["name"], False)
inames = [row[0] for row in idata]
elif mode in (_SHUTDOWN_NODES_BOTH,
......@@ -89,7 +89,8 @@ def _ExpandMultiNames(mode, names, client=None):
_SHUTDOWN_NODES_SEC):
if not names:
raise errors.OpPrereqError("No node names passed")
ndata = client.QueryNodes(names, ["name", "pinst_list", "sinst_list"])
ndata = client.QueryNodes(names, ["name", "pinst_list", "sinst_list"],
True)
ipri = [row[1] for row in ndata]
pri_names = list(itertools.chain(*ipri))
isec = [row[2] for row in ndata]
......@@ -106,7 +107,7 @@ def _ExpandMultiNames(mode, names, client=None):
elif mode == _SHUTDOWN_INSTANCES:
if not names:
raise errors.OpPrereqError("No instance names passed")
idata = client.QueryInstances(names, ["name"])
idata = client.QueryInstances(names, ["name"], False)
inames = [row[0] for row in idata]
else:
......@@ -191,7 +192,7 @@ def _EnsureInstancesExist(client, names):
"""
# TODO: change LUQueryInstances to that it actually returns None
# instead of raising an exception, or devise a better mechanism
result = client.QueryInstances(names, ["name"])
result = client.QueryInstances(names, ["name"], False)
for orig_name, row in zip(names, result):
if row[0] is None:
raise errors.OpPrereqError("Instance '%s' does not exist" % orig_name)
......@@ -214,7 +215,7 @@ def ListInstances(opts, args):
else:
selected_fields = opts.output.split(",")
output = GetClient().QueryInstances(args, selected_fields)
output = GetClient().QueryInstances(args, selected_fields, opts.do_locking)
if not opts.no_headers:
headers = {
......@@ -1370,7 +1371,7 @@ commands = {
], "[-s] [<instance>...]",
"Show information on the specified instance(s)"),
'list': (ListInstances, ARGS_ANY,
[DEBUG_OPT, NOHDR_OPT, SEP_OPT, USEUNITS_OPT, FIELDS_OPT],
[DEBUG_OPT, NOHDR_OPT, SEP_OPT, USEUNITS_OPT, FIELDS_OPT, SYNC_OPT],
"[<instance>...]",
"Lists the instances and their status. The available fields are"
" (see the man page for details): status, oper_state, oper_ram,"
......
......@@ -76,7 +76,7 @@ def AddNode(opts, args):
if not opts.readd:
try:
output = cl.QueryNodes(names=[node], fields=['name'])
output = cl.QueryNodes(names=[node], fields=['name'], use_locking=True)
except (errors.OpPrereqError, errors.OpExecError):
pass
else:
......@@ -119,7 +119,7 @@ def ListNodes(opts, args):
else:
selected_fields = opts.output.split(",")
output = GetClient().QueryNodes([], selected_fields)
output = GetClient().QueryNodes([], selected_fields, True)
if not opts.no_headers:
headers = _LIST_HEADERS
......@@ -183,7 +183,8 @@ def EvacuateNode(opts, args):
src_node = args[0]
op = opcodes.OpQueryNodes(output_fields=selected_fields, names=[src_node])
result = cl.QueryNodes(names=[src_node], fields=selected_fields)
result = cl.QueryNodes(names=[src_node], fields=selected_fields,
use_locking=True)
src_node, sinst = result[0]
if not sinst:
......@@ -191,7 +192,7 @@ def EvacuateNode(opts, args):
return constants.EXIT_SUCCESS
if dst_node is not None:
result = cl.QueryNodes(names=[dst_node], fields=["name"])
result = cl.QueryNodes(names=[dst_node], fields=["name"], use_locking=True)
dst_node = result[0][0]
if src_node == dst_node:
......@@ -276,7 +277,7 @@ def MigrateNode(opts, args):
force = opts.force
selected_fields = ["name", "pinst_list"]
result = cl.QueryNodes(names=args, fields=selected_fields)
result = cl.QueryNodes(names=args, fields=selected_fields, use_locking=True)
node, pinst = result[0]
if not pinst:
......
......@@ -323,7 +323,8 @@ class Burner(object):
else:
names = []
try:
op = opcodes.OpQueryNodes(output_fields=["name", "offline"], names=names)
op = opcodes.OpQueryNodes(output_fields=["name", "offline"],
names=names, use_locking=True)
result = self.ExecOp(op)
except errors.GenericError, err:
err_code, msg = cli.FormatError(err)
......@@ -484,7 +485,7 @@ class Burner(object):
Log("instance %s" % instance, indent=1)
# read the full name of the instance
nam_op = opcodes.OpQueryInstances(output_fields=["name"],
names=[instance])
names=[instance], use_locking=True)
full_name = self.ExecOp(nam_op)[0][0]
if self.opts.iallocator:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment