Skip to content
Snippets Groups Projects
ganeti.jqueue_unittest.py 42.9 KiB
Newer Older
        self.assertEqual(queue.GetNextUpdate(), (job, False))
        self.assertRaises(IndexError, queue.GetNextUpdate)
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)

    for remaining in reversed(range(len(job.ops))):
      self.assertRaises(IndexError, queue.GetNextUpdate)
      result = jqueue._JobProcessor(queue, opexec, job)()
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)

      if remaining == 0:
        # Last opcode
        self.assert_(result)
        break

      self.assertFalse(result)

      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)

    self.assertRaises(IndexError, queue.GetNextUpdate)

    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
    self.assertEqual(job.GetInfo(["opresult"]),
                     [[op.input.result for op in job.ops]])

    logmsgcount = sum(len(m) for m in messages.values())

    self._CheckLogMessages(job, logmsgcount)

    # Serialize and restore (simulates program restart)
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
    self._CheckLogMessages(newjob, logmsgcount)

    # Check each message
    prevserial = -1
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
      for (serial, timestamp, log_type, msg) in oplog:
        (exptype, expmsg) = messages.get(idx).pop(0)
        if exptype:
          self.assertEqual(log_type, exptype)
        else:
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
        self.assertEqual(expmsg, msg)
        self.assert_(serial > prevserial)
        prevserial = serial

  def _CheckLogMessages(self, job, count):
    # Check serial
    self.assertEqual(job.log_serial, count)

    # No filter
    self.assertEqual(job.GetLogEntries(None),
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
                      for entry in entries])

    # Filter with serial
    assert count > 3
    self.assert_(job.GetLogEntries(3))
    self.assertEqual(job.GetLogEntries(3),
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
                      for entry in entries][3:])

    # No log message after highest serial
    self.assertFalse(job.GetLogEntries(count))
    self.assertFalse(job.GetLogEntries(count + 3))


class _FakeTimeoutStrategy:
  def __init__(self, timeouts):
    self.timeouts = timeouts
    self.attempts = 0
    self.last_timeout = None

  def NextAttempt(self):
    self.attempts += 1
    if self.timeouts:
      timeout = self.timeouts.pop(0)
    else:
      timeout = None
    self.last_timeout = timeout
    return timeout


