From a06c6ae87a44ea6951579f0256245f3cc6a68490 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Wed, 28 Mar 2012 14:28:50 +0200 Subject: [PATCH] jqueue: Convert GetInfo to query2 This rather inefficient implementation (fields are evaluated on every call to GetInfo) is not good for WaitForJobChanges and doesn't support filters, but that will be rectified in later patches. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/jqueue.py | 57 +++++++++++++--------------------- test/ganeti.jqueue_unittest.py | 8 ++--- 2 files changed, 26 insertions(+), 39 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index 6c707af04..75516fc51 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -57,6 +57,8 @@ from ganeti import runtime from ganeti import netutils from ganeti import compat from ganeti import ht +from ganeti import query +from ganeti import qlang JOBQUEUE_THREADS = 25 @@ -83,6 +85,25 @@ def TimeStampNow(): return utils.SplitTime(time.time()) +class _SimpleJobQuery: + """Wrapper for job queries. + + Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. + + """ + def __init__(self, fields): + """Initializes this class. + + """ + self._query = query.Query(query.JOB_FIELDS, fields) + + def __call__(self, job): + """Executes a job query using cached field list. + + """ + return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0] + + class _QueuedOpCode(object): """Encapsulates an opcode object. @@ -383,41 +404,7 @@ class _QueuedJob(object): has been passed """ - row = [] - for fname in fields: - if fname == "id": - row.append(self.id) - elif fname == "status": - row.append(self.CalcStatus()) - elif fname == "priority": - row.append(self.CalcPriority()) - elif fname == "ops": - row.append([op.input.__getstate__() for op in self.ops]) - elif fname == "opresult": - row.append([op.result for op in self.ops]) - elif fname == "opstatus": - row.append([op.status for op in self.ops]) - elif fname == "oplog": - row.append([op.log for op in self.ops]) - elif fname == "opstart": - row.append([op.start_timestamp for op in self.ops]) - elif fname == "opexec": - row.append([op.exec_timestamp for op in self.ops]) - elif fname == "opend": - row.append([op.end_timestamp for op in self.ops]) - elif fname == "oppriority": - row.append([op.priority for op in self.ops]) - elif fname == "received_ts": - row.append(self.received_timestamp) - elif fname == "start_ts": - row.append(self.start_timestamp) - elif fname == "end_ts": - row.append(self.end_timestamp) - elif fname == "summary": - row.append([op.input.Summary() for op in self.ops]) - else: - raise errors.OpExecError("Invalid self query field '%s'" % fname) - return row + return _SimpleJobQuery(fields)(self) def MarkUnfinishedOps(self, status, result): """Mark unfinished opcodes with a given status and result. diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index ea5e8f4be..dcbad149c 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -305,7 +305,7 @@ class TestQueuedJob(unittest.TestCase): self.assertEqual(len(job.ops), len(ops)) self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__() for (inp, op) in zip(ops, job.ops))) - self.assertRaises(errors.OpExecError, job.GetInfo, + self.assertRaises(errors.OpPrereqError, job.GetInfo, ["unknown-field"]) self.assertEqual(job.GetInfo(["summary"]), [[op.input.Summary() for op in job.ops]]) @@ -674,7 +674,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): for i in range(opcount)] # Create job - job = self._CreateJob(queue, job_id, ops) + job = self._CreateJob(queue, str(job_id), ops) opexec = _FakeExecOpCodeForProc(queue, None, None) @@ -702,7 +702,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): # Check job status self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR) - self.assertEqual(job.GetInfo(["id"]), [job_id]) + self.assertEqual(job.GetInfo(["id"]), [str(job_id)]) self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR]) # Check opcode status @@ -926,7 +926,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): for i in range(3)] # Create job - job_id = 28492 + job_id = str(28492) job = self._CreateJob(queue, job_id, ops) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) -- GitLab