backend.py 41.6 KB
Newer Older
1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
Giorgos Verigakis's avatar
Giorgos Verigakis committed
3
4
5
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
6
#
Giorgos Verigakis's avatar
Giorgos Verigakis committed
7
8
9
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
10
#
Giorgos Verigakis's avatar
Giorgos Verigakis committed
11
12
13
14
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
15
#
Giorgos Verigakis's avatar
Giorgos Verigakis committed
16
17
18
19
20
21
22
23
24
25
26
27
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
28
#
Giorgos Verigakis's avatar
Giorgos Verigakis committed
29
30
31
32
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
Giorgos Verigakis's avatar
Giorgos Verigakis committed
33
from django.conf import settings
Giorgos Verigakis's avatar
Giorgos Verigakis committed
34
from django.db import transaction
35
from datetime import datetime, timedelta
Giorgos Verigakis's avatar
Giorgos Verigakis committed
36

37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
Christos Stavrakakis's avatar
Christos Stavrakakis committed
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
Giorgos Verigakis's avatar
Giorgos Verigakis committed
46

47
48
from logging import getLogger
log = getLogger(__name__)
49

Giorgos Verigakis's avatar
Giorgos Verigakis committed
50

51
52
53
54
55
56
57
_firewall_tags = {
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}

_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())

58
59
60
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

63

64
65
66
67
68
69
70
71
72
73
74
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
    """Handle quotas for updated VirtualMachine.

    Update quotas for the updated VirtualMachine based on the job that run on
    the Ganeti backend. If a commission has been already issued for this job,
    then this commission is just accepted or rejected based on the job status.
    Otherwise, a new commission for the given change is issued, that is also in
    force and auto-accept mode. In this case, previous commissions are
    rejected, since they reflect a previous state of the VM.

    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77
78
79
80

    # Check successful completion of a job will trigger any quotable change in
    # the VM state.
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
82
83
    if action == "BUILD":
        # Quotas for new VMs are automatically accepted by the API
        return vm
84
85
86

    if vm.task_job_id == job_id and vm.serial is not None:
        # Commission for this change has already been issued. So just
87
88
89
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
        # if fails, must be accepted, as the user must manually remove the
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_serial(serial)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
95
96
97
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
                      serial)
            quotas.reject_serial(serial)
        vm.serial = None
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    elif job_status == rapi.JOB_STATUS_SUCCESS:
        commission_info = quotas.get_commission_info(resource=vm,
                                                     action=action,
                                                     action_fields=job_fields)
        if commission_info is not None:
            # Commission for this change has not been issued, or the issued
            # commission was unaware of the current change. Reject all previous
            # commissions and create a new one in forced mode!
            log.debug("Expected job was %s. Processing job %s.",
                      vm.task_job_id, job_id)
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
                      % (vm, job_id))
            quotas.handle_resource_commission(vm, action,
                                              action_fields=job_fields,
                                              commission_name=reason,
                                              force=True,
                                              auto_accept=True)
            log.debug("Issued new commission: %s", vm.serial)
116
117
118
119

    return vm


120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124
125
126
127
128
129

    Process an incoming message from the backend (currently Ganeti).
    Job notifications with a terminating status (sucess, error, or canceled),
    also update the operating state of the VM.

    """
130
131
    # See #1492, #1031, #1111 why this line has been removed
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
134
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)

135
136
137
138
    vm.backendjobid = jobid
    vm.backendjobstatus = status
    vm.backendopcode = opcode
    vm.backendlogmsg = logmsg
139

140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
142
143
        vm.save()
        return

144
145
    if job_fields is None:
        job_fields = {}
146
147

    new_operstate = None
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

150
    if status == rapi.JOB_STATUS_SUCCESS:
151
        # If job succeeds, change operating state if needed
152
        if state_for_success is not None:
153
154
            new_operstate = state_for_success

155
        beparams = job_fields.get("beparams", None)
156
157
        if beparams:
            # Change the flavor of the VM
158
            _process_resize(vm, beparams)
159

160
161
162
163
164
165
        # Update backendtime only for jobs that have been successfully
        # completed, since only these jobs update the state of the VM. Else a
        # "race condition" may occur when a successful job (e.g.
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
        # in reversed order.
        vm.backendtime = etime
166

167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
169
170
        # Update the NICs of the VM
        _process_net_status(vm, etime, nics)

171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
173
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
                                                     rapi.JOB_STATUS_ERROR):
174
        new_operstate = "ERROR"
175
        vm.backendtime = etime
176
177
        # Update state of associated NICs
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
180
181
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
        # when no instance exists at the Ganeti backend.
        # See ticket #799 for all the details.
182
183
        if (status == rapi.JOB_STATUS_SUCCESS or
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
185
186
            # VM has been deleted
            for nic in vm.nics.all():
                # Release the IP
187
                remove_nic_ips(nic)
188
189
                # And delete the NIC.
                nic.delete()
190
            vm.deleted = True
191
            new_operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194

195
    if status in rapi.JOB_STATUS_FINALIZED:
196
197
198
199
200
201
202
        # Job is finalized: Handle quotas/commissioning
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
                              job_status=status, job_fields=job_fields)
        # and clear task fields
        if vm.task_job_id == jobid:
            vm.task = None
            vm.task_job_id = None
203

204
205
206
    if new_operstate is not None:
        vm.operstate = new_operstate

207
    vm.save()
208

209

210
211
212
def _process_resize(vm, beparams):
    """Change flavor of a VirtualMachine based on new beparams."""
    old_flavor = vm.flavor
213
214
215
    vcpus = beparams.get("vcpus", old_flavor.cpu)
    ram = beparams.get("maxmem", old_flavor.ram)
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
216
217
218
219
220
221
        return
    try:
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
                                        disk=old_flavor.disk,
                                        disk_template=old_flavor.disk_template)
    except Flavor.DoesNotExist:
222
        raise Exception("Cannot find flavor for VM")
223
224
225
226
    vm.flavor = new_flavor
    vm.save()


Giorgos Verigakis's avatar
Giorgos Verigakis committed
227
@transaction.commit_on_success
228
def process_net_status(vm, etime, nics):
229
230
231
232
233
    """Wrap _process_net_status inside transaction."""
    _process_net_status(vm, etime, nics)


def _process_net_status(vm, etime, nics):
234
235
236
237
238
239
    """Process a net status notification from the backend

    Process an incoming message from the Ganeti backend,
    detailing the NIC configuration of a VM instance.

    Update the state of the VM in the DB accordingly.
240

241
    """
242
    ganeti_nics = process_ganeti_nics(nics)
243
244
    db_nics = dict([(nic.id, nic)
                    for nic in vm.nics.prefetch_related("ips__subnet")])
245
246
247
248

    # Get X-Lock on backend before getting X-Lock on network IP pools, to
    # guarantee that no deadlock will occur with Backend allocator.
    Backend.objects.select_for_update().get(id=vm.backend_id)
249

250
251
252
253
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
        db_nic = db_nics.get(nic_name)
        ganeti_nic = ganeti_nics.get(nic_name)
        if ganeti_nic is None:
254
255
            if nic_is_stale(vm, nic):
                log.debug("Removing stale NIC '%s'" % db_nic)
256
                remove_nic_ips(db_nic)
257
                db_nic.delete()
258
            else:
259
                log.info("NIC '%s' is still being created" % db_nic)
260
        elif db_nic is None:
261
262
263
264
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
                   " fix this issue!" % (nic_name, vm))
            log.error(msg)
            continue
265
        elif not nics_are_equal(db_nic, ganeti_nic):
266
267
268
269
            for f in SIMPLE_NIC_FIELDS:
                # Update the NIC in DB with the values from Ganeti NIC
                setattr(db_nic, f, ganeti_nic[f])
                db_nic.save()
270

271
272
            # Special case where the IPv4 address has changed, because you
            # need to release the old IPv4 address and reserve the new one
273
274
            ipv4_address = ganeti_nic["ipv4_address"]
            if db_nic.ipv4_address != ipv4_address:
275
276
277
278
279
280
281
282
283
284
285
                change_address_of_port(db_nic, vm.userid,
                                       old_address=db_nic.ipv4_address,
                                       new_address=ipv4_address,
                                       version=4)

            ipv6_address = ganeti_nic["ipv6_address"]
            if db_nic.ipv6_address != ipv6_address:
                change_address_of_port(db_nic, vm.userid,
                                       old_address=db_nic.ipv6_address,
                                       new_address=ipv6_address,
                                       version=6)
286
287
288
289
290

    vm.backendtime = etime
    vm.save()


291
292
293
294
295
def change_address_of_port(port, userid, old_address, new_address, version):
    """Change."""
    if old_address is not None:
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
               % (version, port.machine_id, old_address, new_address))
296
        log.error(msg)
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325

    # Remove the old IP address
    remove_nic_ips(port, version=version)

    if version == 4:
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
        ipaddress.nic = port
        ipaddress.save()
    elif version == 6:
        subnet6 = port.network.subnet6
        ipaddress = IPAddress.objects.create(userid=userid,
                                             network=port.network,
                                             subnet=subnet6,
                                             nic=port,
                                             address=new_address)
    else:
        raise ValueError("Unknown version: %s" % version)

    # New address log
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
                                         network_id=port.network_id,
                                         address=new_address,
                                         active=True)
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
             ip_log.id, new_address, port.machine_id)

    return ipaddress


326
327
328
329
330
331
332
def nics_are_equal(db_nic, gnt_nic):
    for field in NIC_FIELDS:
        if getattr(db_nic, field) != gnt_nic[field]:
            return False
    return True


333
def process_ganeti_nics(ganeti_nics):
334
    """Process NIC dict from ganeti"""
335
    new_nics = []
336
337
338
339
340
341
342
    for index, gnic in enumerate(ganeti_nics):
        nic_name = gnic.get("name", None)
        if nic_name is not None:
            nic_id = utils.id_from_nic_name(nic_name)
        else:
            # Put as default value the index. If it is an unknown NIC to
            # synnefo it will be created automaticaly.
343
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
344
345
346
        network_name = gnic.get('network', '')
        network_id = utils.id_from_network_name(network_name)
        network = Network.objects.get(id=network_id)
Christos Stavrakakis's avatar
Christos Stavrakakis committed
347
348

        # Get the new nic info
349
350
        mac = gnic.get('mac')
        ipv4 = gnic.get('ip')
351
        subnet6 = network.subnet6
352
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
353

354
        firewall = gnic.get('firewall')
355
        firewall_profile = _reverse_tags.get(firewall)
356
        if not firewall_profile and network.public:
357
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
358

359
360
361
        nic_info = {
            'index': index,
            'network': network,
Christos Stavrakakis's avatar
Christos Stavrakakis committed
362
            'mac': mac,
363
364
            'ipv4_address': ipv4,
            'ipv6_address': ipv6,
365
366
            'firewall_profile': firewall_profile,
            'state': 'ACTIVE'}
367

368
369
        new_nics.append((nic_id, nic_info))
    return dict(new_nics)
370
371


372
def remove_nic_ips(nic, version=None):
373
    """Remove IP addresses associated with a NetworkInterface.
374

375
376
377
    Remove all IP addresses that are associated with the NetworkInterface
    object, by returning them to the pool and deleting the IPAddress object. If
    the IP is a floating IP, then it is just disassociated from the NIC.
378
379
    If version is specified, then only IP addressses of that version will be
    removed.
380
381

    """
382
    for ip in nic.ips.all():
383
384
385
        if version and ip.ipversion != version:
            continue

386
        # Update the DB table holding the logging of all IP addresses
387
        terminate_active_ipaddress_log(nic, ip)
388

389
390
391
392
393
394
        if ip.floating_ip:
            ip.nic = None
            ip.save()
        else:
            # Release the IPv4 address
            ip.release_address()
395
            ip.delete()
396
397


398
def terminate_active_ipaddress_log(nic, ip):
399
    """Update DB logging entry for this IP address."""
400
    if not ip.network.public or nic.machine is None:
401
402
403
404
405
406
407
408
409
        return
    try:
        ip_log, created = \
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
                                               network_id=ip.network_id,
                                               address=ip.address,
                                               active=True)
    except IPAddressLog.MultipleObjectsReturned:
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
410
                  "Server %s. Cannot proceed!"
411
412
413
414
415
416
417
418
419
420
421
422
                  % (ip.address, ip.network, nic.machine))
        log.error(logmsg)
        raise

    if created:
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
                  " but with wrong creation timestamp."
                  % (ip.address, ip.network, nic.machine))
        log.error(logmsg)
    ip_log.released_at = datetime.now()
    ip_log.active = False
    ip_log.save()
423
424


425
426
427
@transaction.commit_on_success
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
    if status not in [x[0] for x in BACKEND_STATUSES]:
428
        raise Network.InvalidBackendMsgError(opcode, status)
429
430
431
432
433
434

    back_network.backendjobid = jobid
    back_network.backendjobstatus = status
    back_network.backendopcode = opcode
    back_network.backendlogmsg = logmsg

435
    # Note: Network is already locked!
436
437
    network = back_network.network

438
439
    # Notifications of success change the operating state
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
440
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
441
442
        back_network.operstate = state_for_success

443
444
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
       and opcode == 'OP_NETWORK_ADD'):
Christos Stavrakakis's avatar
Christos Stavrakakis committed
445
        back_network.operstate = 'ERROR'
446
        back_network.backendtime = etime
447

448
    if opcode == 'OP_NETWORK_REMOVE':
449
450
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
451
                                  network_exists_in_backend(back_network)):
452
453
454
            back_network.operstate = state_for_success
            back_network.deleted = True
            back_network.backendtime = etime
455

456
    if status == rapi.JOB_STATUS_SUCCESS:
457
        back_network.backendtime = etime
458
    back_network.save()
459
    # Also you must update the state of the Network!!
460
    update_network_state(network)
461
462


463
def update_network_state(network):
464
    """Update the state of a Network based on BackendNetwork states.
465

466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
    Update the state of a Network based on the operstate of the networks in the
    backends that network exists.

    The state of the network is:
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
    * DELETED: If it is is 'DELETED' in all backends that have been created.

    This function also releases the resources (MAC prefix or Bridge) and the
    quotas for the network.

    """
    if network.deleted:
        # Network has already been deleted. Just assert that state is also
        # DELETED
        if not network.state == "DELETED":
            network.state = "DELETED"
            network.save()
483
484
        return

485
    backend_states = [s.operstate for s in network.backend_networks.all()]
486
    if not backend_states and network.action != "DESTROY":
487
488
489
490
491
492
        if network.state != "ACTIVE":
            network.state = "ACTIVE"
            network.save()
            return

    # Network is deleted when all BackendNetworks go to "DELETED" operstate
493
494
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
                     "DELETED")
495

496
    # Release the resources on the deletion of the Network
497
    if deleted:
498
        if network.ips.filter(deleted=False, floating_ip=True).exists():
499
            msg = "Cannot delete network %s! Floating IPs still in use!"
500
501
            log.error(msg % network)
            raise Exception(msg % network)
502
503
504
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
                 network.id, network.mac_prefix, network.link)
        network.deleted = True
505
        network.state = "DELETED"
506
507
508
        # Undrain the network, otherwise the network state will remain
        # as 'SNF:DRAINED'
        network.drained = False
509
510
511
512
513
514
515
        if network.mac_prefix:
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
                release_resource(res_type="mac_prefix",
                                 value=network.mac_prefix)
        if network.link:
            if network.FLAVORS[network.flavor]["link"] == "pool":
                release_resource(res_type="bridge", value=network.link)
516

