Commit 6b92f2af authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Merge branch 'devel-2.1'

* devel-2.1:
  Add a basic unittest for uidpool.RequestUnusedUid
  Factorize LUExportInstance
  Use ints instead of strings to represent user-ids
  QA: fix reimporting instance with different name
  Fix broken commit 9e302a8c


  ssh: make quiet configurable

Conflicts:
	lib/cmdlib.py: Mostly trivial
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parents 37549316 17f25f78
......@@ -45,6 +45,7 @@ from ganeti import objects
from ganeti import serializer
from ganeti import ssconf
from ganeti import uidpool
from ganeti import compat
class LogicalUnit(object):
......@@ -8895,6 +8896,94 @@ class LUExportInstance(LogicalUnit):
raise errors.OpPrereqError("Export not supported for instances with"
" file-based disks", errors.ECODE_INVAL)
def _CreateSnapshots(self, feedback_fn):
"""Creates an LVM snapshot for every disk of the instance.
@return: List of snapshots as L{objects.Disk} instances
"""
instance = self.instance
src_node = instance.primary_node
vgname = self.cfg.GetVGName()
snap_disks = []
for idx, disk in enumerate(instance.disks):
feedback_fn("Creating a snapshot of disk/%s on node %s" %
(idx, src_node))
# result.payload will be a snapshot of an lvm leaf of the one we
# passed
result = self.rpc.call_blockdev_snapshot(src_node, disk)
msg = result.fail_msg
if msg:
self.LogWarning("Could not snapshot disk/%s on node %s: %s",
idx, src_node, msg)
snap_disks.append(False)
else:
disk_id = (vgname, result.payload)
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
logical_id=disk_id, physical_id=disk_id,
iv_name=disk.iv_name)
snap_disks.append(new_dev)
return snap_disks
def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
"""Removes an LVM snapshot.
@type snap_disks: list
@param snap_disks: The list of all snapshots as returned by
L{_CreateSnapshots}
@type disk_index: number
@param disk_index: Index of the snapshot to be removed
@rtype: bool
@return: Whether removal was successful or not
"""
disk = snap_disks[disk_index]
if disk:
src_node = self.instance.primary_node
feedback_fn("Removing snapshot of disk/%s on node %s" %
(disk_index, src_node))
result = self.rpc.call_blockdev_remove(src_node, disk)
if not result.fail_msg:
return True
self.LogWarning("Could not remove snapshot for disk/%d from node"
" %s: %s", disk_index, src_node, result.fail_msg)
return False
def _CleanupExports(self, feedback_fn):
"""Removes exports of current instance from all other nodes.
If an instance in a cluster with nodes A..D was exported to node C, its
exports will be removed from the nodes A, B and D.
"""
nodelist = self.cfg.GetNodeList()
nodelist.remove(self.dst_node.name)
# on one-node clusters nodelist will be empty after the removal
# if we proceed the backup would be removed because OpQueryExports
# substitutes an empty list with the full cluster node list.
iname = self.instance.name
if nodelist:
feedback_fn("Removing old exports for instance %s" % iname)
exportlist = self.rpc.call_export_list(nodelist)
for node in exportlist:
if exportlist[node].fail_msg:
continue
if iname in exportlist[node].payload:
msg = self.rpc.call_export_remove(node, iname).fail_msg
if msg:
self.LogWarning("Could not remove older export for instance %s"
" on node %s: %s", iname, node, msg)
def Exec(self, feedback_fn):
"""Export an instance to an image in the cluster.
......@@ -8912,10 +9001,6 @@ class LUExportInstance(LogicalUnit):
result.Raise("Could not shutdown instance %s on"
" node %s" % (instance.name, src_node))
vgname = self.cfg.GetVGName()
snap_disks = []
# set the disks ID correctly since call_instance_start needs the
# correct drbd minor to create the symlinks
for disk in instance.disks:
......@@ -8931,69 +9016,72 @@ class LUExportInstance(LogicalUnit):
try:
# per-disk results
dresults = []
removed_snaps = [False] * len(instance.disks)
snap_disks = None
try:
for idx, disk in enumerate(instance.disks):
feedback_fn("Creating a snapshot of disk/%s on node %s" %
(idx, src_node))
# result.payload will be a snapshot of an lvm leaf of the one we
# passed
result = self.rpc.call_blockdev_snapshot(src_node, disk)
msg = result.fail_msg
if msg:
self.LogWarning("Could not snapshot disk/%s on node %s: %s",
idx, src_node, msg)
snap_disks.append(False)
else:
disk_id = (vgname, result.payload)
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
logical_id=disk_id, physical_id=disk_id,
iv_name=disk.iv_name)
snap_disks.append(new_dev)
try:
snap_disks = self._CreateSnapshots(feedback_fn)
finally:
if (self.op.shutdown and instance.admin_up and
not self.remove_instance):
feedback_fn("Starting instance %s" % instance.name)
result = self.rpc.call_instance_start(src_node, instance,
None, None)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance: %s" % msg)
finally:
if self.op.shutdown and instance.admin_up and not self.remove_instance:
feedback_fn("Starting instance %s" % instance.name)
result = self.rpc.call_instance_start(src_node, instance, None, None)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Could not start instance: %s" % msg)
# TODO: check for size
cluster_name = self.cfg.GetClusterName()
for idx, dev in enumerate(snap_disks):
feedback_fn("Exporting snapshot %s from %s to %s" %
(idx, src_node, dst_node.name))
if dev:
# FIXME: pass debug from opcode to backend
result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
instance, cluster_name,
idx, self.op.debug_level)
msg = result.fail_msg
if msg:
self.LogWarning("Could not export disk/%s from node %s to"
" node %s: %s", idx, src_node, dst_node.name, msg)
dresults.append(False)
assert len(snap_disks) == len(instance.disks)
assert len(removed_snaps) == len(instance.disks)
# TODO: check for size
cluster_name = self.cfg.GetClusterName()
for idx, dev in enumerate(snap_disks):
feedback_fn("Exporting snapshot %s from %s to %s" %
(idx, src_node, dst_node.name))
if dev:
# FIXME: pass debug from opcode to backend
result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
instance, cluster_name,
idx, self.op.debug_level)
msg = result.fail_msg
if msg:
self.LogWarning("Could not export disk/%s from node %s to"
" node %s: %s", idx, src_node, dst_node.name, msg)
dresults.append(False)
else:
dresults.append(True)
# Remove snapshot
if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
removed_snaps[idx] = True
else:
dresults.append(True)
msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
if msg:
self.LogWarning("Could not remove snapshot for disk/%d from node"
" %s: %s", idx, src_node, msg)
else:
dresults.append(False)
dresults.append(False)
feedback_fn("Finalizing export on %s" % dst_node.name)
result = self.rpc.call_finalize_export(dst_node.name, instance,
snap_disks)
fin_resu = True
msg = result.fail_msg
if msg:
self.LogWarning("Could not finalize export for instance %s"
" on node %s: %s", instance.name, dst_node.name, msg)
fin_resu = False
assert len(dresults) == len(instance.disks)
# Check for backwards compatibility
assert compat.all(isinstance(i, bool) for i in dresults), \
"Not all results are boolean: %r" % dresults
feedback_fn("Finalizing export on %s" % dst_node.name)
result = self.rpc.call_finalize_export(dst_node.name, instance,
snap_disks)
msg = result.fail_msg
fin_resu = not msg
if msg:
self.LogWarning("Could not finalize export for instance %s"
" on node %s: %s", instance.name, dst_node.name, msg)
finally:
# Remove all snapshots
assert len(removed_snaps) == len(instance.disks)
for idx, removed in enumerate(removed_snaps):
if not removed:
self._RemoveSnapshot(feedback_fn, snap_disks, idx)
finally:
if activate_disks:
......@@ -9005,24 +9093,7 @@ class LUExportInstance(LogicalUnit):
feedback_fn("Removing instance %s" % instance.name)
_RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
nodelist = self.cfg.GetNodeList()
nodelist.remove(dst_node.name)
# on one-node clusters nodelist will be empty after the removal
# if we proceed the backup would be removed because OpQueryExports
# substitutes an empty list with the full cluster node list.
iname = instance.name
if nodelist:
feedback_fn("Removing old exports for instance %s" % iname)
exportlist = self.rpc.call_export_list(nodelist)
for node in exportlist:
if exportlist[node].fail_msg:
continue
if iname in exportlist[node].payload:
msg = self.rpc.call_export_remove(node, iname).fail_msg
if msg:
self.LogWarning("Could not remove older export for instance %s"
" on node %s: %s", iname, node, msg)
self._CleanupExports(feedback_fn)
return fin_resu, dresults
......
......@@ -216,6 +216,11 @@ class ChrootManager(hv_base.BaseHypervisor):
"""Cleanup after a stopped instance
"""
root_dir = self._InstanceDir(instance_name)
if not os.path.exists(root_dir):
return
if self._IsDirLive(root_dir):
raise HypervisorError("Processes are still using the chroot")
......@@ -223,7 +228,7 @@ class ChrootManager(hv_base.BaseHypervisor):
utils.RunCmd(["umount", mpath])
result = utils.RunCmd(["umount", root_dir])
if result.failed and force:
if result.failed:
msg = ("Processes still alive in the chroot: %s" %
utils.RunCmd("fuser -vm %s" % root_dir).output)
logging.error(msg)
......
......@@ -75,7 +75,7 @@ class SshRunner:
self.cluster_name = cluster_name
def _BuildSshOptions(self, batch, ask_key, use_cluster_key,
strict_host_check, private_key=None):
strict_host_check, private_key=None, quiet=True):
"""Builds a list with needed SSH options.
@param batch: same as ssh's batch option
......@@ -85,6 +85,7 @@ class SshRunner:
HostKeyAlias name
@param strict_host_check: this makes the host key checking strict
@param private_key: use this private key instead of the default
@param quiet: whether to enable -q to ssh
@rtype: list
@return: the list of options ready to use in L{utils.RunCmd}
......@@ -101,6 +102,9 @@ class SshRunner:
if use_cluster_key:
options.append("-oHostKeyAlias=%s" % self.cluster_name)
if quiet:
options.append("-q")
if private_key:
options.append("-i%s" % private_key)
......@@ -133,7 +137,7 @@ class SshRunner:
def BuildCmd(self, hostname, user, command, batch=True, ask_key=False,
tty=False, use_cluster_key=True, strict_host_check=True,
private_key=None):
private_key=None, quiet=True):
"""Build an ssh command to execute a command on a remote node.
@param hostname: the target host, string
......@@ -147,13 +151,15 @@ class SshRunner:
cluster-global SSH key
@param strict_host_check: whether to check the host's SSH key at all
@param private_key: use this private key instead of the default
@param quiet: whether to enable -q to ssh
@return: the ssh call to run 'command' on the remote host.
"""
argv = [constants.SSH, "-q"]
argv = [constants.SSH]
argv.extend(self._BuildSshOptions(batch, ask_key, use_cluster_key,
strict_host_check, private_key))
strict_host_check, private_key,
quiet=quiet))
if tty:
argv.append("-t")
argv.extend(["%s@%s" % (user, hostname), command])
......@@ -191,7 +197,7 @@ class SshRunner:
logging.error("File %s does not exist", filename)
return False
command = [constants.SCP, "-q", "-p"]
command = [constants.SCP, "-p"]
command.extend(self._BuildSshOptions(True, False, True, True))
command.append(filename)
command.append("%s:%s" % (node, filename))
......
......@@ -182,6 +182,9 @@ def ExpandUidPool(uid_pool):
def _IsUidUsed(uid):
"""Check if there is any process in the system running with the given user-id
@type uid: integer
@param uid: the user-id to be checked.
"""
pgrep_command = [constants.PGREP, "-u", uid]
result = utils.RunCmd(pgrep_command)
......@@ -218,7 +221,7 @@ class LockedUid(object):
def GetUid(self):
return self._uid
def __str__(self):
def AsStr(self):
return "%s" % self._uid
......@@ -255,6 +258,7 @@ def RequestUnusedUid(all_uids):
<stop the process>
uidpool.ReleaseUid(uid)
@type all_uids: set of integers
@param all_uids: a set containing all the user-ids in the user-id pool
@return: a LockedUid object representing the unused uid. It's the caller's
responsibility to unlock the uid once an instance is started with
......@@ -269,12 +273,20 @@ def RequestUnusedUid(all_uids):
# Get list of currently used uids from the filesystem
try:
taken_uids = set(os.listdir(constants.UIDPOOL_LOCKDIR))
# Filter out spurious entries from the directory listing
taken_uids = all_uids.intersection(taken_uids)
taken_uids = set()
for taken_uid in os.listdir(constants.UIDPOOL_LOCKDIR):
try:
taken_uid = int(taken_uid)
except ValueError, err:
# Skip directory entries that can't be converted into an integer
continue
taken_uids.add(taken_uid)
except OSError, err:
raise errors.LockError("Failed to get list of used user-ids: %s" % err)
# Filter out spurious entries from the directory listing
taken_uids = all_uids.intersection(taken_uids)
# Remove the list of used uids from the list of all uids
unused_uids = list(all_uids - taken_uids)
if not unused_uids:
......@@ -330,12 +342,16 @@ def ReleaseUid(uid):
if isinstance(uid, LockedUid):
# Make sure we release the exclusive lock, if there is any
uid.Unlock()
uid_filename = uid.AsStr()
else:
uid_filename = str(uid)
try:
uid_path = utils.PathJoin(constants.UIDPOOL_LOCKDIR, str(uid))
uid_path = utils.PathJoin(constants.UIDPOOL_LOCKDIR, uid_filename)
os.remove(uid_path)
except OSError, err:
raise errors.LockError("Failed to remove user-id lockfile"
" for user-id %s: %s" % (uid, err))
" for user-id %s: %s" % (uid_filename, err))
def ExecWithUnusedUid(fn, all_uids, *args, **kwargs):
......@@ -344,17 +360,18 @@ def ExecWithUnusedUid(fn, all_uids, *args, **kwargs):
This wrapper function provides a simple way to handle the requesting,
unlocking and releasing a user-id.
"fn" is called by passing a "uid" keyword argument that
contains an unused user-id (as a string) selected from the set of user-ids
contains an unused user-id (as an integer) selected from the set of user-ids
passed in all_uids.
If there is an error while executing "fn", the user-id is returned
to the pool.
@param fn: a callable
@param fn: a callable that accepts a keyword argument called "uid"
@type all_uids: a set of integers
@param all_uids: a set containing all user-ids in the user-id pool
"""
uid = RequestUnusedUid(all_uids)
kwargs["uid"] = str(uid)
kwargs["uid"] = uid.GetUid()
try:
return_value = fn(*args, **kwargs)
except:
......
......@@ -294,6 +294,7 @@ def TestInstanceImport(node, newinst, expnode, name):
cmd = (['gnt-backup', 'import',
'--disk-template=plain',
'--no-ip-check',
'--net', '0:mac=generate',
'--src-node=%s' % expnode['primary'],
'--src-dir=%s/%s' % (constants.EXPORT_DIR, name),
'--node=%s' % node['primary']] +
......
......@@ -22,6 +22,8 @@
"""Script for unittesting the uidpool module"""
import os
import tempfile
import unittest
from ganeti import constants
......@@ -39,10 +41,14 @@ class TestUidPool(testutils.GanetiTestCase):
self.old_uid_max = constants.UIDPOOL_UID_MAX
constants.UIDPOOL_UID_MIN = 1
constants.UIDPOOL_UID_MAX = 10
constants.UIDPOOL_LOCKDIR = tempfile.mkdtemp()
def tearDown(self):
constants.UIDPOOL_UID_MIN = self.old_uid_min
constants.UIDPOOL_UID_MAX = self.old_uid_max
for name in os.listdir(constants.UIDPOOL_LOCKDIR):
os.unlink(os.path.join(constants.UIDPOOL_LOCKDIR, name))
os.rmdir(constants.UIDPOOL_LOCKDIR)
def testParseUidPool(self):
self.assertEqualValues(
......@@ -80,6 +86,36 @@ class TestUidPool(testutils.GanetiTestCase):
uidpool.FormatUidPool([(1, 100), (200, 200)], separator="\n"),
"1-100\n200")
def testRequestUnusedUid(self):
# Check with known used user-ids
#
# Test with user-id "0" and with our own user-id, both
# of which are guaranteed to be used user-ids
for uid in 0, os.getuid():
self.assertRaises(errors.LockError,
uidpool.RequestUnusedUid,
set([uid]))
# Check with a single, known unused user-id
#
# We use "-1" here, which is not a valid user-id, so it's
# guaranteed that it's unused.
uid = uidpool.RequestUnusedUid(set([-1]))
self.assertEqualValues(uid.GetUid(), -1)
# Check uid-pool exhaustion
#
# uid "-1" is locked now, so RequestUnusedUid is expected to fail
self.assertRaises(errors.LockError,
uidpool.RequestUnusedUid,
set([-1]))
# Check unlocking
uid.Unlock()
# After unlocking, "-1" should be available again
uid = uidpool.RequestUnusedUid(set([-1]))
self.assertEqualValues(uid.GetUid(), -1)
if __name__ == '__main__':
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment