instance.py 45.9 KB
Newer Older
1
2
3
#
#

4
# Copyright (C) 2010, 2011 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Instance-related functions and classes for masterd.

"""

import logging
import time
28
import OpenSSL
29
30
31
32

from ganeti import constants
from ganeti import errors
from ganeti import compat
33
34
from ganeti import utils
from ganeti import objects
35
from ganeti import netutils
36
from ganeti import pathutils
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54


class _ImportExportError(Exception):
  """Local exception to report import/export errors.

  """


class ImportExportTimeouts(object):
  #: Time until daemon starts writing status file
  DEFAULT_READY_TIMEOUT = 10

  #: Length of time until errors cause hard failure
  DEFAULT_ERROR_TIMEOUT = 10

  #: Time after which daemon must be listening
  DEFAULT_LISTEN_TIMEOUT = 10

55
56
57
  #: Progress update interval
  DEFAULT_PROGRESS_INTERVAL = 60

58
59
60
61
62
  __slots__ = [
    "error",
    "ready",
    "listen",
    "connect",
63
    "progress",
64
65
66
67
68
    ]

  def __init__(self, connect,
               listen=DEFAULT_LISTEN_TIMEOUT,
               error=DEFAULT_ERROR_TIMEOUT,
69
70
               ready=DEFAULT_READY_TIMEOUT,
               progress=DEFAULT_PROGRESS_INTERVAL):
71
72
73
74
75
76
77
78
79
80
    """Initializes this class.

    @type connect: number
    @param connect: Timeout for establishing connection
    @type listen: number
    @param listen: Timeout for starting to listen for connections
    @type error: number
    @param error: Length of time until errors cause hard failure
    @type ready: number
    @param ready: Timeout for daemon to become ready
81
82
    @type progress: number
    @param progress: Progress update interval
83
84
85
86
87
88

    """
    self.error = error
    self.ready = ready
    self.listen = listen
    self.connect = connect
89
    self.progress = progress
90
91
92
93
94
95


class ImportExportCbBase(object):
  """Callbacks for disk import/export.

  """
96
  def ReportListening(self, ie, private, component):
97
98
99
100
101
    """Called when daemon started listening.

    @type ie: Subclass of L{_DiskImportExportBase}
    @param ie: Import/export object
    @param private: Private data passed to import/export object
102
    @param component: transfer component name
103
104
105
106
107
108
109
110
111
112
113
114

    """

  def ReportConnected(self, ie, private):
    """Called when a connection has been established.

    @type ie: Subclass of L{_DiskImportExportBase}
    @param ie: Import/export object
    @param private: Private data passed to import/export object

    """

115
116
117
118
119
120
121
122
123
  def ReportProgress(self, ie, private):
    """Called when new progress information should be reported.

    @type ie: Subclass of L{_DiskImportExportBase}
    @param ie: Import/export object
    @param private: Private data passed to import/export object

    """

124
125
126
127
128
129
130
131
132
133
134
135
136
  def ReportFinished(self, ie, private):
    """Called when a transfer has finished.

    @type ie: Subclass of L{_DiskImportExportBase}
    @param ie: Import/export object
    @param private: Private data passed to import/export object

    """


class _DiskImportExportBase(object):
  MODE_TEXT = None

137
  def __init__(self, lu, node_name, opts,
138
               instance, component, timeouts, cbs, private=None):
139
140
141
142
143
    """Initializes this class.

    @param lu: Logical unit instance
    @type node_name: string
    @param node_name: Node name for import
144
145
    @type opts: L{objects.ImportExportOptions}
    @param opts: Import/export daemon options
146
147
    @type instance: L{objects.Instance}
    @param instance: Instance object
148
149
    @type component: string
    @param component: which part of the instance is being imported
150
151
152
153
154
155
156
157
158
159
160
    @type timeouts: L{ImportExportTimeouts}
    @param timeouts: Timeouts for this import
    @type cbs: L{ImportExportCbBase}
    @param cbs: Callbacks
    @param private: Private data for callback functions

    """
    assert self.MODE_TEXT

    self._lu = lu
    self.node_name = node_name
161
    self._opts = opts.Copy()
162
    self._instance = instance
163
    self._component = component
164
165
166
167
    self._timeouts = timeouts
    self._cbs = cbs
    self._private = private

168
169
170
171
    # Set master daemon's timeout in options for import/export daemon
    assert self._opts.connect_timeout is None
    self._opts.connect_timeout = timeouts.connect

172
173
174
175
176
177
178
179
    # Parent loop
    self._loop = None

    # Timestamps
    self._ts_begin = None
    self._ts_connected = None
    self._ts_finished = None
    self._ts_cleanup = None
180
    self._ts_last_progress = None
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
    self._ts_last_error = None

    # Transfer status
    self.success = None
    self.final_message = None

    # Daemon status
    self._daemon_name = None
    self._daemon = None

  @property
  def recent_output(self):
    """Returns the most recent output from the daemon.

    """
    if self._daemon:
197
      return "\n".join(self._daemon.recent_output)
198
199
200

    return None

201
202
203
204
205
206
207
208
209
210
211
212
213
  @property
  def progress(self):
    """Returns transfer progress information.

    """
    if not self._daemon:
      return None

    return (self._daemon.progress_mbytes,
            self._daemon.progress_throughput,
            self._daemon.progress_percent,
            self._daemon.progress_eta)

214
215
216
217
218
219
220
  @property
  def magic(self):
    """Returns the magic value for this import/export.

    """
    return self._opts.magic

221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
  @property
  def active(self):
    """Determines whether this transport is still active.

    """
    return self.success is None

  @property
  def loop(self):
    """Returns parent loop.

    @rtype: L{ImportExportLoop}

    """
    return self._loop

  def SetLoop(self, loop):
    """Sets the parent loop.

    @type loop: L{ImportExportLoop}

    """
    if self._loop:
      raise errors.ProgrammerError("Loop can only be set once")

    self._loop = loop

  def _StartDaemon(self):
    """Starts the import/export daemon.

    """
    raise NotImplementedError()

  def CheckDaemon(self):
    """Checks whether daemon has been started and if not, starts it.

    @rtype: string
    @return: Daemon name

    """
    assert self._ts_cleanup is None

    if self._daemon_name is None:
      assert self._ts_begin is None

      result = self._StartDaemon()
      if result.fail_msg:
        raise _ImportExportError("Failed to start %s on %s: %s" %
                                 (self.MODE_TEXT, self.node_name,
                                  result.fail_msg))

      daemon_name = result.payload

274
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
                   self.node_name)

      self._ts_begin = time.time()
      self._daemon_name = daemon_name

    return self._daemon_name

  def GetDaemonName(self):
    """Returns the daemon name.

    """
    assert self._daemon_name, "Daemon has not been started"
    assert self._ts_cleanup is None
    return self._daemon_name

  def Abort(self):
    """Sends SIGTERM to import/export daemon (if still active).

    """
    if self._daemon_name:
295
      self._lu.LogWarning("Aborting %s '%s' on %s",
296
297
298
                          self.MODE_TEXT, self._daemon_name, self.node_name)
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
      if result.fail_msg:
299
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
                            self.MODE_TEXT, self._daemon_name,
                            self.node_name, result.fail_msg)
        return False

    return True

  def _SetDaemonData(self, data):
    """Internal function for updating status daemon data.

    @type data: L{objects.ImportExportStatus}
    @param data: Daemon status data

    """
    assert self._ts_begin is not None

    if not data:
316
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
        raise _ImportExportError("Didn't become ready after %s seconds" %
                                 self._timeouts.ready)

      return False

    self._daemon = data

    return True

  def SetDaemonData(self, success, data):
    """Updates daemon status data.

    @type success: bool
    @param success: Whether fetching data was successful or not
    @type data: L{objects.ImportExportStatus}
    @param data: Daemon status data

    """
    if not success:
      if self._ts_last_error is None:
        self._ts_last_error = time.time()

339
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
        raise _ImportExportError("Too many errors while updating data")

      return False

    self._ts_last_error = None

    return self._SetDaemonData(data)

  def CheckListening(self):
    """Checks whether the daemon is listening.

    """
    raise NotImplementedError()

  def _GetConnectedCheckEpoch(self):
    """Returns timeout to calculate connect timeout.

    """
    raise NotImplementedError()

  def CheckConnected(self):
    """Checks whether the daemon is connected.

    @rtype: bool
    @return: Whether the daemon is connected

    """
    assert self._daemon, "Daemon status missing"

    if self._ts_connected is not None:
      return True

    if self._daemon.connected:
      self._ts_connected = time.time()

      # TODO: Log remote peer
376
      logging.debug("%s '%s' on %s is now connected",
377
378
379
380
381
382
                    self.MODE_TEXT, self._daemon_name, self.node_name)

      self._cbs.ReportConnected(self, self._private)

      return True

383
384
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
                            self._timeouts.connect):
385
386
387
388
389
      raise _ImportExportError("Not connected after %s seconds" %
                               self._timeouts.connect)

    return False

390
391
392
393
394
  def _CheckProgress(self):
    """Checks whether a progress update should be reported.

    """
    if ((self._ts_last_progress is None or
395
396
        utils.TimeoutExpired(self._ts_last_progress,
                             self._timeouts.progress)) and
397
398
399
400
401
402
        self._daemon and
        self._daemon.progress_mbytes is not None and
        self._daemon.progress_throughput is not None):
      self._cbs.ReportProgress(self, self._private)
      self._ts_last_progress = time.time()

403
404
405
406
407
408
409
410
411
412
413
414
415
  def CheckFinished(self):
    """Checks whether the daemon exited.

    @rtype: bool
    @return: Whether the transfer is finished

    """
    assert self._daemon, "Daemon status missing"

    if self._ts_finished:
      return True

    if self._daemon.exit_status is None:
416
417
      # TODO: Adjust delay for ETA expiring soon
      self._CheckProgress()
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
      return False

    self._ts_finished = time.time()

    self._ReportFinished(self._daemon.exit_status == 0,
                         self._daemon.error_message)

    return True

  def _ReportFinished(self, success, message):
    """Transfer is finished or daemon exited.

    @type success: bool
    @param success: Whether the transfer was successful
    @type message: string
    @param message: Error message

    """
    assert self.success is None

    self.success = success
    self.final_message = message

    if success:
442
443
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
                   self._daemon_name, self.node_name)
444
    elif self._daemon_name:
445
      self._lu.LogWarning("%s '%s' on %s failed: %s",
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
                          self.MODE_TEXT, self._daemon_name, self.node_name,
                          message)
    else:
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
                          self.node_name, message)

    self._cbs.ReportFinished(self, self._private)

  def _Finalize(self):
    """Makes the RPC call to finalize this import/export.

    """
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)

  def Finalize(self, error=None):
    """Finalizes this import/export.

    """
    if self._daemon_name:
465
      logging.info("Finalizing %s '%s' on %s",
466
467
468
469
                   self.MODE_TEXT, self._daemon_name, self.node_name)

      result = self._Finalize()
      if result.fail_msg:
470
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
                            self.MODE_TEXT, self._daemon_name,
                            self.node_name, result.fail_msg)
        return False

      # Daemon is no longer running
      self._daemon_name = None
      self._ts_cleanup = time.time()

    if error:
      self._ReportFinished(False, error)

    return True


class DiskImport(_DiskImportExportBase):
  MODE_TEXT = "import"

488
  def __init__(self, lu, node_name, opts, instance, component,
489
490
491
492
493
494
               dest, dest_args, timeouts, cbs, private=None):
    """Initializes this class.

    @param lu: Logical unit instance
    @type node_name: string
    @param node_name: Node name for import
495
496
    @type opts: L{objects.ImportExportOptions}
    @param opts: Import/export daemon options
497
498
    @type instance: L{objects.Instance}
    @param instance: Instance object
499
500
    @type component: string
    @param component: which part of the instance is being imported
501
502
503
504
505
506
507
508
509
    @param dest: I/O destination
    @param dest_args: I/O arguments
    @type timeouts: L{ImportExportTimeouts}
    @param timeouts: Timeouts for this import
    @type cbs: L{ImportExportCbBase}
    @param cbs: Callbacks
    @param private: Private data for callback functions

    """
510
511
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
                                   component, timeouts, cbs, private)
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
    self._dest = dest
    self._dest_args = dest_args

    # Timestamps
    self._ts_listening = None

  @property
  def listen_port(self):
    """Returns the port the daemon is listening on.

    """
    if self._daemon:
      return self._daemon.listen_port

    return None

  def _StartDaemon(self):
    """Starts the import daemon.

    """
532
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
533
                                          self._instance, self._component,
534
                                          (self._dest, self._dest_args))
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551

  def CheckListening(self):
    """Checks whether the daemon is listening.

    @rtype: bool
    @return: Whether the daemon is listening

    """
    assert self._daemon, "Daemon status missing"

    if self._ts_listening is not None:
      return True

    port = self._daemon.listen_port
    if port is not None:
      self._ts_listening = time.time()

552
      logging.debug("Import '%s' on %s is now listening on port %s",
553
554
                    self._daemon_name, self.node_name, port)

555
      self._cbs.ReportListening(self, self._private, self._component)
556
557
558

      return True

559
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
      raise _ImportExportError("Not listening after %s seconds" %
                               self._timeouts.listen)

    return False

  def _GetConnectedCheckEpoch(self):
    """Returns the time since we started listening.

    """
    assert self._ts_listening is not None, \
           ("Checking whether an import is connected is only useful"
            " once it's been listening")

    return self._ts_listening


class DiskExport(_DiskImportExportBase):
  MODE_TEXT = "export"

579
580
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
               instance, component, source, source_args,
581
582
583
584
585
586
               timeouts, cbs, private=None):
    """Initializes this class.

    @param lu: Logical unit instance
    @type node_name: string
    @param node_name: Node name for import
587
588
    @type opts: L{objects.ImportExportOptions}
    @param opts: Import/export daemon options
589
590
591
592
593
594
    @type dest_host: string
    @param dest_host: Destination host name or IP address
    @type dest_port: number
    @param dest_port: Destination port number
    @type instance: L{objects.Instance}
    @param instance: Instance object
595
596
    @type component: string
    @param component: which part of the instance is being imported
597
598
599
600
601
602
603
604
605
    @param source: I/O source
    @param source_args: I/O source
    @type timeouts: L{ImportExportTimeouts}
    @param timeouts: Timeouts for this import
    @type cbs: L{ImportExportCbBase}
    @param cbs: Callbacks
    @param private: Private data for callback functions

    """
606
607
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
                                   component, timeouts, cbs, private)
608
609
610
611
612
613
614
615
616
    self._dest_host = dest_host
    self._dest_port = dest_port
    self._source = source
    self._source_args = source_args

  def _StartDaemon(self):
    """Starts the export daemon.

    """
617
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
618
                                          self._dest_host, self._dest_port,
619
                                          self._instance, self._component,
620
                                          (self._source, self._source_args))
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637

  def CheckListening(self):
    """Checks whether the daemon is listening.

    """
    # Only an import can be listening
    return True

  def _GetConnectedCheckEpoch(self):
    """Returns the time since the daemon started.

    """
    assert self._ts_begin is not None

    return self._ts_begin


638
639
640
641
def FormatProgress(progress):
  """Formats progress information for user consumption

  """
642
  (mbytes, throughput, percent, eta) = progress
643
644
645
646
647
648
649
650
651
652
653

  parts = [
    utils.FormatUnit(mbytes, "h"),

    # Not using FormatUnit as it doesn't support kilobytes
    "%0.1f MiB/s" % throughput,
    ]

  if percent is not None:
    parts.append("%d%%" % percent)

654
655
  if eta is not None:
    parts.append("ETA %s" % utils.FormatSeconds(eta))
656
657
658
659

  return utils.CommaJoin(parts)


660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
class ImportExportLoop:
  MIN_DELAY = 1.0
  MAX_DELAY = 20.0

  def __init__(self, lu):
    """Initializes this class.

    """
    self._lu = lu
    self._queue = []
    self._pending_add = []

  def Add(self, diskie):
    """Adds an import/export object to the loop.

    @type diskie: Subclass of L{_DiskImportExportBase}
    @param diskie: Import/export object

    """
    assert diskie not in self._pending_add
    assert diskie.loop is None

    diskie.SetLoop(self)

    # Adding new objects to a staging list is necessary, otherwise the main
    # loop gets confused if callbacks modify the queue while the main loop is
    # iterating over it.
    self._pending_add.append(diskie)

  @staticmethod
  def _CollectDaemonStatus(lu, daemons):
    """Collects the status for all import/export daemons.

    """
    daemon_status = {}

    for node_name, names in daemons.iteritems():
      result = lu.rpc.call_impexp_status(node_name, names)
      if result.fail_msg:
        lu.LogWarning("Failed to get daemon status on %s: %s",
                      node_name, result.fail_msg)
        continue

      assert len(names) == len(result.payload)

      daemon_status[node_name] = dict(zip(names, result.payload))

    return daemon_status

  @staticmethod
  def _GetActiveDaemonNames(queue):
    """Gets the names of all active daemons.

    """
    result = {}
    for diskie in queue:
      if not diskie.active:
        continue

      try:
        # Start daemon if necessary
        daemon_name = diskie.CheckDaemon()
      except _ImportExportError, err:
        logging.exception("%s failed", diskie.MODE_TEXT)
        diskie.Finalize(error=str(err))
        continue

      result.setdefault(diskie.node_name, []).append(daemon_name)

    assert len(queue) >= len(result)
    assert len(queue) >= sum([len(names) for names in result.itervalues()])

    logging.debug("daemons=%r", result)

    return result

  def _AddPendingToQueue(self):
    """Adds all pending import/export objects to the internal queue.

    """
    assert compat.all(diskie not in self._queue and diskie.loop == self
                      for diskie in self._pending_add)

    self._queue.extend(self._pending_add)

    del self._pending_add[:]

  def Run(self):
    """Utility main loop.

    """
    while True:
      self._AddPendingToQueue()

      # Collect all active daemon names
      daemons = self._GetActiveDaemonNames(self._queue)
      if not daemons:
        break

      # Collection daemon status data
      data = self._CollectDaemonStatus(self._lu, daemons)

      # Use data
      delay = self.MAX_DELAY
      for diskie in self._queue:
        if not diskie.active:
          continue

        try:
          try:
            all_daemon_data = data[diskie.node_name]
          except KeyError:
            result = diskie.SetDaemonData(False, None)
          else:
            result = \
              diskie.SetDaemonData(True,
                                   all_daemon_data[diskie.GetDaemonName()])

          if not result:
            # Daemon not yet ready, retry soon
            delay = min(3.0, delay)
            continue

          if diskie.CheckFinished():
            # Transfer finished
            diskie.Finalize()
            continue

          # Normal case: check again in 5 seconds
          delay = min(5.0, delay)

          if not diskie.CheckListening():
            # Not yet listening, retry soon
            delay = min(1.0, delay)
            continue

          if not diskie.CheckConnected():
            # Not yet connected, retry soon
            delay = min(1.0, delay)
            continue

        except _ImportExportError, err:
          logging.exception("%s failed", diskie.MODE_TEXT)
          diskie.Finalize(error=str(err))

805
      if not compat.any(diskie.active for diskie in self._queue):
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
        break

      # Wait a bit
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
      logging.debug("Waiting for %ss", delay)
      time.sleep(delay)

  def FinalizeAll(self):
    """Finalizes all pending transfers.

    """
    success = True

    for diskie in self._queue:
      success = diskie.Finalize() and success

    return success
823
824
825
826


class _TransferInstCbBase(ImportExportCbBase):
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
827
               dest_node, dest_ip):
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
    """Initializes this class.

    """
    ImportExportCbBase.__init__(self)

    self.lu = lu
    self.feedback_fn = feedback_fn
    self.instance = instance
    self.timeouts = timeouts
    self.src_node = src_node
    self.src_cbs = src_cbs
    self.dest_node = dest_node
    self.dest_ip = dest_ip


class _TransferInstSourceCb(_TransferInstCbBase):
  def ReportConnected(self, ie, dtp):
    """Called when a connection has been established.

    """
    assert self.src_cbs is None
    assert dtp.src_export == ie
    assert dtp.dest_import

    self.feedback_fn("%s is sending data on %s" %
                     (dtp.data.name, ie.node_name))

855
856
857
858
859
860
861
862
863
864
  def ReportProgress(self, ie, dtp):
    """Called when new progress information should be reported.

    """
    progress = ie.progress
    if not progress:
      return

    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))

865
866
867
868
869
870
871
872
873
874
875
  def ReportFinished(self, ie, dtp):
    """Called when a transfer has finished.

    """
    assert self.src_cbs is None
    assert dtp.src_export == ie
    assert dtp.dest_import

    if ie.success:
      self.feedback_fn("%s finished sending data" % dtp.data.name)
    else:
876
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
                       (dtp.data.name, ie.final_message, ie.recent_output))

    dtp.RecordResult(ie.success)

    cb = dtp.data.finished_fn
    if cb:
      cb()

    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
    # give the daemon a moment to sort things out
    if dtp.dest_import and not ie.success:
      dtp.dest_import.Abort()


class _TransferInstDestCb(_TransferInstCbBase):
892
  def ReportListening(self, ie, dtp, component):
893
894
895
896
897
898
    """Called when daemon started listening.

    """
    assert self.src_cbs
    assert dtp.src_export is None
    assert dtp.dest_import
899
    assert dtp.export_opts
900
901
902
903

    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)

    # Start export on source node
904
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
905
906
                    self.dest_ip, ie.listen_port, self.instance,
                    component, dtp.data.src_io, dtp.data.src_ioargs,
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
                    self.timeouts, self.src_cbs, private=dtp)
    ie.loop.Add(de)

    dtp.src_export = de

  def ReportConnected(self, ie, dtp):
    """Called when a connection has been established.

    """
    self.feedback_fn("%s is receiving data on %s" %
                     (dtp.data.name, self.dest_node))

  def ReportFinished(self, ie, dtp):
    """Called when a transfer has finished.

    """
    if ie.success:
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
    else:
926
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
                       (dtp.data.name, ie.final_message, ie.recent_output))

    dtp.RecordResult(ie.success)

    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
    # give the daemon a moment to sort things out
    if dtp.src_export and not ie.success:
      dtp.src_export.Abort()


class DiskTransfer(object):
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
               finished_fn):
    """Initializes this class.

    @type name: string
    @param name: User-visible name for this transfer (e.g. "disk/0")
    @param src_io: Source I/O type
    @param src_ioargs: Source I/O arguments
    @param dest_io: Destination I/O type
    @param dest_ioargs: Destination I/O arguments
    @type finished_fn: callable
    @param finished_fn: Function called once transfer has finished

    """
    self.name = name

    self.src_io = src_io
    self.src_ioargs = src_ioargs

    self.dest_io = dest_io
    self.dest_ioargs = dest_ioargs

    self.finished_fn = finished_fn


class _DiskTransferPrivate(object):
964
  def __init__(self, data, success, export_opts):
965
966
967
968
969
970
971
    """Initializes this class.

    @type data: L{DiskTransfer}
    @type success: bool

    """
    self.data = data
972
973
    self.success = success
    self.export_opts = export_opts
974
975
976
977
978
979
980
981
982
983
984
985
986

    self.src_export = None
    self.dest_import = None

  def RecordResult(self, success):
    """Updates the status.

    One failed part will cause the whole transfer to fail.

    """
    self.success = self.success and success


987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
def _GetInstDiskMagic(base, instance_name, index):
  """Computes the magic value for a disk export or import.

  @type base: string
  @param base: Random seed value (can be the same for all disks of a transfer)
  @type instance_name: string
  @param instance_name: Name of instance
  @type index: number
  @param index: Disk index

  """
  h = compat.sha1_hash()
  h.update(str(constants.RIE_VERSION))
  h.update(base)
  h.update(instance_name)
  h.update(str(index))
  return h.hexdigest()


1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
                         instance, all_transfers):
  """Transfers an instance's data from one node to another.

  @param lu: Logical unit instance
  @param feedback_fn: Feedback function
  @type src_node: string
  @param src_node: Source node name
  @type dest_node: string
  @param dest_node: Destination node name
  @type dest_ip: string
  @param dest_ip: IP address of destination node
  @type instance: L{objects.Instance}
  @param instance: Instance object
  @type all_transfers: list of L{DiskTransfer} instances
  @param all_transfers: List of all disk transfers to be made
  @rtype: list
  @return: List with a boolean (True=successful, False=failed) for success for
           each transfer

  """
1027
1028
  # Disable compression for all moves as these are all within the same cluster
  compress = constants.IEC_NONE
1029
1030
1031
1032

  logging.debug("Source node %s, destination node %s, compression '%s'",
                src_node, dest_node, compress)

1033
1034
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1035
                                  src_node, None, dest_node, dest_ip)
1036
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1037
                                 src_node, src_cbs, dest_node, dest_ip)
1038
1039
1040

  all_dtp = []

1041
1042
  base_magic = utils.GenerateSecret(6)

1043
1044
  ieloop = ImportExportLoop(lu)
  try:
1045
    for idx, transfer in enumerate(all_transfers):
1046
1047
1048
1049
      if transfer:
        feedback_fn("Exporting %s from %s to %s" %
                    (transfer.name, src_node, dest_node))

1050
1051
1052
1053
1054
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
                                           compress=compress, magic=magic)

        dtp = _DiskTransferPrivate(transfer, True, opts)
1055

1056
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1057
1058
1059
1060
1061
1062
                        transfer.dest_io, transfer.dest_ioargs,
                        timeouts, dest_cbs, private=dtp)
        ieloop.Add(di)

        dtp.dest_import = di
      else:
1063
        dtp = _DiskTransferPrivate(None, False, None)
1064
1065
1066
1067
1068
1069
1070
1071

      all_dtp.append(dtp)

    ieloop.Run()
  finally:
    ieloop.FinalizeAll()

  assert len(all_dtp) == len(all_transfers)
1072
  assert compat.all((dtp.src_export is None or
1073
1074
1075
                      dtp.src_export.success is not None) and
                     (dtp.dest_import is None or
                      dtp.dest_import.success is not None)
1076
                     for dtp in all_dtp), \
1077
1078
1079
         "Not all imports/exports are finalized"

  return [bool(dtp.success) for dtp in all_dtp]
1080
1081


1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
class _RemoteExportCb(ImportExportCbBase):
  def __init__(self, feedback_fn, disk_count):
    """Initializes this class.

    """
    ImportExportCbBase.__init__(self)
    self._feedback_fn = feedback_fn
    self._dresults = [None] * disk_count

  @property
  def disk_results(self):
    """Returns per-disk results.

    """
    return self._dresults

  def ReportConnected(self, ie, private):
    """Called when a connection has been established.

    """
    (idx, _) = private

    self._feedback_fn("Disk %s is now sending data" % idx)

1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
  def ReportProgress(self, ie, private):
    """Called when new progress information should be reported.

    """
    (idx, _) = private

    progress = ie.progress
    if not progress:
      return

    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))

1118
1119
1120
1121
1122
1123
1124
1125
1126
  def ReportFinished(self, ie, private):
    """Called when a transfer has finished.

    """
    (idx, finished_fn) = private

    if ie.success:
      self._feedback_fn("Disk %s finished sending data" % idx)
    else:
1127
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1128
1129
1130
1131
1132
1133
1134
1135
                        (idx, ie.final_message, ie.recent_output))

    self._dresults[idx] = bool(ie.success)

    if finished_fn:
      finished_fn()


1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
class ExportInstanceHelper:
  def __init__(self, lu, feedback_fn, instance):
    """Initializes this class.

    @param lu: Logical unit instance
    @param feedback_fn: Feedback function
    @type instance: L{objects.Instance}
    @param instance: Instance object

    """
    self._lu = lu
    self._feedback_fn = feedback_fn
    self._instance = instance

    self._snap_disks = []
    self._removed_snaps = [False] * len(instance.disks)

  def CreateSnapshots(self):
    """Creates an LVM snapshot for every disk of the instance.

    """
    assert not self._snap_disks

    instance = self._instance
    src_node = instance.primary_node

    for idx, disk in enumerate(instance.disks):
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
                        (idx, src_node))

      # result.payload will be a snapshot of an lvm leaf of the one we
      # passed
René Nussbaumer's avatar
René Nussbaumer committed
1168
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
Iustin Pop's avatar
Iustin Pop committed
1169
      new_dev = False
1170
1171
1172
1173
      msg = result.fail_msg
      if msg:
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
                            idx, src_node, msg)
Iustin Pop's avatar
Iustin Pop committed
1174
1175
1176
1177
      elif (not isinstance(result.payload, (tuple, list)) or
            len(result.payload) != 2):
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
                            " result '%s'", idx, src_node, result.payload)
1178
      else:
Iustin Pop's avatar
Iustin Pop committed
1179
        disk_id = tuple(result.payload)
1180
        disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1181
1182
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
                               logical_id=disk_id, physical_id=disk_id,
1183
1184
                               iv_name=disk.iv_name,
                               params=disk_params)
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230

      self._snap_disks.append(new_dev)

    assert len(self._snap_disks) == len(instance.disks)
    assert len(self._removed_snaps) == len(instance.disks)

  def _RemoveSnapshot(self, disk_index):
    """Removes an LVM snapshot.

    @type disk_index: number
    @param disk_index: Index of the snapshot to be removed

    """
    disk = self._snap_disks[disk_index]
    if disk and not self._removed_snaps[disk_index]:
      src_node = self._instance.primary_node

      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
                        (disk_index, src_node))

      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
      if result.fail_msg:
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
                            " %s: %s", disk_index, src_node, result.fail_msg)
      else:
        self._removed_snaps[disk_index] = True

  def LocalExport(self, dest_node):
    """Intra-cluster instance export.

    @type dest_node: L{objects.Node}
    @param dest_node: Destination node

    """
    instance = self._instance
    src_node = instance.primary_node

    assert len(self._snap_disks) == len(instance.disks)

    transfers = []

    for idx, dev in enumerate(self._snap_disks):
      if not dev:
        transfers.append(None)
        continue

1231
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
                            dev.physical_id[1])

      finished_fn = compat.partial(self._TransferFinished, idx)

      # FIXME: pass debug option from opcode to backend
      dt = DiskTransfer("snapshot/%s" % idx,
                        constants.IEIO_SCRIPT, (dev, idx),
                        constants.IEIO_FILE, (path, ),
                        finished_fn)
      transfers.append(dt)

    # Actually export data
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
                                    src_node, dest_node.name,
                                    dest_node.secondary_ip,
                                    instance, transfers)

    assert len(dresults) == len(instance.disks)

    self._feedback_fn("Finalizing export on %s" % dest_node.name)
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
                                               self._snap_disks)
    msg = result.fail_msg
    fin_resu = not msg
    if msg:
      self._lu.LogWarning("Could not finalize export for instance %s"
                          " on node %s: %s", instance.name, dest_node.name, msg)

    return (fin_resu, dresults)

1262
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1263
1264
1265
1266
    """Inter-cluster instance export.

    @type disk_info: list
    @param disk_info: Per-disk destination information
1267
1268
1269
1270
    @type key_name: string
    @param key_name: Name of X509 key to use
    @type dest_ca_pem: string
    @param dest_ca_pem: Destination X509 CA in PEM format
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
    @type timeouts: L{ImportExportTimeouts}
    @param timeouts: Timeouts for this import

    """
    instance = self._instance

    assert len(disk_info) == len(instance.disks)

    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))

    ieloop = ImportExportLoop(self._lu)
    try:
1283
1284
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
                                                           disk_info)):
1285
1286
1287
        # Decide whether to use IPv6
        ipv6 = netutils.IP6Address.IsValid(host)

1288
1289
        opts = objects.ImportExportOptions(key_name=key_name,
                                           ca_pem=dest_ca_pem,
1290
                                           magic=magic, ipv6=ipv6)
1291

1292
1293
1294
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
        finished_fn = compat.partial(self._TransferFinished, idx)
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1295
                              opts, host, port, instance, "disk%d" % idx,
1296
1297
1298
1299
1300
1301
1302
1303
1304
                              constants.IEIO_SCRIPT, (dev, idx),
                              timeouts, cbs, private=(idx, finished_fn)))

      ieloop.Run()
    finally:
      ieloop.FinalizeAll()

    return (True, cbs.disk_results)

1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
  def _TransferFinished(self, idx):
    """Called once a transfer has finished.

    @type idx: number
    @param idx: Disk index

    """
    logging.debug("Transfer %s finished", idx)
    self._RemoveSnapshot(idx)

  def Cleanup(self):
    """Remove all snapshots.

    """
    assert len(self._removed_snaps) == len(self._instance.disks)
    for idx in range(len(self._instance.disks)):
      self._RemoveSnapshot(idx)
1322
1323


1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
class _RemoteImportCb(ImportExportCbBase):
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
               external_address):
    """Initializes this class.

    @type cds: string
    @param cds: Cluster domain secret
    @type x509_cert_pem: string
    @param x509_cert_pem: CA used for signing import key
    @type disk_count: number
    @param disk_count: Number of disks
    @type external_address: string
    @param external_address: External address of destination node

    """
    ImportExportCbBase.__init__(self)
    self._feedback_fn = feedback_fn
    self._cds = cds
    self._x509_cert_pem = x509_cert_pem
    self._disk_count = disk_count
    self._external_address = external_address

    self._dresults = [None] * disk_count
    self._daemon_port = [None] * disk_count

    self._salt = utils.GenerateSecret(8)

  @property
  def disk_results(self):
    """Returns per-disk results.

    """
    return self._dresults

  def _CheckAllListening(self):
    """Checks whether all daemons are listening.

    If all daemons are listening, the information is sent to the client.

    """
    if not compat.all(dp is not None for dp in self._daemon_port):
      return

    host = self._external_address

    disks = []
1370
    for idx, (port, magic) in enumerate(self._daemon_port):
1371
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1372
                                               idx, host, port, magic))
1373
1374
1375
1376
1377
1378
1379
1380

    assert len(disks) == self._disk_count

    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
      "disks": disks,
      "x509_ca": self._x509_cert_pem,
      })

1381
  def ReportListening(self, ie, private, _):
1382
1383
1384
1385
1386
1387
1388
1389
1390
    """Called when daemon started listening.

    """
    (idx, ) = private

    self._feedback_fn("Disk %s is now listening" % idx)

    assert self._daemon_port[idx] is None

1391
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415

    self._CheckAllListening()

  def ReportConnected(self, ie, private):
    """Called when a connection has been established.

    """
    (idx, ) = private

    self._feedback_fn("Disk %s is now receiving data" % idx)

  def ReportFinished(self, ie, private):
    """Called when a transfer has finished.

    """
    (idx, ) = private

    # Daemon is certainly no longer listening
    self._daemon_port[idx] = None

    if ie.success:
      self._feedback_fn("Disk %s finished receiving data" % idx)
    else:
      self._feedback_fn(("Disk %s failed to receive data: %s"
1416
                         " (recent output: %s)") %
1417
1418
1419
1420
1421
                        (idx, ie.final_message, ie.recent_output))

    self._dresults[idx] = bool(ie.success)


1422
1423
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
                 cds, timeouts):
1424
1425
1426
1427
1428
1429
  """Imports an instance from another cluster.

  @param lu: Logical unit instance
  @param feedback_fn: Feedback function
  @type instance: L{objects.Instance}
  @param instance: Instance object
1430
1431
  @type pnode: L{objects.Node}
  @param pnode: Primary node of instance as an object
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
  @type source_x509_ca: OpenSSL.crypto.X509
  @param source_x509_ca: Import source's X509 CA
  @type cds: string
  @param cds: Cluster domain secret
  @type timeouts: L{ImportExportTimeouts}
  @param timeouts: Timeouts for this import

  """
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                                  source_x509_ca)

1443
1444
  magic_base = utils.GenerateSecret(6)

1445
1446
1447
  # Decide whether to use IPv6
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)

1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
  # Create crypto key
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
                                        constants.RIE_CERT_VALIDITY)
  result.Raise("Can't create X509 key and certificate on %s" % result.node)

  (x509_key_name, x509_cert_pem) = result.payload
  try:
    # Load certificate
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                                x509_cert_pem)

    # Sign certificate
    signed_x509_cert_pem = \
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))

    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1464
                          len(instance.disks), pnode.primary_ip)
1465
1466
1467
1468

    ieloop = ImportExportLoop(lu)
    try:
      for idx, dev in enumerate(instance.disks):
1469
1470
1471
1472
1473
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)

        # Import daemon options
        opts = objects.ImportExportOptions(key_name=x509_key_name,
                                           ca_pem=source_ca_pem,
1474
                                           magic=magic, ipv6=ipv6)
1475

1476
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1477
                              "disk%d" % idx,
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
                              constants.IEIO_SCRIPT, (dev, idx),
                              timeouts, cbs, private=(idx, )))

      ieloop.Run()
    finally:
      ieloop.FinalizeAll()
  finally:
    # Remove crypto key and certificate
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)

  return cbs.disk_results


1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
def _GetImportExportHandshakeMessage(version):
  """Returns the handshake message for a RIE protocol version.

  @type version: number

  """
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)


def ComputeRemoteExportHandshake(cds):
  """Computes the remote import/export handshake.

  @type cds: string
  @param cds: Cluster domain secret

  """
  salt = utils.GenerateSecret(8)
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)


def CheckRemoteExportHandshake(cds, handshake):
  """Checks the handshake of a remote import/export.

  @type cds: string
  @param cds: Cluster domain secret
  @type handshake: sequence
  @param handshake: Handshake sent by remote peer

  """
  try:
    (version, hmac_digest, hmac_salt) = handshake
  except (TypeError, ValueError), err:
    return "Invalid data: %s" % err

  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
                              hmac_digest, salt=hmac_salt):
    return "Hash didn't match, clusters don't share the same domain secret"

  if version != constants.RIE_VERSION:
    return ("Clusters don't have the same remote import/export protocol"
            " (local=%s, remote=%s)" %
            (constants.RIE_VERSION, version))

  return None
1537
1538


1539
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1540
1541
1542
1543
1544
1545
1546
1547
  """Returns the hashed text for import/export disk information.

  @type disk_index: number
  @param disk_index: Index of disk (included in hash)
  @type host: string
  @param host: Hostname
  @type port: number
  @param port: Daemon port
