backend.py 50.8 KB
Newer Older
Vangelis Koukis's avatar
Vangelis Koukis committed
1
# Copyright (C) 2010-2014 GRNET S.A.
2
#
Vangelis Koukis's avatar
Vangelis Koukis committed
3 4 5 6
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
7
#
Vangelis Koukis's avatar
Vangelis Koukis committed
8 9 10 11
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
12
#
Vangelis Koukis's avatar
Vangelis Koukis committed
13 14
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
15
from django.conf import settings
Giorgos Verigakis's avatar
Giorgos Verigakis committed
16
from django.db import transaction
17
from django.utils import simplejson as json
18
from datetime import datetime, timedelta
Giorgos Verigakis's avatar
Giorgos Verigakis committed
19

20
from synnefo.db.models import (VirtualMachine, Network, Volume,
21
                               BackendNetwork, BACKEND_STATUSES,
22
                               pooled_rapi_client, VirtualMachineDiagnostic,
23
                               Flavor, IPAddress, IPAddressLog)
Christos Stavrakakis's avatar
Christos Stavrakakis committed
24
from synnefo.logic import utils, ips
25
from synnefo import quotas
26
from synnefo.api.util import release_resource
27
from synnefo.util.mac2eui64 import mac2eui64
28
from synnefo.logic import rapi
Giorgos Verigakis's avatar
Giorgos Verigakis committed
29

30 31
from logging import getLogger
log = getLogger(__name__)
32

Giorgos Verigakis's avatar
Giorgos Verigakis committed
33

34 35 36 37 38 39 40
_firewall_tags = {
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}

_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())

41 42 43
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
44
DISK_FIELDS = ["status", "size", "index"]
45 46
UNKNOWN_NIC_PREFIX = "unknown-nic-"
UNKNOWN_DISK_PREFIX = "unknown-disk-"
47

48

49 50 51 52 53 54 55 56 57 58 59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
    """Handle quotas for updated VirtualMachine.

    Update quotas for the updated VirtualMachine based on the job that run on
    the Ganeti backend. If a commission has been already issued for this job,
    then this commission is just accepted or rejected based on the job status.
    Otherwise, a new commission for the given change is issued, that is also in
    force and auto-accept mode. In this case, previous commissions are
    rejected, since they reflect a previous state of the VM.

    """
60
    if job_status not in rapi.JOB_STATUS_FINALIZED:
61
        return vm
62 63 64 65

    # Check successful completion of a job will trigger any quotable change in
    # the VM state.
    action = utils.get_action_from_opcode(job_opcode, job_fields)
66 67 68
    if action == "BUILD":
        # Quotas for new VMs are automatically accepted by the API
        return vm
69 70 71

    if vm.task_job_id == job_id and vm.serial is not None:
        # Commission for this change has already been issued. So just
72 73 74
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
        # if fails, must be accepted, as the user must manually remove the
        # failed server
75
        serial = vm.serial
76
        if job_status == rapi.JOB_STATUS_SUCCESS:
77
            quotas.accept_resource_serial(vm)
78
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
79 80
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
                      serial)
81
            quotas.reject_resource_serial(vm)
82 83 84 85 86 87 88 89
    elif job_status == rapi.JOB_STATUS_SUCCESS:
        commission_info = quotas.get_commission_info(resource=vm,
                                                     action=action,
                                                     action_fields=job_fields)
        if commission_info is not None:
            # Commission for this change has not been issued, or the issued
            # commission was unaware of the current change. Reject all previous
            # commissions and create a new one in forced mode!
90 91 92
            log.debug("Expected job was %s. Processing job %s. "
                      "Attached serial %s",
                      vm.task_job_id, job_id, vm.serial)
93 94
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
                      % (vm, job_id))
95 96 97 98 99 100 101 102 103 104
            try:
                serial = quotas.handle_resource_commission(
                    vm, action,
                    action_fields=job_fields,
                    commission_name=reason,
                    force=True,
                    auto_accept=True)
            except:
                log.exception("Error while handling new commission")
                raise
105
            log.debug("Issued new commission: %s", serial)
106 107 108
    return vm


109
@transaction.commit_on_success
110
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
111
                      disks=None, job_fields=None):
112
    """Process a job progress notification from the backend
113 114 115 116 117 118

    Process an incoming message from the backend (currently Ganeti).
    Job notifications with a terminating status (sucess, error, or canceled),
    also update the operating state of the VM.

    """
119
    # See #1492, #1031, #1111 why this line has been removed
120
    # if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
121
    if status not in [x[0] for x in BACKEND_STATUSES]:
122 123
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)

124 125
    if opcode == "OP_INSTANCE_SNAPSHOT":
        for disk_id, disk_info in job_fields.get("disks", []):
126 127 128 129 130 131
            snap_info = disk_info.get("snasphot_info", None)
            if snap_info is not None:
                snap_info = json.loads(snap_info)
                snap_id = snap_info["snapshot_id"]
                update_snapshot(snap_id, user_id=vm.userid, job_id=jobid,
                                job_status=status, etime=etime)
132 133
        return

134 135 136 137
    vm.backendjobid = jobid
    vm.backendjobstatus = status
    vm.backendopcode = opcode
    vm.backendlogmsg = logmsg
138

139
    if status not in rapi.JOB_STATUS_FINALIZED:
140 141 142
        vm.save()
        return

143 144
    if job_fields is None:
        job_fields = {}
145 146

    new_operstate = None
147
    new_flavor = None
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

150
    if status == rapi.JOB_STATUS_SUCCESS:
151
        if state_for_success is not None:
152 153
            new_operstate = state_for_success

154
        beparams = job_fields.get("beparams")
155
        if beparams:
156 157 158
            cpu = beparams.get("vcpus")
            ram = beparams.get("maxmem")
            new_flavor = find_new_flavor(vm, cpu=cpu, ram=ram)
159

160
        # XXX: Update backendtime only for jobs that have been successfully
161 162 163 164 165
        # completed, since only these jobs update the state of the VM. Else a
        # "race condition" may occur when a successful job (e.g.
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
        # in reversed order.
        vm.backendtime = etime
166

167
    if status in rapi.JOB_STATUS_FINALIZED:
168 169 170 171 172 173 174 175 176
        if nics is not None:
            update_vm_nics(vm, nics, etime)
        if disks is not None:
            # XXX: Replace the job fields with mocked changes as produced by
            # the diff between the DB and Ganeti disks. This is required in
            # order to update quotas for disks that changed, but not from this
            # job!
            disk_changes = update_vm_disks(vm, disks, etime)
            job_fields["disks"] = disk_changes
177

178
    vm_deleted = False
179
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
180 181
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
                                                     rapi.JOB_STATUS_ERROR):
182
        new_operstate = "ERROR"
183
        vm.backendtime = etime
184
        # Update state of associated attachments
185
        vm.nics.all().update(state="ERROR")
186
        vm.volumes.all().update(status="ERROR")
187
    elif opcode == 'OP_INSTANCE_REMOVE':
188 189 190
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
        # when no instance exists at the Ganeti backend.
        # See ticket #799 for all the details.
191 192
        if (status == rapi.JOB_STATUS_SUCCESS or
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
193
            vm_deleted = True
194
            for nic in vm.nics.all():
195
                # but first release the IP
196
                remove_nic_ips(nic)
197
                nic.delete()
198
            vm.deleted = True
199
            new_operstate = state_for_success
200
            vm.backendtime = etime
201
            status = rapi.JOB_STATUS_SUCCESS
202

203
    if status in rapi.JOB_STATUS_FINALIZED:
204 205 206 207 208 209 210
        # Job is finalized: Handle quotas/commissioning
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
                              job_status=status, job_fields=job_fields)
        # and clear task fields
        if vm.task_job_id == jobid:
            vm.task = None
            vm.task_job_id = None
211

212 213
    # Update VM's state and flavor after handling of quotas, since computation
    # of quotas depends on these attributes
214 215 216
    if vm_deleted:
        vm.volumes.filter(deleted=False).update(deleted=True, status="DELETED",
                                                machine=None)
217 218
    if new_operstate is not None:
        vm.operstate = new_operstate
219 220
    if new_flavor is not None:
        vm.flavor = new_flavor
221

222
    vm.save()
223

224

225 226 227 228 229
def find_new_flavor(vm, cpu=None, ram=None):
    """Find VM's new flavor based on the new CPU and RAM"""
    if cpu is None and ram is None:
        return None

230
    old_flavor = vm.flavor
231 232 233 234 235
    ram = ram if ram is not None else old_flavor.ram
    cpu = cpu if cpu is not None else old_flavor.cpu
    if cpu == old_flavor.cpu and ram == old_flavor.ram:
        return None

236
    try:
237 238 239
        new_flavor = Flavor.objects.get(
            cpu=cpu, ram=ram, disk=old_flavor.disk,
            volume_type_id=old_flavor.volume_type_id)
240
    except Flavor.DoesNotExist:
241
        raise Exception("There is no flavor to match the instance specs!"
242
                        " Instance: %s CPU: %s RAM %s: Disk: %s VolumeType: %s"
243
                        % (vm.backend_vm_id, cpu, ram, old_flavor.disk,
244
                           old_flavor.volume_type_id))
245 246
    log.info("Flavor of VM '%s' changed from '%s' to '%s'", vm,
             old_flavor.name, new_flavor.name)
247
    return new_flavor
248 249


250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
def nics_are_equal(db_nic, gnt_nic):
    """Check if DB and Ganeti NICs are equal."""
    for field in NIC_FIELDS:
        if getattr(db_nic, field) != gnt_nic[field]:
            return False
    return True


def parse_instance_nics(gnt_nics):
    """Parse NICs of a Ganeti instance"""
    nics = []
    for index, gnic in enumerate(gnt_nics):
        nic_name = gnic.get("name", None)
        if nic_name is not None:
            nic_id = utils.id_from_nic_name(nic_name)
        else:
            # Unknown NIC
            nic_id = UNKNOWN_NIC_PREFIX + str(index)

        network_name = gnic.get('network', '')
        network_id = utils.id_from_network_name(network_name)
        network = Network.objects.get(id=network_id)
        subnet6 = network.subnet6

        # Get the new nic info
        mac = gnic.get('mac')
        ipv4 = gnic.get('ip')
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None

        firewall = gnic.get('firewall')
        firewall_profile = _reverse_tags.get(firewall)
        if not firewall_profile and network.public:
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE

        nic_info = {
            'index': index,
            'network': network,
            'mac': mac,
            'ipv4_address': ipv4,
            'ipv6_address': ipv6,
            'firewall_profile': firewall_profile,
            'state': 'ACTIVE'}

        nics.append((nic_id, nic_info))
    return dict(nics)
295 296


297 298
def update_vm_nics(vm, nics, etime=None):
    """Update VM's NICs to match with the NICs of the Ganeti instance
299

300 301
    This function will update the VM's NICs(update, delete or create) and
    return a list of quotable changes.
302

303 304 305 306 307 308 309 310 311
    @param vm: The VirtualMachine the NICs belong to
    @type vm: VirtualMachine object
    @param nics: The NICs of the Ganeti instance
    @type nics: List of dictionaries with NIC information
    @param etime: The datetime the Ganeti instance had these NICs
    @type etime: datetime

    @return: List of quotable changes (add/remove NIC) (currently empty list)
    @rtype: List of dictionaries
312

313
    """
314 315 316 317 318 319
    try:
        ganeti_nics = parse_instance_nics(nics)
    except Network.InvalidBackendIdError as e:
        log.warning("Server %s is connected to unknown network %s"
                    " Cannot reconcile server." % (vm.id, str(e)))
        return []
320 321
    db_nics = dict([(nic.id, nic) for nic in vm.nics.select_related("network")
                                                    .prefetch_related("ips")])
322

323 324 325 326
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
        db_nic = db_nics.get(nic_name)
        ganeti_nic = ganeti_nics.get(nic_name)
        if ganeti_nic is None:
327 328
            if nic_is_stale(vm, nic):
                log.debug("Removing stale NIC '%s'" % db_nic)
329
                remove_nic_ips(db_nic)
330
                db_nic.delete()
331
            else:
332
                log.info("NIC '%s' is still being created" % db_nic)
333
        elif db_nic is None:
334 335 336 337
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
                   " fix this issue!" % (nic_name, vm))
            log.error(msg)
            continue
338
        elif not nics_are_equal(db_nic, ganeti_nic):
339 340 341 342
            for f in SIMPLE_NIC_FIELDS:
                # Update the NIC in DB with the values from Ganeti NIC
                setattr(db_nic, f, ganeti_nic[f])
                db_nic.save()
343

344 345
            # Special case where the IPv4 address has changed, because you
            # need to release the old IPv4 address and reserve the new one
346 347 348
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
            db_ipv4_address = db_nic.ipv4_address
            if db_ipv4_address != gnt_ipv4_address:
349
                change_address_of_port(db_nic, vm.userid,
350 351
                                       old_address=db_ipv4_address,
                                       new_address=gnt_ipv4_address,
352 353
                                       version=4)

354 355 356
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
            db_ipv6_address = db_nic.ipv6_address
            if db_ipv6_address != gnt_ipv6_address:
357
                change_address_of_port(db_nic, vm.userid,
358 359
                                       old_address=db_ipv6_address,
                                       new_address=gnt_ipv6_address,
360
                                       version=6)
361

362
    return []
363 364


365
def remove_nic_ips(nic, version=None):
366
    """Remove IP addresses associated with a NetworkInterface.
367

368 369 370
    Remove all IP addresses that are associated with the NetworkInterface
    object, by returning them to the pool and deleting the IPAddress object. If
    the IP is a floating IP, then it is just disassociated from the NIC.
371 372
    If version is specified, then only IP addressses of that version will be
    removed.
373 374

    """
375
    for ip in nic.ips.all():
376 377 378
        if version and ip.ipversion != version:
            continue

379
        # Update the DB table holding the logging of all IP addresses
380
        terminate_active_ipaddress_log(nic, ip)
381

382 383 384 385 386 387
        if ip.floating_ip:
            ip.nic = None
            ip.save()
        else:
            # Release the IPv4 address
            ip.release_address()
388
            ip.delete()
389 390


391
def terminate_active_ipaddress_log(nic, ip):
392
    """Update DB logging entry for this IP address."""
393
    if not ip.network.public or nic.machine is None:
394 395 396 397 398 399 400 401 402
        return
    try:
        ip_log, created = \
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
                                               network_id=ip.network_id,
                                               address=ip.address,
                                               active=True)
    except IPAddressLog.MultipleObjectsReturned:
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
403
                  "Server %s. Cannot proceed!"
404 405 406 407 408 409 410 411 412 413 414 415
                  % (ip.address, ip.network, nic.machine))
        log.error(logmsg)
        raise

    if created:
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
                  " but with wrong creation timestamp."
                  % (ip.address, ip.network, nic.machine))
        log.error(logmsg)
    ip_log.released_at = datetime.now()
    ip_log.active = False
    ip_log.save()
416 417


418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
def change_address_of_port(port, userid, old_address, new_address, version):
    """Change."""
    if old_address is not None:
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
               % (version, port.machine_id, old_address, new_address))
        log.error(msg)

    # Remove the old IP address
    remove_nic_ips(port, version=version)

    if version == 4:
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
        ipaddress.nic = port
        ipaddress.save()
    elif version == 6:
        subnet6 = port.network.subnet6
        ipaddress = IPAddress.objects.create(userid=userid,
                                             network=port.network,
                                             subnet=subnet6,
                                             nic=port,
                                             address=new_address,
                                             ipversion=6)
    else:
        raise ValueError("Unknown version: %s" % version)

    # New address log
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
                                         network_id=port.network_id,
                                         address=new_address,
                                         active=True)
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
             ip_log.id, new_address, port.machine_id)

    return ipaddress