517
518
519
520
521
522
        # Set all subnets as deleted
        network.subnets.update(deleted=True)
        # And delete the IP pools
        for subnet in network.subnets.all():
            if subnet.ipversion == 4:
                subnet.ip_pools.all().delete()
523
524
        # And all the backend networks since there are useless
        network.backend_networks.all().delete()
525

526
        # Issue commission
527
        if network.userid:
528
            quotas.issue_and_accept_commission(network, action="DESTROY")
529
530
531
532
            # the above has already saved the object and committed;
            # a second save would override others' changes, since the
            # object is now unlocked
            return
533
534
        elif not network.public:
            log.warning("Network %s does not have an owner!", network.id)
535
536
537
    network.save()


538
539
@transaction.commit_on_success
def process_network_modify(back_network, etime, jobid, opcode, status,
540
                           job_fields):
541
542
543
544
545
546
547
548
    assert (opcode == "OP_NETWORK_SET_PARAMS")
    if status not in [x[0] for x in BACKEND_STATUSES]:
        raise Network.InvalidBackendMsgError(opcode, status)

    back_network.backendjobid = jobid
    back_network.backendjobstatus = status
    back_network.opcode = opcode

549
    add_reserved_ips = job_fields.get("add_reserved_ips")
550
    if add_reserved_ips:
551
552
553
        network = back_network.network
        for ip in add_reserved_ips:
            network.reserve_address(ip, external=True)
554

555
    if status == rapi.JOB_STATUS_SUCCESS:
556
557
        back_network.backendtime = etime
    back_network.save()
558

559

560
@transaction.commit_on_success
561
def process_create_progress(vm, etime, progress):
562

563
    percentage = int(progress)
564

565
    # The percentage may exceed 100%, due to the way
566
    # snf-image:copy-progress tracks bytes read by image handling processes
567
568
569
    percentage = 100 if percentage > 100 else percentage
    if percentage < 0:
        raise ValueError("Percentage cannot be negative")
570

571
572
573
574
    # FIXME: log a warning here, see #1033
#   if last_update > percentage:
#       raise ValueError("Build percentage should increase monotonically " \
#                        "(old = %d, new = %d)" % (last_update, percentage))
575

576
577
578
579
580
581
582
583
584
    # This assumes that no message of type 'ganeti-create-progress' is going to
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
    # the instance is STARTED.  What if the two messages are processed by two
    # separate dispatcher threads, and the 'ganeti-op-status' message for
    # successful creation gets processed before the 'ganeti-create-progress'
    # message? [vkoukis]
    #
    #if not vm.operstate == 'BUILD':
    #    raise VirtualMachine.IllegalState("VM is not in building state")
585

586
    vm.buildpercentage = percentage
587
    vm.backendtime = etime
588
    vm.save()
589

590

591
@transaction.commit_on_success
592
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
Christos Stavrakakis's avatar
Christos Stavrakakis committed
593
                               details=None):
594
595
596
597
598
599
600
601
602
603
604
    """
    Create virtual machine instance diagnostic entry.

    :param vm: VirtualMachine instance to create diagnostic for.
    :param message: Diagnostic message.
    :param source: Diagnostic source identifier (e.g. image-helper).
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
    :param etime: The time the message occured (if available).
    :param details: Additional details or debug information.
    """
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
Christos Stavrakakis's avatar
Christos Stavrakakis committed
605
606
607
                                                   source_date=etime,
                                                   message=message,
                                                   details=details)
608
609


610
def create_instance(vm, nics, flavor, image):
611
612
    """`image` is a dictionary which should contain the keys:
            'backend_id', 'format' and 'metadata'
613

614
615
        metadata value should be a dictionary.
    """
616

617
618
619
    # Handle arguments to CreateInstance() as a dictionary,
    # initialize it based on a deployment-specific value.
    # This enables the administrator to override deployment-specific
620
    # arguments, such as the disk template to use, name of os provider
621
622
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
    #
623
    kw = vm.backend.get_create_params()
624
    kw['mode'] = 'create'
625
    kw['name'] = vm.backend_vm_id
626
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
627

