-
Christos Stavrakakis authored
Add support for specifying a timeout when stopping, rebooting or destroying a server.
399ca467
backend.py 42.45 KiB
# Copyright 2011-2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from django.conf import settings
from django.db import transaction
from datetime import datetime, timedelta
from synnefo.db.models import (Backend, VirtualMachine, Network,
BackendNetwork, BACKEND_STATUSES,
pooled_rapi_client, VirtualMachineDiagnostic,
Flavor, IPAddress, IPAddressLog)
from synnefo.logic import utils, ips
from synnefo import quotas
from synnefo.api.util import release_resource
from synnefo.util.mac2eui64 import mac2eui64
from synnefo.logic import rapi
from logging import getLogger
log = getLogger(__name__)
_firewall_tags = {
'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
UNKNOWN_NIC_PREFIX = "unknown-"
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
"""Handle quotas for updated VirtualMachine.
Update quotas for the updated VirtualMachine based on the job that run on
the Ganeti backend. If a commission has been already issued for this job,
then this commission is just accepted or rejected based on the job status.
Otherwise, a new commission for the given change is issued, that is also in
force and auto-accept mode. In this case, previous commissions are
rejected, since they reflect a previous state of the VM.
"""
if job_status not in rapi.JOB_STATUS_FINALIZED:
return vm
# Check successful completion of a job will trigger any quotable change in
# the VM state.
action = utils.get_action_from_opcode(job_opcode, job_fields)
if action == "BUILD":
# Quotas for new VMs are automatically accepted by the API
return vm
if vm.task_job_id == job_id and vm.serial is not None:
# Commission for this change has already been issued. So just
# accept/reject it. Special case is OP_INSTANCE_CREATE, which even
# if fails, must be accepted, as the user must manually remove the
# failed server
serial = vm.serial
if job_status == rapi.JOB_STATUS_SUCCESS:
quotas.accept_serial(serial)
elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
log.debug("Job %s failed. Rejecting related serial %s", job_id,
serial)
quotas.reject_serial(serial)
vm.serial = None
elif job_status == rapi.JOB_STATUS_SUCCESS:
commission_info = quotas.get_commission_info(resource=vm,
action=action,
action_fields=job_fields)
if commission_info is not None:
# Commission for this change has not been issued, or the issued
# commission was unaware of the current change. Reject all previous
# commissions and create a new one in forced mode!
log.debug("Expected job was %s. Processing job %s.",
vm.task_job_id, job_id)
reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
% (vm, job_id))
quotas.handle_resource_commission(vm, action,
action_fields=job_fields,
commission_name=reason,
force=True,
auto_accept=True)
log.debug("Issued new commission: %s", vm.serial)
return vm
@transaction.commit_on_success
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
job_fields=None):
"""Process a job progress notification from the backend
Process an incoming message from the backend (currently Ganeti).
Job notifications with a terminating status (sucess, error, or canceled),
also update the operating state of the VM.
"""
# See #1492, #1031, #1111 why this line has been removed
#if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
if status not in [x[0] for x in BACKEND_STATUSES]:
raise VirtualMachine.InvalidBackendMsgError(opcode, status)
vm.backendjobid = jobid
vm.backendjobstatus = status
vm.backendopcode = opcode
vm.backendlogmsg = logmsg
if status not in rapi.JOB_STATUS_FINALIZED:
vm.save()
return
if job_fields is None:
job_fields = {}
new_operstate = None
new_flavor = None
state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
if status == rapi.JOB_STATUS_SUCCESS:
# If job succeeds, change operating state if needed
if state_for_success is not None:
new_operstate = state_for_success
beparams = job_fields.get("beparams", None)
if beparams:
# Change the flavor of the VM
new_flavor = _process_resize(vm, beparams)
# Update backendtime only for jobs that have been successfully
# completed, since only these jobs update the state of the VM. Else a
# "race condition" may occur when a successful job (e.g.
# OP_INSTANCE_REMOVE) completes before an error job and messages arrive
# in reversed order.
vm.backendtime = etime
if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
# Update the NICs of the VM
_process_net_status(vm, etime, nics)
# Special case: if OP_INSTANCE_CREATE fails --> ERROR
if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
rapi.JOB_STATUS_ERROR):
new_operstate = "ERROR"
vm.backendtime = etime
# Update state of associated NICs
vm.nics.all().update(state="ERROR")
elif opcode == 'OP_INSTANCE_REMOVE':
# Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
# when no instance exists at the Ganeti backend.
# See ticket #799 for all the details.
if (status == rapi.JOB_STATUS_SUCCESS or
(status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
# VM has been deleted
for nic in vm.nics.all():
# Release the IP
remove_nic_ips(nic)
# And delete the NIC.
nic.delete()
vm.deleted = True
new_operstate = state_for_success
vm.backendtime = etime
status = rapi.JOB_STATUS_SUCCESS
if status in rapi.JOB_STATUS_FINALIZED:
# Job is finalized: Handle quotas/commissioning
vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
job_status=status, job_fields=job_fields)
# and clear task fields
if vm.task_job_id == jobid:
vm.task = None
vm.task_job_id = None
if new_operstate is not None:
vm.operstate = new_operstate
if new_flavor is not None:
vm.flavor = new_flavor
vm.save()
def _process_resize(vm, beparams):
"""Change flavor of a VirtualMachine based on new beparams."""
old_flavor = vm.flavor
vcpus = beparams.get("vcpus", old_flavor.cpu)
ram = beparams.get("maxmem", old_flavor.ram)
if vcpus == old_flavor.cpu and ram == old_flavor.ram:
return
try:
new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
disk=old_flavor.disk,
disk_template=old_flavor.disk_template)
except Flavor.DoesNotExist:
raise Exception("Cannot find flavor for VM")
return new_flavor
@transaction.commit_on_success
def process_net_status(vm, etime, nics):
"""Wrap _process_net_status inside transaction."""
_process_net_status(vm, etime, nics)
def _process_net_status(vm, etime, nics):
"""Process a net status notification from the backend
Process an incoming message from the Ganeti backend,
detailing the NIC configuration of a VM instance.
Update the state of the VM in the DB accordingly.
"""
ganeti_nics = process_ganeti_nics(nics)
db_nics = dict([(nic.id, nic)
for nic in vm.nics.select_related("network")
.prefetch_related("ips")])
for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
db_nic = db_nics.get(nic_name)
ganeti_nic = ganeti_nics.get(nic_name)
if ganeti_nic is None:
if nic_is_stale(vm, nic):
log.debug("Removing stale NIC '%s'" % db_nic)
remove_nic_ips(db_nic)
db_nic.delete()
else:
log.info("NIC '%s' is still being created" % db_nic)
elif db_nic is None:
msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
" fix this issue!" % (nic_name, vm))
log.error(msg)
continue
elif not nics_are_equal(db_nic, ganeti_nic):
for f in SIMPLE_NIC_FIELDS:
# Update the NIC in DB with the values from Ganeti NIC
setattr(db_nic, f, ganeti_nic[f])
db_nic.save()
# Special case where the IPv4 address has changed, because you
# need to release the old IPv4 address and reserve the new one
gnt_ipv4_address = ganeti_nic["ipv4_address"]
db_ipv4_address = db_nic.ipv4_address
if db_ipv4_address != gnt_ipv4_address:
change_address_of_port(db_nic, vm.userid,
old_address=db_ipv4_address,
new_address=gnt_ipv4_address,
version=4)
gnt_ipv6_address = ganeti_nic["ipv6_address"]
db_ipv6_address = db_nic.ipv6_address
if db_ipv6_address != gnt_ipv6_address:
change_address_of_port(db_nic, vm.userid,
old_address=db_ipv6_address,
new_address=gnt_ipv6_address,
version=6)
vm.backendtime = etime
vm.save()
def change_address_of_port(port, userid, old_address, new_address, version):
"""Change."""
if old_address is not None:
msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
% (version, port.machine_id, old_address, new_address))
log.error(msg)
# Remove the old IP address
remove_nic_ips(port, version=version)
if version == 4:
ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
ipaddress.nic = port
ipaddress.save()
elif version == 6:
subnet6 = port.network.subnet6
ipaddress = IPAddress.objects.create(userid=userid,
network=port.network,
subnet=subnet6,
nic=port,
address=new_address,
ipversion=6)
else:
raise ValueError("Unknown version: %s" % version)
# New address log
ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
network_id=port.network_id,
address=new_address,
active=True)
log.info("Created IP log entry '%s' for address '%s' to server '%s'",
ip_log.id, new_address, port.machine_id)
return ipaddress
def nics_are_equal(db_nic, gnt_nic):
for field in NIC_FIELDS:
if getattr(db_nic, field) != gnt_nic[field]:
return False
return True
def process_ganeti_nics(ganeti_nics):
"""Process NIC dict from ganeti"""
new_nics = []
for index, gnic in enumerate(ganeti_nics):
nic_name = gnic.get("name", None)
if nic_name is not None:
nic_id = utils.id_from_nic_name(nic_name)
else:
# Put as default value the index. If it is an unknown NIC to
# synnefo it will be created automaticaly.
nic_id = UNKNOWN_NIC_PREFIX + str(index)
network_name = gnic.get('network', '')
network_id = utils.id_from_network_name(network_name)
network = Network.objects.get(id=network_id)
# Get the new nic info
mac = gnic.get('mac')
ipv4 = gnic.get('ip')
subnet6 = network.subnet6
ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
firewall = gnic.get('firewall')
firewall_profile = _reverse_tags.get(firewall)
if not firewall_profile and network.public:
firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
nic_info = {
'index': index,
'network': network,
'mac': mac,
'ipv4_address': ipv4,
'ipv6_address': ipv6,
'firewall_profile': firewall_profile,
'state': 'ACTIVE'}
new_nics.append((nic_id, nic_info))
return dict(new_nics)
def remove_nic_ips(nic, version=None):
"""Remove IP addresses associated with a NetworkInterface.
Remove all IP addresses that are associated with the NetworkInterface
object, by returning them to the pool and deleting the IPAddress object. If
the IP is a floating IP, then it is just disassociated from the NIC.
If version is specified, then only IP addressses of that version will be
removed.
"""
for ip in nic.ips.all():
if version and ip.ipversion != version:
continue
# Update the DB table holding the logging of all IP addresses
terminate_active_ipaddress_log(nic, ip)
if ip.floating_ip:
ip.nic = None
ip.save()
else:
# Release the IPv4 address
ip.release_address()
ip.delete()
def terminate_active_ipaddress_log(nic, ip):
"""Update DB logging entry for this IP address."""
if not ip.network.public or nic.machine is None:
return
try:
ip_log, created = \
IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
network_id=ip.network_id,
address=ip.address,
active=True)
except IPAddressLog.MultipleObjectsReturned:
logmsg = ("Multiple active log entries for IP %s, Network %s,"
"Server %s. Cannot proceed!"
% (ip.address, ip.network, nic.machine))
log.error(logmsg)
raise
if created:
logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
" but with wrong creation timestamp."
% (ip.address, ip.network, nic.machine))
log.error(logmsg)
ip_log.released_at = datetime.now()
ip_log.active = False
ip_log.save()
@transaction.commit_on_success
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
if status not in [x[0] for x in BACKEND_STATUSES]:
raise Network.InvalidBackendMsgError(opcode, status)
back_network.backendjobid = jobid
back_network.backendjobstatus = status
back_network.backendopcode = opcode
back_network.backendlogmsg = logmsg
# Note: Network is already locked!
network = back_network.network
# Notifications of success change the operating state
state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
back_network.operstate = state_for_success
if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
and opcode == 'OP_NETWORK_ADD'):
back_network.operstate = 'ERROR'
back_network.backendtime = etime
if opcode == 'OP_NETWORK_REMOVE':
network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
network_exists_in_backend(back_network)):
back_network.operstate = state_for_success
back_network.deleted = True
back_network.backendtime = etime
if status == rapi.JOB_STATUS_SUCCESS:
back_network.backendtime = etime
back_network.save()
# Also you must update the state of the Network!!
update_network_state(network)
def update_network_state(network):
"""Update the state of a Network based on BackendNetwork states.
Update the state of a Network based on the operstate of the networks in the
backends that network exists.
The state of the network is:
* ACTIVE: If it is 'ACTIVE' in at least one backend.
* DELETED: If it is is 'DELETED' in all backends that have been created.
This function also releases the resources (MAC prefix or Bridge) and the
quotas for the network.
"""
if network.deleted:
# Network has already been deleted. Just assert that state is also
# DELETED
if not network.state == "DELETED":
network.state = "DELETED"
network.save()
return
backend_states = [s.operstate for s in network.backend_networks.all()]
if not backend_states and network.action != "DESTROY":
if network.state != "ACTIVE":
network.state = "ACTIVE"
network.save()
return
# Network is deleted when all BackendNetworks go to "DELETED" operstate
deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
"DELETED")
# Release the resources on the deletion of the Network
if deleted:
if network.ips.filter(deleted=False, floating_ip=True).exists():
msg = "Cannot delete network %s! Floating IPs still in use!"
log.error(msg % network)
raise Exception(msg % network)
log.info("Network %r deleted. Releasing link %r mac_prefix %r",
network.id, network.mac_prefix, network.link)
network.deleted = True
network.state = "DELETED"
# Undrain the network, otherwise the network state will remain
# as 'SNF:DRAINED'
network.drained = False
if network.mac_prefix:
if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
release_resource(res_type="mac_prefix",
value=network.mac_prefix)
if network.link:
if network.FLAVORS[network.flavor]["link"] == "pool":
release_resource(res_type="bridge", value=network.link)
# Set all subnets as deleted
network.subnets.update(deleted=True)
# And delete the IP pools
for subnet in network.subnets.all():
if subnet.ipversion == 4:
subnet.ip_pools.all().delete()
# And all the backend networks since there are useless
network.backend_networks.all().delete()
# Issue commission
if network.userid:
quotas.issue_and_accept_commission(network, action="DESTROY")
# the above has already saved the object and committed;
# a second save would override others' changes, since the
# object is now unlocked
return
elif not network.public:
log.warning("Network %s does not have an owner!", network.id)
network.save()
@transaction.commit_on_success
def process_network_modify(back_network, etime, jobid, opcode, status,
job_fields):
assert (opcode == "OP_NETWORK_SET_PARAMS")
if status not in [x[0] for x in BACKEND_STATUSES]:
raise Network.InvalidBackendMsgError(opcode, status)
back_network.backendjobid = jobid
back_network.backendjobstatus = status
back_network.opcode = opcode
add_reserved_ips = job_fields.get("add_reserved_ips")
if add_reserved_ips:
network = back_network.network
for ip in add_reserved_ips:
network.reserve_address(ip, external=True)
if status == rapi.JOB_STATUS_SUCCESS:
back_network.backendtime = etime
back_network.save()
@transaction.commit_on_success
def process_create_progress(vm, etime, progress):
percentage = int(progress)
# The percentage may exceed 100%, due to the way
# snf-image:copy-progress tracks bytes read by image handling processes
percentage = 100 if percentage > 100 else percentage
if percentage < 0:
raise ValueError("Percentage cannot be negative")
# FIXME: log a warning here, see #1033
# if last_update > percentage:
# raise ValueError("Build percentage should increase monotonically " \
# "(old = %d, new = %d)" % (last_update, percentage))
# This assumes that no message of type 'ganeti-create-progress' is going to
# arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
# the instance is STARTED. What if the two messages are processed by two
# separate dispatcher threads, and the 'ganeti-op-status' message for
# successful creation gets processed before the 'ganeti-create-progress'
# message? [vkoukis]
#
#if not vm.operstate == 'BUILD':
# raise VirtualMachine.IllegalState("VM is not in building state")
vm.buildpercentage = percentage
vm.backendtime = etime
vm.save()
@transaction.commit_on_success
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
details=None):
"""
Create virtual machine instance diagnostic entry.
:param vm: VirtualMachine instance to create diagnostic for.
:param message: Diagnostic message.
:param source: Diagnostic source identifier (e.g. image-helper).
:param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
:param etime: The time the message occured (if available).
:param details: Additional details or debug information.
"""
VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
source_date=etime,
message=message,
details=details)
def create_instance(vm, nics, flavor, image):
"""`image` is a dictionary which should contain the keys:
'backend_id', 'format' and 'metadata'
metadata value should be a dictionary.
"""
# Handle arguments to CreateInstance() as a dictionary,
# initialize it based on a deployment-specific value.
# This enables the administrator to override deployment-specific
# arguments, such as the disk template to use, name of os provider
# and hypervisor-specific parameters at will (see Synnefo #785, #835).
#
kw = vm.backend.get_create_params()
kw['mode'] = 'create'
kw['name'] = vm.backend_vm_id
# Defined in settings.GANETI_CREATEINSTANCE_KWARGS
kw['disk_template'] = flavor.disk_template
kw['disks'] = [{"size": flavor.disk * 1024}]
provider = flavor.disk_provider
if provider:
kw['disks'][0]['provider'] = provider
kw['disks'][0]['origin'] = flavor.disk_origin
extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS.get(provider)
if extra_disk_params is not None:
kw["disks"][0].update(extra_disk_params)
kw['nics'] = [{"name": nic.backend_uuid,
"network": nic.network.backend_id,
"ip": nic.ipv4_address}
for nic in nics]
backend = vm.backend
depend_jobs = []
for nic in nics:
bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
depend_jobs.extend(job_ids)
kw["depends"] = create_job_dependencies(depend_jobs)
# Defined in settings.GANETI_CREATEINSTANCE_KWARGS
# kw['os'] = settings.GANETI_OS_PROVIDER
kw['ip_check'] = False
kw['name_check'] = False
# Do not specific a node explicitly, have
# Ganeti use an iallocator instead
#kw['pnode'] = rapi.GetNodes()[0]
kw['dry_run'] = settings.TEST
kw['beparams'] = {
'auto_balance': True,
'vcpus': flavor.cpu,
'memory': flavor.ram}
kw['osparams'] = {
'config_url': vm.config_url,
# Store image id and format to Ganeti
'img_id': image['backend_id'],
'img_format': image['format']}
# Use opportunistic locking
kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
# Defined in settings.GANETI_CREATEINSTANCE_KWARGS
# kw['hvparams'] = dict(serial_console=False)
log.debug("Creating instance %s", utils.hide_pass(kw))
with pooled_rapi_client(vm) as client:
return client.CreateInstance(**kw)
def delete_instance(vm, shutdown_timeout=None):
with pooled_rapi_client(vm) as client:
return client.DeleteInstance(vm.backend_vm_id,
shutdown_timeout=shutdown_timeout,
dry_run=settings.TEST)
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
assert reboot_type in ('soft', 'hard')
# Note that reboot type of Ganeti job must be always hard. The 'soft' and
# 'hard' type of OS API is different from the one in Ganeti, and maps to
# 'shutdown_timeout'.
kwargs = {"instance": vm.backend_vm_id,
"reboot_type": "hard"}
# 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
# Ganeti > 2.10. In other versions this parameter will be ignored and
# we will fallback to default timeout of Ganeti (120s).
if shutdown_timeout is not None:
kwargs["shutdown_timeout"] = shutdown_timeout
if reboot_type == "hard":
kwargs["shutdown_timeout"] = 0
if settings.TEST:
kwargs["dry_run"] = True
with pooled_rapi_client(vm) as client:
return client.RebootInstance(**kwargs)
def startup_instance(vm):
with pooled_rapi_client(vm) as client:
return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
def shutdown_instance(vm, shutdown_timeout=None):
with pooled_rapi_client(vm) as client:
return client.ShutdownInstance(vm.backend_vm_id,
timeout=shutdown_timeout,
dry_run=settings.TEST)
def resize_instance(vm, vcpus, memory):
beparams = {"vcpus": int(vcpus),
"minmem": int(memory),
"maxmem": int(memory)}
with pooled_rapi_client(vm) as client:
return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
def get_instance_console(vm):
# RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
# which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
# useless (see #783).
#
# Until this is fixed on the Ganeti side, construct a console info reply
# directly.
#
# WARNING: This assumes that VNC runs on port network_port on
# the instance's primary node, and is probably
# hypervisor-specific.
#
log.debug("Getting console for vm %s", vm)
console = {}
console['kind'] = 'vnc'
with pooled_rapi_client(vm) as client:
i = client.GetInstance(vm.backend_vm_id)
if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
raise Exception("hv parameter serial_console cannot be true")
console['host'] = i['pnode']
console['port'] = i['network_port']
return console
def get_instance_info(vm):
with pooled_rapi_client(vm) as client:
return client.GetInstance(vm.backend_vm_id)
def vm_exists_in_backend(vm):
try:
get_instance_info(vm)
return True
except rapi.GanetiApiError as e:
if e.code == 404:
return False
raise e
def get_network_info(backend_network):
with pooled_rapi_client(backend_network) as client:
return client.GetNetwork(backend_network.network.backend_id)
def network_exists_in_backend(backend_network):
try:
get_network_info(backend_network)
return True
except rapi.GanetiApiError as e:
if e.code == 404:
return False
def job_is_still_running(vm, job_id=None):
with pooled_rapi_client(vm) as c:
try:
if job_id is None:
job_id = vm.backendjobid
job_info = c.GetJobStatus(job_id)
return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
except rapi.GanetiApiError:
return False
def nic_is_stale(vm, nic, timeout=60):
"""Check if a NIC is stale or exists in the Ganeti backend."""
# First check the state of the NIC and if there is a pending CONNECT
if nic.state == "BUILD" and vm.task == "CONNECT":
if datetime.now() < nic.created + timedelta(seconds=timeout):
# Do not check for too recent NICs to avoid the time overhead
return False
if job_is_still_running(vm, job_id=vm.task_job_id):
return False
else:
# If job has finished, check that the NIC exists, because the
# message may have been lost or stuck in the queue.
vm_info = get_instance_info(vm)
if nic.backend_uuid in vm_info["nic.names"]:
return False
return True
def ensure_network_is_active(backend, network_id):
"""Ensure that a network is active in the specified backend
Check that a network exists and is active in the specified backend. If not
(re-)create the network. Return the corresponding BackendNetwork object
and the IDs of the Ganeti job to create the network.
"""
job_ids = []
try:
bnet = BackendNetwork.objects.select_related("network")\
.get(backend=backend, network=network_id)
if bnet.operstate != "ACTIVE":
job_ids = create_network(bnet.network, backend, connect=True)
except BackendNetwork.DoesNotExist:
network = Network.objects.select_for_update().get(id=network_id)
bnet = BackendNetwork.objects.create(backend=backend, network=network)
job_ids = create_network(network, backend, connect=True)
return bnet, job_ids
def create_network(network, backend, connect=True):
"""Create a network in a Ganeti backend"""
log.debug("Creating network %s in backend %s", network, backend)
job_id = _create_network(network, backend)
if connect:
job_ids = connect_network(network, backend, depends=[job_id])
return job_ids
else:
return [job_id]
def _create_network(network, backend):
"""Create a network."""
tags = network.backend_tag
subnet = None
subnet6 = None
gateway = None
gateway6 = None
for _subnet in network.subnets.all():
if _subnet.dhcp and not "nfdhcpd" in tags:
tags.append("nfdhcpd")
if _subnet.ipversion == 4:
subnet = _subnet.cidr
gateway = _subnet.gateway
elif _subnet.ipversion == 6:
subnet6 = _subnet.cidr
gateway6 = _subnet.gateway
conflicts_check = False
if network.public:
tags.append('public')
if subnet is not None:
conflicts_check = True
else:
tags.append('private')
# Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
# not support IPv6 only networks. To bypass this limitation, we create the
# network with a dummy network subnet, and make Cyclades connect instances
# to such networks, with address=None.
if subnet is None:
subnet = "10.0.0.0/29"
try:
bn = BackendNetwork.objects.get(network=network, backend=backend)
mac_prefix = bn.mac_prefix
except BackendNetwork.DoesNotExist:
raise Exception("BackendNetwork for network '%s' in backend '%s'"
" does not exist" % (network.id, backend.id))
with pooled_rapi_client(backend) as client:
return client.CreateNetwork(network_name=network.backend_id,
network=subnet,
network6=subnet6,
gateway=gateway,
gateway6=gateway6,
mac_prefix=mac_prefix,
conflicts_check=conflicts_check,
tags=tags)
def connect_network(network, backend, depends=[], group=None):
"""Connect a network to nodegroups."""
log.debug("Connecting network %s to backend %s", network, backend)
conflicts_check = False
if network.public and (network.subnet4 is not None):
conflicts_check = True
depends = create_job_dependencies(depends)
with pooled_rapi_client(backend) as client:
groups = [group] if group is not None else client.GetGroups()
job_ids = []
for group in groups:
job_id = client.ConnectNetwork(network.backend_id, group,
network.mode, network.link,
conflicts_check,
depends=depends)
job_ids.append(job_id)
return job_ids
def delete_network(network, backend, disconnect=True):
log.debug("Deleting network %s from backend %s", network, backend)
depends = []
if disconnect:
depends = disconnect_network(network, backend)
_delete_network(network, backend, depends=depends)
def _delete_network(network, backend, depends=[]):
depends = create_job_dependencies(depends)
with pooled_rapi_client(backend) as client:
return client.DeleteNetwork(network.backend_id, depends)
def disconnect_network(network, backend, group=None):
log.debug("Disconnecting network %s to backend %s", network, backend)
with pooled_rapi_client(backend) as client:
groups = [group] if group is not None else client.GetGroups()
job_ids = []
for group in groups:
job_id = client.DisconnectNetwork(network.backend_id, group)
job_ids.append(job_id)
return job_ids
def connect_to_network(vm, nic):
network = nic.network
backend = vm.backend
bnet, depend_jobs = ensure_network_is_active(backend, network.id)
depends = create_job_dependencies(depend_jobs)
nic = {'name': nic.backend_uuid,
'network': network.backend_id,
'ip': nic.ipv4_address}
log.debug("Adding NIC %s to VM %s", nic, vm)
kwargs = {
"instance": vm.backend_vm_id,
"nics": [("add", "-1", nic)],
"depends": depends,
}
if vm.backend.use_hotplug():
kwargs["hotplug_if_possible"] = True
if settings.TEST:
kwargs["dry_run"] = True
with pooled_rapi_client(vm) as client:
return client.ModifyInstance(**kwargs)
def disconnect_from_network(vm, nic):
log.debug("Removing NIC %s of VM %s", nic, vm)
kwargs = {
"instance": vm.backend_vm_id,
"nics": [("remove", nic.backend_uuid, {})],
}
if vm.backend.use_hotplug():
kwargs["hotplug_if_possible"] = True
if settings.TEST:
kwargs["dry_run"] = True
with pooled_rapi_client(vm) as client:
jobID = client.ModifyInstance(**kwargs)
firewall_profile = nic.firewall_profile
if firewall_profile and firewall_profile != "DISABLED":
tag = _firewall_tags[firewall_profile] % nic.backend_uuid
client.DeleteInstanceTags(vm.backend_vm_id, [tag],
dry_run=settings.TEST)
return jobID
def set_firewall_profile(vm, profile, nic):
uuid = nic.backend_uuid
try:
tag = _firewall_tags[profile] % uuid
except KeyError:
raise ValueError("Unsopported Firewall Profile: %s" % profile)
log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
with pooled_rapi_client(vm) as client:
# Delete previous firewall tags
old_tags = client.GetInstanceTags(vm.backend_vm_id)
delete_tags = [(t % uuid) for t in _firewall_tags.values()
if (t % uuid) in old_tags]
if delete_tags:
client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
dry_run=settings.TEST)
if profile != "DISABLED":
client.AddInstanceTags(vm.backend_vm_id, [tag],
dry_run=settings.TEST)
# XXX NOP ModifyInstance call to force process_net_status to run
# on the dispatcher
os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
client.ModifyInstance(vm.backend_vm_id,
os_name=os_name)
return None
def get_instances(backend, bulk=True):
with pooled_rapi_client(backend) as c:
return c.GetInstances(bulk=bulk)
def get_nodes(backend, bulk=True):
with pooled_rapi_client(backend) as c:
return c.GetNodes(bulk=bulk)
def get_jobs(backend, bulk=True):
with pooled_rapi_client(backend) as c:
return c.GetJobs(bulk=bulk)
def get_physical_resources(backend):
""" Get the physical resources of a backend.
Get the resources of a backend as reported by the backend (not the db).
"""
nodes = get_nodes(backend, bulk=True)
attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
res = {}
for a in attr:
res[a] = 0
for n in nodes:
# Filter out drained, offline and not vm_capable nodes since they will
# not take part in the vm allocation process
can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
if can_host_vms and n['cnodes']:
for a in attr:
res[a] += int(n[a] or 0)
return res
def update_backend_resources(backend, resources=None):
""" Update the state of the backend resources in db.
"""
if not resources:
resources = get_physical_resources(backend)
backend.mfree = resources['mfree']
backend.mtotal = resources['mtotal']
backend.dfree = resources['dfree']
backend.dtotal = resources['dtotal']
backend.pinst_cnt = resources['pinst_cnt']
backend.ctotal = resources['ctotal']
backend.updated = datetime.now()
backend.save()
def get_memory_from_instances(backend):
""" Get the memory that is used from instances.
Get the used memory of a backend. Note: This is different for
the real memory used, due to kvm's memory de-duplication.
"""
with pooled_rapi_client(backend) as client:
instances = client.GetInstances(bulk=True)
mem = 0
for i in instances:
mem += i['oper_ram']
return mem
def get_available_disk_templates(backend):
"""Get the list of available disk templates of a Ganeti backend.
The list contains the disk templates that are enabled in the Ganeti backend
and also included in ipolicy-disk-templates.
"""
with pooled_rapi_client(backend) as c:
info = c.GetInfo()
ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
try:
enabled_disk_templates = info["enabled_disk_templates"]
return [dp for dp in enabled_disk_templates
if dp in ipolicy_disk_templates]
except KeyError:
# Ganeti < 2.8 does not have 'enabled_disk_templates'
return ipolicy_disk_templates
def update_backend_disk_templates(backend):
disk_templates = get_available_disk_templates(backend)
backend.disk_templates = disk_templates
backend.save()
##
## Synchronized operations for reconciliation
##
def create_network_synced(network, backend):
result = _create_network_synced(network, backend)
if result[0] != rapi.JOB_STATUS_SUCCESS:
return result
result = connect_network_synced(network, backend)
return result
def _create_network_synced(network, backend):
with pooled_rapi_client(backend) as client:
job = _create_network(network, backend)
result = wait_for_job(client, job)
return result
def connect_network_synced(network, backend):
with pooled_rapi_client(backend) as client:
for group in client.GetGroups():
job = client.ConnectNetwork(network.backend_id, group,
network.mode, network.link)
result = wait_for_job(client, job)
if result[0] != rapi.JOB_STATUS_SUCCESS:
return result
return result
def wait_for_job(client, jobid):
result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
status = result['job_info'][0]
while status not in rapi.JOB_STATUS_FINALIZED:
result = client.WaitForJobChange(jobid, ['status', 'opresult'],
[result], None)
status = result['job_info'][0]
if status == rapi.JOB_STATUS_SUCCESS:
return (status, None)
else:
error = result['job_info'][1]
return (status, error)
def create_job_dependencies(job_ids=[], job_states=None):
"""Transform a list of job IDs to Ganeti 'depends' attribute."""
if job_states is None:
job_states = list(rapi.JOB_STATUS_FINALIZED)
assert(type(job_states) == list)
return [[job_id, job_states] for job_id in job_ids]