453

454 455
def update_vm_disks(vm, disks, etime=None):
    """Update VM's disks to match with the disks of the Ganeti instance
456

457 458
    This function will update the VM's disks(update, delete or create) and
    return a list of quotable changes.
459

460 461 462 463 464 465
    @param vm: The VirtualMachine the disks belong to
    @type vm: VirtualMachine object
    @param disks: The disks of the Ganeti instance
    @type disks: List of dictionaries with disk information
    @param etime: The datetime the Ganeti instance had these disks
    @type etime: datetime
466

467 468
    @return: List of quotable changes (add/remove disk)
    @rtype: List of dictionaries
469 470

    """
471
    gnt_disks = parse_instance_disks(disks)
472 473 474
    db_disks = dict([(disk.id, disk)
                     for disk in vm.volumes.filter(deleted=False)])

475 476 477 478
    db_keys = set(db_disks.keys())
    gnt_keys = set(gnt_disks.keys())
    skip_db_stale = False

479
    changes = []
480 481 482 483 484 485 486 487 488 489 490 491 492

    # Disks that exist in Ganeti but not in DB
    for disk_name in (gnt_keys - db_keys):
        gnt_disk = gnt_disks[disk_name]
        if ((disk_name.startswith(UNKNOWN_DISK_PREFIX)) and
           (len(db_keys - gnt_keys) > 0)):
            log.warning("Ganeti disk '%s' of VM '%s' does not exist in DB,"
                        " while there are stale DB volumes. Cannot"
                        " automatically fix this issue.", disk_name, vm)
            skip_db_stale = True
        else:
            log.warning("Automatically adopting unknown disk '%s' of"
                        " instance '%s'", disk_name, vm)
493
            adopt_instance_disk(vm, gnt_disk)
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513

    # Disks that exist in DB but not in Ganeti
    for disk_name in (db_keys - gnt_keys):
        db_disk = db_disks[disk_name]
        if db_disk.status != "DELETING" and skip_db_stale:
            continue
        if disk_is_stale(vm, disk):
            log.debug("Removing stale disk '%s'", db_disk)
            db_disk.status = "DELETED"
            db_disk.deleted = True
            db_disk.save()
            changes.append(("remove", db_disk, {}))
        else:
            log.info("disk '%s' is still being created" % db_disk)

    # Disks that exist both in DB and in Ganeti
    for disk_name in (db_keys & gnt_keys):
        db_disk = db_disks[disk_name]
        gnt_disk = gnt_disks[disk_name]
        if not disks_are_equal(db_disk, gnt_disk):  # Modified Disk
514 515 516 517 518 519 520 521 522 523 524 525
            if gnt_disk["size"] != db_disk.size:
                # Size of the disk has changed! TODO: Fix flavor!
                size_delta = gnt_disk["size"] - db_disk.size
                changes.append(("modify", db_disk, {"size_delta": size_delta}))
            if db_disk.status == "CREATING":
                # Disk has been created
                changes.append(("add", db_disk, {}))
            # Update the disk in DB with the values from Ganeti disk
            [setattr(db_disk, f, gnt_disk[f]) for f in DISK_FIELDS]
            db_disk.save()

    return changes
526 527 528


def disks_are_equal(db_disk, gnt_disk):
529
    """Check if DB and Ganeti disks are equal"""
530 531 532 533 534 535
    for field in DISK_FIELDS:
        if getattr(db_disk, field) != gnt_disk[field]:
            return False
    return True


536 537 538 539 540
def parse_instance_disks(gnt_disks):
    """Parse disks of a Ganeti instance"""
    disks = []
    for index, gnt_disk in enumerate(gnt_disks):
        disk_name = gnt_disk.get("name", None)
541 542
        if disk_name is not None:
            disk_id = utils.id_from_disk_name(disk_name)
543
        else:  # Unknown disk
544 545 546 547
            disk_id = UNKNOWN_DISK_PREFIX + str(index)

        disk_info = {
            'index': index,
548
            'size': gnt_disk["size"] >> 10,  # Size in GB
549
            'uuid': gnt_disk['uuid'],
550 551
            'status': "IN_USE"}

552 553
        disks.append((disk_id, disk_info))
    return dict(disks)
554

555

556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
def adopt_instance_disk(server, gnt_disk):
    """Create a new Cyclades Volume by adopting an existing Ganeti Disk."""
    disk_uuid = gnt_disk["uuid"]
    disk_size = gnt_disk["size"]
    disk_index = gnt_disk.get("index", 0)
    vol = Volume.objects.create(userid=server.userid,
                                project=server.project,
                                size=disk_size,
                                volume_type=server.flavor.volume_type,
                                name="",
                                machine=server,
                                description=None,
                                delete_on_termination=True,
                                source="blank",
                                source_version=None,
                                origin=None,
                                index=disk_index,
                                status="CREATING")

    with pooled_rapi_client(server) as c:
        jobid = c.ModifyInstance(instance=server.backend_vm_id,
                                 disks=[("modify", disk_uuid,
                                         {"name": vol.backend_volume_uuid})])
    log.info("Adopting disk '%s' of instance '%s' to volume '%s'. jobid: %s",
             disk_uuid, server, vol, jobid)
    return vol


584 585
def update_snapshot(snap_id, user_id, job_id, job_status, etime):
    """Update a snapshot based on result of a Ganeti job."""
586
    return
587 588


589 590 591
@transaction.commit_on_success
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
    if status not in [x[0] for x in BACKEND_STATUSES]:
592
        raise Network.InvalidBackendMsgError(opcode, status)
593 594 595 596 597 598

    back_network.backendjobid = jobid
    back_network.backendjobstatus = status
    back_network.backendopcode = opcode
    back_network.backendlogmsg = logmsg

599
    # Note: Network is already locked!
600 601
    network = back_network.network

602 603
    # Notifications of success change the operating state
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
604
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
605 606
        back_network.operstate = state_for_success

607 608
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
       and opcode == 'OP_NETWORK_ADD'):
Christos Stavrakakis's avatar
Christos Stavrakakis committed
609
        back_network.operstate = 'ERROR'
610
        back_network.backendtime = etime
611

612
    if opcode == 'OP_NETWORK_REMOVE':
613 614
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
615
                                  network_exists_in_backend(back_network)):
616 617 618
            back_network.operstate = state_for_success
            back_network.deleted = True
            back_network.backendtime = etime
619

620
    if status == rapi.JOB_STATUS_SUCCESS:
621
        back_network.backendtime = etime
622
    back_network.save()
623
    # Also you must update the state of the Network!!
624
    update_network_state(network)
625 626


627
def update_network_state(network):
628
    """Update the state of a Network based on BackendNetwork states.
629

630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
    Update the state of a Network based on the operstate of the networks in the
    backends that network exists.

    The state of the network is:
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
    * DELETED: If it is is 'DELETED' in all backends that have been created.

    This function also releases the resources (MAC prefix or Bridge) and the
    quotas for the network.

    """
    if network.deleted:
        # Network has already been deleted. Just assert that state is also
        # DELETED
        if not network.state == "DELETED":
            network.state = "DELETED"
            network.save()
647 648
        return

649
    backend_states = [s.operstate for s in network.backend_networks.all()]
650
    if not backend_states and network.action != "DESTROY":
651 652 653 654 655 656
        if network.state != "ACTIVE":
            network.state = "ACTIVE"
            network.save()
            return

    # Network is deleted when all BackendNetworks go to "DELETED" operstate
657 658
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
                     "DELETED")
659

660
    # Release the resources on the deletion of the Network
661
    if deleted:
662
        if network.ips.filter(deleted=False, floating_ip=True).exists():
663
            msg = "Cannot delete network %s! Floating IPs still in use!"
664 665
            log.error(msg % network)
            raise Exception(msg % network)
666 667 668
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
                 network.id, network.mac_prefix, network.link)
        network.deleted = True
669
        network.state = "DELETED"
670 671 672
        # Undrain the network, otherwise the network state will remain
        # as 'SNF:DRAINED'
        network.drained = False
673 674 675 676 677 678 679
        if network.mac_prefix:
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
                release_resource(res_type="mac_prefix",
                                 value=network.mac_prefix)
        if network.link:
            if network.FLAVORS[network.flavor]["link"] == "pool":
                release_resource(res_type="bridge", value=network.link)
680

681 682 683 684 685 686
        # Set all subnets as deleted
        network.subnets.update(deleted=True)
        # And delete the IP pools
        for subnet in network.subnets.all():
            if subnet.ipversion == 4:
                subnet.ip_pools.all().delete()
687 688
        # And all the backend networks since there are useless
        network.backend_networks.all().delete()
689

690
        # Issue commission
691
        if network.userid:
692
            quotas.issue_and_accept_commission(network, action="DESTROY")
693 694 695 696
            # the above has already saved the object and committed;
            # a second save would override others' changes, since the
            # object is now unlocked
            return
697 698
        elif not network.public:
            log.warning("Network %s does not have an owner!", network.id)
699 700 701
    network.save()


702 703
@transaction.commit_on_success
def process_network_modify(back_network, etime, jobid, opcode, status,
704
                           job_fields):
705 706 707 708 709 710 711 712
    assert (opcode == "OP_NETWORK_SET_PARAMS")
    if status not in [x[0] for x in BACKEND_STATUSES]:
        raise Network.InvalidBackendMsgError(opcode, status)

    back_network.backendjobid = jobid
    back_network.backendjobstatus = status
    back_network.opcode = opcode

713
    add_reserved_ips = job_fields.get("add_reserved_ips")
714
    if add_reserved_ips:
715 716 717
        network = back_network.network
        for ip in add_reserved_ips:
            network.reserve_address(ip, external=True)
718

719
    if status == rapi.JOB_STATUS_SUCCESS:
720 721
        back_network.backendtime = etime
    back_network.save()
722

723

724
@transaction.commit_on_success
725
def process_create_progress(vm, etime, progress):
726

727
    percentage = int(progress)
728

729
    # The percentage may exceed 100%, due to the way
730
    # snf-image:copy-progress tracks bytes read by image handling processes
731 732 733
    percentage = 100 if percentage > 100 else percentage
    if percentage < 0:
        raise ValueError("Percentage cannot be negative")
734

735 736 737 738
    # FIXME: log a warning here, see #1033
#   if last_update > percentage:
#       raise ValueError("Build percentage should increase monotonically " \
#                        "(old = %d, new = %d)" % (last_update, percentage))
739

740 741 742 743 744 745 746 747 748
    # This assumes that no message of type 'ganeti-create-progress' is going to
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
    # the instance is STARTED.  What if the two messages are processed by two
    # separate dispatcher threads, and the 'ganeti-op-status' message for
    # successful creation gets processed before the 'ganeti-create-progress'
    # message? [vkoukis]
    #
    #if not vm.operstate == 'BUILD':
    #    raise VirtualMachine.IllegalState("VM is not in building state")
749

750
    vm.buildpercentage = percentage
751
    vm.backendtime = etime
752
    vm.save()
753

754

755
@transaction.commit_on_success
756
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
Christos Stavrakakis's avatar
Christos Stavrakakis committed
757
                               details=None):
758 759 760 761 762 763 764 765 766 767 768
    """
    Create virtual machine instance diagnostic entry.

    :param vm: VirtualMachine instance to create diagnostic for.
    :param message: Diagnostic message.
    :param source: Diagnostic source identifier (e.g. image-helper).
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
    :param etime: The time the message occured (if available).
    :param details: Additional details or debug information.
    """
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
Christos Stavrakakis's avatar
Christos Stavrakakis committed
769 770 771
                                                   source_date=etime,
                                                   message=message,
                                                   details=details)
772 773


774
def create_instance(vm, nics, volumes, flavor, image):
775 776
    """`image` is a dictionary which should contain the keys:
            'backend_id', 'format' and 'metadata'
777

778 779
        metadata value should be a dictionary.
    """
780

781 782 783
    # Handle arguments to CreateInstance() as a dictionary,
    # initialize it based on a deployment-specific value.
    # This enables the administrator to override deployment-specific
784
    # arguments, such as the disk template to use, name of os provider
785 786
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
    #
787
    kw = vm.backend.get_create_params()
788
    kw['mode'] = 'create'
789
    kw['name'] = vm.backend_vm_id
790
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
791

792
    kw['disk_template'] = volumes[0].volume_type.template
793 794
    disks = []
    for volume in volumes:
795 796
        disk = {"name": volume.backend_volume_uuid,
                "size": volume.size * 1024}
797
        provider = volume.volume_type.provider
798 799
        if provider is not None:
            disk["provider"] = provider
800 801
            if provider in settings.GANETI_CLONE_PROVIDERS:
                disk["origin"] = volume.origin
802
                disk["origin_size"] = volume.origin_size
803 804 805 806 807 808 809
            extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS\
                                        .get(provider)
            if extra_disk_params is not None:
                disk.update(extra_disk_params)
        disks.append(disk)

    kw["disks"] = disks
810

811 812 813
    # --no-wait-for-sync option for DRBD disks
    kw["wait_for_sync"] = settings.GANETI_DISKS_WAIT_FOR_SYNC

814 815
    kw['nics'] = [{"name": nic.backend_uuid,
                   "network": nic.network.backend_id,
816
                   "ip": nic.ipv4_address}
817
                  for nic in nics]
818

819 820 821
    backend = vm.backend
    depend_jobs = []
    for nic in nics:
822 823 824 825
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
        depend_jobs.extend(job_ids)

    kw["depends"] = create_job_dependencies(depend_jobs)
826

827
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
828 829 830
    # kw['os'] = settings.GANETI_OS_PROVIDER
    kw['ip_check'] = False
    kw['name_check'] = False
831

832 833
    # Do not specific a node explicitly, have
    # Ganeti use an iallocator instead
834
    #kw['pnode'] = rapi.GetNodes()[0]
835

836
    kw['dry_run'] = settings.TEST
837

838
    kw['beparams'] = {
Christos Stavrakakis's avatar
Christos Stavrakakis committed
839 840 841
        'auto_balance': True,
        'vcpus': flavor.cpu,
        'memory': flavor.ram}
842

843
    kw['osparams'] = {
844 845
        'config_url': vm.config_url,
        # Store image id and format to Ganeti
846
        'img_id': image['pithosmap'],
847
        'img_format': image['format']}
848

849
    # Use opportunistic locking
850
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
851

852
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
853
    # kw['hvparams'] = dict(serial_console=False)
854

855
    log.debug("Creating instance %s", utils.hide_pass(kw))
856 857
    with pooled_rapi_client(vm) as client:
        return client.CreateInstance(**kw)
858

Giorgos Verigakis's avatar
Giorgos Verigakis committed
859

860
def delete_instance(vm, shutdown_timeout=None):
861
    with pooled_rapi_client(vm) as client:
862 863 864
        return client.DeleteInstance(vm.backend_vm_id,
                                     shutdown_timeout=shutdown_timeout,
                                     dry_run=settings.TEST)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
865

866

867
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
Giorgos Verigakis's avatar
Giorgos Verigakis committed
868
    assert reboot_type in ('soft', 'hard')
869 870 871
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
    # 'shutdown_timeout'.
872 873
    kwargs = {"instance": vm.backend_vm_id,
              "reboot_type": "hard"}
874 875