diff --git a/Makefile.am b/Makefile.am index 67f453bdcae82f7a88da209b140fa6b282346839..6b27721a3cfbb70c4fa1e6b026815ca0c2561a53 100644 --- a/Makefile.am +++ b/Makefile.am @@ -144,7 +144,8 @@ confd_PYTHON = \ lib/confd/server.py masterd_PYTHON = \ - lib/masterd/__init__.py + lib/masterd/__init__.py \ + lib/masterd/instance.py docrst = \ doc/admin.rst \ @@ -345,6 +346,7 @@ python_tests = \ test/ganeti.http_unittest.py \ test/ganeti.locking_unittest.py \ test/ganeti.luxi_unittest.py \ + test/ganeti.masterd.instance_unittest.py \ test/ganeti.mcpu_unittest.py \ test/ganeti.objects_unittest.py \ test/ganeti.opcodes_unittest.py \ diff --git a/lib/constants.py b/lib/constants.py index 1086271ad2b1f3546123dca2773097d8caf48607..8e566391e79eacd156ace6630b8d5cb4b570d130 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -376,6 +376,8 @@ LVM_STRIPECOUNT = _autoconf.LVM_STRIPECOUNT # default maximum instance wait time, in seconds. DEFAULT_SHUTDOWN_TIMEOUT = 120 NODE_MAX_CLOCK_SKEW = 150 +# Time for an intra-cluster disk transfer to wait for a connection +DISK_TRANSFER_CONNECT_TIMEOUT = 30 # runparts results (RUNPARTS_SKIP, diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py new file mode 100644 index 0000000000000000000000000000000000000000..3c3ed08d6e3d09ef454c94c2e85a1182852a3dc8 --- /dev/null +++ b/lib/masterd/instance.py @@ -0,0 +1,749 @@ +# +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Instance-related functions and classes for masterd. + +""" + +import logging +import time + +from ganeti import constants +from ganeti import errors +from ganeti import compat + + +class _ImportExportError(Exception): + """Local exception to report import/export errors. + + """ + + +class ImportExportTimeouts(object): + #: Time until daemon starts writing status file + DEFAULT_READY_TIMEOUT = 10 + + #: Length of time until errors cause hard failure + DEFAULT_ERROR_TIMEOUT = 10 + + #: Time after which daemon must be listening + DEFAULT_LISTEN_TIMEOUT = 10 + + __slots__ = [ + "error", + "ready", + "listen", + "connect", + ] + + def __init__(self, connect, + listen=DEFAULT_LISTEN_TIMEOUT, + error=DEFAULT_ERROR_TIMEOUT, + ready=DEFAULT_READY_TIMEOUT): + """Initializes this class. + + @type connect: number + @param connect: Timeout for establishing connection + @type listen: number + @param listen: Timeout for starting to listen for connections + @type error: number + @param error: Length of time until errors cause hard failure + @type ready: number + @param ready: Timeout for daemon to become ready + + """ + self.error = error + self.ready = ready + self.listen = listen + self.connect = connect + + +class ImportExportCbBase(object): + """Callbacks for disk import/export. + + """ + def ReportListening(self, ie, private): + """Called when daemon started listening. + + @type ie: Subclass of L{_DiskImportExportBase} + @param ie: Import/export object + @param private: Private data passed to import/export object + + """ + + def ReportConnected(self, ie, private): + """Called when a connection has been established. + + @type ie: Subclass of L{_DiskImportExportBase} + @param ie: Import/export object + @param private: Private data passed to import/export object + + """ + + def ReportFinished(self, ie, private): + """Called when a transfer has finished. + + @type ie: Subclass of L{_DiskImportExportBase} + @param ie: Import/export object + @param private: Private data passed to import/export object + + """ + + +def _TimeoutExpired(epoch, timeout, _time_fn=time.time): + """Checks whether a timeout has expired. + + """ + return _time_fn() > (epoch + timeout) + + +class _DiskImportExportBase(object): + MODE_TEXT = None + + def __init__(self, lu, node_name, x509_key_name, remote_x509_ca, + instance, timeouts, cbs, private=None): + """Initializes this class. + + @param lu: Logical unit instance + @type node_name: string + @param node_name: Node name for import + @type x509_key_name: string + @param x509_key_name: Name of X509 key (None for node daemon key) + @type remote_x509_ca: string + @param remote_x509_ca: Remote peer's CA (None for node daemon certificate) + @type instance: L{objects.Instance} + @param instance: Instance object + @type timeouts: L{ImportExportTimeouts} + @param timeouts: Timeouts for this import + @type cbs: L{ImportExportCbBase} + @param cbs: Callbacks + @param private: Private data for callback functions + + """ + assert self.MODE_TEXT + + self._lu = lu + self.node_name = node_name + self._x509_key_name = x509_key_name + self._remote_x509_ca = remote_x509_ca + self._instance = instance + self._timeouts = timeouts + self._cbs = cbs + self._private = private + + # Parent loop + self._loop = None + + # Timestamps + self._ts_begin = None + self._ts_connected = None + self._ts_finished = None + self._ts_cleanup = None + self._ts_last_error = None + + # Transfer status + self.success = None + self.final_message = None + + # Daemon status + self._daemon_name = None + self._daemon = None + + @property + def recent_output(self): + """Returns the most recent output from the daemon. + + """ + if self._daemon: + return self._daemon.recent_output + + return None + + @property + def active(self): + """Determines whether this transport is still active. + + """ + return self.success is None + + @property + def loop(self): + """Returns parent loop. + + @rtype: L{ImportExportLoop} + + """ + return self._loop + + def SetLoop(self, loop): + """Sets the parent loop. + + @type loop: L{ImportExportLoop} + + """ + if self._loop: + raise errors.ProgrammerError("Loop can only be set once") + + self._loop = loop + + def _StartDaemon(self): + """Starts the import/export daemon. + + """ + raise NotImplementedError() + + def CheckDaemon(self): + """Checks whether daemon has been started and if not, starts it. + + @rtype: string + @return: Daemon name + + """ + assert self._ts_cleanup is None + + if self._daemon_name is None: + assert self._ts_begin is None + + result = self._StartDaemon() + if result.fail_msg: + raise _ImportExportError("Failed to start %s on %s: %s" % + (self.MODE_TEXT, self.node_name, + result.fail_msg)) + + daemon_name = result.payload + + logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name, + self.node_name) + + self._ts_begin = time.time() + self._daemon_name = daemon_name + + return self._daemon_name + + def GetDaemonName(self): + """Returns the daemon name. + + """ + assert self._daemon_name, "Daemon has not been started" + assert self._ts_cleanup is None + return self._daemon_name + + def Abort(self): + """Sends SIGTERM to import/export daemon (if still active). + + """ + if self._daemon_name: + self._lu.LogWarning("Aborting %s %r on %s", + self.MODE_TEXT, self._daemon_name, self.node_name) + result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) + if result.fail_msg: + self._lu.LogWarning("Failed to abort %s %r on %s: %s", + self.MODE_TEXT, self._daemon_name, + self.node_name, result.fail_msg) + return False + + return True + + def _SetDaemonData(self, data): + """Internal function for updating status daemon data. + + @type data: L{objects.ImportExportStatus} + @param data: Daemon status data + + """ + assert self._ts_begin is not None + + if not data: + if _TimeoutExpired(self._ts_begin, self._timeouts.ready): + raise _ImportExportError("Didn't become ready after %s seconds" % + self._timeouts.ready) + + return False + + self._daemon = data + + return True + + def SetDaemonData(self, success, data): + """Updates daemon status data. + + @type success: bool + @param success: Whether fetching data was successful or not + @type data: L{objects.ImportExportStatus} + @param data: Daemon status data + + """ + if not success: + if self._ts_last_error is None: + self._ts_last_error = time.time() + + elif _TimeoutExpired(self._ts_last_error, self._timeouts.error): + raise _ImportExportError("Too many errors while updating data") + + return False + + self._ts_last_error = None + + return self._SetDaemonData(data) + + def CheckListening(self): + """Checks whether the daemon is listening. + + """ + raise NotImplementedError() + + def _GetConnectedCheckEpoch(self): + """Returns timeout to calculate connect timeout. + + """ + raise NotImplementedError() + + def CheckConnected(self): + """Checks whether the daemon is connected. + + @rtype: bool + @return: Whether the daemon is connected + + """ + assert self._daemon, "Daemon status missing" + + if self._ts_connected is not None: + return True + + if self._daemon.connected: + self._ts_connected = time.time() + + # TODO: Log remote peer + logging.debug("%s %r on %s is now connected", + self.MODE_TEXT, self._daemon_name, self.node_name) + + self._cbs.ReportConnected(self, self._private) + + return True + + if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect): + raise _ImportExportError("Not connected after %s seconds" % + self._timeouts.connect) + + return False + + def CheckFinished(self): + """Checks whether the daemon exited. + + @rtype: bool + @return: Whether the transfer is finished + + """ + assert self._daemon, "Daemon status missing" + + if self._ts_finished: + return True + + if self._daemon.exit_status is None: + return False + + self._ts_finished = time.time() + + self._ReportFinished(self._daemon.exit_status == 0, + self._daemon.error_message) + + return True + + def _ReportFinished(self, success, message): + """Transfer is finished or daemon exited. + + @type success: bool + @param success: Whether the transfer was successful + @type message: string + @param message: Error message + + """ + assert self.success is None + + self.success = success + self.final_message = message + + if success: + logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name, + self.node_name) + elif self._daemon_name: + self._lu.LogWarning("%s %r on %s failed: %s", + self.MODE_TEXT, self._daemon_name, self.node_name, + message) + else: + self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, + self.node_name, message) + + self._cbs.ReportFinished(self, self._private) + + def _Finalize(self): + """Makes the RPC call to finalize this import/export. + + """ + return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name) + + def Finalize(self, error=None): + """Finalizes this import/export. + + """ + assert error or self.success is not None + + if self._daemon_name: + logging.info("Finalizing %s %r on %s", + self.MODE_TEXT, self._daemon_name, self.node_name) + + result = self._Finalize() + if result.fail_msg: + self._lu.LogWarning("Failed to finalize %s %r on %s: %s", + self.MODE_TEXT, self._daemon_name, + self.node_name, result.fail_msg) + return False + + # Daemon is no longer running + self._daemon_name = None + self._ts_cleanup = time.time() + + if error: + self._ReportFinished(False, error) + + return True + + +class DiskImport(_DiskImportExportBase): + MODE_TEXT = "import" + + def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance, + dest, dest_args, timeouts, cbs, private=None): + """Initializes this class. + + @param lu: Logical unit instance + @type node_name: string + @param node_name: Node name for import + @type x509_key_name: string + @param x509_key_name: Name of X509 key (None for node daemon key) + @type source_x509_ca: string + @param source_x509_ca: Remote peer's CA (None for node daemon certificate) + @type instance: L{objects.Instance} + @param instance: Instance object + @param dest: I/O destination + @param dest_args: I/O arguments + @type timeouts: L{ImportExportTimeouts} + @param timeouts: Timeouts for this import + @type cbs: L{ImportExportCbBase} + @param cbs: Callbacks + @param private: Private data for callback functions + + """ + _DiskImportExportBase.__init__(self, lu, node_name, + x509_key_name, source_x509_ca, + instance, timeouts, cbs, private) + self._dest = dest + self._dest_args = dest_args + + # Timestamps + self._ts_listening = None + + @property + def listen_port(self): + """Returns the port the daemon is listening on. + + """ + if self._daemon: + return self._daemon.listen_port + + return None + + def _StartDaemon(self): + """Starts the import daemon. + + """ + return self._lu.rpc.call_import_start(self.node_name, + self._x509_key_name, + self._remote_x509_ca, self._instance, + self._dest, self._dest_args) + + def CheckListening(self): + """Checks whether the daemon is listening. + + @rtype: bool + @return: Whether the daemon is listening + + """ + assert self._daemon, "Daemon status missing" + + if self._ts_listening is not None: + return True + + port = self._daemon.listen_port + if port is not None: + self._ts_listening = time.time() + + logging.debug("Import %r on %s is now listening on port %s", + self._daemon_name, self.node_name, port) + + self._cbs.ReportListening(self, self._private) + + return True + + if _TimeoutExpired(self._ts_begin, self._timeouts.listen): + raise _ImportExportError("Not listening after %s seconds" % + self._timeouts.listen) + + return False + + def _GetConnectedCheckEpoch(self): + """Returns the time since we started listening. + + """ + assert self._ts_listening is not None, \ + ("Checking whether an import is connected is only useful" + " once it's been listening") + + return self._ts_listening + + +class DiskExport(_DiskImportExportBase): + MODE_TEXT = "export" + + def __init__(self, lu, node_name, x509_key_name, dest_x509_ca, + dest_host, dest_port, instance, source, source_args, + timeouts, cbs, private=None): + """Initializes this class. + + @param lu: Logical unit instance + @type node_name: string + @param node_name: Node name for import + @type x509_key_name: string + @param x509_key_name: Name of X509 key (None for node daemon key) + @type dest_x509_ca: string + @param dest_x509_ca: Remote peer's CA (None for node daemon certificate) + @type dest_host: string + @param dest_host: Destination host name or IP address + @type dest_port: number + @param dest_port: Destination port number + @type instance: L{objects.Instance} + @param instance: Instance object + @param source: I/O source + @param source_args: I/O source + @type timeouts: L{ImportExportTimeouts} + @param timeouts: Timeouts for this import + @type cbs: L{ImportExportCbBase} + @param cbs: Callbacks + @param private: Private data for callback functions + + """ + _DiskImportExportBase.__init__(self, lu, node_name, + x509_key_name, dest_x509_ca, + instance, timeouts, cbs, private) + self._dest_host = dest_host + self._dest_port = dest_port + self._source = source + self._source_args = source_args + + def _StartDaemon(self): + """Starts the export daemon. + + """ + return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name, + self._remote_x509_ca, + self._dest_host, self._dest_port, + self._instance, self._source, + self._source_args) + + def CheckListening(self): + """Checks whether the daemon is listening. + + """ + # Only an import can be listening + return True + + def _GetConnectedCheckEpoch(self): + """Returns the time since the daemon started. + + """ + assert self._ts_begin is not None + + return self._ts_begin + + +class ImportExportLoop: + MIN_DELAY = 1.0 + MAX_DELAY = 20.0 + + def __init__(self, lu): + """Initializes this class. + + """ + self._lu = lu + self._queue = [] + self._pending_add = [] + + def Add(self, diskie): + """Adds an import/export object to the loop. + + @type diskie: Subclass of L{_DiskImportExportBase} + @param diskie: Import/export object + + """ + assert diskie not in self._pending_add + assert diskie.loop is None + + diskie.SetLoop(self) + + # Adding new objects to a staging list is necessary, otherwise the main + # loop gets confused if callbacks modify the queue while the main loop is + # iterating over it. + self._pending_add.append(diskie) + + @staticmethod + def _CollectDaemonStatus(lu, daemons): + """Collects the status for all import/export daemons. + + """ + daemon_status = {} + + for node_name, names in daemons.iteritems(): + result = lu.rpc.call_impexp_status(node_name, names) + if result.fail_msg: + lu.LogWarning("Failed to get daemon status on %s: %s", + node_name, result.fail_msg) + continue + + assert len(names) == len(result.payload) + + daemon_status[node_name] = dict(zip(names, result.payload)) + + return daemon_status + + @staticmethod + def _GetActiveDaemonNames(queue): + """Gets the names of all active daemons. + + """ + result = {} + for diskie in queue: + if not diskie.active: + continue + + try: + # Start daemon if necessary + daemon_name = diskie.CheckDaemon() + except _ImportExportError, err: + logging.exception("%s failed", diskie.MODE_TEXT) + diskie.Finalize(error=str(err)) + continue + + result.setdefault(diskie.node_name, []).append(daemon_name) + + assert len(queue) >= len(result) + assert len(queue) >= sum([len(names) for names in result.itervalues()]) + + logging.debug("daemons=%r", result) + + return result + + def _AddPendingToQueue(self): + """Adds all pending import/export objects to the internal queue. + + """ + assert compat.all(diskie not in self._queue and diskie.loop == self + for diskie in self._pending_add) + + self._queue.extend(self._pending_add) + + del self._pending_add[:] + + def Run(self): + """Utility main loop. + + """ + while True: + self._AddPendingToQueue() + + # Collect all active daemon names + daemons = self._GetActiveDaemonNames(self._queue) + if not daemons: + break + + # Collection daemon status data + data = self._CollectDaemonStatus(self._lu, daemons) + + # Use data + delay = self.MAX_DELAY + for diskie in self._queue: + if not diskie.active: + continue + + try: + try: + all_daemon_data = data[diskie.node_name] + except KeyError: + result = diskie.SetDaemonData(False, None) + else: + result = \ + diskie.SetDaemonData(True, + all_daemon_data[diskie.GetDaemonName()]) + + if not result: + # Daemon not yet ready, retry soon + delay = min(3.0, delay) + continue + + if diskie.CheckFinished(): + # Transfer finished + diskie.Finalize() + continue + + # Normal case: check again in 5 seconds + delay = min(5.0, delay) + + if not diskie.CheckListening(): + # Not yet listening, retry soon + delay = min(1.0, delay) + continue + + if not diskie.CheckConnected(): + # Not yet connected, retry soon + delay = min(1.0, delay) + continue + + except _ImportExportError, err: + logging.exception("%s failed", diskie.MODE_TEXT) + diskie.Finalize(error=str(err)) + + if not compat.any([diskie.active for diskie in self._queue]): + break + + # Wait a bit + delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) + logging.debug("Waiting for %ss", delay) + time.sleep(delay) + + def FinalizeAll(self): + """Finalizes all pending transfers. + + """ + success = True + + for diskie in self._queue: + success = diskie.Finalize() and success + + return success diff --git a/test/ganeti.masterd.instance_unittest.py b/test/ganeti.masterd.instance_unittest.py new file mode 100755 index 0000000000000000000000000000000000000000..05b0183ab5a5d98fdc37d0d60bd25717a99a2694 --- /dev/null +++ b/test/ganeti.masterd.instance_unittest.py @@ -0,0 +1,60 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Script for testing ganeti.masterd.instance""" + +import os +import sys +import unittest + +from ganeti import utils +from ganeti import masterd + +from ganeti.masterd.instance import \ + ImportExportTimeouts, _TimeoutExpired, _DiskImportExportBase + +import testutils + + +class TestMisc(unittest.TestCase): + def testTimeouts(self): + tmo = ImportExportTimeouts(0) + self.assertEqual(tmo.connect, 0) + self.assertEqual(tmo.listen, ImportExportTimeouts.DEFAULT_LISTEN_TIMEOUT) + self.assertEqual(tmo.ready, ImportExportTimeouts.DEFAULT_READY_TIMEOUT) + self.assertEqual(tmo.error, ImportExportTimeouts.DEFAULT_ERROR_TIMEOUT) + + tmo = ImportExportTimeouts(999) + self.assertEqual(tmo.connect, 999) + + def testTimeoutExpired(self): + self.assert_(_TimeoutExpired(100, 300, _time_fn=lambda: 500)) + self.assertFalse(_TimeoutExpired(100, 300, _time_fn=lambda: 0)) + self.assertFalse(_TimeoutExpired(100, 300, _time_fn=lambda: 100)) + self.assertFalse(_TimeoutExpired(100, 300, _time_fn=lambda: 400)) + + def testDiskImportExportBaseDirect(self): + self.assertRaises(AssertionError, _DiskImportExportBase, + None, None, None, None, None, None, None) + + +if __name__ == "__main__": + testutils.GanetiTestProgram()