Commit 6dfcc47b authored by Iustin Pop's avatar Iustin Pop
Browse files

Change the watcher to use jobs instead of queries

As per the mailing list discussion, this patch changes the watcher to
use a single job (two opcodes) for getting the cluster state (node list
and instance list); it will then compute the needed actions based on
this data.

The patch also archives this job and the verify-disks job.

Reviewed-by: imsnah
parent 7dd106d3
...@@ -225,45 +225,42 @@ class Instance(object): ...@@ -225,45 +225,42 @@ class Instance(object):
cli.SubmitOpCode(op, cl=client) cli.SubmitOpCode(op, cl=client)
def GetInstanceList(with_secondaries=None): def GetClusterData():
"""Get a list of instances on this cluster. """Get a list of instances on this cluster.
""" """
fields = ["name", "status", "admin_state"] op1_fields = ["name", "status", "admin_state", "snodes"]
op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
use_locking=True)
op2_fields = ["name", "bootid", "offline"]
op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
use_locking=True)
if with_secondaries is not None: job_id = client.SubmitJob([op1, op2])
fields.append("snodes")
result = client.QueryInstances([], fields, True) all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
instances = [] result = all_results[0]
for fields in result: smap = {}
if with_secondaries is not None:
(name, status, autostart, snodes) = fields
if not snodes:
continue
for node in with_secondaries: instances = {}
if node in snodes: for fields in result:
break (name, status, autostart, snodes) = fields
else:
continue
else:
(name, status, autostart) = fields
instances.append(Instance(name, status, autostart)) # update the secondary node map
for node in snodes:
if node not in smap:
smap[node] = []
smap[node].append(name)
return instances instances[name] = Instance(name, status, autostart)
nodes = dict([(name, (bootid, offline))
for name, bootid, offline in all_results[1]])
def GetNodeBootIDs(): client.ArchiveJob(job_id)
"""Get a dict mapping nodes to boot IDs.
""" return instances, nodes, smap
result = client.QueryNodes([], ["name", "bootid", "offline"], True)
return dict([(name, (bootid, offline)) for name, bootid, offline in result])
class Watcher(object): class Watcher(object):
...@@ -279,8 +276,7 @@ class Watcher(object): ...@@ -279,8 +276,7 @@ class Watcher(object):
master = client.QueryConfigValues(["master_node"])[0] master = client.QueryConfigValues(["master_node"])[0]
if master != utils.HostInfo().name: if master != utils.HostInfo().name:
raise NotMasterError("This is not the master node") raise NotMasterError("This is not the master node")
self.instances = GetInstanceList() self.instances, self.bootids, self.smap = GetClusterData()
self.bootids = GetNodeBootIDs()
self.started_instances = set() self.started_instances = set()
self.opts = opts self.opts = opts
...@@ -321,21 +317,25 @@ class Watcher(object): ...@@ -321,21 +317,25 @@ class Watcher(object):
if check_nodes: if check_nodes:
# Activate disks for all instances with any of the checked nodes as a # Activate disks for all instances with any of the checked nodes as a
# secondary node. # secondary node.
for instance in GetInstanceList(with_secondaries=check_nodes): for node in check_nodes:
if not instance.autostart: if node not in self.smap:
logging.info(("Skipping disk activation for non-autostart"
" instance %s"), instance.name)
continue
if instance.name in self.started_instances:
# we already tried to start the instance, which should have
# activated its drives (if they can be at all)
continue continue
try: for instance_name in self.smap[node]:
logging.info("Activating disks for instance %s", instance.name) instance = self.instances[instance_name]
instance.ActivateDisks() if not instance.autostart:
except Exception: logging.info(("Skipping disk activation for non-autostart"
logging.exception("Error while activating disks for instance %s", " instance %s"), instance.name)
instance.name) continue
if instance.name in self.started_instances:
# we already tried to start the instance, which should have
# activated its drives (if they can be at all)
continue
try:
logging.info("Activating disks for instance %s", instance.name)
instance.ActivateDisks()
except Exception:
logging.exception("Error while activating disks for instance %s",
instance.name)
# Keep changed boot IDs # Keep changed boot IDs
for name in check_nodes: for name in check_nodes:
...@@ -345,7 +345,7 @@ class Watcher(object): ...@@ -345,7 +345,7 @@ class Watcher(object):
"""Make a pass over the list of instances, restarting downed ones. """Make a pass over the list of instances, restarting downed ones.
""" """
for instance in self.instances: for instance in self.instances.values():
if instance.state in BAD_STATES: if instance.state in BAD_STATES:
n = notepad.NumberOfRestartAttempts(instance) n = notepad.NumberOfRestartAttempts(instance)
...@@ -383,7 +383,9 @@ class Watcher(object): ...@@ -383,7 +383,9 @@ class Watcher(object):
""" """
op = opcodes.OpVerifyDisks() op = opcodes.OpVerifyDisks()
result = cli.SubmitOpCode(op, cl=client) job_id = client.SubmitJob([op])
result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
client.ArchiveJob(job_id)
if not isinstance(result, (tuple, list)): if not isinstance(result, (tuple, list)):
logging.error("Can't get a valid result from verify-disks") logging.error("Can't get a valid result from verify-disks")
return return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment