diff --git a/lib/daemon.py b/lib/daemon.py index 6c872f20ea70f712e73e2aab710c7456442ea795..1e41b80781b1a2309ba9e2f11a009a829f92016c 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -796,7 +796,12 @@ def GenericMain(daemon_name, optionparser, signal.signal(signal.SIGHUP, compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn])) - utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) + try: + utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) + except errors.PidFileLockError, err: + print >> sys.stderr, "Error while locking PID file:\n%s" % err + sys.exit(constants.EXIT_FAILURE) + try: try: logging.info("%s daemon startup", daemon_name) diff --git a/lib/errors.py b/lib/errors.py index ff7cbf85126ae7fec90e0b6634241485bafd7281..3847353b78d22a5adb5988b31cd09a41f67d4254 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -82,6 +82,12 @@ class LockError(GenericError): pass +class PidFileLockError(LockError): + """PID file is already locked by another process. + + """ + + class HypervisorError(GenericError): """Hypervisor-related exception. diff --git a/lib/jqueue.py b/lib/jqueue.py index e030558dfc319281e8ec5fb2f3eab9b38a94b176..34ac6a7ec149e65558b3d8095e445c0160c95073 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1237,6 +1237,29 @@ class _JobProcessor(object): queue.release() +def _EvaluateJobProcessorResult(depmgr, job, result): + """Looks at a result from L{_JobProcessor} for a job. + + To be used in a L{_JobQueueWorker}. + + """ + if result == _JobProcessor.FINISHED: + # Notify waiting jobs + depmgr.NotifyWaiters(job.id) + + elif result == _JobProcessor.DEFER: + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) + + elif result == _JobProcessor.WAITDEP: + # No-op, dependency manager will re-schedule + pass + + else: + raise errors.ProgrammerError("Job processor returned unknown status %s" % + (result, )) + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -1277,23 +1300,8 @@ class _JobQueueWorker(workerpool.BaseWorker): wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, proc.ExecOpCode) - result = _JobProcessor(queue, wrap_execop_fn, job)() - - if result == _JobProcessor.FINISHED: - # Notify waiting jobs - queue.depmgr.NotifyWaiters(job.id) - - elif result == _JobProcessor.DEFER: - # Schedule again - raise workerpool.DeferTask(priority=job.CalcPriority()) - - elif result == _JobProcessor.WAITDEP: - # No-op, dependency manager will re-schedule - pass - - else: - raise errors.ProgrammerError("Job processor returned unknown status %s" % - (result, )) + _EvaluateJobProcessorResult(queue.depmgr, job, + _JobProcessor(queue, wrap_execop_fn, job)()) @staticmethod def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): @@ -1439,28 +1447,39 @@ class _JobDependencyManager: " not one of '%s' as required" % (dep_job_id, status, utils.CommaJoin(dep_status))) - @locking.ssynchronized(_LOCK) + def _RemoveEmptyWaitersUnlocked(self): + """Remove all jobs without actual waiters. + + """ + for job_id in [job_id for (job_id, waiters) in self._waiters.items() + if not waiters]: + del self._waiters[job_id] + def NotifyWaiters(self, job_id): """Notifies all jobs waiting for a certain job ID. + @attention: Do not call until L{CheckAndRegister} returned a status other + than C{WAITDEP} for C{job_id}, or behaviour is undefined @type job_id: string @param job_id: Job ID """ assert ht.TString(job_id) - jobs = self._waiters.pop(job_id, None) + self._lock.acquire() + try: + self._RemoveEmptyWaitersUnlocked() + + jobs = self._waiters.pop(job_id, None) + finally: + self._lock.release() + if jobs: # Re-add jobs to workerpool logging.debug("Re-adding %s jobs which were waiting for job %s", len(jobs), job_id) self._enqueue_fn(jobs) - # Remove all jobs without actual waiters - for job_id in [job_id for (job_id, waiters) in self._waiters.items() - if not waiters]: - del self._waiters[job_id] - def _RequireOpenQueue(fn): """Decorator for "public" functions. diff --git a/lib/locking.py b/lib/locking.py index e5861c5bf6f28bc8cee05bc42b04f9054df0a56f..5ac20f97e2804d49b93cbeead4c10cdcf2ae34f0 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -357,6 +357,11 @@ class PipeCondition(_BaseCondition): return bool(self._waiters) + def __repr__(self): + return ("<%s.%s waiters=%s at %#x>" % + (self.__class__.__module__, self.__class__.__name__, + self._waiters, id(self))) + class _PipeConditionWithMode(PipeCondition): __slots__ = [ @@ -436,6 +441,11 @@ class SharedLock(object): logging.debug("Adding lock %s to monitor", name) monitor.RegisterLock(self) + def __repr__(self): + return ("<%s.%s name=%s at %#x>" % + (self.__class__.__module__, self.__class__.__name__, + self.name, id(self))) + def GetLockInfo(self, requested): """Retrieves information for querying locks. diff --git a/lib/utils/io.py b/lib/utils/io.py index ebae9dff6be791caaf1ac68df75fba879270bafc..8c785f7d791dcc6c08ea958053fa6e8e05bdb16d 100644 --- a/lib/utils/io.py +++ b/lib/utils/io.py @@ -38,6 +38,10 @@ from ganeti.utils import filelock #: Path generating random UUID _RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid" +#: Directory used by fsck(8) to store recovered data, usually at a file +#: system's root directory +_LOST_AND_FOUND = "lost+found" + # Possible values for keep_perms in WriteFile() KP_NEVER = 0 KP_ALWAYS = 1 @@ -523,7 +527,7 @@ def CreateBackup(file_name): return backup_name -def ListVisibleFiles(path): +def ListVisibleFiles(path, _is_mountpoint=os.path.ismount): """Returns a list of visible files in a directory. @type path: str @@ -536,8 +540,22 @@ def ListVisibleFiles(path): if not IsNormAbsPath(path): raise errors.ProgrammerError("Path passed to ListVisibleFiles is not" " absolute/normalized: '%s'" % path) - files = [i for i in os.listdir(path) if not i.startswith(".")] - return files + + mountpoint = _is_mountpoint(path) + + def fn(name): + """File name filter. + + Ignores files starting with a dot (".") as by Unix convention they're + considered hidden. The "lost+found" directory found at the root of some + filesystems is also hidden. + + """ + return not (name.startswith(".") or + (mountpoint and name == _LOST_AND_FOUND and + os.path.isdir(os.path.join(path, name)))) + + return filter(fn, os.listdir(path)) def EnsureDirs(dirs): @@ -739,13 +757,24 @@ def ReadPidFile(pidfile): logging.exception("Can't read pid file") return 0 + return _ParsePidFileContents(raw_data) + + +def _ParsePidFileContents(data): + """Tries to extract a process ID from a PID file's content. + + @type data: string + @rtype: int + @return: Zero if nothing could be read, PID otherwise + + """ try: - pid = int(raw_data) - except (TypeError, ValueError), err: + pid = int(data) + except (TypeError, ValueError): logging.info("Can't parse pid file contents", exc_info=True) return 0 - - return pid + else: + return pid def ReadLockedPidFile(path): @@ -872,13 +901,21 @@ def WritePidFile(pidfile): """ # We don't rename nor truncate the file to not drop locks under # existing processes - fd_pidfile = os.open(pidfile, os.O_WRONLY | os.O_CREAT, 0600) + fd_pidfile = os.open(pidfile, os.O_RDWR | os.O_CREAT, 0600) # Lock the PID file (and fail if not possible to do so). Any code # wanting to send a signal to the daemon should try to lock the PID # file before reading it. If acquiring the lock succeeds, the daemon is # no longer running and the signal should not be sent. - filelock.LockFile(fd_pidfile) + try: + filelock.LockFile(fd_pidfile) + except errors.LockError: + msg = ["PID file '%s' is already locked by another process" % pidfile] + # Try to read PID file + pid = _ParsePidFileContents(os.read(fd_pidfile, 100)) + if pid > 0: + msg.append(", PID read from file is %s" % pid) + raise errors.PidFileLockError("".join(msg)) os.write(fd_pidfile, "%d\n" % os.getpid()) diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 45c298450bf54f1a93badd2dd8d05f1143a6a30d..ea5e8f4be522be31494e358b3546631a87336fe6 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -38,6 +38,7 @@ from ganeti import opcodes from ganeti import compat from ganeti import mcpu from ganeti import query +from ganeti import workerpool import testutils @@ -1625,6 +1626,43 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, queue.GetNextUpdate) +class TestEvaluateJobProcessorResult(unittest.TestCase): + def testFinished(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(30953) + jqueue._EvaluateJobProcessorResult(depmgr, job, + jqueue._JobProcessor.FINISHED) + self.assertEqual(depmgr.GetNextNotification(), job.id) + self.assertRaises(IndexError, depmgr.GetNextNotification) + + def testDefer(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(11326, priority=5463) + try: + jqueue._EvaluateJobProcessorResult(depmgr, job, + jqueue._JobProcessor.DEFER) + except workerpool.DeferTask, err: + self.assertEqual(err.priority, 5463) + else: + self.fail("Didn't raise exception") + self.assertRaises(IndexError, depmgr.GetNextNotification) + + def testWaitdep(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(21317) + jqueue._EvaluateJobProcessorResult(depmgr, job, + jqueue._JobProcessor.WAITDEP) + self.assertRaises(IndexError, depmgr.GetNextNotification) + + def testOther(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(5813) + self.assertRaises(errors.ProgrammerError, + jqueue._EvaluateJobProcessorResult, + depmgr, job, "Other result") + self.assertRaises(IndexError, depmgr.GetNextNotification) + + class _FakeTimeoutStrategy: def __init__(self, timeouts): self.timeouts = timeouts @@ -1858,11 +1896,16 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, self.queue.GetNextUpdate) -class TestJobDependencyManager(unittest.TestCase): - class _FakeJob: - def __init__(self, job_id): - self.id = str(job_id) +class _IdOnlyFakeJob: + def __init__(self, job_id, priority=NotImplemented): + self.id = str(job_id) + self._priority = priority + + def CalcPriority(self): + return self._priority + +class TestJobDependencyManager(unittest.TestCase): def setUp(self): self._status = [] self._queue = [] @@ -1874,10 +1917,13 @@ class TestJobDependencyManager(unittest.TestCase): return result def _Enqueue(self, jobs): + self.assertFalse(self.jdm._lock.is_owned(), + msg=("Must not own manager lock while re-adding jobs" + " (potential deadlock)")) self._queue.append(jobs) def testNotFinalizedThenCancel(self): - job = self._FakeJob(17697) + job = _IdOnlyFakeJob(17697) job_id = str(28625) self._status.append((job_id, constants.JOB_STATUS_RUNNING)) @@ -1902,7 +1948,7 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) def testRequireCancel(self): - job = self._FakeJob(5278) + job = _IdOnlyFakeJob(5278) job_id = str(9610) dep_status = [constants.JOB_STATUS_CANCELED] @@ -1928,7 +1974,7 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) def testRequireError(self): - job = self._FakeJob(21459) + job = _IdOnlyFakeJob(21459) job_id = str(25519) dep_status = [constants.JOB_STATUS_ERROR] @@ -1954,7 +2000,7 @@ class TestJobDependencyManager(unittest.TestCase): dep_status = list(constants.JOBS_FINALIZED) for end_status in dep_status: - job = self._FakeJob(21343) + job = _IdOnlyFakeJob(21343) job_id = str(14609) self._status.append((job_id, constants.JOB_STATUS_WAITING)) @@ -1979,7 +2025,7 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) def testNotify(self): - job = self._FakeJob(8227) + job = _IdOnlyFakeJob(8227) job_id = str(4113) self._status.append((job_id, constants.JOB_STATUS_RUNNING)) @@ -1999,7 +2045,7 @@ class TestJobDependencyManager(unittest.TestCase): self.assertEqual(self._queue, [set([job])]) def testWrongStatus(self): - job = self._FakeJob(10102) + job = _IdOnlyFakeJob(10102) job_id = str(1271) self._status.append((job_id, constants.JOB_STATUS_QUEUED)) @@ -2022,7 +2068,7 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.JobWaiting(job)) def testCorrectStatus(self): - job = self._FakeJob(24273) + job = _IdOnlyFakeJob(24273) job_id = str(23885) self._status.append((job_id, constants.JOB_STATUS_QUEUED)) @@ -2045,7 +2091,7 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.JobWaiting(job)) def testFinalizedRightAway(self): - job = self._FakeJob(224) + job = _IdOnlyFakeJob(224) job_id = str(3081) self._status.append((job_id, constants.JOB_STATUS_SUCCESS)) @@ -2072,7 +2118,7 @@ class TestJobDependencyManager(unittest.TestCase): job_ids = map(str, rnd.sample(range(1, 10000), 150)) waiters = dict((job_ids.pop(), - set(map(self._FakeJob, + set(map(_IdOnlyFakeJob, [job_ids.pop() for _ in range(rnd.randint(1, 20))]))) for _ in range(10)) @@ -2132,14 +2178,14 @@ class TestJobDependencyManager(unittest.TestCase): assert not waiters def testSelfDependency(self): - job = self._FakeJob(18937) + job = _IdOnlyFakeJob(18937) self._status.append((job.id, constants.JOB_STATUS_SUCCESS)) (result, _) = self.jdm.CheckAndRegister(job, job.id, []) self.assertEqual(result, self.jdm.ERROR) def testJobDisappears(self): - job = self._FakeJob(30540) + job = _IdOnlyFakeJob(30540) job_id = str(23769) def _FakeStatus(_): diff --git a/test/ganeti.utils.io_unittest.py b/test/ganeti.utils.io_unittest.py index a4220c14a99b38998ae75c408c1368bcce7b9297..109232a399bb5cf39704a178935e8495e6fdb28d 100755 --- a/test/ganeti.utils.io_unittest.py +++ b/test/ganeti.utils.io_unittest.py @@ -239,6 +239,27 @@ class TestListVisibleFiles(unittest.TestCase): self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles, "/bin/../tmp") + def testMountpoint(self): + lvfmp_fn = compat.partial(utils.ListVisibleFiles, + _is_mountpoint=lambda _: True) + self.assertEqual(lvfmp_fn(self.path), []) + + # Create "lost+found" as a regular file + self._CreateFiles(["foo", "bar", ".baz", "lost+found"]) + self.assertEqual(set(lvfmp_fn(self.path)), + set(["foo", "bar", "lost+found"])) + + # Replace "lost+found" with a directory + laf_path = utils.PathJoin(self.path, "lost+found") + utils.RemoveFile(laf_path) + os.mkdir(laf_path) + self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"])) + + def testLostAndFoundNoMountpoint(self): + files = ["foo", "bar", ".Hello World", "lost+found"] + expected = ["foo", "bar", "lost+found"] + self._test(files, expected) + class TestWriteFile(testutils.GanetiTestCase): def setUp(self): @@ -415,7 +436,6 @@ class TestFileID(testutils.GanetiTestCase): def testUpdate(self): name = self._CreateTempFile() oldi = utils.GetFileID(path=name) - os.utime(name, None) fd = os.open(name, os.O_RDWR) try: newi = utils.GetFileID(fd=fd) @@ -723,7 +743,7 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, os.getpid()) self.failUnless(utils.IsProcessAlive(read_pid)) - self.failUnlessRaises(errors.LockError, utils.WritePidFile, + self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile, self.f_dpn('test')) os.close(fd) utils.RemoveFile(self.f_dpn("test")) @@ -759,11 +779,28 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, new_pid) self.failUnless(utils.IsProcessAlive(new_pid)) + + # Try writing to locked file + try: + utils.WritePidFile(pid_file) + except errors.PidFileLockError, err: + errmsg = str(err) + self.assertTrue(errmsg.endswith(" %s" % new_pid), + msg=("Error message ('%s') didn't contain correct" + " PID (%s)" % (errmsg, new_pid))) + else: + self.fail("Writing to locked file didn't fail") + utils.KillProcess(new_pid, waitpid=True) self.failIf(utils.IsProcessAlive(new_pid)) utils.RemoveFile(self.f_dpn('child')) self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0) + def testExceptionType(self): + # Make sure the PID lock error is a subclass of LockError in case some code + # depends on it + self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError)) + def tearDown(self): shutil.rmtree(self.dir)