628
    kw['disk_template'] = flavor.disk_template
629
    kw['disks'] = [{"size": flavor.disk * 1024}]
Christos Stavrakakis's avatar
Christos Stavrakakis committed
630
    provider = flavor.disk_provider
631
632
    if provider:
        kw['disks'][0]['provider'] = provider
633
        kw['disks'][0]['origin'] = flavor.disk_origin
634

635
636
    kw['nics'] = [{"name": nic.backend_uuid,
                   "network": nic.network.backend_id,
637
                   "ip": nic.ipv4_address}
638
                  for nic in nics]
639

640
641
642
    backend = vm.backend
    depend_jobs = []
    for nic in nics:
643
644
645
646
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
        depend_jobs.extend(job_ids)

    kw["depends"] = create_job_dependencies(depend_jobs)
647

648
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
649
650
651
    # kw['os'] = settings.GANETI_OS_PROVIDER
    kw['ip_check'] = False
    kw['name_check'] = False
652

653
654
    # Do not specific a node explicitly, have
    # Ganeti use an iallocator instead
655
    #kw['pnode'] = rapi.GetNodes()[0]
656

657
    kw['dry_run'] = settings.TEST
658

659
    kw['beparams'] = {
Christos Stavrakakis's avatar
Christos Stavrakakis committed
660
661
662
        'auto_balance': True,
        'vcpus': flavor.cpu,
        'memory': flavor.ram}
663

664
    kw['osparams'] = {
665
666
        'config_url': vm.config_url,
        # Store image id and format to Ganeti
667
        'img_id': image['backend_id'],
668
        'img_format': image['format']}
669

670
    # Use opportunistic locking
671
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
672

673
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
674
    # kw['hvparams'] = dict(serial_console=False)
675

676
    log.debug("Creating instance %s", utils.hide_pass(kw))
677
678
    with pooled_rapi_client(vm) as client:
        return client.CreateInstance(**kw)
679

Giorgos Verigakis's avatar
Giorgos Verigakis committed
680
681

def delete_instance(vm):
682
    with pooled_rapi_client(vm) as client:
683
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
684

685

Giorgos Verigakis's avatar
Giorgos Verigakis committed
686
687
def reboot_instance(vm, reboot_type):
    assert reboot_type in ('soft', 'hard')
688
689
690
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
    # 'shutdown_timeout'.
691
692
    kwargs = {"instance": vm.backend_vm_id,
              "reboot_type": "hard"}
693
694
695
696
697
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
    # Ganeti > 2.10. In other versions this parameter will be ignored and
    # we will fallback to default timeout of Ganeti (120s).
    if reboot_type == "hard":
        kwargs["shutdown_timeout"] = 0
698
699
    if settings.TEST:
        kwargs["dry_run"] = True
700
    with pooled_rapi_client(vm) as client:
701
        return client.RebootInstance(**kwargs)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
702

703

Giorgos Verigakis's avatar
Giorgos Verigakis committed
704
def startup_instance(vm):
705
706
    with pooled_rapi_client(vm) as client:
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
707

708

Giorgos Verigakis's avatar
Giorgos Verigakis committed
709
def shutdown_instance(vm):
710
711
    with pooled_rapi_client(vm) as client:
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
Giorgos Verigakis's avatar
Giorgos Verigakis committed
712

713

714
715
716
717
718
719
720
721
def resize_instance(vm, vcpus, memory):
    beparams = {"vcpus": int(vcpus),
                "minmem": int(memory),
                "maxmem": int(memory)}
    with pooled_rapi_client(vm) as client:
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)


Giorgos Verigakis's avatar
Giorgos Verigakis committed
722
def get_instance_console(vm):
723
724
725
726
727
728
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
    # useless (see #783).
    #
    # Until this is fixed on the Ganeti side, construct a console info reply
    # directly.
729
    #
730
731
732
733
    # WARNING: This assumes that VNC runs on port network_port on
    #          the instance's primary node, and is probably
    #          hypervisor-specific.
    #
