diff --git a/NEWS b/NEWS index b411cb1bb7e05f0ae3b832329f475e537a0c9176..45f081c939ed564f6c1bd0471913b348e50e05ae 100644 --- a/NEWS +++ b/NEWS @@ -36,36 +36,27 @@ Version 2.6.1 *(Released Fri, 12 Oct 2012)* -A small bugfix release. - -Fix double use of PRIORITY_OPT in gnt-node migrate, that would make the -command unusable. - -Commands that issue many jobs don't fail anymore just because some jobs -take so long that other jobs are archived. - -Failures during gnt-instance reinstall are reflected by the exit status. - -Issue 190 fixed. Check for DRBD in cluster verify is enabled only when -DRBD is enabled. - -When always_failover is set, --allow-failover is not required in migrate -commands anymore. - -bash_completion works even if extglob is disabled - -Fix bug with locks that made failover for RDB-based instances fail. - -Fix bug in non-mirrored instance allocation that would make Ganeti -choose a random node instead of one based on the allocator metric. - -Support for newer versions of pylint and pep8. - -Hail doesn't fail anymore when trying to add an instance of type -'file', 'sharedfile' or 'rbd'. - -Add new Makefile target to rebuild the whole dist, so that all files are -included. +A small bugfix release. Among the bugs fixed: + +- Fixed double use of ``PRIORITY_OPT`` in ``gnt-node migrate``, that + made the command unusable. +- Commands that issue many jobs don't fail anymore just because some jobs + take so long that other jobs are archived. +- Failures during ``gnt-instance reinstall`` are reflected by the exit + status. +- Issue 190 fixed. Check for DRBD in cluster verify is enabled only when + DRBD is enabled. +- When ``always_failover`` is set, ``--allow-failover`` is not required + in migrate commands anymore. +- ``bash_completion`` works even if extglob is disabled. +- Fixed bug with locks that made failover for RDB-based instances fail. +- Fixed bug in non-mirrored instance allocation that made Ganeti choose + a random node instead of one based on the allocator metric. +- Support for newer versions of pylint and pep8. +- Hail doesn't fail anymore when trying to add an instance of type + ``file``, ``sharedfile`` or ``rbd``. +- Added new Makefile target to rebuild the whole distribution, so that + all files are included. Version 2.6.0 diff --git a/doc/devnotes.rst b/doc/devnotes.rst index c5594e84386c96a05a16bb550a5ef93db9a027ae..7721ecd8808bdd4231d911130079e9fb97da4b6d 100644 --- a/doc/devnotes.rst +++ b/doc/devnotes.rst @@ -100,6 +100,13 @@ different python version):: $ ./autogen.sh && \ ./configure --prefix=/usr/local --sysconfdir=/etc --localstatedir=/var +Note that doing development on a machine which already has Ganeti +installed is problematic, as ``PYTHONPATH`` behaviour can be confusing +(see Issue 170 for a bit of history/details; in general it works if +the installed and developed versions are very similar, and/or if +PYTHONPATH is customised correctly). As such, in general it's +recommended to use a "clean" machine for ganeti development. + Haskell development notes ------------------------- diff --git a/lib/client/gnt_cluster.py b/lib/client/gnt_cluster.py index 06bdaf6e1e753b1a02894a43dcaadc99df683af3..ec06e1cc4c0f3c05e58a33a89135df35e37644d4 100644 --- a/lib/client/gnt_cluster.py +++ b/lib/client/gnt_cluster.py @@ -57,6 +57,10 @@ SHOW_MACHINE_OPT = cli_option("-M", "--show-machine-names", default=False, action="store_true", help="Show machine name for every line in output") +FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it", + help="Override interactive check for --no-voting", + default=False, action="store_true") + _EPO_PING_INTERVAL = 30 # 30 seconds between pings _EPO_PING_TIMEOUT = 1 # 1 second _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes @@ -706,7 +710,7 @@ def MasterFailover(opts, args): @return: the desired exit code """ - if opts.no_voting: + if opts.no_voting and not opts.yes_do_it: usertext = ("This will perform the failover even if most other nodes" " are down, or if this node is outdated. This is dangerous" " as it can lead to a non-consistent cluster. Check the" @@ -1507,7 +1511,7 @@ commands = { RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT], "[instance...]", "Updates mismatches in recorded disk sizes"), "master-failover": ( - MasterFailover, ARGS_NONE, [NOVOTING_OPT], + MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER], "", "Makes the current node the master"), "master-ping": ( MasterPing, ARGS_NONE, [], diff --git a/lib/client/gnt_debug.py b/lib/client/gnt_debug.py index ebc6af10f77d4c69da1aa57a7db95187e793fe34..0ac48b57e18943c54febb14a544693368ba96921 100644 --- a/lib/client/gnt_debug.py +++ b/lib/client/gnt_debug.py @@ -65,7 +65,7 @@ def Delay(opts, args): on_master=opts.on_master, on_nodes=opts.on_nodes, repeat=opts.repeat) - SubmitOpCode(op, opts=opts) + SubmitOrSend(op, opts) return 0 @@ -626,7 +626,7 @@ commands = { action="append", help="Select nodes to sleep on"), cli_option("-r", "--repeat", type="int", default="0", dest="repeat", help="Number of times to repeat the sleep"), - DRY_RUN_OPT, PRIORITY_OPT, + DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT, ], "[opts...] <duration>", "Executes a TestDelay OpCode"), "submit-job": ( diff --git a/lib/client/gnt_instance.py b/lib/client/gnt_instance.py index b28b0169974bb59fc23c971d0dc926cbc4024656..e9b5687461d7c524802ce28687ae950d40bcb57c 100644 --- a/lib/client/gnt_instance.py +++ b/lib/client/gnt_instance.py @@ -1347,7 +1347,9 @@ def SetInstanceParams(opts, args): for param, data in result: ToStdout(" - %-5s -> %s", param, data) ToStdout("Please don't forget that most parameters take effect" - " only at the next start of the instance.") + " only at the next (re)start of the instance initiated by" + " ganeti; restarting from within the instance will" + " not be enough.") return 0 diff --git a/lib/cmdlib.py b/lib/cmdlib.py index beac311ea5388377f852163503a9f8e062f435d4..fbc63e1375227138c87ed5ae4f667145775b4269 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1677,6 +1677,27 @@ def _GetDefaultIAllocator(cfg, ialloc): return ialloc +def _CheckHostnameSane(lu, name): + """Ensures that a given hostname resolves to a 'sane' name. + + The given name is required to be a prefix of the resolved hostname, + to prevent accidental mismatches. + + @param lu: the logical unit on behalf of which we're checking + @param name: the name we should resolve and check + @return: the resolved hostname object + + """ + hostname = netutils.GetHostname(name=name) + if hostname.name != name: + lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name) + if not utils.MatchNameComponent(name, [hostname.name]): + raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" + " same as given hostname '%s'") % + (hostname.name, name), errors.ECODE_INVAL) + return hostname + + class LUClusterPostInit(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -3371,11 +3392,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): inst_config.primary_node) # If the instance is non-redundant we cannot survive losing its primary - # node, so we are not N+1 compliant. On the other hand we have no disk - # templates with more than one secondary so that situation is not well - # supported either. - # FIXME: does not support file-backed instances - if not inst_config.secondary_nodes: + # node, so we are not N+1 compliant. + if inst_config.disk_template not in constants.DTS_MIRRORED: i_non_redundant.append(instance) _ErrorIf(len(inst_config.secondary_nodes) > 1, @@ -7462,15 +7480,7 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: - hostname = netutils.GetHostname(name=new_name) - if hostname.name != new_name: - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) - if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): - raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" - " same as given hostname '%s'") % - (hostname.name, self.op.new_name), - errors.ECODE_INVAL) + hostname = _CheckHostnameSane(self, new_name) new_name = self.op.new_name = hostname.name if (self.op.ip_check and netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): @@ -9232,7 +9242,7 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): for port in ports_to_release: lu.cfg.AddTcpUdpPort(port) - if instance.disk_template == constants.DT_FILE: + if instance.disk_template in constants.DTS_FILEBASED: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) if target_node: tgt = target_node @@ -9575,7 +9585,7 @@ class LUInstanceCreate(LogicalUnit): # instance name verification if self.op.name_check: - self.hostname1 = netutils.GetHostname(name=self.op.instance_name) + self.hostname1 = _CheckHostnameSane(self, self.op.instance_name) self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check self.check_ip = self.hostname1.ip @@ -10243,6 +10253,27 @@ class LUInstanceCreate(LogicalUnit): dsk[constants.IDISK_SIZE] = \ int(float(node_disks[dsk[constants.IDISK_ADOPT]])) + # Verify instance specs + spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) + ispec = { + constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), + constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), + constants.ISPEC_DISK_COUNT: len(self.disks), + constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] + for disk in self.disks], + constants.ISPEC_NIC_COUNT: len(self.nics), + constants.ISPEC_SPINDLE_USE: spindle_use, + } + + group_info = self.cfg.GetNodeGroup(pnode.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) + if not self.op.ignore_ipolicy and res: + raise errors.OpPrereqError(("Instance allocation to group %s violates" + " policy: %s") % (pnode.group, + utils.CommaJoin(res)), + errors.ECODE_INVAL) + _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant) @@ -12973,12 +13004,11 @@ class LUInstanceSetParams(LogicalUnit): self.be_proposed[constants.BE_MAXMEM]), errors.ECODE_INVAL) - if self.op.runtime_mem > current_memory: + delta = self.op.runtime_mem - current_memory + if delta > 0: _CheckNodeFreeMemory(self, instance.primary_node, "ballooning memory for instance %s" % - instance.name, - self.op.memory - current_memory, - instance.hypervisor) + instance.name, delta, instance.hypervisor) if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" diff --git a/lib/jqueue.py b/lib/jqueue.py index 165cfec838d333d52cb48831391fa1039cd03929..6929d6a049ffcee6df063d899da7a8d3fe2cf075 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -76,6 +76,12 @@ class CancelJob(Exception): """ +class QueueShutdown(Exception): + """Special exception to abort a job when the job queue is shutting down. + + """ + + def TimeStampNow(): """Returns the current timestamp. @@ -502,6 +508,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): logging.debug("Canceling opcode") raise CancelJob() + # See if queue is shutting down + if not self._queue.AcceptingJobsUnlocked(): + logging.debug("Queue is shutting down") + raise QueueShutdown() + @locking.ssynchronized(_QUEUE, shared=1) def NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -1044,12 +1055,25 @@ class _JobProcessor(object): if op.status == constants.OP_STATUS_CANCELING: return (constants.OP_STATUS_CANCELING, None) + # Queue is shutting down, return to queued + if not self.queue.AcceptingJobsUnlocked(): + return (constants.OP_STATUS_QUEUED, None) + # Stay in waitlock while trying to re-acquire lock return (constants.OP_STATUS_WAITING, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) + + except QueueShutdown: + logging.exception("%s: Queue is shutting down", opctx.log_prefix) + + assert op.status == constants.OP_STATUS_WAITING + + # Job hadn't been started yet, so it should return to the queue + return (constants.OP_STATUS_QUEUED, None) + except Exception, err: # pylint: disable=W0703 logging.exception("%s: Caught exception in %s", opctx.log_prefix, opctx.summary) @@ -1147,8 +1171,10 @@ class _JobProcessor(object): assert not waitjob - if op.status == constants.OP_STATUS_WAITING: - # Couldn't get locks in time + if op.status in (constants.OP_STATUS_WAITING, + constants.OP_STATUS_QUEUED): + # waiting: Couldn't get locks in time + # queued: Queue is shutting down assert not op.end_timestamp else: # Finalize opcode @@ -1160,7 +1186,19 @@ class _JobProcessor(object): else: assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_WAITING or waitjob: + if op.status == constants.OP_STATUS_QUEUED: + # Queue is shutting down + assert not waitjob + + finalize = False + + # Reset context + job.cur_opctx = None + + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_QUEUED + + elif op.status == constants.OP_STATUS_WAITING or waitjob: finalize = False if not waitjob and opctx.CheckPriorityIncrease(): @@ -2540,6 +2578,17 @@ class JobQueue(object): return self._wpool.HasRunningTasks() + def AcceptingJobsUnlocked(self): + """Returns whether jobs are accepted. + + Once L{PrepareShutdown} has been called, no new jobs are accepted and the + queue is shutting down. + + @rtype: bool + + """ + return self._accepting_jobs + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): diff --git a/lib/mcpu.py b/lib/mcpu.py index b807b911bf27ff31935954ab40e77bece0cebe9c..27f29fca13d9be82b08dd046c9596f1e0095a350 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -400,10 +400,12 @@ class Processor(object): try: self.context.glm.add(level, add_locks, acquired=1, shared=share) except errors.LockError: + logging.exception("Detected lock error in level %s for locks" + " %s, shared=%s", level, add_locks, share) raise errors.OpPrereqError( - "Couldn't add locks (%s), probably because of a race condition" - " with another job, who added them first" % add_locks, - errors.ECODE_FAULT) + "Couldn't add locks (%s), most likely because of another" + " job who added them first" % add_locks, + errors.ECODE_NOTUNIQUE) try: result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) diff --git a/lib/opcodes.py b/lib/opcodes.py index 8a88bf02681bd869543f577163beb3864c610478..2da68ce06c1a48075da19901480bbef9fe1624e2 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -806,7 +806,7 @@ class OpClusterSetParams(OpCode): OP_PARAMS = [ _PHvState, _PDiskState, - ("vg_name", None, ht.TMaybeString, "Volume group name"), + ("vg_name", None, ht.TOr(ht.TString, ht.TNone), "Volume group name"), ("enabled_hypervisors", None, ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue), ht.TNone), diff --git a/lib/server/masterd.py b/lib/server/masterd.py index f53c914d6f1264f166044505bd776b9940b88ee7..7cd7a80c79528076b0b11c1be37ecb8338cf6ef0 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -67,6 +67,18 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR +def _LogNewJob(status, info, ops): + """Log information about a recently submitted job. + + """ + if status: + logging.info("New job with id %s, summary: %s", + info, utils.CommaJoin(op.Summary() for op in ops)) + else: + logging.info("Failed to submit job, reason: '%s', summary: %s", + info, utils.CommaJoin(op.Summary() for op in ops)) + + class ClientRequestWorker(workerpool.BaseWorker): # pylint: disable=W0221 def RunTask(self, server, message, client): @@ -268,18 +280,23 @@ class ClientOps: # TODO: Rewrite to not exit in each 'if/elif' branch if method == luxi.REQ_SUBMIT_JOB: - logging.info("Received new job") + logging.info("Receiving new job") (job_def, ) = args ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] - return queue.SubmitJob(ops) + job_id = queue.SubmitJob(ops) + _LogNewJob(True, job_id, ops) + return job_id elif method == luxi.REQ_SUBMIT_MANY_JOBS: - logging.info("Received multiple jobs") + logging.info("Receiving multiple jobs") (job_defs, ) = args jobs = [] for ops in job_defs: jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) - return queue.SubmitManyJobs(jobs) + job_ids = queue.SubmitManyJobs(jobs) + for ((status, job_id), ops) in zip(job_ids, jobs): + _LogNewJob(status, job_id, ops) + return job_ids elif method == luxi.REQ_CANCEL_JOB: (job_id, ) = args diff --git a/man/gnt-cluster.rst b/man/gnt-cluster.rst index 3cf77d2ec5359310e0c157e2adb11a516e3c7495..e4bcc28c2ab6708be53cbbc0f9ddd5af705984ea 100644 --- a/man/gnt-cluster.rst +++ b/man/gnt-cluster.rst @@ -524,7 +524,7 @@ List the tags of the cluster. MASTER-FAILOVER ~~~~~~~~~~~~~~~ -**master-failover** [\--no-voting] +**master-failover** [\--no-voting] [\--yes-do-it] Failover the master role to the current node. @@ -538,6 +538,11 @@ You can pass ``--no-voting`` to **ganeti-masterd** on the new master to solve this problem, and run **gnt-cluster redist-conf** to make sure the cluster is consistent again. +The option ``--yes-do-it`` is used together with ``--no-voting``, for +skipping the interactive checks. This is even more dangerous, and should +only be used in conjunction with other means (e.g. a HA suite) to +confirm that the operation is indeed safe. + MASTER-PING ~~~~~~~~~~~ diff --git a/man/gnt-instance.rst b/man/gnt-instance.rst index 458aef4fafd85405eab3594c927b16b6fe6c549e..3cb2cecdaeb82ddbde565252ac08d8149a590937 100644 --- a/man/gnt-instance.rst +++ b/man/gnt-instance.rst @@ -1115,7 +1115,7 @@ case the more than one instance will be affected. The ``--no-remember`` option will perform the startup but not change the state of the instance in the configuration file (if it was stopped -before, Ganeti will still thinks it needs to be stopped). This can be +before, Ganeti will still think it needs to be stopped). This can be used for testing, or for a one shot-start where you don't want the watcher to restart the instance if it crashes. diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 41b4c4be89ffbcc07b7b26bcd221ff7cd66ba98a..8b1cb250d45d05da75d5ade01e2146124cda2c32 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -505,6 +505,7 @@ class _FakeQueueForProc: self._acquired = False self._updates = [] self._submitted = [] + self._accepting_jobs = True self._submit_count = itertools.count(1000) @@ -540,6 +541,12 @@ class _FakeQueueForProc: self._submitted.extend(zip(job_ids, jobs)) return job_ids + def StopAcceptingJobs(self): + self._accepting_jobs = False + + def AcceptingJobsUnlocked(self): + return self._accepting_jobs + class _FakeExecOpCodeForProc: def __init__(self, queue, before_start, after_start): @@ -884,7 +891,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): [[constants.OP_STATUS_CANCELED for _ in job.ops], ["Job canceled by request" for _ in job.ops]]) - def testCancelWhileWaitlockWithTimeout(self): + def _TestCancelWhileSomething(self, cb): queue = _FakeQueueForProc() ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) @@ -907,8 +914,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING for op in job.ops)) - # Fake an acquire attempt timing out - raise mcpu.LockAcquireTimeout() + cb(queue) def _AfterStart(op, cbs): self.fail("Should not reach this") @@ -929,6 +935,19 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): [[constants.OP_STATUS_CANCELED for _ in job.ops], ["Job canceled by request" for _ in job.ops]]) + return queue + + def testCancelWhileWaitlockWithTimeout(self): + def fn(_): + # Fake an acquire attempt timing out + raise mcpu.LockAcquireTimeout() + + self._TestCancelWhileSomething(fn) + + def testCancelDuringQueueShutdown(self): + queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs()) + self.assertFalse(queue.AcceptingJobsUnlocked()) + def testCancelWhileRunning(self): # Tests canceling a job with finished opcodes and more, unprocessed ones queue = _FakeQueueForProc() @@ -975,6 +994,185 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): ["Res0", "Job canceled by request", "Job canceled by request"]]) + def _TestQueueShutdown(self, queue, opexec, job, runcount): + self.assertTrue(queue.AcceptingJobsUnlocked()) + + # Simulate shutdown + queue.StopAcceptingJobs() + + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Check result + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED]) + self.assertFalse(job.cur_opctx) + self.assertTrue(job.start_timestamp) + self.assertFalse(job.end_timestamp) + self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp) + self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops[:runcount])) + self.assertFalse(job.ops[runcount].end_timestamp) + self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp + for op in job.ops[(runcount + 1):])) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [(([constants.OP_STATUS_SUCCESS] * runcount) + + ([constants.OP_STATUS_QUEUED] * + (len(job.ops) - runcount))), + (["Res%s" % i for i in range(runcount)] + + ([None] * (len(job.ops) - runcount)))]) + + # Must have been written and replicated + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + + def testQueueShutdownWhileRunning(self): + # Tests shutting down the queue while a job is running + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(3)] + + # Create job + job_id = 2718211587 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + opexec = _FakeExecOpCodeForProc(queue, None, None) + + self.assertRaises(IndexError, queue.GetNextUpdate) + + # Run one opcode + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED], + ["Res0", None, None]]) + self.assertFalse(job.cur_opctx) + + # Writes for waiting, running and result + for _ in range(3): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + + # Run second opcode + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED], + ["Res0", "Res1", None]]) + self.assertFalse(job.cur_opctx) + + # Writes for waiting, running and result + for _ in range(3): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + + self._TestQueueShutdown(queue, opexec, job, 2) + + def testQueueShutdownWithLockTimeout(self): + # Tests shutting down while a lock acquire times out + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(3)] + + # Create job + job_id = 1304231178 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + acquire_timeout = False + + def _BeforeStart(timeout, priority): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + if acquire_timeout: + raise mcpu.LockAcquireTimeout() + + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None) + + self.assertRaises(IndexError, queue.GetNextUpdate) + + # Run one opcode + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED], + ["Res0", None, None]]) + self.assertFalse(job.cur_opctx) + + # Writes for waiting, running and result + for _ in range(3): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + + # The next opcode should have expiring lock acquires + acquire_timeout = True + + self._TestQueueShutdown(queue, opexec, job, 1) + + def testQueueShutdownWhileInQueue(self): + # This should never happen in reality (no new jobs are started by the + # workerpool once a shutdown has been initiated), but it's better to test + # the job processor for this scenario + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(5)] + + # Create job + job_id = 2031 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertRaises(IndexError, queue.GetNextUpdate) + + self.assertFalse(job.start_timestamp) + self.assertFalse(job.end_timestamp) + self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED + for op in job.ops)) + + opexec = _FakeExecOpCodeForProc(queue, None, None) + self._TestQueueShutdown(queue, opexec, job, 0) + + def testQueueShutdownWhileWaitlockInQueue(self): + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(5)] + + # Create job + job_id = 53125685 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + job.ops[0].status = constants.OP_STATUS_WAITING + + assert len(job.ops) == 5 + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + + self.assertRaises(IndexError, queue.GetNextUpdate) + + opexec = _FakeExecOpCodeForProc(queue, None, None) + self._TestQueueShutdown(queue, opexec, job, 0) + def testPartiallyRun(self): # Tests calling the processor on a job that's been partially run before the # program was restarted @@ -1958,6 +2156,28 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.JobWaiting(job)) self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) + def testNotFinalizedThenQueued(self): + # This can happen on a queue shutdown + job = _IdOnlyFakeJob(1320) + job_id = str(22971) + + for i in range(5): + if i > 2: + self._status.append((job_id, constants.JOB_STATUS_QUEUED)) + else: + self._status.append((job_id, constants.JOB_STATUS_RUNNING)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, []) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [ + ("job/22971", None, None, [("job", [job.id])]) + ]) + def testRequireCancel(self): job = _IdOnlyFakeJob(5278) job_id = str(9610)