watcher: Write per-group instance status, merge into global one

Each per-group watcher process writes its own instance status file. Once
that's done it tries to acquire an exclusive lock on the global file and
will proceed to read all status file, merging them based on each file's
mtime. If an instance is moved to another group, the newer status will
supersede that of an older file which hasn't yet been updated.
......@@ -161,6 +161,10 @@ WATCHER_LOCK_FILE = LOCK_DIR + "/ganeti-watcher.lock"
#: Status file for per-group watcher, locked in exclusive mode by watcher
#: File for per-group instance status, merged into L{INSTANCE_STATUS_FILE} by
#: per-group processes
WATCHER_GROUP_INSTANCE_STATUS_FILE = DATA_DIR + "/watcher.%s.instance-status"
#: File containing Unix timestamp until which watcher should be paused
WATCHER_PAUSEFILE = DATA_DIR + "/watcher.pause"
......@@ -33,6 +33,7 @@ import sys
import time
import logging
import operator
import errno
from optparse import OptionParser
from ganeti import utils
......@@ -69,6 +70,9 @@ ERROR = "ERROR"
#: Number of seconds to wait between starting child processes for node groups
#: How many seconds to wait for instance status file lock
class NotMasterError(errors.GenericError):
"""Exception raised when this host is not the master."""
......@@ -370,23 +374,124 @@ def ParseOptions():
return (options, args)
def _UpdateInstanceStatus(cl, filename):
"""Get a list of instances on this cluster.
def _WriteInstanceStatus(filename, data):
"""Writes the per-group instance status file.
The entries are sorted.
@todo: Think about doing this per nodegroup, too
@type filename: string
@param filename: Path to instance status file
@type data: list of tuple; (instance name as string, status as string)
@param data: Instance name and status
op = opcodes.OpInstanceQuery(output_fields=["name", "status"], names=[],
job_id = cl.SubmitJob([op])
(result, ) = cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
logging.debug("Updating instance status file '%s' with %s instances",
filename, len(data))
data="".join(map(compat.partial(operator.mod, "%s %s\n"),
def _UpdateInstanceStatus(filename, instances):
"""Writes an instance status file from L{Instance} objects.
@type filename: string
@param filename: Path to status file
@type instances: list of L{Instance}
_WriteInstanceStatus(filename, [(, inst.status)
for inst in instances])
class _StatCb:
"""Helper to store file handle's C{fstat}.
def __init__(self):
"""Initializes this class.
""" = None
def __call__(self, fh):
"""Calls C{fstat} on file handle.
logging.debug("Got instance data, writing status file %s", filename)
""" = os.fstat(fh.fileno())
utils.WriteFile(filename, data="".join("%s %s\n" % (name, status)
for (name, status) in result))
def _ReadInstanceStatus(filename):
"""Reads an instance status file.
@type filename: string
@param filename: Path to status file
@rtype: tuple; (None or number, list of lists containing instance name and
@return: File's mtime and instance status contained in the file; mtime is
C{None} if file can't be read
logging.debug("Reading per-group instance status from '%s'", filename)
statcb = _StatCb()
content = utils.ReadFile(filename, preread=statcb)
except EnvironmentError, err:
if err.errno == errno.ENOENT:
logging.error("Can't read '%s', does not exist (yet)", filename)
logging.exception("Unable to read '%s', ignoring", filename)
return (None, None)
return (, [line.split(1)
for line in content.splitlines()])
def _MergeInstanceStatus(filename, pergroup_filename, groups):
"""Merges all per-group instance status files into a global one.
@type filename: string
@param filename: Path to global instance status file
@type pergroup_filename: string
@param pergroup_filename: Path to per-group status files, must contain "%s"
to be replaced with group UUID
@type groups: sequence
@param groups: UUIDs of known groups
# Lock global status file in exclusive mode
lock = utils.FileLock.Open(filename)
lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
except errors.LockError, err:
# All per-group processes will lock and update the file. None of them
# should take longer than 10 seconds (the value of
logging.error("Can't acquire lock on instance status file '%s', not"
" updating: %s", filename, err)
logging.debug("Acquired exclusive lock on '%s'", filename)
data = {}
# Load instance status from all groups
for group_uuid in groups:
(mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
if mtime is not None:
for (instance_name, status) in instdata:
data.setdefault(instance_name, []).append((mtime, status))
# Select last update based on file mtime
inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
for (instance_name, status) in data.items()]
# Write the global status file. Don't touch file after it's been
# updated--there is no lock anymore.
_WriteInstanceStatus(filename, inststatus)
def GetLuxiClient(try_restart):
......@@ -513,7 +618,6 @@ def _GlobalWatcher(opts):
_ArchiveJobs(client, opts.job_age)
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
# Spawn child processes for all node groups
_StartGroupChildren(client, opts.wait_children)
......@@ -578,14 +682,19 @@ def _GetGroupData(cl, uuid):
dict((, inst) for inst in instances))
def _KnownGroup(uuid):
"""Checks if a group UUID is known by ssconf.
def _LoadKnownGroups():
"""Returns a list of all node groups known by L{ssconf}.
groups = ssconf.SimpleStore().GetNodegroupList()
return compat.any(line.strip() and line.split()[0] == uuid
for line in groups)
result = list(line.split(None, 1)[0] for line in groups
if line.strip())
if not compat.all(map(utils.UUID_RE.match, result)):
raise errors.GenericError("Ssconf contains invalid group UUID")
return result
def _GroupWatcher(opts):
......@@ -601,12 +710,16 @@ def _GroupWatcher(opts):"Watcher for node group '%s'", group_uuid)
known_groups = _LoadKnownGroups()
# Check if node group is known
if not _KnownGroup(group_uuid):
if group_uuid not in known_groups:
raise errors.GenericError("Node group '%s' is not known by ssconf" %
# Group UUID has been verified and should not contain any dangerous characters
state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
logging.debug("Using state file %s", state_path)
......@@ -624,6 +737,13 @@ def _GroupWatcher(opts):
(nodes, instances) = _GetGroupData(client, group_uuid)
# Update per-group instance status file
_UpdateInstanceStatus(inst_status_path, instances.values())
started = _CheckInstances(client, notepad, instances)
_CheckDisks(client, notepad, nodes, instances, started)
_VerifyDisks(client, group_uuid, nodes, instances)