class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
  def setUp(self):
    self.queue = _FakeQueueForProc()
    self.job = None
    self.curop = None
    self.opcounter = None
    self.timeout_strategy = None
    self.retries = 0
    self.prev_tsop = None
    self.prev_prio = None
    self.prev_status = None
    self.lock_acq_prio = None
    self.gave_lock = None
    self.done_lock_before_blocking = False

  def _BeforeStart(self, timeout, priority):
    # If status has changed, job must've been written
    if self.prev_status != self.job.ops[self.curop].status:
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
    self.assertFalse(self.queue.IsAcquired())
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)

    ts = self.timeout_strategy

    self.assert_(timeout is None or isinstance(timeout, (int, float)))
    self.assertEqual(timeout, ts.last_timeout)
    self.assertEqual(priority, job.ops[self.curop].priority)

    if (self.curop == 3 and
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
      # Give locks before running into blocking acquire
      assert self.retries == 7
      self.retries = 0
      self.done_lock_before_blocking = True
      return

    if self.retries > 0:
      self.assert_(timeout is not None)
      self.retries -= 1
      self.gave_lock = False
      raise mcpu.LockAcquireTimeout()

    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
      assert not ts.timeouts
      self.assert_(timeout is None)

  def _AfterStart(self, op, cbs):
    job = self.job

    # Setting to "running" requires an update
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
    self.assertFalse(self.queue.IsAcquired())
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)

    # Job is running, cancelling shouldn't be possible
    (success, _) = job.Cancel()
    self.assertFalse(success)

  def _NextOpcode(self):
    self.curop = self.opcounter.next()
    self.prev_prio = self.job.ops[self.curop].priority
    self.prev_status = self.job.ops[self.curop].status

  def _NewTimeoutStrategy(self):
    job = self.job

    self.assertEqual(self.retries, 0)

    if self.prev_tsop == self.curop:
      # Still on the same opcode, priority must've been increased
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)

    if self.curop == 1:
      # Normal retry
      timeouts = range(10, 31, 10)
      self.retries = len(timeouts) - 1

    elif self.curop == 2:
      # Let this run into a blocking acquire
      timeouts = range(11, 61, 12)
      self.retries = len(timeouts)

    elif self.curop == 3:
      # Wait for priority to increase, but give lock before blocking acquire
      timeouts = range(12, 100, 14)
      self.retries = len(timeouts)

      self.assertFalse(self.done_lock_before_blocking)

    elif self.curop == 4:
      self.assert_(self.done_lock_before_blocking)

      # Timeouts, but no need to retry
      timeouts = range(10, 31, 10)
      self.retries = 0

    elif self.curop == 5:
      # Normal retry
      timeouts = range(19, 100, 11)
      self.retries = len(timeouts)

    else:
      timeouts = []
      self.retries = 0

    assert len(job.ops) == 10
    assert self.retries <= len(timeouts)

    ts = _FakeTimeoutStrategy(timeouts)

    self.timeout_strategy = ts
    self.prev_tsop = self.curop
    self.prev_prio = job.ops[self.curop].priority

    return ts

  def testTimeout(self):
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
           for i in range(10)]

    # Create job
    job_id = 15801
    job = self._CreateJob(self.queue, job_id, ops)
    self.job = job

    self.opcounter = itertools.count(0)

    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
                                    self._AfterStart)
    tsf = self._NewTimeoutStrategy

    self.assertFalse(self.done_lock_before_blocking)

      proc = jqueue._JobProcessor(self.queue, opexec, job,
                                  _timeout_strategy_factory=tsf)

      self.assertRaises(IndexError, self.queue.GetNextUpdate)

      if self.curop is not None:
        self.prev_status = self.job.ops[self.curop].status

      self.lock_acq_prio = None

      result = proc(_nextop_fn=self._NextOpcode)
      assert self.curop is not None

      if result or self.gave_lock:
        # Got lock and/or job is done, result must've been written
        self.assertFalse(job.cur_opctx)
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
        self.assert_(job.ops[self.curop].exec_timestamp)

      if result:
        self.assertFalse(job.cur_opctx)
        break

      self.assertFalse(result)

      if self.curop == 0:
        self.assertEqual(job.ops[self.curop].start_timestamp,
                         job.start_timestamp)

      if self.gave_lock:
        # Opcode finished, but job not yet done
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
        self.assert_(job.cur_opctx)
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
                         self.timeout_strategy.NextAttempt)
        self.assertFalse(job.ops[self.curop].exec_timestamp)
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)

        # If priority has changed since acquiring locks, the job must've been
        # updated
        if self.lock_acq_prio != job.ops[self.curop].priority:
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))

      self.assertRaises(IndexError, self.queue.GetNextUpdate)

      self.assert_(job.start_timestamp)
      self.assertFalse(job.end_timestamp)

    self.assertEqual(self.curop, len(job.ops) - 1)
    self.assertEqual(self.job, job)
    self.assertEqual(self.opcounter.next(), len(job.ops))
    self.assert_(self.done_lock_before_blocking)

    self.assertRaises(IndexError, self.queue.GetNextUpdate)
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
    self.assertEqual(job.GetInfo(["opresult"]),
                     [[op.input.result for op in job.ops]])
    self.assertEqual(job.GetInfo(["opstatus"]),
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
                            for op in job.ops))

    # Finished jobs can't be processed any further
    self.assertRaises(errors.ProgrammerError,
                      jqueue._JobProcessor(self.queue, opexec, job))


if __name__ == "__main__":
  testutils.GanetiTestProgram()