Commit aa66c183 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Merge branch 'devel-2.5'

* devel-2.5:
  jqueue: Factorize checking job processor's result
  jqueue unittest: Rename simple fake-job class
  jqueue: Fix epylint errors introduced in 37d76f1e


  jqueue: Fix deadlock between job queue and dependency manager
  locking: Add “__repr__” to SharedLock and PipeCondition
  daemon.GenericMain: Don't generate backtrace on conflicting daemons
  utils.io.WritePidFile: Improve error reporting
  utils.ListVisibleFiles: Hide “lost+found” directories
  Fix race condition in test for *FileID functions

Conflicts:
	lib/utils/io.py: Trivial
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parents fc4498e9 4f44e311
......@@ -796,7 +796,12 @@ def GenericMain(daemon_name, optionparser,
signal.signal(signal.SIGHUP,
compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
try:
utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
except errors.PidFileLockError, err:
print >> sys.stderr, "Error while locking PID file:\n%s" % err
sys.exit(constants.EXIT_FAILURE)
try:
try:
logging.info("%s daemon startup", daemon_name)
......
......@@ -82,6 +82,12 @@ class LockError(GenericError):
pass
class PidFileLockError(LockError):
"""PID file is already locked by another process.
"""
class HypervisorError(GenericError):
"""Hypervisor-related exception.
......
......@@ -1237,6 +1237,29 @@ class _JobProcessor(object):
queue.release()
def _EvaluateJobProcessorResult(depmgr, job, result):
"""Looks at a result from L{_JobProcessor} for a job.
To be used in a L{_JobQueueWorker}.
"""
if result == _JobProcessor.FINISHED:
# Notify waiting jobs
depmgr.NotifyWaiters(job.id)
elif result == _JobProcessor.DEFER:
# Schedule again
raise workerpool.DeferTask(priority=job.CalcPriority())
elif result == _JobProcessor.WAITDEP:
# No-op, dependency manager will re-schedule
pass
else:
raise errors.ProgrammerError("Job processor returned unknown status %s" %
(result, ))
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
......@@ -1277,23 +1300,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
proc.ExecOpCode)
result = _JobProcessor(queue, wrap_execop_fn, job)()
if result == _JobProcessor.FINISHED:
# Notify waiting jobs
queue.depmgr.NotifyWaiters(job.id)
elif result == _JobProcessor.DEFER:
# Schedule again
raise workerpool.DeferTask(priority=job.CalcPriority())
elif result == _JobProcessor.WAITDEP:
# No-op, dependency manager will re-schedule
pass
else:
raise errors.ProgrammerError("Job processor returned unknown status %s" %
(result, ))
_EvaluateJobProcessorResult(queue.depmgr, job,
_JobProcessor(queue, wrap_execop_fn, job)())
@staticmethod
def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
......@@ -1439,28 +1447,39 @@ class _JobDependencyManager:
" not one of '%s' as required" %
(dep_job_id, status, utils.CommaJoin(dep_status)))
@locking.ssynchronized(_LOCK)
def _RemoveEmptyWaitersUnlocked(self):
"""Remove all jobs without actual waiters.
"""
for job_id in [job_id for (job_id, waiters) in self._waiters.items()
if not waiters]:
del self._waiters[job_id]
def NotifyWaiters(self, job_id):
"""Notifies all jobs waiting for a certain job ID.
@attention: Do not call until L{CheckAndRegister} returned a status other
than C{WAITDEP} for C{job_id}, or behaviour is undefined
@type job_id: string
@param job_id: Job ID
"""
assert ht.TString(job_id)
jobs = self._waiters.pop(job_id, None)
self._lock.acquire()
try:
self._RemoveEmptyWaitersUnlocked()
jobs = self._waiters.pop(job_id, None)
finally:
self._lock.release()
if jobs:
# Re-add jobs to workerpool
logging.debug("Re-adding %s jobs which were waiting for job %s",
len(jobs), job_id)
self._enqueue_fn(jobs)
# Remove all jobs without actual waiters
for job_id in [job_id for (job_id, waiters) in self._waiters.items()
if not waiters]:
del self._waiters[job_id]
def _RequireOpenQueue(fn):
"""Decorator for "public" functions.
......
......@@ -357,6 +357,11 @@ class PipeCondition(_BaseCondition):
return bool(self._waiters)
def __repr__(self):
return ("<%s.%s waiters=%s at %#x>" %
(self.__class__.__module__, self.__class__.__name__,
self._waiters, id(self)))
class _PipeConditionWithMode(PipeCondition):
__slots__ = [
......@@ -436,6 +441,11 @@ class SharedLock(object):
logging.debug("Adding lock %s to monitor", name)
monitor.RegisterLock(self)
def __repr__(self):
return ("<%s.%s name=%s at %#x>" %
(self.__class__.__module__, self.__class__.__name__,
self.name, id(self)))
def GetLockInfo(self, requested):
"""Retrieves information for querying locks.
......
......@@ -38,6 +38,10 @@ from ganeti.utils import filelock
#: Path generating random UUID
_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
#: Directory used by fsck(8) to store recovered data, usually at a file
#: system's root directory
_LOST_AND_FOUND = "lost+found"
# Possible values for keep_perms in WriteFile()
KP_NEVER = 0
KP_ALWAYS = 1
......@@ -523,7 +527,7 @@ def CreateBackup(file_name):
return backup_name
def ListVisibleFiles(path):
def ListVisibleFiles(path, _is_mountpoint=os.path.ismount):
"""Returns a list of visible files in a directory.
@type path: str
......@@ -536,8 +540,22 @@ def ListVisibleFiles(path):
if not IsNormAbsPath(path):
raise errors.ProgrammerError("Path passed to ListVisibleFiles is not"
" absolute/normalized: '%s'" % path)
files = [i for i in os.listdir(path) if not i.startswith(".")]
return files
mountpoint = _is_mountpoint(path)
def fn(name):
"""File name filter.
Ignores files starting with a dot (".") as by Unix convention they're
considered hidden. The "lost+found" directory found at the root of some
filesystems is also hidden.
"""
return not (name.startswith(".") or
(mountpoint and name == _LOST_AND_FOUND and
os.path.isdir(os.path.join(path, name))))
return filter(fn, os.listdir(path))
def EnsureDirs(dirs):
......@@ -739,13 +757,24 @@ def ReadPidFile(pidfile):
logging.exception("Can't read pid file")
return 0
return _ParsePidFileContents(raw_data)
def _ParsePidFileContents(data):
"""Tries to extract a process ID from a PID file's content.
@type data: string
@rtype: int
@return: Zero if nothing could be read, PID otherwise
"""
try:
pid = int(raw_data)
except (TypeError, ValueError), err:
pid = int(data)
except (TypeError, ValueError):
logging.info("Can't parse pid file contents", exc_info=True)
return 0
return pid
else:
return pid
def ReadLockedPidFile(path):
......@@ -872,13 +901,21 @@ def WritePidFile(pidfile):
"""
# We don't rename nor truncate the file to not drop locks under
# existing processes
fd_pidfile = os.open(pidfile, os.O_WRONLY | os.O_CREAT, 0600)
fd_pidfile = os.open(pidfile, os.O_RDWR | os.O_CREAT, 0600)
# Lock the PID file (and fail if not possible to do so). Any code
# wanting to send a signal to the daemon should try to lock the PID
# file before reading it. If acquiring the lock succeeds, the daemon is
# no longer running and the signal should not be sent.
filelock.LockFile(fd_pidfile)
try:
filelock.LockFile(fd_pidfile)
except errors.LockError:
msg = ["PID file '%s' is already locked by another process" % pidfile]
# Try to read PID file
pid = _ParsePidFileContents(os.read(fd_pidfile, 100))
if pid > 0:
msg.append(", PID read from file is %s" % pid)
raise errors.PidFileLockError("".join(msg))
os.write(fd_pidfile, "%d\n" % os.getpid())
......
......@@ -38,6 +38,7 @@ from ganeti import opcodes
from ganeti import compat
from ganeti import mcpu
from ganeti import query
from ganeti import workerpool
import testutils
......@@ -1625,6 +1626,43 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate)
class TestEvaluateJobProcessorResult(unittest.TestCase):
def testFinished(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(30953)
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.FINISHED)
self.assertEqual(depmgr.GetNextNotification(), job.id)
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testDefer(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(11326, priority=5463)
try:
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.DEFER)
except workerpool.DeferTask, err:
self.assertEqual(err.priority, 5463)
else:
self.fail("Didn't raise exception")
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testWaitdep(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(21317)
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.WAITDEP)
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testOther(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(5813)
self.assertRaises(errors.ProgrammerError,
jqueue._EvaluateJobProcessorResult,
depmgr, job, "Other result")
self.assertRaises(IndexError, depmgr.GetNextNotification)
class _FakeTimeoutStrategy:
def __init__(self, timeouts):
self.timeouts = timeouts
......@@ -1858,11 +1896,16 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, self.queue.GetNextUpdate)
class TestJobDependencyManager(unittest.TestCase):
class _FakeJob:
def __init__(self, job_id):
self.id = str(job_id)
class _IdOnlyFakeJob:
def __init__(self, job_id, priority=NotImplemented):
self.id = str(job_id)
self._priority = priority
def CalcPriority(self):
return self._priority
class TestJobDependencyManager(unittest.TestCase):
def setUp(self):
self._status = []
self._queue = []
......@@ -1874,10 +1917,13 @@ class TestJobDependencyManager(unittest.TestCase):
return result
def _Enqueue(self, jobs):
self.assertFalse(self.jdm._lock.is_owned(),
msg=("Must not own manager lock while re-adding jobs"
" (potential deadlock)"))
self._queue.append(jobs)
def testNotFinalizedThenCancel(self):
job = self._FakeJob(17697)
job = _IdOnlyFakeJob(17697)
job_id = str(28625)
self._status.append((job_id, constants.JOB_STATUS_RUNNING))
......@@ -1902,7 +1948,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireCancel(self):
job = self._FakeJob(5278)
job = _IdOnlyFakeJob(5278)
job_id = str(9610)
dep_status = [constants.JOB_STATUS_CANCELED]
......@@ -1928,7 +1974,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireError(self):
job = self._FakeJob(21459)
job = _IdOnlyFakeJob(21459)
job_id = str(25519)
dep_status = [constants.JOB_STATUS_ERROR]
......@@ -1954,7 +2000,7 @@ class TestJobDependencyManager(unittest.TestCase):
dep_status = list(constants.JOBS_FINALIZED)
for end_status in dep_status:
job = self._FakeJob(21343)
job = _IdOnlyFakeJob(21343)
job_id = str(14609)
self._status.append((job_id, constants.JOB_STATUS_WAITING))
......@@ -1979,7 +2025,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testNotify(self):
job = self._FakeJob(8227)
job = _IdOnlyFakeJob(8227)
job_id = str(4113)
self._status.append((job_id, constants.JOB_STATUS_RUNNING))
......@@ -1999,7 +2045,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertEqual(self._queue, [set([job])])
def testWrongStatus(self):
job = self._FakeJob(10102)
job = _IdOnlyFakeJob(10102)
job_id = str(1271)
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
......@@ -2022,7 +2068,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self.jdm.JobWaiting(job))
def testCorrectStatus(self):
job = self._FakeJob(24273)
job = _IdOnlyFakeJob(24273)
job_id = str(23885)
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
......@@ -2045,7 +2091,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self.jdm.JobWaiting(job))
def testFinalizedRightAway(self):
job = self._FakeJob(224)
job = _IdOnlyFakeJob(224)
job_id = str(3081)
self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
......@@ -2072,7 +2118,7 @@ class TestJobDependencyManager(unittest.TestCase):
job_ids = map(str, rnd.sample(range(1, 10000), 150))
waiters = dict((job_ids.pop(),
set(map(self._FakeJob,
set(map(_IdOnlyFakeJob,
[job_ids.pop()
for _ in range(rnd.randint(1, 20))])))
for _ in range(10))
......@@ -2132,14 +2178,14 @@ class TestJobDependencyManager(unittest.TestCase):
assert not waiters
def testSelfDependency(self):
job = self._FakeJob(18937)
job = _IdOnlyFakeJob(18937)
self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
(result, _) = self.jdm.CheckAndRegister(job, job.id, [])
self.assertEqual(result, self.jdm.ERROR)
def testJobDisappears(self):
job = self._FakeJob(30540)
job = _IdOnlyFakeJob(30540)
job_id = str(23769)
def _FakeStatus(_):
......
......@@ -239,6 +239,27 @@ class TestListVisibleFiles(unittest.TestCase):
self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
"/bin/../tmp")
def testMountpoint(self):
lvfmp_fn = compat.partial(utils.ListVisibleFiles,
_is_mountpoint=lambda _: True)
self.assertEqual(lvfmp_fn(self.path), [])
# Create "lost+found" as a regular file
self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
self.assertEqual(set(lvfmp_fn(self.path)),
set(["foo", "bar", "lost+found"]))
# Replace "lost+found" with a directory
laf_path = utils.PathJoin(self.path, "lost+found")
utils.RemoveFile(laf_path)
os.mkdir(laf_path)
self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
def testLostAndFoundNoMountpoint(self):
files = ["foo", "bar", ".Hello World", "lost+found"]
expected = ["foo", "bar", "lost+found"]
self._test(files, expected)
class TestWriteFile(testutils.GanetiTestCase):
def setUp(self):
......@@ -415,7 +436,6 @@ class TestFileID(testutils.GanetiTestCase):
def testUpdate(self):
name = self._CreateTempFile()
oldi = utils.GetFileID(path=name)
os.utime(name, None)
fd = os.open(name, os.O_RDWR)
try:
newi = utils.GetFileID(fd=fd)
......@@ -723,7 +743,7 @@ class TestPidFileFunctions(unittest.TestCase):
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, os.getpid())
self.failUnless(utils.IsProcessAlive(read_pid))
self.failUnlessRaises(errors.LockError, utils.WritePidFile,
self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
self.f_dpn('test'))
os.close(fd)
utils.RemoveFile(self.f_dpn("test"))
......@@ -759,11 +779,28 @@ class TestPidFileFunctions(unittest.TestCase):
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, new_pid)
self.failUnless(utils.IsProcessAlive(new_pid))
# Try writing to locked file
try:
utils.WritePidFile(pid_file)
except errors.PidFileLockError, err:
errmsg = str(err)
self.assertTrue(errmsg.endswith(" %s" % new_pid),
msg=("Error message ('%s') didn't contain correct"
" PID (%s)" % (errmsg, new_pid)))
else:
self.fail("Writing to locked file didn't fail")
utils.KillProcess(new_pid, waitpid=True)
self.failIf(utils.IsProcessAlive(new_pid))
utils.RemoveFile(self.f_dpn('child'))
self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
def testExceptionType(self):
# Make sure the PID lock error is a subclass of LockError in case some code
# depends on it
self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
def tearDown(self):
shutil.rmtree(self.dir)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment