Commit adf6301e authored by Michael Hanselmann's avatar Michael Hanselmann

watcher: Split state class into separate module

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent bcf0450d
......@@ -245,7 +245,8 @@ impexpd_PYTHON = \
watcher_PYTHON = \
lib/watcher/__init__.py \
lib/watcher/nodemaint.py
lib/watcher/nodemaint.py \
lib/watcher/state.py
server_PYTHON = \
lib/server/__init__.py \
......
......@@ -37,7 +37,6 @@ from optparse import OptionParser
from ganeti import utils
from ganeti import constants
from ganeti import compat
from ganeti import serializer
from ganeti import errors
from ganeti import opcodes
from ganeti import cli
......@@ -46,22 +45,16 @@ from ganeti import rapi
from ganeti import netutils
import ganeti.rapi.client # pylint: disable-msg=W0611
import ganeti.watcher.nodemaint # pylint: disable-msg=W0611
from ganeti.watcher import nodemaint
from ganeti.watcher import state
MAXTRIES = 5
# Delete any record that is older than 8 hours; this value is based on
# the fact that the current retry counter is 5, and watcher runs every
# 5 minutes, so it takes around half an hour to exceed the retry
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
RETRY_EXPIRATION = 8 * 3600
BAD_STATES = [constants.INSTST_ERRORDOWN]
HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
NOTICE = 'NOTICE'
ERROR = 'ERROR'
KEY_RESTART_COUNT = "restart_count"
KEY_RESTART_WHEN = "restart_when"
KEY_BOOT_ID = "bootid"
# Global LUXI client object
......@@ -118,166 +111,13 @@ def RunWatcherHooks():
runresult.output)
class WatcherState(object):
"""Interface to a state file recording restart attempts.
"""
def __init__(self, statefile):
"""Open, lock, read and parse the file.
@type statefile: file
@param statefile: State file object
"""
self.statefile = statefile
try:
state_data = self.statefile.read()
if not state_data:
self._data = {}
else:
self._data = serializer.Load(state_data)
except Exception, msg: # pylint: disable-msg=W0703
# Ignore errors while loading the file and treat it as empty
self._data = {}
logging.warning(("Invalid state file. Using defaults."
" Error message: %s"), msg)
if "instance" not in self._data:
self._data["instance"] = {}
if "node" not in self._data:
self._data["node"] = {}
self._orig_data = serializer.Dump(self._data)
def Save(self):
"""Save state to file, then unlock and close it.
"""
assert self.statefile
serialized_form = serializer.Dump(self._data)
if self._orig_data == serialized_form:
logging.debug("Data didn't change, just touching status file")
os.utime(constants.WATCHER_STATEFILE, None)
return
# We need to make sure the file is locked before renaming it, otherwise
# starting ganeti-watcher again at the same time will create a conflict.
fd = utils.WriteFile(constants.WATCHER_STATEFILE,
data=serialized_form,
prewrite=utils.LockFile, close=False)
self.statefile = os.fdopen(fd, 'w+')
def Close(self):
"""Unlock configuration file and close it.
"""
assert self.statefile
# Files are automatically unlocked when closing them
self.statefile.close()
self.statefile = None
def GetNodeBootID(self, name):
"""Returns the last boot ID of a node or None.
"""
ndata = self._data["node"]
if name in ndata and KEY_BOOT_ID in ndata[name]:
return ndata[name][KEY_BOOT_ID]
return None
def SetNodeBootID(self, name, bootid):
"""Sets the boot ID of a node.
"""
assert bootid
ndata = self._data["node"]
if name not in ndata:
ndata[name] = {}
ndata[name][KEY_BOOT_ID] = bootid
def NumberOfRestartAttempts(self, instance):
"""Returns number of previous restart attempts.
@type instance: L{Instance}
@param instance: the instance to look up
"""
idata = self._data["instance"]
if instance.name in idata:
return idata[instance.name][KEY_RESTART_COUNT]
return 0
def MaintainInstanceList(self, instances):
"""Perform maintenance on the recorded instances.
@type instances: list of string
@param instances: the list of currently existing instances
"""
idict = self._data["instance"]
# First, delete obsolete instances
obsolete_instances = set(idict).difference(instances)
for inst in obsolete_instances:
logging.debug("Forgetting obsolete instance %s", inst)
del idict[inst]
# Second, delete expired records
earliest = time.time() - RETRY_EXPIRATION
expired_instances = [i for i in idict
if idict[i][KEY_RESTART_WHEN] < earliest]
for inst in expired_instances:
logging.debug("Expiring record for instance %s", inst)
del idict[inst]
def RecordRestartAttempt(self, instance):
"""Record a restart attempt.
@type instance: L{Instance}
@param instance: the instance being restarted
"""
idata = self._data["instance"]
if instance.name not in idata:
inst = idata[instance.name] = {}
else:
inst = idata[instance.name]
inst[KEY_RESTART_WHEN] = time.time()
inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
def RemoveInstance(self, instance):
"""Update state to reflect that a machine is running.
This method removes the record for a named instance (as we only
track down instances).
@type instance: L{Instance}
@param instance: the instance to remove from books
"""
idata = self._data["instance"]
if instance.name in idata:
del idata[instance.name]
class Instance(object):
"""Abstraction for a Virtual Machine instance.
"""
def __init__(self, name, state, autostart, snodes):
def __init__(self, name, status, autostart, snodes):
self.name = name
self.state = state
self.status = status
self.autostart = autostart
self.snodes = snodes
......@@ -431,7 +271,7 @@ class Watcher(object):
notepad.MaintainInstanceList(self.instances.keys())
for instance in self.instances.values():
if instance.state in BAD_STATES:
if instance.status in BAD_STATES:
n = notepad.NumberOfRestartAttempts(instance)
if n > MAXTRIES:
......@@ -454,7 +294,7 @@ class Watcher(object):
instance.name)
notepad.RecordRestartAttempt(instance)
elif instance.state in HELPLESS_STATES:
elif instance.status in HELPLESS_STATES:
if notepad.NumberOfRestartAttempts(instance):
notepad.RemoveInstance(instance)
else:
......@@ -519,7 +359,7 @@ class Watcher(object):
job = []
for name in offline_disk_instances:
instance = self.instances[name]
if (instance.state in HELPLESS_STATES or
if (instance.status in HELPLESS_STATES or
self._CheckForOfflineNodes(instance)):
logging.info("Skip instance %s because it is in helpless state or has"
" one offline secondary", name)
......@@ -535,30 +375,6 @@ class Watcher(object):
logging.exception("Error while activating disks")
def OpenStateFile(path):
"""Opens the state file and acquires a lock on it.
@type path: string
@param path: Path to state file
"""
# The two-step dance below is necessary to allow both opening existing
# file read/write and creating if not existing. Vanilla open will truncate
# an existing file -or- allow creating if not existing.
statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
# Try to acquire lock on state file. If this fails, another watcher instance
# might already be running or another program is temporarily blocking the
# watcher from running.
try:
utils.LockFile(statefile_fd)
except errors.LockError, err:
logging.error("Can't acquire lock on state file %s: %s", path, err)
return None
return os.fdopen(statefile_fd, "w+")
def IsRapiResponding(hostname):
"""Connects to RAPI port and does a simple test.
......@@ -628,7 +444,8 @@ def Main():
logging.debug("Pause has been set, exiting")
return constants.EXIT_SUCCESS
statefile = OpenStateFile(constants.WATCHER_STATEFILE)
statefile = \
state.OpenStateFile(constants.WATCHER_STATEFILE)
if not statefile:
return constants.EXIT_FAILURE
......@@ -638,10 +455,10 @@ def Main():
RunWatcherHooks()
# run node maintenance in all cases, even if master, so that old
# masters can be properly cleaned up too
if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
if nodemaint.NodeMaintenance.ShouldRun():
nodemaint.NodeMaintenance().Exec()
notepad = WatcherState(statefile)
notepad = state.WatcherState(statefile)
try:
try:
client = cli.GetClient()
......
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Module keeping state for Ganeti watcher.
"""
import os
import time
import logging
from ganeti import utils
from ganeti import constants
from ganeti import serializer
from ganeti import errors
# Delete any record that is older than 8 hours; this value is based on
# the fact that the current retry counter is 5, and watcher runs every
# 5 minutes, so it takes around half an hour to exceed the retry
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
RETRY_EXPIRATION = 8 * 3600
KEY_RESTART_COUNT = "restart_count"
KEY_RESTART_WHEN = "restart_when"
KEY_BOOT_ID = "bootid"
def OpenStateFile(path):
"""Opens the state file and acquires a lock on it.
@type path: string
@param path: Path to state file
"""
# The two-step dance below is necessary to allow both opening existing
# file read/write and creating if not existing. Vanilla open will truncate
# an existing file -or- allow creating if not existing.
statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
# Try to acquire lock on state file. If this fails, another watcher instance
# might already be running or another program is temporarily blocking the
# watcher from running.
try:
utils.LockFile(statefile_fd)
except errors.LockError, err:
logging.error("Can't acquire lock on state file %s: %s", path, err)
return None
return os.fdopen(statefile_fd, "w+")
class WatcherState(object):
"""Interface to a state file recording restart attempts.
"""
def __init__(self, statefile):
"""Open, lock, read and parse the file.
@type statefile: file
@param statefile: State file object
"""
self.statefile = statefile
try:
state_data = self.statefile.read()
if not state_data:
self._data = {}
else:
self._data = serializer.Load(state_data)
except Exception, msg: # pylint: disable-msg=W0703
# Ignore errors while loading the file and treat it as empty
self._data = {}
logging.warning(("Invalid state file. Using defaults."
" Error message: %s"), msg)
if "instance" not in self._data:
self._data["instance"] = {}
if "node" not in self._data:
self._data["node"] = {}
self._orig_data = serializer.Dump(self._data)
def Save(self):
"""Save state to file, then unlock and close it.
"""
assert self.statefile
serialized_form = serializer.Dump(self._data)
if self._orig_data == serialized_form:
logging.debug("Data didn't change, just touching status file")
os.utime(constants.WATCHER_STATEFILE, None)
return
# We need to make sure the file is locked before renaming it, otherwise
# starting ganeti-watcher again at the same time will create a conflict.
fd = utils.WriteFile(constants.WATCHER_STATEFILE,
data=serialized_form,
prewrite=utils.LockFile, close=False)
self.statefile = os.fdopen(fd, 'w+')
def Close(self):
"""Unlock configuration file and close it.
"""
assert self.statefile
# Files are automatically unlocked when closing them
self.statefile.close()
self.statefile = None
def GetNodeBootID(self, name):
"""Returns the last boot ID of a node or None.
"""
ndata = self._data["node"]
if name in ndata and KEY_BOOT_ID in ndata[name]:
return ndata[name][KEY_BOOT_ID]
return None
def SetNodeBootID(self, name, bootid):
"""Sets the boot ID of a node.
"""
assert bootid
ndata = self._data["node"]
if name not in ndata:
ndata[name] = {}
ndata[name][KEY_BOOT_ID] = bootid
def NumberOfRestartAttempts(self, instance):
"""Returns number of previous restart attempts.
@type instance: L{Instance}
@param instance: the instance to look up
"""
idata = self._data["instance"]
if instance.name in idata:
return idata[instance.name][KEY_RESTART_COUNT]
return 0
def MaintainInstanceList(self, instances):
"""Perform maintenance on the recorded instances.
@type instances: list of string
@param instances: the list of currently existing instances
"""
idict = self._data["instance"]
# First, delete obsolete instances
obsolete_instances = set(idict).difference(instances)
for inst in obsolete_instances:
logging.debug("Forgetting obsolete instance %s", inst)
del idict[inst]
# Second, delete expired records
earliest = time.time() - RETRY_EXPIRATION
expired_instances = [i for i in idict
if idict[i][KEY_RESTART_WHEN] < earliest]
for inst in expired_instances:
logging.debug("Expiring record for instance %s", inst)
del idict[inst]
def RecordRestartAttempt(self, instance):
"""Record a restart attempt.
@type instance: L{Instance}
@param instance: the instance being restarted
"""
idata = self._data["instance"]
if instance.name not in idata:
inst = idata[instance.name] = {}
else:
inst = idata[instance.name]
inst[KEY_RESTART_WHEN] = time.time()
inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
def RemoveInstance(self, instance):
"""Update state to reflect that a machine is running.
This method removes the record for a named instance (as we only
track down instances).
@type instance: L{Instance}
@param instance: the instance to remove from books
"""
idata = self._data["instance"]
if instance.name in idata:
del idata[instance.name]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment