Commit 1c54156d authored by Luca Bigliardi's avatar Luca Bigliardi
Browse files

Merge branch 'devel-2.1'



Conflicts:
	daemons/ganeti-noded
	lib/daemon.py
	lib/rapi/baserlib.py
	lib/rapi/rlib2.py
	lib/utils.py
Signed-off-by: default avatarLuca Bigliardi <shammash@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parents 40523663 20601361
......@@ -353,10 +353,12 @@ python_tests = \
test/ganeti.opcodes_unittest.py \
test/ganeti.rapi.client_unittest.py \
test/ganeti.rapi.resources_unittest.py \
test/ganeti.rapi.rlib2_unittest.py \
test/ganeti.serializer_unittest.py \
test/ganeti.ssh_unittest.py \
test/ganeti.uidpool_unittest.py \
test/ganeti.utils_unittest.py \
test/ganeti.utils_mlockall_unittest.py \
test/ganeti.workerpool_unittest.py \
test/docs_unittest.py \
test/tempfile_fork_unittest.py
......
......@@ -108,6 +108,17 @@ def _DecodeImportExportIO(ieio, ieioargs):
return ieioargs
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
"""Custom Request Executor class that ensures NodeHttpServer children are
locked in ram.
"""
def __init__(self, *args, **kwargs):
utils.Mlockall()
http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
class NodeHttpServer(http.server.HttpServer):
"""The server implementation.
......@@ -895,6 +906,8 @@ def ExecNoded(options, _):
"""Main node daemon function, executed with the PID file held.
"""
utils.Mlockall()
# Read SSL certificate
if options.ssl:
ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
......@@ -911,7 +924,8 @@ def ExecNoded(options, _):
mainloop = daemon.Mainloop()
server = NodeHttpServer(mainloop, options.bind_address, options.port,
ssl_params=ssl_params, ssl_verify_peer=True)
ssl_params=ssl_params, ssl_verify_peer=True,
request_executor_class=MlockallRequestExecutor)
server.Start()
try:
mainloop.Run()
......@@ -934,7 +948,8 @@ def main():
dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
default_ssl_cert=constants.NODED_CERT_FILE,
default_ssl_key=constants.NODED_CERT_FILE)
default_ssl_key=constants.NODED_CERT_FILE,
console_logging=True)
if __name__ == '__main__':
......
......@@ -292,6 +292,35 @@ wasn't closed during the timeout, the waiting function returns to its
caller nonetheless.
Node daemon availability
~~~~~~~~~~~~~~~~~~~~~~~~
Current State and shortcomings
++++++++++++++++++++++++++++++
Currently, when a Ganeti node suffers serious system disk damage, the
migration/failover of an instance may not correctly shutdown the virtual
machine on the broken node causing instances duplication. The ``gnt-node
powercycle`` command can be used to force a node reboot and thus to
avoid duplicated instances. This command relies on node daemon
availability, though, and thus can fail if the node daemon has some
pages swapped out of ram, for example.
Proposed changes
++++++++++++++++
The proposed solution forces node daemon to run exclusively in RAM. It
uses python ctypes to to call ``mlockall(MCL_CURRENT | MCL_FUTURE)`` on
the node daemon process and all its children. In addition another log
handler has been implemented for node daemon to redirect to
``/dev/console`` messages that cannot be written on the logfile.
With these changes node daemon can successfully run basic tasks such as
a powercycle request even when the system disk is heavily damaged and
reading/writing to disk fails constantly.
Feature changes
---------------
......
......@@ -136,6 +136,11 @@ Usage examples
You can access the API using your favorite programming language as long
as it supports network connections.
Ganeti RAPI client
++++++++++++++++++
Ganeti includes a standalone RAPI client, ``lib/rapi/client.py``.
Shell
+++++
......@@ -280,6 +285,19 @@ It supports the following commands: ``PUT``.
Redistribute configuration to all nodes. The result will be a job id.
``/2/features``
+++++++++++++++
``GET``
~~~~~~~
Returns a list of features supported by the RAPI server. Available
features:
``instance-create-reqv1``
Instance creation request data version 1 supported.
``/2/instances``
++++++++++++++++
......@@ -351,6 +369,57 @@ nodes selected for the instance.
Returns: a job ID that can be used later for polling.
Body parameters:
``__version__`` (int, required)
Must be ``1`` (older Ganeti versions used a different format for
instance creation requests, version ``0``, but that format is not
documented).
``name`` (string, required)
Instance name
``disk_template`` (string, required)
Disk template for instance
``disks`` (list, required)
List of disk definitions. Example: ``[{"size": 100}, {"size": 5}]``.
Each disk definition must contain a ``size`` value and can contain an
optional ``mode`` value denoting the disk access mode (``ro`` or
``rw``).
``nics`` (list, required)
List of NIC (network interface) definitions. Example: ``[{}, {},
{"ip": "1.2.3.4"}]``. Each NIC definition can contain the optional
values ``ip``, ``mode``, ``link`` and ``bridge``.
``os`` (string)
Instance operating system.
``force_variant`` (bool)
Whether to force an unknown variant.
``pnode`` (string)
Primary node.
``snode`` (string)
Secondary node.
``src_node`` (string)
Source node for import.
``src_path`` (string)
Source directory for import.
``start`` (bool)
Whether to start instance after creation.
``ip_check`` (bool)
Whether to ensure instance's IP address is inactive.
``name_check`` (bool)
Whether to ensure instance's name is resolvable.
``file_storage_dir`` (string)
File storage directory.
``file_driver`` (string)
File storage driver.
``iallocator`` (string)
Instance allocator name.
``hypervisor`` (string)
Hypervisor name.
``hvparams`` (dict)
Hypervisor parameters, hypervisor-dependent.
``beparams``
Backend parameters.
``/2/instances/[instance_name]``
++++++++++++++++++++++++++++++++
......
......@@ -2916,6 +2916,11 @@ def PowercycleNode(hypervisor_type):
pid = 0
if pid > 0:
return "Reboot scheduled in 5 seconds"
# ensure the child is running on ram
try:
utils.Mlockall()
except Exception: # pylint: disable-msg=W0703
pass
time.sleep(5)
hyper.PowercycleNode()
......
......@@ -168,6 +168,8 @@ LOG_WATCHER = LOG_DIR + "watcher.log"
LOG_COMMANDS = LOG_DIR + "commands.log"
LOG_BURNIN = LOG_DIR + "burnin.log"
DEV_CONSOLE = "/dev/console"
# one of 'no', 'yes', 'only'
SYSLOG_USAGE = _autoconf.SYSLOG_USAGE
SYSLOG_NO = "no"
......
......@@ -242,7 +242,7 @@ class Mainloop(object):
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
multithreaded=False,
multithreaded=False, console_logging=False,
default_ssl_cert=None, default_ssl_key=None):
"""Shared main function for daemons.
......@@ -262,6 +262,9 @@ def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
runs the daemon itself.
@type multithreaded: bool
@param multithreaded: Whether the daemon uses threads
@type console_logging: boolean
@param console_logging: if True, the daemon will fall back to the system
console if logging fails
@type default_ssl_cert: string
@param default_ssl_cert: Default SSL certificate path
@type default_ssl_key: string
......@@ -345,7 +348,8 @@ def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
stderr_logging=not options.fork,
multithreaded=multithreaded,
program=daemon_name,
syslog=options.syslog)
syslog=options.syslog,
console_logging=console_logging)
logging.info("%s daemon startup", daemon_name)
exec_fn(options, args)
finally:
......
......@@ -38,6 +38,10 @@ from ganeti import opcodes
from ganeti import errors
# Dummy value to detect unchanged parameters
_DEFAULT = object()
def BuildUriList(ids, uri_format, uri_fields=("name", "uri")):
"""Builds a URI list as used by index resources.
......@@ -213,6 +217,53 @@ def FeedbackFn(ts, log_type, log_msg): # pylint: disable-msg=W0613
logging.info("%s: %s", log_type, log_msg)
def CheckType(value, exptype, descr):
"""Abort request if value type doesn't match expected type.
@param value: Value
@type exptype: type
@param exptype: Expected type
@type descr: string
@param descr: Description of value
@return: Value (allows inline usage)
"""
if not isinstance(value, exptype):
raise http.HttpBadRequest("%s: Type is '%s', but '%s' is expected" %
(descr, type(value).__name__, exptype.__name__))
return value
def CheckParameter(data, name, default=_DEFAULT, exptype=_DEFAULT):
"""Check and return the value for a given parameter.
If no default value was given and the parameter doesn't exist in the input
data, an error is raise.
@type data: dict
@param data: Dictionary containing input data
@type name: string
@param name: Parameter name
@param default: Default value (can be None)
@param exptype: Expected type (can be None)
"""
try:
value = data[name]
except KeyError:
if default is not _DEFAULT:
return default
raise http.HttpBadRequest("Required parameter '%s' is missing" %
name)
if exptype is _DEFAULT:
return value
return CheckType(value, exptype, "'%s' parameter" % name)
class R_Generic(object):
"""Generic class for resources.
......@@ -280,13 +331,10 @@ class R_Generic(object):
@param name: the required parameter
"""
try:
return self.request_body[name]
except KeyError:
if args:
return args[0]
if args:
return CheckParameter(self.request_body, name, default=args[0])
raise http.HttpBadRequest("Required parameter '%s' is missing" % name)
return CheckParameter(self.request_body, name)
def useLocking(self):
"""Check if the request specifies locking.
......
......@@ -21,6 +21,9 @@
"""Ganeti RAPI client."""
# No Ganeti-specific modules should be imported. The RAPI client is supposed to
# be standalone.
import httplib
import urllib2
import logging
......@@ -39,6 +42,7 @@ HTTP_GET = "GET"
HTTP_PUT = "PUT"
HTTP_POST = "POST"
HTTP_OK = 200
HTTP_NOT_FOUND = 404
HTTP_APP_JSON = "application/json"
REPLACE_DISK_PRI = "replace_on_primary"
......@@ -52,6 +56,10 @@ NODE_ROLE_MASTER = "master"
NODE_ROLE_OFFLINE = "offline"
NODE_ROLE_REGULAR = "regular"
# Internal constants
_REQ_DATA_VERSION_FIELD = "__version__"
_INST_CREATE_REQV1 = "instance-create-reqv1"
class Error(Exception):
"""Base error class for this module.
......@@ -433,6 +441,23 @@ class GanetiRapiClient(object):
"""
return self._SendRequest(HTTP_GET, "/version", None, None)
def GetFeatures(self):
"""Gets the list of optional features supported by RAPI server.
@rtype: list
@return: List of optional features
"""
try:
return self._SendRequest(HTTP_GET, "/%s/features" % GANETI_RAPI_VERSION,
None, None)
except GanetiApiError, err:
# Older RAPI servers don't support this resource
if err.code == HTTP_NOT_FOUND:
return []
raise
def GetOperatingSystems(self):
"""Gets the Operating Systems running in the Ganeti cluster.
......@@ -534,23 +559,64 @@ class GanetiRapiClient(object):
("/%s/instances/%s" %
(GANETI_RAPI_VERSION, instance)), None, None)
def CreateInstance(self, dry_run=False):
def CreateInstance(self, mode, name, disk_template, disks, nics,
**kwargs):
"""Creates a new instance.
More details for parameters can be found in the RAPI documentation.
@type mode: string
@param mode: Instance creation mode
@type name: string
@param name: Hostname of the instance to create
@type disk_template: string
@param disk_template: Disk template for instance (e.g. plain, diskless,
file, or drbd)
@type disks: list of dicts
@param disks: List of disk definitions
@type nics: list of dicts
@param nics: List of NIC definitions
@type dry_run: bool
@param dry_run: whether to perform a dry run
@keyword dry_run: whether to perform a dry run
@rtype: int
@return: job id
"""
# TODO: Pass arguments needed to actually create an instance.
query = []
if dry_run:
if kwargs.get("dry_run"):
query.append(("dry-run", 1))
if _INST_CREATE_REQV1 in self.GetFeatures():
# All required fields for request data version 1
body = {
_REQ_DATA_VERSION_FIELD: 1,
"mode": mode,
"name": name,
"disk_template": disk_template,
"disks": disks,
"nics": nics,
}
conflicts = set(kwargs.iterkeys()) & set(body.iterkeys())
if conflicts:
raise GanetiApiError("Required fields can not be specified as"
" keywords: %s" % ", ".join(conflicts))
body.update((key, value) for key, value in kwargs.iteritems()
if key != "dry_run")
else:
# TODO: Implement instance creation request data version 0
# When implementing version 0, care should be taken to refuse unknown
# parameters and invalid values. The interface of this function must stay
# exactly the same for version 0 and 1 (e.g. they aren't allowed to
# require different data types).
raise NotImplementedError("Support for instance creation request data"
" version 0 is not yet implemented")
return self._SendRequest(HTTP_POST, "/%s/instances" % GANETI_RAPI_VERSION,
query, None)
query, body)
def DeleteInstance(self, instance, dry_run=False):
"""Deletes an instance.
......
......@@ -216,6 +216,7 @@ def GetHandlers(node_name_pattern, instance_name_pattern, job_id_pattern):
"/2/info": rlib2.R_2_info,
"/2/os": rlib2.R_2_os,
"/2/redistribute-config": rlib2.R_2_redist_config,
"/2/features": rlib2.R_2_features,
}
......
......@@ -45,6 +45,7 @@ from ganeti import opcodes
from ganeti import http
from ganeti import constants
from ganeti import cli
from ganeti import utils
from ganeti import rapi
from ganeti.rapi import baserlib
......@@ -83,6 +84,12 @@ _NR_MAP = {
"R": _NR_REGULAR,
}
# Request data version field
_REQ_DATA_VERSION = "__version__"
# Feature string for instance creation request data version 1
_INST_CREATE_REQV1 = "instance-create-reqv1"
# Timeout for /2/jobs/[job_id]/wait. Gives job up to 10 seconds to change.
_WFJC_TIMEOUT = 10
......@@ -115,6 +122,18 @@ class R_2_info(baserlib.R_Generic):
return client.QueryClusterInfo()
class R_2_features(baserlib.R_Generic):
"""/2/features resource.
"""
@staticmethod
def GET():
"""Returns list of optional RAPI features implemented.
"""
return [_INST_CREATE_REQV1]
class R_2_os(baserlib.R_Generic):
"""/2/os resource.
......@@ -471,6 +490,100 @@ class R_2_nodes_name_storage_repair(baserlib.R_Generic):
return baserlib.SubmitJob([op])
def _ParseInstanceCreateRequestVersion1(data, dry_run):
"""Parses an instance creation request version 1.
@rtype: L{opcodes.OpCreateInstance}
@return: Instance creation opcode
"""
# Disks
disks_input = baserlib.CheckParameter(data, "disks", exptype=list)
disks = []
for idx, i in enumerate(disks_input):
baserlib.CheckType(i, dict, "Disk %d specification" % idx)
# Size is mandatory
try:
size = i["size"]
except KeyError:
raise http.HttpBadRequest("Disk %d specification wrong: missing disk"
" size" % idx)
disk = {
"size": size,
}
# Optional disk access mode
try:
disk_access = i["mode"]
except KeyError:
pass
else:
disk["mode"] = disk_access
disks.append(disk)
assert len(disks_input) == len(disks)
# Network interfaces
nics_input = baserlib.CheckParameter(data, "nics", exptype=list)
nics = []
for idx, i in enumerate(nics_input):
baserlib.CheckType(i, dict, "NIC %d specification" % idx)
nic = {}
for field in ["mode", "ip", "link", "bridge"]:
try:
value = i[field]
except KeyError:
continue
nic[field] = value
nics.append(nic)
assert len(nics_input) == len(nics)
# HV/BE parameters
hvparams = baserlib.CheckParameter(data, "hvparams", default={})
utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
beparams = baserlib.CheckParameter(data, "beparams", default={})
utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
return opcodes.OpCreateInstance(
mode=baserlib.CheckParameter(data, "mode"),
instance_name=baserlib.CheckParameter(data, "name"),
os_type=baserlib.CheckParameter(data, "os", default=None),
force_variant=baserlib.CheckParameter(data, "force_variant",
default=False),
pnode=baserlib.CheckParameter(data, "pnode", default=None),
snode=baserlib.CheckParameter(data, "snode", default=None),
disk_template=baserlib.CheckParameter(data, "disk_template"),
disks=disks,
nics=nics,
src_node=baserlib.CheckParameter(data, "src_node", default=None),
src_path=baserlib.CheckParameter(data, "src_path", default=None),
start=baserlib.CheckParameter(data, "start", default=True),
wait_for_sync=True,
ip_check=baserlib.CheckParameter(data, "ip_check", default=True),
name_check=baserlib.CheckParameter(data, "name_check", default=True),
file_storage_dir=baserlib.CheckParameter(data, "file_storage_dir",
default=None),
file_driver=baserlib.CheckParameter(data, "file_driver",
default=constants.FD_LOOP),
iallocator=baserlib.CheckParameter(data, "iallocator", default=None),
hypervisor=baserlib.CheckParameter(data, "hypervisor", default=None),
hvparams=hvparams,
beparams=beparams,
dry_run=dry_run,
)
class R_2_instances(baserlib.R_Generic):
"""/2/instances resource.
......@@ -491,15 +604,16 @@ class R_2_instances(baserlib.R_Generic):
return baserlib.BuildUriList(instanceslist, "/2/instances/%s",
uri_fields=("id", "uri"))
def POST(self):
"""Create an instance.
def _ParseVersion0CreateRequest(self):
"""Parses an instance creation request version 0.
@return: a job id
Request data version 0 is deprecated and should not be used anymore.
"""
if not isinstance(self.request_body, dict):
raise http.HttpBadRequest("Invalid body contents, not a dictionary")
@rtype: L{opcodes.OpCreateInstance}
@return: Instance creation opcode
"""
# Do not modify anymore, request data version 0 is deprecated
beparams = baserlib.MakeParamsDict(self.request_body,
constants.BES_PARAMETERS)
hvparams = baserlib.MakeParamsDict(self.request_body,
......@@ -516,6 +630,7 @@ class R_2_instances(baserlib.R_Generic):
raise http.HttpBadRequest("Disk %d specification wrong: should"
" be an integer" % idx)
disks.append({"size": d})
# nic processing (one nic only)
nics = [{"mac": fn("mac", constants.VALUE_AUTO)}]
if fn("ip", None) is not None:
......@@ -527,7 +642,8 @@ class R_2_instances(baserlib.R_Generic):
if fn("bridge", None) is not None:
nics[0]["bridge"] = fn("bridge")
op = opcodes.OpCreateInstance(
# Do not modify anymore, request data version 0 is deprecated
return opcodes.OpCreateInstance(
mode=constants.INSTANCE_CREATE,
instance_name=fn('name'),
disks=disks,
......@@ -545,10 +661,31 @@ class R_2_instances(baserlib.R_Generic):
hvparams=hvparams,
beparams=beparams,
file_storage_dir=fn('file_storage_dir', None),
file_driver=fn('file_driver', 'loop'),
file_driver=fn('file_driver', constants.FD_LOOP),
dry_run=bool(self.dryRun()),
)
def POST(self):