diff --git a/doc/devnotes.rst b/doc/devnotes.rst index 3a8af6417c6a6d7c2af51b82b2563774c0c227cd..00d9132fa45c2396c41dc6543d6f3ee32a6cfa2b 100644 --- a/doc/devnotes.rst +++ b/doc/devnotes.rst @@ -66,6 +66,13 @@ different python version):: $ ./autogen.sh && \ ./configure --prefix=/usr/local --sysconfdir=/etc --localstatedir=/var +Note that doing development on a machine which already has Ganeti +installed is problematic, as ``PYTHONPATH`` behaviour can be confusing +(see Issue 170 for a bit of history/details; in general it works if +the installed and developed versions are very similar, and/or if +PYTHONPATH is customised correctly). As such, in general it's +recommended to use a "clean" machine for ganeti development. + Haskell development notes ------------------------- diff --git a/lib/client/gnt_cluster.py b/lib/client/gnt_cluster.py index ecbe5366bf9f5cc5176b5216017878a274530673..ffec3b0005cd87e2f9dfc66920748526e28278d0 100644 --- a/lib/client/gnt_cluster.py +++ b/lib/client/gnt_cluster.py @@ -56,6 +56,10 @@ SHOW_MACHINE_OPT = cli_option("-M", "--show-machine-names", default=False, action="store_true", help="Show machine name for every line in output") +FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it", + help="Override interactive check for --no-voting", + default=False, action="store_true") + _EPO_PING_INTERVAL = 30 # 30 seconds between pings _EPO_PING_TIMEOUT = 1 # 1 second _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes @@ -707,7 +711,7 @@ def MasterFailover(opts, args): @return: the desired exit code """ - if opts.no_voting: + if opts.no_voting and not opts.yes_do_it: usertext = ("This will perform the failover even if most other nodes" " are down, or if this node is outdated. This is dangerous" " as it can lead to a non-consistent cluster. Check the" @@ -1508,7 +1512,7 @@ commands = { RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT], "[instance...]", "Updates mismatches in recorded disk sizes"), "master-failover": ( - MasterFailover, ARGS_NONE, [NOVOTING_OPT], + MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER], "", "Makes the current node the master"), "master-ping": ( MasterPing, ARGS_NONE, [], diff --git a/lib/client/gnt_debug.py b/lib/client/gnt_debug.py index 5eb58a29fa91e9e5597f91134532cea8e9f81289..4c833c312acf1181a173a95014fe59460e38a798 100644 --- a/lib/client/gnt_debug.py +++ b/lib/client/gnt_debug.py @@ -65,7 +65,7 @@ def Delay(opts, args): on_master=opts.on_master, on_nodes=opts.on_nodes, repeat=opts.repeat) - SubmitOpCode(op, opts=opts) + SubmitOrSend(op, opts) return 0 @@ -624,7 +624,7 @@ commands = { action="append", help="Select nodes to sleep on"), cli_option("-r", "--repeat", type="int", default="0", dest="repeat", help="Number of times to repeat the sleep"), - DRY_RUN_OPT, PRIORITY_OPT, + DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT, ], "[opts...] <duration>", "Executes a TestDelay OpCode"), "submit-job": ( diff --git a/lib/client/gnt_instance.py b/lib/client/gnt_instance.py index 38ea6558514ab0f16a7ee1eecf1a747418a1cdd9..cdf83542646d54ae523a4d4e9c563b45714bc055 100644 --- a/lib/client/gnt_instance.py +++ b/lib/client/gnt_instance.py @@ -1441,7 +1441,9 @@ def SetInstanceParams(opts, args): for param, data in result: ToStdout(" - %-5s -> %s", param, data) ToStdout("Please don't forget that most parameters take effect" - " only at the next start of the instance.") + " only at the next (re)start of the instance initiated by" + " ganeti; restarting from within the instance will" + " not be enough.") return 0 diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 47a6e83886badc891d661484409470afb13b3d74..4f288bd8ef0346e621dd6ece2a34317c34fb400b 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1778,6 +1778,27 @@ def _InstanceRunning(lu, instance): return instance_running +def _CheckHostnameSane(lu, name): + """Ensures that a given hostname resolves to a 'sane' name. + + The given name is required to be a prefix of the resolved hostname, + to prevent accidental mismatches. + + @param lu: the logical unit on behalf of which we're checking + @param name: the name we should resolve and check + @return: the resolved hostname object + + """ + hostname = netutils.GetHostname(name=name) + if hostname.name != name: + lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name) + if not utils.MatchNameComponent(name, [hostname.name]): + raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" + " same as given hostname '%s'") % + (hostname.name, name), errors.ECODE_INVAL) + return hostname + + class LUClusterPostInit(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -3428,11 +3449,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): inst_config.primary_node) # If the instance is non-redundant we cannot survive losing its primary - # node, so we are not N+1 compliant. On the other hand we have no disk - # templates with more than one secondary so that situation is not well - # supported either. - # FIXME: does not support file-backed instances - if not inst_config.secondary_nodes: + # node, so we are not N+1 compliant. + if inst_config.disk_template not in constants.DTS_MIRRORED: i_non_redundant.append(instance) _ErrorIf(len(inst_config.secondary_nodes) > 1, @@ -7536,15 +7554,7 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: - hostname = netutils.GetHostname(name=new_name) - if hostname.name != new_name: - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) - if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): - raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" - " same as given hostname '%s'") % - (hostname.name, self.op.new_name), - errors.ECODE_INVAL) + hostname = _CheckHostnameSane(self, new_name) new_name = self.op.new_name = hostname.name if (self.op.ip_check and netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): @@ -9328,7 +9338,7 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): for port in ports_to_release: lu.cfg.AddTcpUdpPort(port) - if instance.disk_template == constants.DT_FILE: + if instance.disk_template in constants.DTS_FILEBASED: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) if target_node: tgt = target_node @@ -9540,7 +9550,7 @@ class LUInstanceCreate(LogicalUnit): # instance name verification if self.op.name_check: - self.hostname1 = netutils.GetHostname(name=self.op.instance_name) + self.hostname1 = _CheckHostnameSane(self, self.op.instance_name) self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check self.check_ip = self.hostname1.ip @@ -10288,26 +10298,6 @@ class LUInstanceCreate(LogicalUnit): nodenames = [pnode.name] + self.secondaries - # Verify instance specs - spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) - ispec = { - constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), - constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), - constants.ISPEC_DISK_COUNT: len(self.disks), - constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks], - constants.ISPEC_NIC_COUNT: len(self.nics), - constants.ISPEC_SPINDLE_USE: spindle_use, - } - - group_info = self.cfg.GetNodeGroup(pnode.group) - ipolicy = _CalculateGroupIPolicy(cluster, group_info) - res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) - if not self.op.ignore_ipolicy and res: - raise errors.OpPrereqError(("Instance allocation to group %s violates" - " policy: %s") % (pnode.group, - utils.CommaJoin(res)), - errors.ECODE_INVAL) - if not self.adopt_disks: if self.op.disk_template == constants.DT_RBD: # _CheckRADOSFreeSpace() is just a placeholder. @@ -10392,6 +10382,27 @@ class LUInstanceCreate(LogicalUnit): dsk[constants.IDISK_SIZE] = \ int(float(node_disks[dsk[constants.IDISK_ADOPT]])) + # Verify instance specs + spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) + ispec = { + constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), + constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), + constants.ISPEC_DISK_COUNT: len(self.disks), + constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] + for disk in self.disks], + constants.ISPEC_NIC_COUNT: len(self.nics), + constants.ISPEC_SPINDLE_USE: spindle_use, + } + + group_info = self.cfg.GetNodeGroup(pnode.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) + if not self.op.ignore_ipolicy and res: + raise errors.OpPrereqError(("Instance allocation to group %s violates" + " policy: %s") % (pnode.group, + utils.CommaJoin(res)), + errors.ECODE_INVAL) + _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant) @@ -13088,12 +13099,11 @@ class LUInstanceSetParams(LogicalUnit): self.be_proposed[constants.BE_MAXMEM]), errors.ECODE_INVAL) - if self.op.runtime_mem > current_memory: + delta = self.op.runtime_mem - current_memory + if delta > 0: _CheckNodeFreeMemory(self, instance.primary_node, "ballooning memory for instance %s" % - instance.name, - self.op.memory - current_memory, - instance.hypervisor) + instance.name, delta, instance.hypervisor) if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" diff --git a/lib/jqueue.py b/lib/jqueue.py index 204966ec53bf673354cc34d62682725f5e4128aa..03b1fea615c55b7e59fab0ee9f010c4877f9e851 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -75,6 +75,12 @@ class CancelJob(Exception): """ +class QueueShutdown(Exception): + """Special exception to abort a job when the job queue is shutting down. + + """ + + def TimeStampNow(): """Returns the current timestamp. @@ -487,6 +493,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): logging.debug("Canceling opcode") raise CancelJob() + # See if queue is shutting down + if not self._queue.AcceptingJobsUnlocked(): + logging.debug("Queue is shutting down") + raise QueueShutdown() + @locking.ssynchronized(_QUEUE, shared=1) def NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -1029,12 +1040,25 @@ class _JobProcessor(object): if op.status == constants.OP_STATUS_CANCELING: return (constants.OP_STATUS_CANCELING, None) + # Queue is shutting down, return to queued + if not self.queue.AcceptingJobsUnlocked(): + return (constants.OP_STATUS_QUEUED, None) + # Stay in waitlock while trying to re-acquire lock return (constants.OP_STATUS_WAITING, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) + + except QueueShutdown: + logging.exception("%s: Queue is shutting down", opctx.log_prefix) + + assert op.status == constants.OP_STATUS_WAITING + + # Job hadn't been started yet, so it should return to the queue + return (constants.OP_STATUS_QUEUED, None) + except Exception, err: # pylint: disable=W0703 logging.exception("%s: Caught exception in %s", opctx.log_prefix, opctx.summary) @@ -1132,8 +1156,10 @@ class _JobProcessor(object): assert not waitjob - if op.status == constants.OP_STATUS_WAITING: - # Couldn't get locks in time + if op.status in (constants.OP_STATUS_WAITING, + constants.OP_STATUS_QUEUED): + # waiting: Couldn't get locks in time + # queued: Queue is shutting down assert not op.end_timestamp else: # Finalize opcode @@ -1145,7 +1171,19 @@ class _JobProcessor(object): else: assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_WAITING or waitjob: + if op.status == constants.OP_STATUS_QUEUED: + # Queue is shutting down + assert not waitjob + + finalize = False + + # Reset context + job.cur_opctx = None + + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_QUEUED + + elif op.status == constants.OP_STATUS_WAITING or waitjob: finalize = False if not waitjob and opctx.CheckPriorityIncrease(): @@ -2513,6 +2551,17 @@ class JobQueue(object): return self._wpool.HasRunningTasks() + def AcceptingJobsUnlocked(self): + """Returns whether jobs are accepted. + + Once L{PrepareShutdown} has been called, no new jobs are accepted and the + queue is shutting down. + + @rtype: bool + + """ + return self._accepting_jobs + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): diff --git a/lib/mcpu.py b/lib/mcpu.py index 6e8be77744c5600bbc70d5b74fabb1e63ab2bdee..6ecae61f77e6002470e64c0aa5ecaf620e7bbc79 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -385,10 +385,12 @@ class Processor(object): try: self.context.glm.add(level, add_locks, acquired=1, shared=share) except errors.LockError: + logging.exception("Detected lock error in level %s for locks" + " %s, shared=%s", level, add_locks, share) raise errors.OpPrereqError( - "Couldn't add locks (%s), probably because of a race condition" - " with another job, who added them first" % add_locks, - errors.ECODE_FAULT) + "Couldn't add locks (%s), most likely because of another" + " job who added them first" % add_locks, + errors.ECODE_NOTUNIQUE) try: result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) diff --git a/lib/opcodes.py b/lib/opcodes.py index a4bb31f473d013e413b8a03ef7003aae02dc5595..e09e6445c270d179e0b9a4989d0a6bccd87cbb75 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -878,7 +878,7 @@ class OpClusterSetParams(OpCode): OP_PARAMS = [ _PHvState, _PDiskState, - ("vg_name", None, ht.TMaybeString, "Volume group name"), + ("vg_name", None, ht.TOr(ht.TString, ht.TNone), "Volume group name"), ("enabled_hypervisors", None, ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue), ht.TNone), diff --git a/lib/server/masterd.py b/lib/server/masterd.py index aacacc042e8787aea5eb4f4ef0ac071aa072d25b..e79457d8982e58728a66326e6d9a105462e71598 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -66,6 +66,18 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR +def _LogNewJob(status, info, ops): + """Log information about a recently submitted job. + + """ + if status: + logging.info("New job with id %s, summary: %s", + info, utils.CommaJoin(op.Summary() for op in ops)) + else: + logging.info("Failed to submit job, reason: '%s', summary: %s", + info, utils.CommaJoin(op.Summary() for op in ops)) + + class ClientRequestWorker(workerpool.BaseWorker): # pylint: disable=W0221 def RunTask(self, server, message, client): @@ -267,18 +279,23 @@ class ClientOps: # TODO: Rewrite to not exit in each 'if/elif' branch if method == luxi.REQ_SUBMIT_JOB: - logging.info("Received new job") + logging.info("Receiving new job") (job_def, ) = args ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] - return queue.SubmitJob(ops) + job_id = queue.SubmitJob(ops) + _LogNewJob(True, job_id, ops) + return job_id elif method == luxi.REQ_SUBMIT_MANY_JOBS: - logging.info("Received multiple jobs") + logging.info("Receiving multiple jobs") (job_defs, ) = args jobs = [] for ops in job_defs: jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) - return queue.SubmitManyJobs(jobs) + job_ids = queue.SubmitManyJobs(jobs) + for ((status, job_id), ops) in zip(job_ids, jobs): + _LogNewJob(status, job_id, ops) + return job_ids elif method == luxi.REQ_CANCEL_JOB: (job_id, ) = args diff --git a/man/gnt-cluster.rst b/man/gnt-cluster.rst index 8082d14d94b85263fb8b696d579107ae1de32ef8..6d1c77595347bef585beb0ab87d64a54cd48bacf 100644 --- a/man/gnt-cluster.rst +++ b/man/gnt-cluster.rst @@ -524,7 +524,7 @@ List the tags of the cluster. MASTER-FAILOVER ~~~~~~~~~~~~~~~ -**master-failover** [\--no-voting] +**master-failover** [\--no-voting] [\--yes-do-it] Failover the master role to the current node. @@ -538,6 +538,11 @@ You can pass ``--no-voting`` to **ganeti-masterd** on the new master to solve this problem, and run **gnt-cluster redist-conf** to make sure the cluster is consistent again. +The option ``--yes-do-it`` is used together with ``--no-voting``, for +skipping the interactive checks. This is even more dangerous, and should +only be used in conjunction with other means (e.g. a HA suite) to +confirm that the operation is indeed safe. + MASTER-PING ~~~~~~~~~~~ diff --git a/man/gnt-instance.rst b/man/gnt-instance.rst index 1c55c7aab41f78893c2e63f45da87b2f5a3fee10..2f9fdb4052f2c6f00e0d567c8a55f334381dbe94 100644 --- a/man/gnt-instance.rst +++ b/man/gnt-instance.rst @@ -1088,7 +1088,7 @@ case the more than one instance will be affected. The ``--no-remember`` option will perform the startup but not change the state of the instance in the configuration file (if it was stopped -before, Ganeti will still thinks it needs to be stopped). This can be +before, Ganeti will still think it needs to be stopped). This can be used for testing, or for a one shot-start where you don't want the watcher to restart the instance if it crashes. diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index dcbad149ceab1d029d051209a843af59da111a7c..f7874d93608a744af50bb336993d62872c4de29f 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -494,6 +494,7 @@ class _FakeQueueForProc: self._acquired = False self._updates = [] self._submitted = [] + self._accepting_jobs = True self._submit_count = itertools.count(1000) @@ -529,6 +530,12 @@ class _FakeQueueForProc: self._submitted.extend(zip(job_ids, jobs)) return job_ids + def StopAcceptingJobs(self): + self._accepting_jobs = False + + def AcceptingJobsUnlocked(self): + return self._accepting_jobs + class _FakeExecOpCodeForProc: def __init__(self, queue, before_start, after_start): @@ -873,7 +880,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): [[constants.OP_STATUS_CANCELED for _ in job.ops], ["Job canceled by request" for _ in job.ops]]) - def testCancelWhileWaitlockWithTimeout(self): + def _TestCancelWhileSomething(self, cb): queue = _FakeQueueForProc() ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) @@ -896,8 +903,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING for op in job.ops)) - # Fake an acquire attempt timing out - raise mcpu.LockAcquireTimeout() + cb(queue) def _AfterStart(op, cbs): self.fail("Should not reach this") @@ -918,6 +924,19 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): [[constants.OP_STATUS_CANCELED for _ in job.ops], ["Job canceled by request" for _ in job.ops]]) + return queue + + def testCancelWhileWaitlockWithTimeout(self): + def fn(_): + # Fake an acquire attempt timing out + raise mcpu.LockAcquireTimeout() + + self._TestCancelWhileSomething(fn) + + def testCancelDuringQueueShutdown(self): + queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs()) + self.assertFalse(queue.AcceptingJobsUnlocked()) + def testCancelWhileRunning(self): # Tests canceling a job with finished opcodes and more, unprocessed ones queue = _FakeQueueForProc() @@ -964,6 +983,185 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): ["Res0", "Job canceled by request", "Job canceled by request"]]) + def _TestQueueShutdown(self, queue, opexec, job, runcount): + self.assertTrue(queue.AcceptingJobsUnlocked()) + + # Simulate shutdown + queue.StopAcceptingJobs() + + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Check result + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED]) + self.assertFalse(job.cur_opctx) + self.assertTrue(job.start_timestamp) + self.assertFalse(job.end_timestamp) + self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp) + self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops[:runcount])) + self.assertFalse(job.ops[runcount].end_timestamp) + self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp + for op in job.ops[(runcount + 1):])) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [(([constants.OP_STATUS_SUCCESS] * runcount) + + ([constants.OP_STATUS_QUEUED] * + (len(job.ops) - runcount))), + (["Res%s" % i for i in range(runcount)] + + ([None] * (len(job.ops) - runcount)))]) + + # Must have been written and replicated + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + + def testQueueShutdownWhileRunning(self): + # Tests shutting down the queue while a job is running + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(3)] + + # Create job + job_id = 2718211587 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + opexec = _FakeExecOpCodeForProc(queue, None, None) + + self.assertRaises(IndexError, queue.GetNextUpdate) + + # Run one opcode + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED], + ["Res0", None, None]]) + self.assertFalse(job.cur_opctx) + + # Writes for waiting, running and result + for _ in range(3): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + + # Run second opcode + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED], + ["Res0", "Res1", None]]) + self.assertFalse(job.cur_opctx) + + # Writes for waiting, running and result + for _ in range(3): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + + self._TestQueueShutdown(queue, opexec, job, 2) + + def testQueueShutdownWithLockTimeout(self): + # Tests shutting down while a lock acquire times out + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(3)] + + # Create job + job_id = 1304231178 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + acquire_timeout = False + + def _BeforeStart(timeout, priority): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + if acquire_timeout: + raise mcpu.LockAcquireTimeout() + + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None) + + self.assertRaises(IndexError, queue.GetNextUpdate) + + # Run one opcode + self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED], + ["Res0", None, None]]) + self.assertFalse(job.cur_opctx) + + # Writes for waiting, running and result + for _ in range(3): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + + # The next opcode should have expiring lock acquires + acquire_timeout = True + + self._TestQueueShutdown(queue, opexec, job, 1) + + def testQueueShutdownWhileInQueue(self): + # This should never happen in reality (no new jobs are started by the + # workerpool once a shutdown has been initiated), but it's better to test + # the job processor for this scenario + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(5)] + + # Create job + job_id = 2031 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertRaises(IndexError, queue.GetNextUpdate) + + self.assertFalse(job.start_timestamp) + self.assertFalse(job.end_timestamp) + self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED + for op in job.ops)) + + opexec = _FakeExecOpCodeForProc(queue, None, None) + self._TestQueueShutdown(queue, opexec, job, 0) + + def testQueueShutdownWhileWaitlockInQueue(self): + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(5)] + + # Create job + job_id = 53125685 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + job.ops[0].status = constants.OP_STATUS_WAITING + + assert len(job.ops) == 5 + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) + + self.assertRaises(IndexError, queue.GetNextUpdate) + + opexec = _FakeExecOpCodeForProc(queue, None, None) + self._TestQueueShutdown(queue, opexec, job, 0) + def testPartiallyRun(self): # Tests calling the processor on a job that's been partially run before the # program was restarted @@ -1947,6 +2145,28 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.JobWaiting(job)) self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) + def testNotFinalizedThenQueued(self): + # This can happen on a queue shutdown + job = _IdOnlyFakeJob(1320) + job_id = str(22971) + + for i in range(5): + if i > 2: + self._status.append((job_id, constants.JOB_STATUS_QUEUED)) + else: + self._status.append((job_id, constants.JOB_STATUS_RUNNING)) + (result, _) = self.jdm.CheckAndRegister(job, job_id, []) + self.assertEqual(result, self.jdm.WAIT) + self.assertFalse(self._status) + self.assertFalse(self._queue) + self.assertTrue(self.jdm.JobWaiting(job)) + self.assertEqual(self.jdm._waiters, { + job_id: set([job]), + }) + self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [ + ("job/22971", None, None, [("job", [job.id])]) + ]) + def testRequireCancel(self): job = _IdOnlyFakeJob(5278) job_id = str(9610) diff --git a/tools/kvm-ifup.in b/tools/kvm-ifup.in index 2e8e0c23c10e2366a0c9586f978ab33f4b35bac8..a5e8c1b854c09dcb72f32c81b7cc6b44635a9ac1 100644 --- a/tools/kvm-ifup.in +++ b/tools/kvm-ifup.in @@ -33,14 +33,20 @@ if [ -x "@SYSCONFDIR@/ganeti/kvm-vif-bridge" ]; then exec @SYSCONFDIR@/ganeti/kvm-vif-bridge fi -ip link set $INTERFACE up - if [ "$MODE" = "bridged" ]; then + # Fix the autogenerated MAC to have the first octet set to "fe" + # to discourage the bridge from using the TAP dev's MAC + FIXED_MAC=$(ip link show $INTERFACE | awk '{if ($1 == "link/ether") printf("fe%s",substr($2,3,15))}') + ip link set $INTERFACE address $FIXED_MAC + + ip link set $INTERFACE up ip link set $INTERFACE mtu $(</sys/class/net/${BRIDGE}/mtu) # Connect the interface to the bridge brctl addif $BRIDGE $INTERFACE else + ip link set $INTERFACE up + if [ -z "$IP" ]; then echo "Routed NIC but no IP address specified" exit 1