Commit a06c6ae8 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Convert GetInfo to query2



This rather inefficient implementation (fields are evaluated on every
call to GetInfo) is not good for WaitForJobChanges and doesn't support
filters, but that will be rectified in later patches.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 9665bb3a
......@@ -57,6 +57,8 @@ from ganeti import runtime
from ganeti import netutils
from ganeti import compat
from ganeti import ht
from ganeti import query
from ganeti import qlang
JOBQUEUE_THREADS = 25
......@@ -83,6 +85,25 @@ def TimeStampNow():
return utils.SplitTime(time.time())
class _SimpleJobQuery:
"""Wrapper for job queries.
Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
"""
def __init__(self, fields):
"""Initializes this class.
"""
self._query = query.Query(query.JOB_FIELDS, fields)
def __call__(self, job):
"""Executes a job query using cached field list.
"""
return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
class _QueuedOpCode(object):
"""Encapsulates an opcode object.
......@@ -383,41 +404,7 @@ class _QueuedJob(object):
has been passed
"""
row = []
for fname in fields:
if fname == "id":
row.append(self.id)
elif fname == "status":
row.append(self.CalcStatus())
elif fname == "priority":
row.append(self.CalcPriority())
elif fname == "ops":
row.append([op.input.__getstate__() for op in self.ops])
elif fname == "opresult":
row.append([op.result for op in self.ops])
elif fname == "opstatus":
row.append([op.status for op in self.ops])
elif fname == "oplog":
row.append([op.log for op in self.ops])
elif fname == "opstart":
row.append([op.start_timestamp for op in self.ops])
elif fname == "opexec":
row.append([op.exec_timestamp for op in self.ops])
elif fname == "opend":
row.append([op.end_timestamp for op in self.ops])
elif fname == "oppriority":
row.append([op.priority for op in self.ops])
elif fname == "received_ts":
row.append(self.received_timestamp)
elif fname == "start_ts":
row.append(self.start_timestamp)
elif fname == "end_ts":
row.append(self.end_timestamp)
elif fname == "summary":
row.append([op.input.Summary() for op in self.ops])
else:
raise errors.OpExecError("Invalid self query field '%s'" % fname)
return row
return _SimpleJobQuery(fields)(self)
def MarkUnfinishedOps(self, status, result):
"""Mark unfinished opcodes with a given status and result.
......
......@@ -305,7 +305,7 @@ class TestQueuedJob(unittest.TestCase):
self.assertEqual(len(job.ops), len(ops))
self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
for (inp, op) in zip(ops, job.ops)))
self.assertRaises(errors.OpExecError, job.GetInfo,
self.assertRaises(errors.OpPrereqError, job.GetInfo,
["unknown-field"])
self.assertEqual(job.GetInfo(["summary"]),
[[op.input.Summary() for op in job.ops]])
......@@ -674,7 +674,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
for i in range(opcount)]
# Create job
job = self._CreateJob(queue, job_id, ops)
job = self._CreateJob(queue, str(job_id), ops)
opexec = _FakeExecOpCodeForProc(queue, None, None)
......@@ -702,7 +702,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# Check job status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["id"]), [str(job_id)])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
# Check opcode status
......@@ -926,7 +926,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
for i in range(3)]
# Create job
job_id = 28492
job_id = str(28492)
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment