From b7a1c816102553df4c73118ce14db328ca59ae24 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Mon, 23 May 2011 18:55:50 +0200 Subject: [PATCH] gnt-node migrate: Use LU-generated jobs Until now LUNodeMigrate used multiple tasklets to evacuate all primary instances on a node. In some cases it would acquire all node locks, which isn't good on big clusters. With upcoming improvements to the LUs for instance failover and migration, switching to separate jobs looks like a better option. This patch changes LUNodeMigrate to use LU-generated jobs. While working on this patch, I identified a race condition in LUNodeMigrate.ExpandNames. A node's instances were retrieved without a lock and no verification was done. For RAPI, a new feature string is added and can be used to detect clusters which support more parameters for node migration. The client is updated. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> Signed-off-by: Michael Hanselmann <hansmi@google.com> --- lib/client/gnt_node.py | 30 +++++++++++--- lib/cmdlib.py | 62 +++++++++++++---------------- lib/rapi/client.py | 40 ++++++++++++++++--- lib/rapi/rlib2.py | 34 +++++++++++----- test/ganeti.rapi.client_unittest.py | 34 ++++++++++++++++ 5 files changed, 145 insertions(+), 55 deletions(-) diff --git a/lib/client/gnt_node.py b/lib/client/gnt_node.py index 0b0dbf12f..a56e44910 100644 --- a/lib/client/gnt_node.py +++ b/lib/client/gnt_node.py @@ -364,7 +364,7 @@ def MigrateNode(opts, args): selected_fields = ["name", "pinst_list"] result = cl.QueryNodes(names=args, fields=selected_fields, use_locking=False) - node, pinst = result[0] + ((node, pinst), ) = result if not pinst: ToStdout("No primary instances on node %s, exiting." % node) @@ -372,9 +372,10 @@ def MigrateNode(opts, args): pinst = utils.NiceSort(pinst) - if not force and not AskUser("Migrate instance(s) %s?" % - (",".join("'%s'" % name for name in pinst))): - return 2 + if not (force or + AskUser("Migrate instance(s) %s?" % + utils.CommaJoin(utils.NiceSort(pinst)))): + return constants.EXIT_CONFIRMATION # this should be removed once --non-live is deprecated if not opts.live and opts.migration_mode is not None: @@ -385,10 +386,29 @@ def MigrateNode(opts, args): mode = constants.HT_MIGRATION_NONLIVE else: mode = opts.migration_mode + op = opcodes.OpNodeMigrate(node_name=args[0], mode=mode, iallocator=opts.iallocator, target_node=opts.dst_node) - SubmitOpCode(op, cl=cl, opts=opts) + + result = SubmitOpCode(op, cl=cl, opts=opts) + + # Keep track of submitted jobs + jex = JobExecutor(cl=cl, opts=opts) + + for (status, job_id) in result[constants.JOB_IDS_KEY]: + jex.AddJobId(None, status, job_id) + + results = jex.GetResults() + bad_cnt = len([row for row in results if not row[0]]) + if bad_cnt == 0: + ToStdout("All instances migrated successfully.") + rcode = constants.EXIT_SUCCESS + else: + ToStdout("There were %s errors during the node migration.", bad_cnt) + rcode = constants.EXIT_FAILURE + + return rcode def ShowNodeConfig(opts, args): diff --git a/lib/cmdlib.py b/lib/cmdlib.py index d83a4481d..49f7eb511 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -6653,40 +6653,10 @@ class LUNodeMigrate(LogicalUnit): def ExpandNames(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - self.needed_locks = {} - - # Create tasklets for migrating instances for all instances on this node - names = [] - tasklets = [] - - self.lock_all_nodes = False - - for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name): - logging.debug("Migrating instance %s", inst.name) - names.append(inst.name) - - tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False)) - - if inst.disk_template in constants.DTS_EXT_MIRROR: - # We need to lock all nodes, as the iallocator will choose the - # destination nodes afterwards - self.lock_all_nodes = True - - self.tasklets = tasklets - - # Declare node locks - if self.lock_all_nodes: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = [self.op.node_name] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - - # Declare instance locks - self.needed_locks[locking.LEVEL_INSTANCE] = names - - def DeclareLocks(self, level): - if level == locking.LEVEL_NODE and not self.lock_all_nodes: - self._LockInstancesNodes() + self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } def BuildHooksEnv(self): """Build hooks env. @@ -6705,6 +6675,30 @@ class LUNodeMigrate(LogicalUnit): nl = [self.cfg.GetMasterNode()] return (nl, nl) + def CheckPrereq(self): + pass + + def Exec(self, feedback_fn): + # Prepare jobs for migration instances + jobs = [ + [opcodes.OpInstanceMigrate(instance_name=inst.name, + mode=self.op.mode, + live=self.op.live, + iallocator=self.op.iallocator, + target_node=self.op.target_node)] + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name) + ] + + # TODO: Run iallocator in this opcode and pass correct placement options to + # OpInstanceMigrate. Since other jobs can modify the cluster between + # running the iallocator and the actual migration, a good consistency model + # will have to be found. + + assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) == + frozenset([self.op.node_name])) + + return ResultWithJobs(jobs) + class TLMigrateInstance(Tasklet): """Tasklet class for instance migration. diff --git a/lib/rapi/client.py b/lib/rapi/client.py index b752915ed..d2aa4acc5 100644 --- a/lib/rapi/client.py +++ b/lib/rapi/client.py @@ -92,6 +92,7 @@ JOB_STATUS_ALL = frozenset([ _REQ_DATA_VERSION_FIELD = "__version__" _INST_CREATE_REQV1 = "instance-create-reqv1" _INST_REINSTALL_REQV1 = "instance-reinstall-reqv1" +_NODE_MIGRATE_REQV1 = "node-migrate-reqv1" _INST_NIC_PARAMS = frozenset(["mac", "ip", "mode", "link"]) _INST_CREATE_V0_DISK_PARAMS = frozenset(["size"]) _INST_CREATE_V0_PARAMS = frozenset([ @@ -1289,7 +1290,8 @@ class GanetiRapiClient(object): # pylint: disable-msg=R0904 ("/%s/nodes/%s/evacuate" % (GANETI_RAPI_VERSION, node)), query, None) - def MigrateNode(self, node, mode=None, dry_run=False): + def MigrateNode(self, node, mode=None, dry_run=False, iallocator=None, + target_node=None): """Migrates all primary instances from a node. @type node: str @@ -1299,20 +1301,46 @@ class GanetiRapiClient(object): # pylint: disable-msg=R0904 otherwise the hypervisor default will be used @type dry_run: bool @param dry_run: whether to perform a dry run + @type iallocator: string + @param iallocator: instance allocator to use + @type target_node: string + @param target_node: Target node for shared-storage instances @rtype: string @return: job id """ query = [] - if mode is not None: - query.append(("mode", mode)) if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_POST, - ("/%s/nodes/%s/migrate" % - (GANETI_RAPI_VERSION, node)), query, None) + if _NODE_MIGRATE_REQV1 in self.GetFeatures(): + body = {} + + if mode is not None: + body["mode"] = mode + if iallocator is not None: + body["iallocator"] = iallocator + if target_node is not None: + body["target_node"] = target_node + + assert len(query) <= 1 + + return self._SendRequest(HTTP_POST, + ("/%s/nodes/%s/migrate" % + (GANETI_RAPI_VERSION, node)), query, body) + else: + # Use old request format + if target_node is not None: + raise GanetiApiError("Server does not support specifying target node" + " for node migration") + + if mode is not None: + query.append(("mode", mode)) + + return self._SendRequest(HTTP_POST, + ("/%s/nodes/%s/migrate" % + (GANETI_RAPI_VERSION, node)), query, None) def GetNodeRole(self, node): """Gets the current role for a node. diff --git a/lib/rapi/rlib2.py b/lib/rapi/rlib2.py index eef6fc788..d6bd9ce76 100644 --- a/lib/rapi/rlib2.py +++ b/lib/rapi/rlib2.py @@ -104,6 +104,9 @@ _INST_CREATE_REQV1 = "instance-create-reqv1" # Feature string for instance reinstall request version 1 _INST_REINSTALL_REQV1 = "instance-reinstall-reqv1" +# Feature string for node migration version 1 +_NODE_MIGRATE_REQV1 = "node-migrate-reqv1" + # Timeout for /2/jobs/[job_id]/wait. Gives job up to 10 seconds to change. _WFJC_TIMEOUT = 10 @@ -145,7 +148,7 @@ class R_2_features(baserlib.R_Generic): """Returns list of optional RAPI features implemented. """ - return [_INST_CREATE_REQV1, _INST_REINSTALL_REQV1] + return [_INST_CREATE_REQV1, _INST_REINSTALL_REQV1, _NODE_MIGRATE_REQV1] class R_2_os(baserlib.R_Generic): @@ -455,18 +458,29 @@ class R_2_nodes_name_migrate(baserlib.R_Generic): """ node_name = self.items[0] - if "live" in self.queryargs and "mode" in self.queryargs: - raise http.HttpBadRequest("Only one of 'live' and 'mode' should" - " be passed") - elif "live" in self.queryargs: - if self._checkIntVariable("live", default=1): - mode = constants.HT_MIGRATION_LIVE + if self.queryargs: + # Support old-style requests + if "live" in self.queryargs and "mode" in self.queryargs: + raise http.HttpBadRequest("Only one of 'live' and 'mode' should" + " be passed") + + if "live" in self.queryargs: + if self._checkIntVariable("live", default=1): + mode = constants.HT_MIGRATION_LIVE + else: + mode = constants.HT_MIGRATION_NONLIVE else: - mode = constants.HT_MIGRATION_NONLIVE + mode = self._checkStringVariable("mode", default=None) + + data = { + "mode": mode, + } else: - mode = self._checkStringVariable("mode", default=None) + data = self.request_body - op = opcodes.OpNodeMigrate(node_name=node_name, mode=mode) + op = baserlib.FillOpcode(opcodes.OpNodeMigrate, data, { + "node_name": node_name, + }) return baserlib.SubmitJob([op]) diff --git a/test/ganeti.rapi.client_unittest.py b/test/ganeti.rapi.client_unittest.py index 464d45178..23dacf8e0 100755 --- a/test/ganeti.rapi.client_unittest.py +++ b/test/ganeti.rapi.client_unittest.py @@ -151,6 +151,7 @@ class TestConstants(unittest.TestCase): self.assertEqual(client._REQ_DATA_VERSION_FIELD, rlib2._REQ_DATA_VERSION) self.assertEqual(client._INST_CREATE_REQV1, rlib2._INST_CREATE_REQV1) self.assertEqual(client._INST_REINSTALL_REQV1, rlib2._INST_REINSTALL_REQV1) + self.assertEqual(client._NODE_MIGRATE_REQV1, rlib2._NODE_MIGRATE_REQV1) self.assertEqual(client._INST_NIC_PARAMS, constants.INIC_PARAMS) self.assertEqual(client.JOB_STATUS_QUEUED, constants.JOB_STATUS_QUEUED) self.assertEqual(client.JOB_STATUS_WAITLOCK, constants.JOB_STATUS_WAITLOCK) @@ -835,13 +836,16 @@ class GanetiRapiClientTests(testutils.GanetiTestCase): "node-4", iallocator="hail", remote_node="node-5") def testMigrateNode(self): + self.rapi.AddResponse(serializer.DumpJson([])) self.rapi.AddResponse("1111") self.assertEqual(1111, self.client.MigrateNode("node-a", dry_run=True)) self.assertHandler(rlib2.R_2_nodes_name_migrate) self.assertItems(["node-a"]) self.assert_("mode" not in self.rapi.GetLastHandler().queryargs) self.assertDryRun() + self.assertFalse(self.rapi.GetLastRequestData()) + self.rapi.AddResponse(serializer.DumpJson([])) self.rapi.AddResponse("1112") self.assertEqual(1112, self.client.MigrateNode("node-a", dry_run=True, mode="live")) @@ -849,6 +853,36 @@ class GanetiRapiClientTests(testutils.GanetiTestCase): self.assertItems(["node-a"]) self.assertQuery("mode", ["live"]) self.assertDryRun() + self.assertFalse(self.rapi.GetLastRequestData()) + + self.rapi.AddResponse(serializer.DumpJson([])) + self.assertRaises(client.GanetiApiError, self.client.MigrateNode, + "node-c", target_node="foonode") + self.assertEqual(self.rapi.CountPending(), 0) + + def testMigrateNodeBodyData(self): + self.rapi.AddResponse(serializer.DumpJson([rlib2._NODE_MIGRATE_REQV1])) + self.rapi.AddResponse("27539") + self.assertEqual(27539, self.client.MigrateNode("node-a", dry_run=False, + mode="live")) + self.assertHandler(rlib2.R_2_nodes_name_migrate) + self.assertItems(["node-a"]) + self.assertFalse(self.rapi.GetLastHandler().queryargs) + self.assertEqual(serializer.LoadJson(self.rapi.GetLastRequestData()), + { "mode": "live", }) + + self.rapi.AddResponse(serializer.DumpJson([rlib2._NODE_MIGRATE_REQV1])) + self.rapi.AddResponse("14219") + self.assertEqual(14219, self.client.MigrateNode("node-x", dry_run=True, + target_node="node9", + iallocator="ial")) + self.assertHandler(rlib2.R_2_nodes_name_migrate) + self.assertItems(["node-x"]) + self.assertDryRun() + self.assertEqual(serializer.LoadJson(self.rapi.GetLastRequestData()), + { "target_node": "node9", "iallocator": "ial", }) + + self.assertEqual(self.rapi.CountPending(), 0) def testGetNodeRole(self): self.rapi.AddResponse("\"master\"") -- GitLab