Commit b7a1c816 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

gnt-node migrate: Use LU-generated jobs



Until now LUNodeMigrate used multiple tasklets to evacuate all primary
instances on a node. In some cases it would acquire all node locks,
which isn't good on big clusters. With upcoming improvements to the LUs
for instance failover and migration, switching to separate jobs looks
like a better option. This patch changes LUNodeMigrate to use
LU-generated jobs.

While working on this patch, I identified a race condition in
LUNodeMigrate.ExpandNames. A node's instances were retrieved without a
lock and no verification was done.

For RAPI, a new feature string is added and can be used to detect
clusters which support more parameters for node migration. The client
is updated.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
parent b99b607f
......@@ -364,7 +364,7 @@ def MigrateNode(opts, args):
selected_fields = ["name", "pinst_list"]
result = cl.QueryNodes(names=args, fields=selected_fields, use_locking=False)
node, pinst = result[0]
((node, pinst), ) = result
if not pinst:
ToStdout("No primary instances on node %s, exiting." % node)
......@@ -372,9 +372,10 @@ def MigrateNode(opts, args):
pinst = utils.NiceSort(pinst)
if not force and not AskUser("Migrate instance(s) %s?" %
(",".join("'%s'" % name for name in pinst))):
return 2
if not (force or
AskUser("Migrate instance(s) %s?" %
utils.CommaJoin(utils.NiceSort(pinst)))):
return constants.EXIT_CONFIRMATION
# this should be removed once --non-live is deprecated
if not opts.live and opts.migration_mode is not None:
......@@ -385,10 +386,29 @@ def MigrateNode(opts, args):
mode = constants.HT_MIGRATION_NONLIVE
else:
mode = opts.migration_mode
op = opcodes.OpNodeMigrate(node_name=args[0], mode=mode,
iallocator=opts.iallocator,
target_node=opts.dst_node)
SubmitOpCode(op, cl=cl, opts=opts)
result = SubmitOpCode(op, cl=cl, opts=opts)
# Keep track of submitted jobs
jex = JobExecutor(cl=cl, opts=opts)
for (status, job_id) in result[constants.JOB_IDS_KEY]:
jex.AddJobId(None, status, job_id)
results = jex.GetResults()
bad_cnt = len([row for row in results if not row[0]])
if bad_cnt == 0:
ToStdout("All instances migrated successfully.")
rcode = constants.EXIT_SUCCESS
else:
ToStdout("There were %s errors during the node migration.", bad_cnt)
rcode = constants.EXIT_FAILURE
return rcode
def ShowNodeConfig(opts, args):
......
......@@ -6653,40 +6653,10 @@ class LUNodeMigrate(LogicalUnit):
def ExpandNames(self):
self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
self.needed_locks = {}
# Create tasklets for migrating instances for all instances on this node
names = []
tasklets = []
self.lock_all_nodes = False
for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
logging.debug("Migrating instance %s", inst.name)
names.append(inst.name)
tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
if inst.disk_template in constants.DTS_EXT_MIRROR:
# We need to lock all nodes, as the iallocator will choose the
# destination nodes afterwards
self.lock_all_nodes = True
self.tasklets = tasklets
# Declare node locks
if self.lock_all_nodes:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
else:
self.needed_locks[locking.LEVEL_NODE] = [self.op.node_name]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
# Declare instance locks
self.needed_locks[locking.LEVEL_INSTANCE] = names
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE and not self.lock_all_nodes:
self._LockInstancesNodes()
self.share_locks = dict.fromkeys(locking.LEVELS, 1)
self.needed_locks = {
locking.LEVEL_NODE: [self.op.node_name],
}
def BuildHooksEnv(self):
"""Build hooks env.
......@@ -6705,6 +6675,30 @@ class LUNodeMigrate(LogicalUnit):
nl = [self.cfg.GetMasterNode()]
return (nl, nl)
def CheckPrereq(self):
pass
def Exec(self, feedback_fn):
# Prepare jobs for migration instances
jobs = [
[opcodes.OpInstanceMigrate(instance_name=inst.name,
mode=self.op.mode,
live=self.op.live,
iallocator=self.op.iallocator,
target_node=self.op.target_node)]
for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)
]
# TODO: Run iallocator in this opcode and pass correct placement options to
# OpInstanceMigrate. Since other jobs can modify the cluster between
# running the iallocator and the actual migration, a good consistency model
# will have to be found.
assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
frozenset([self.op.node_name]))
return ResultWithJobs(jobs)
class TLMigrateInstance(Tasklet):
"""Tasklet class for instance migration.
......
......@@ -92,6 +92,7 @@ JOB_STATUS_ALL = frozenset([
_REQ_DATA_VERSION_FIELD = "__version__"
_INST_CREATE_REQV1 = "instance-create-reqv1"
_INST_REINSTALL_REQV1 = "instance-reinstall-reqv1"
_NODE_MIGRATE_REQV1 = "node-migrate-reqv1"
_INST_NIC_PARAMS = frozenset(["mac", "ip", "mode", "link"])
_INST_CREATE_V0_DISK_PARAMS = frozenset(["size"])
_INST_CREATE_V0_PARAMS = frozenset([
......@@ -1289,7 +1290,8 @@ class GanetiRapiClient(object): # pylint: disable-msg=R0904
("/%s/nodes/%s/evacuate" %
(GANETI_RAPI_VERSION, node)), query, None)
def MigrateNode(self, node, mode=None, dry_run=False):
def MigrateNode(self, node, mode=None, dry_run=False, iallocator=None,
target_node=None):
"""Migrates all primary instances from a node.
@type node: str
......@@ -1299,20 +1301,46 @@ class GanetiRapiClient(object): # pylint: disable-msg=R0904
otherwise the hypervisor default will be used
@type dry_run: bool
@param dry_run: whether to perform a dry run
@type iallocator: string
@param iallocator: instance allocator to use
@type target_node: string
@param target_node: Target node for shared-storage instances
@rtype: string
@return: job id
"""
query = []
if mode is not None:
query.append(("mode", mode))
if dry_run:
query.append(("dry-run", 1))
return self._SendRequest(HTTP_POST,
("/%s/nodes/%s/migrate" %
(GANETI_RAPI_VERSION, node)), query, None)
if _NODE_MIGRATE_REQV1 in self.GetFeatures():
body = {}
if mode is not None:
body["mode"] = mode
if iallocator is not None:
body["iallocator"] = iallocator
if target_node is not None:
body["target_node"] = target_node
assert len(query) <= 1
return self._SendRequest(HTTP_POST,
("/%s/nodes/%s/migrate" %
(GANETI_RAPI_VERSION, node)), query, body)
else:
# Use old request format
if target_node is not None:
raise GanetiApiError("Server does not support specifying target node"
" for node migration")
if mode is not None:
query.append(("mode", mode))
return self._SendRequest(HTTP_POST,
("/%s/nodes/%s/migrate" %
(GANETI_RAPI_VERSION, node)), query, None)
def GetNodeRole(self, node):
"""Gets the current role for a node.
......
......@@ -104,6 +104,9 @@ _INST_CREATE_REQV1 = "instance-create-reqv1"
# Feature string for instance reinstall request version 1
_INST_REINSTALL_REQV1 = "instance-reinstall-reqv1"
# Feature string for node migration version 1
_NODE_MIGRATE_REQV1 = "node-migrate-reqv1"
# Timeout for /2/jobs/[job_id]/wait. Gives job up to 10 seconds to change.
_WFJC_TIMEOUT = 10
......@@ -145,7 +148,7 @@ class R_2_features(baserlib.R_Generic):
"""Returns list of optional RAPI features implemented.
"""
return [_INST_CREATE_REQV1, _INST_REINSTALL_REQV1]
return [_INST_CREATE_REQV1, _INST_REINSTALL_REQV1, _NODE_MIGRATE_REQV1]
class R_2_os(baserlib.R_Generic):
......@@ -455,18 +458,29 @@ class R_2_nodes_name_migrate(baserlib.R_Generic):
"""
node_name = self.items[0]
if "live" in self.queryargs and "mode" in self.queryargs:
raise http.HttpBadRequest("Only one of 'live' and 'mode' should"
" be passed")
elif "live" in self.queryargs:
if self._checkIntVariable("live", default=1):
mode = constants.HT_MIGRATION_LIVE
if self.queryargs:
# Support old-style requests
if "live" in self.queryargs and "mode" in self.queryargs:
raise http.HttpBadRequest("Only one of 'live' and 'mode' should"
" be passed")
if "live" in self.queryargs:
if self._checkIntVariable("live", default=1):
mode = constants.HT_MIGRATION_LIVE
else:
mode = constants.HT_MIGRATION_NONLIVE
else:
mode = constants.HT_MIGRATION_NONLIVE
mode = self._checkStringVariable("mode", default=None)
data = {
"mode": mode,
}
else:
mode = self._checkStringVariable("mode", default=None)
data = self.request_body
op = opcodes.OpNodeMigrate(node_name=node_name, mode=mode)
op = baserlib.FillOpcode(opcodes.OpNodeMigrate, data, {
"node_name": node_name,
})
return baserlib.SubmitJob([op])
......
......@@ -151,6 +151,7 @@ class TestConstants(unittest.TestCase):
self.assertEqual(client._REQ_DATA_VERSION_FIELD, rlib2._REQ_DATA_VERSION)
self.assertEqual(client._INST_CREATE_REQV1, rlib2._INST_CREATE_REQV1)
self.assertEqual(client._INST_REINSTALL_REQV1, rlib2._INST_REINSTALL_REQV1)
self.assertEqual(client._NODE_MIGRATE_REQV1, rlib2._NODE_MIGRATE_REQV1)
self.assertEqual(client._INST_NIC_PARAMS, constants.INIC_PARAMS)
self.assertEqual(client.JOB_STATUS_QUEUED, constants.JOB_STATUS_QUEUED)
self.assertEqual(client.JOB_STATUS_WAITLOCK, constants.JOB_STATUS_WAITLOCK)
......@@ -835,13 +836,16 @@ class GanetiRapiClientTests(testutils.GanetiTestCase):
"node-4", iallocator="hail", remote_node="node-5")
def testMigrateNode(self):
self.rapi.AddResponse(serializer.DumpJson([]))
self.rapi.AddResponse("1111")
self.assertEqual(1111, self.client.MigrateNode("node-a", dry_run=True))
self.assertHandler(rlib2.R_2_nodes_name_migrate)
self.assertItems(["node-a"])
self.assert_("mode" not in self.rapi.GetLastHandler().queryargs)
self.assertDryRun()
self.assertFalse(self.rapi.GetLastRequestData())
self.rapi.AddResponse(serializer.DumpJson([]))
self.rapi.AddResponse("1112")
self.assertEqual(1112, self.client.MigrateNode("node-a", dry_run=True,
mode="live"))
......@@ -849,6 +853,36 @@ class GanetiRapiClientTests(testutils.GanetiTestCase):
self.assertItems(["node-a"])
self.assertQuery("mode", ["live"])
self.assertDryRun()
self.assertFalse(self.rapi.GetLastRequestData())
self.rapi.AddResponse(serializer.DumpJson([]))
self.assertRaises(client.GanetiApiError, self.client.MigrateNode,
"node-c", target_node="foonode")
self.assertEqual(self.rapi.CountPending(), 0)
def testMigrateNodeBodyData(self):
self.rapi.AddResponse(serializer.DumpJson([rlib2._NODE_MIGRATE_REQV1]))
self.rapi.AddResponse("27539")
self.assertEqual(27539, self.client.MigrateNode("node-a", dry_run=False,
mode="live"))
self.assertHandler(rlib2.R_2_nodes_name_migrate)
self.assertItems(["node-a"])
self.assertFalse(self.rapi.GetLastHandler().queryargs)
self.assertEqual(serializer.LoadJson(self.rapi.GetLastRequestData()),
{ "mode": "live", })
self.rapi.AddResponse(serializer.DumpJson([rlib2._NODE_MIGRATE_REQV1]))
self.rapi.AddResponse("14219")
self.assertEqual(14219, self.client.MigrateNode("node-x", dry_run=True,
target_node="node9",
iallocator="ial"))
self.assertHandler(rlib2.R_2_nodes_name_migrate)
self.assertItems(["node-x"])
self.assertDryRun()
self.assertEqual(serializer.LoadJson(self.rapi.GetLastRequestData()),
{ "target_node": "node9", "iallocator": "ial", })
self.assertEqual(self.rapi.CountPending(), 0)
def testGetNodeRole(self):
self.rapi.AddResponse("\"master\"")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment