diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py index b830dfe2838fa50b32a0d902b7ab549e09d03856..1e13d111cb2e781586c3ba5482ed18c0bb093f36 100644 --- a/lib/masterd/instance.py +++ b/lib/masterd/instance.py @@ -50,17 +50,22 @@ class ImportExportTimeouts(object): #: Time after which daemon must be listening DEFAULT_LISTEN_TIMEOUT = 10 + #: Progress update interval + DEFAULT_PROGRESS_INTERVAL = 60 + __slots__ = [ "error", "ready", "listen", "connect", + "progress", ] def __init__(self, connect, listen=DEFAULT_LISTEN_TIMEOUT, error=DEFAULT_ERROR_TIMEOUT, - ready=DEFAULT_READY_TIMEOUT): + ready=DEFAULT_READY_TIMEOUT, + progress=DEFAULT_PROGRESS_INTERVAL): """Initializes this class. @type connect: number @@ -71,12 +76,15 @@ class ImportExportTimeouts(object): @param error: Length of time until errors cause hard failure @type ready: number @param ready: Timeout for daemon to become ready + @type progress: number + @param progress: Progress update interval """ self.error = error self.ready = ready self.listen = listen self.connect = connect + self.progress = progress class ImportExportCbBase(object): @@ -101,6 +109,15 @@ class ImportExportCbBase(object): """ + def ReportProgress(self, ie, private): + """Called when new progress information should be reported. + + @type ie: Subclass of L{_DiskImportExportBase} + @param ie: Import/export object + @param private: Private data passed to import/export object + + """ + def ReportFinished(self, ie, private): """Called when a transfer has finished. @@ -157,6 +174,7 @@ class _DiskImportExportBase(object): self._ts_connected = None self._ts_finished = None self._ts_cleanup = None + self._ts_last_progress = None self._ts_last_error = None # Transfer status @@ -177,6 +195,19 @@ class _DiskImportExportBase(object): return None + @property + def progress(self): + """Returns transfer progress information. + + """ + if not self._daemon: + return None + + return (self._daemon.progress_mbytes, + self._daemon.progress_throughput, + self._daemon.progress_percent, + self._daemon.progress_eta) + @property def active(self): """Determines whether this transport is still active. @@ -345,6 +376,18 @@ class _DiskImportExportBase(object): return False + def _CheckProgress(self): + """Checks whether a progress update should be reported. + + """ + if ((self._ts_last_progress is None or + _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and + self._daemon and + self._daemon.progress_mbytes is not None and + self._daemon.progress_throughput is not None): + self._cbs.ReportProgress(self, self._private) + self._ts_last_progress = time.time() + def CheckFinished(self): """Checks whether the daemon exited. @@ -358,6 +401,8 @@ class _DiskImportExportBase(object): return True if self._daemon.exit_status is None: + # TODO: Adjust delay for ETA expiring soon + self._CheckProgress() return False self._ts_finished = time.time() @@ -404,8 +449,6 @@ class _DiskImportExportBase(object): """Finalizes this import/export. """ - assert error or self.success is not None - if self._daemon_name: logging.info("Finalizing %s %r on %s", self.MODE_TEXT, self._daemon_name, self.node_name) @@ -576,6 +619,27 @@ class DiskExport(_DiskImportExportBase): return self._ts_begin +def FormatProgress(progress): + """Formats progress information for user consumption + + """ + (mbytes, throughput, percent, _) = progress + + parts = [ + utils.FormatUnit(mbytes, "h"), + + # Not using FormatUnit as it doesn't support kilobytes + "%0.1f MiB/s" % throughput, + ] + + if percent is not None: + parts.append("%d%%" % percent) + + # TODO: Format ETA + + return utils.CommaJoin(parts) + + class ImportExportLoop: MIN_DELAY = 1.0 MAX_DELAY = 20.0 @@ -772,6 +836,16 @@ class _TransferInstSourceCb(_TransferInstCbBase): self.feedback_fn("%s is sending data on %s" % (dtp.data.name, ie.node_name)) + def ReportProgress(self, ie, dtp): + """Called when new progress information should be reported. + + """ + progress = ie.progress + if not progress: + return + + self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress))) + def ReportFinished(self, ie, dtp): """Called when a transfer has finished. @@ -993,6 +1067,18 @@ class _RemoteExportCb(ImportExportCbBase): self._feedback_fn("Disk %s is now sending data" % idx) + def ReportProgress(self, ie, private): + """Called when new progress information should be reported. + + """ + (idx, _) = private + + progress = ie.progress + if not progress: + return + + self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress))) + def ReportFinished(self, ie, private): """Called when a transfer has finished. diff --git a/test/ganeti.masterd.instance_unittest.py b/test/ganeti.masterd.instance_unittest.py index 06228b3fe25360f4a67d75d7c0cdd6971cc415ed..066da7d764819a0cdd98284ff6114567c6183b5f 100755 --- a/test/ganeti.masterd.instance_unittest.py +++ b/test/ganeti.masterd.instance_unittest.py @@ -33,7 +33,8 @@ from ganeti import masterd from ganeti.masterd.instance import \ ImportExportTimeouts, _TimeoutExpired, _DiskImportExportBase, \ ComputeRemoteExportHandshake, CheckRemoteExportHandshake, \ - ComputeRemoteImportDiskInfo, CheckRemoteExportDiskInfo + ComputeRemoteImportDiskInfo, CheckRemoteExportDiskInfo, \ + FormatProgress import testutils @@ -45,10 +46,19 @@ class TestMisc(unittest.TestCase): self.assertEqual(tmo.listen, ImportExportTimeouts.DEFAULT_LISTEN_TIMEOUT) self.assertEqual(tmo.ready, ImportExportTimeouts.DEFAULT_READY_TIMEOUT) self.assertEqual(tmo.error, ImportExportTimeouts.DEFAULT_ERROR_TIMEOUT) + self.assertEqual(tmo.progress, + ImportExportTimeouts.DEFAULT_PROGRESS_INTERVAL) tmo = ImportExportTimeouts(999) self.assertEqual(tmo.connect, 999) + tmo = ImportExportTimeouts(1, listen=2, error=3, ready=4, progress=5) + self.assertEqual(tmo.connect, 1) + self.assertEqual(tmo.listen, 2) + self.assertEqual(tmo.error, 3) + self.assertEqual(tmo.ready, 4) + self.assertEqual(tmo.progress, 5) + def testTimeoutExpired(self): self.assert_(_TimeoutExpired(100, 300, _time_fn=lambda: 500)) self.assertFalse(_TimeoutExpired(100, 300, _time_fn=lambda: 0)) @@ -119,5 +129,15 @@ class TestRieDiskInfo(unittest.TestCase): cds, 0, ("nodeX", 123, "fakehash", "xyz")) +class TestFormatProgress(unittest.TestCase): + def test(self): + FormatProgress((0, 0, None, None)) + FormatProgress((100, 3.3, 30, None)) + FormatProgress((100, 3.3, 30, 900)) + + self.assertEqual(FormatProgress((1500, 12, 30, None)), + "1.5G, 12.0 MiB/s, 30%") + + if __name__ == "__main__": testutils.GanetiTestProgram()