1548
1549
  @type magic: string
  @param magic: Magic value
1550
1551

  """
1552
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566


def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
  """Verifies received disk information for an export.

  @type cds: string
  @param cds: Cluster domain secret
  @type disk_index: number
  @param disk_index: Index of disk (included in hash)
  @type disk_info: sequence
  @param disk_info: Disk information sent by remote peer

  """
  try:
1567
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1568
1569
1570
  except (TypeError, ValueError), err:
    raise errors.GenericError("Invalid data: %s" % err)

1571
1572
  if not (host and port and magic):
    raise errors.GenericError("Missing destination host, port or magic")
1573

1574
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1575
1576
1577
1578

  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
    raise errors.GenericError("HMAC is wrong")

1579
1580
1581
1582
1583
1584
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
    destination = host
  else:
    destination = netutils.Hostname.GetNormalizedName(host)

  return (destination,
1585
1586
          utils.ValidateServiceName(port),
          magic)
1587
1588


1589
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
  """Computes the signed disk information for a remote import.

  @type cds: string
  @param cds: Cluster domain secret
  @type salt: string
  @param salt: HMAC salt
  @type disk_index: number
  @param disk_index: Index of disk (included in hash)
  @type host: string
  @param host: Hostname
  @type port: number
  @param port: Daemon port
1602
1603
  @type magic: string
  @param magic: Magic value
1604
1605

  """
1606
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1607
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1608
  return (host, port, magic, hmac_digest, salt)
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639


def CalculateGroupIPolicy(cluster, group):
  """Calculate instance policy for group.

  """
  return cluster.SimpleFillIPolicy(group.ipolicy)


def ComputeDiskSize(disk_template, disks):
  """Compute disk size requirements according to disk template

  """
  # Required free disk space as a function of disk and swap space
  req_size_dict = {
    constants.DT_DISKLESS: None,
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
    # 128 MB are added for drbd metadata for each disk
    constants.DT_DRBD8:
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
    constants.DT_BLOCK: 0,
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
  }

  if disk_template not in req_size_dict:
    raise errors.ProgrammerError("Disk template '%s' size requirement"
                                 " is unknown" % disk_template)

  return req_size_dict[disk_template]