Christos Stavrakakis's avatar
Christos Stavrakakis committed
734
735
    log.debug("Getting console for vm %s", vm)

736
737
    console = {}
    console['kind'] = 'vnc'
738
739
740
741

    with pooled_rapi_client(vm) as client:
        i = client.GetInstance(vm.backend_vm_id)

742
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
743
744
745
        raise Exception("hv parameter serial_console cannot be true")
    console['host'] = i['pnode']
    console['port'] = i['network_port']
746

747
    return console
748
749


750
751
def get_instance_info(vm):
    with pooled_rapi_client(vm) as client:
752
753
754
755
756
757
758
        return client.GetInstance(vm.backend_vm_id)


def vm_exists_in_backend(vm):
    try:
        get_instance_info(vm)
        return True
759
    except rapi.GanetiApiError as e:
760
761
762
763
764
765
766
767
768
769
770
771
772
773
        if e.code == 404:
            return False
        raise e


def get_network_info(backend_network):
    with pooled_rapi_client(backend_network) as client:
        return client.GetNetwork(backend_network.network.backend_id)


def network_exists_in_backend(backend_network):
    try:
        get_network_info(backend_network)
        return True
774
    except rapi.GanetiApiError as e:
775
776
        if e.code == 404:
            return False
777

778

779
def job_is_still_running(vm, job_id=None):
780
781
    with pooled_rapi_client(vm) as c:
        try:
782
783
784
            if job_id is None:
                job_id = vm.backendjobid
            job_info = c.GetJobStatus(job_id)
785
786
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
        except rapi.GanetiApiError:
787
788
789
            return False


790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
def nic_is_stale(vm, nic, timeout=60):
    """Check if a NIC is stale or exists in the Ganeti backend."""
    # First check the state of the NIC and if there is a pending CONNECT
    if nic.state == "BUILD" and vm.task == "CONNECT":
        if datetime.now() < nic.created + timedelta(seconds=timeout):
            # Do not check for too recent NICs to avoid the time overhead
            return False
        if job_is_still_running(vm, job_id=vm.task_job_id):
            return False
        else:
            # If job has finished, check that the NIC exists, because the
            # message may have been lost or stuck in the queue.
            vm_info = get_instance_info(vm)
            if nic.backend_uuid in vm_info["nic.names"]:
                return False
    return True


808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
def ensure_network_is_active(backend, network_id):
    """Ensure that a network is active in the specified backend

    Check that a network exists and is active in the specified backend. If not
    (re-)create the network. Return the corresponding BackendNetwork object
    and the IDs of the Ganeti job to create the network.

    """
    network = Network.objects.select_for_update().get(id=network_id)
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
                                                         network=network)
    job_ids = []
    if bnet.operstate != "ACTIVE":
        job_ids = create_network(network, backend, connect=True)

    return bnet, job_ids


826
827
828
def create_network(network, backend, connect=True):
    """Create a network in a Ganeti backend"""
    log.debug("Creating network %s in backend %s", network, backend)
829

830
    job_id = _create_network(network, backend)
831

832
833
834
835
836
    if connect:
        job_ids = connect_network(network, backend, depends=[job_id])
        return job_ids
    else:
        return [job_id]
837

838

839
840
def _create_network(network, backend):
    """Create a network."""
841

842
    tags = network.backend_tag
843
844
845
846
    subnet = None
    subnet6 = None
    gateway = None
    gateway6 = None
847
    for _subnet in network.subnets.all():
848
849
        if _subnet.dhcp and not "nfdhcpd" in tags:
            tags.append("nfdhcpd")
850
        if _subnet.ipversion == 4:
851
852
            subnet = _subnet.cidr
            gateway = _subnet.gateway
853
        elif _subnet.ipversion == 6:
854
855
            subnet6 = _subnet.cidr
            gateway6 = _subnet.gateway
856
857
858

    if network.public:
        conflicts_check = True
859
        tags.append('public')
860
861
    else:
        conflicts_check = False
862
        tags.append('private')
863

864
865
866
867
868
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
    # not support IPv6 only networks. To bypass this limitation, we create the
    # network with a dummy network subnet, and make Cyclades connect instances
    # to such networks, with address=None.
    if subnet is None:
869
        subnet = "10.0.0.0/29"
870

871
872
873
874
    try:
        bn = BackendNetwork.objects.get(network=network, backend=backend)
        mac_prefix = bn.mac_prefix
    except BackendNetwork.DoesNotExist:
Christos Stavrakakis's avatar
Christos Stavrakakis committed
875
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
876
877
878
879
                        " does not exist" % (network.id, backend.id))

    with pooled_rapi_client(backend) as client:
        return client.CreateNetwork(network_name=network.backend_id,
880
                                    network=subnet,
881
882
883
                                    network6=subnet6,
                                    gateway=gateway,
                                    gateway6=gateway6,
884
                                    mac_prefix=mac_prefix,
885
                                    conflicts_check=conflicts_check,
886
887
888
                                    tags=tags)


889
def connect_network(network, backend, depends=[], group=None):
890
    """Connect a network to nodegroups."""
Christos Stavrakakis's avatar
Christos Stavrakakis committed
891
892
    log.debug("Connecting network %s to backend %s", network, backend)

893
894
895
896
897
    if network.public:
        conflicts_check = True
    else:
        conflicts_check = False

898
    depends = create_job_dependencies(depends)
899
    with pooled_rapi_client(backend) as client:
900
901
902
903
904
905
906
907
908
        groups = [group] if group is not None else client.GetGroups()
        job_ids = []
        for group in groups:
            job_id = client.ConnectNetwork(network.backend_id, group,
                                           network.mode, network.link,
                                           conflicts_check,
                                           depends=depends)
            job_ids.append(job_id)
    return job_ids
909
910


911
912
def delete_network(network, backend, disconnect=True):
    log.debug("Deleting network %s from backend %s", network, backend)
913

914
915
916
917
    depends = []
    if disconnect:
        depends = disconnect_network(network, backend)
    _delete_network(network, backend, depends=depends)
Christos Stavrakakis's avatar
Christos Stavrakakis committed
918

919

920
def _delete_network(network, backend, depends=[]):
921
    depends = create_job_dependencies(depends)
922
    with pooled_rapi_client(backend) as client:
923
        return client.DeleteNetwork(network.backend_id, depends)
924
925


926
def disconnect_network(network, backend, group=None):
Christos Stavrakakis's avatar
Christos Stavrakakis committed
927
    log.debug("Disconnecting network %s to backend %s", network, backend)
928

929
    with pooled_rapi_client(backend) as client:
930
931
932
933
934
935
        groups = [group] if group is not None else client.GetGroups()
        job_ids = []
        for group in groups:
            job_id = client.DisconnectNetwork(network.backend_id, group)
            job_ids.append(job_id)
    return job_ids
936
937


938
939
def connect_to_network(vm, nic):
    network = nic.network
940
    backend = vm.backend
941
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
942

943
    depends = create_job_dependencies(depend_jobs)
944

945
946
    nic = {'name': nic.backend_uuid,
           'network': network.backend_id,
947
           'ip': nic.ipv4_address}
948

949
    log.debug("Adding NIC %s to VM %s", nic, vm)
950

951
952
    kwargs = {
        "instance": vm.backend_vm_id,
953
        "nics": [("add", "-1", nic)],
954
955
956
        "depends": depends,
    }
    if vm.backend.use_hotplug():
957
        kwargs["hotplug_if_possible"] = True
958
959
960
    if settings.TEST:
        kwargs["dry_run"] = True

961
    with pooled_rapi_client(vm) as client:
962
        return client.ModifyInstance(**kwargs)
963
964


965
def disconnect_from_network(vm, nic):
966
    log.debug("Removing NIC %s of VM %s", nic, vm)
967

968
969
    kwargs = {
        "instance": vm.backend_vm_id,
970
        "nics": [("remove", nic.backend_uuid, {})],
Christos Stavrakakis's avatar