Commit 4e338533 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

cli: Make PollJob generic to support other protocols



By separating the LUXI-specific code and stdio-related code
into separate classes, we can make cli.PollJob protocol-
agnostic, allowing it to be used with RAPI.

This patch also adds unittests for cli.PollJob.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 507fd05a
...@@ -1223,41 +1223,31 @@ def SendJob(ops, cl=None): ...@@ -1223,41 +1223,31 @@ def SendJob(ops, cl=None):
return job_id return job_id
def PollJob(job_id, cl=None, feedback_fn=None): def GenericPollJob(job_id, cbs, report_cbs):
"""Function to poll for the result of a job. """Generic job-polling function.
@type job_id: job identified @type job_id: number
@param job_id: the job to poll for results @param job_id: Job ID
@type cl: luxi.Client @type cbs: Instance of L{JobPollCbBase}
@param cl: the luxi client to use for communicating with the master; @param cbs: Data callbacks
if None, a new client will be created @type report_cbs: Instance of L{JobPollReportCbBase}
@param report_cbs: Reporting callbacks
""" """
if cl is None:
cl = GetClient()
prev_job_info = None prev_job_info = None
prev_logmsg_serial = None prev_logmsg_serial = None
status = None status = None
notified_queued = False
notified_waitlock = False
while True: while True:
result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info, result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
prev_logmsg_serial) prev_logmsg_serial)
if not result: if not result:
# job not found, go away! # job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id) raise errors.JobLost("Job with id %s lost" % job_id)
elif result == constants.JOB_NOTCHANGED:
if status is not None and not callable(feedback_fn): if result == constants.JOB_NOTCHANGED:
if status == constants.JOB_STATUS_QUEUED and not notified_queued: report_cbs.ReportNotChanged(job_id, status)
ToStderr("Job %s is waiting in queue", job_id)
notified_queued = True
elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
ToStderr("Job %s is trying to acquire all necessary locks", job_id)
notified_waitlock = True
# Wait again # Wait again
continue continue
...@@ -1268,12 +1258,9 @@ def PollJob(job_id, cl=None, feedback_fn=None): ...@@ -1268,12 +1258,9 @@ def PollJob(job_id, cl=None, feedback_fn=None):
if log_entries: if log_entries:
for log_entry in log_entries: for log_entry in log_entries:
(serial, timestamp, _, message) = log_entry (serial, timestamp, log_type, message) = log_entry
if callable(feedback_fn): report_cbs.ReportLogMessage(job_id, serial, timestamp,
feedback_fn(log_entry[1:]) log_type, message)
else:
encoded = utils.SafeEncode(message)
ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
prev_logmsg_serial = max(prev_logmsg_serial, serial) prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs # TODO: Handle canceled and archived jobs
...@@ -1285,30 +1272,189 @@ def PollJob(job_id, cl=None, feedback_fn=None): ...@@ -1285,30 +1272,189 @@ def PollJob(job_id, cl=None, feedback_fn=None):
prev_job_info = job_info prev_job_info = job_info
jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"]) jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
if not jobs: if not jobs:
raise errors.JobLost("Job with id %s lost" % job_id) raise errors.JobLost("Job with id %s lost" % job_id)
status, opstatus, result = jobs[0] status, opstatus, result = jobs[0]
if status == constants.JOB_STATUS_SUCCESS: if status == constants.JOB_STATUS_SUCCESS:
return result return result
elif status in (constants.JOB_STATUS_CANCELING,
constants.JOB_STATUS_CANCELED): if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
raise errors.OpExecError("Job was canceled") raise errors.OpExecError("Job was canceled")
has_ok = False
for idx, (status, msg) in enumerate(zip(opstatus, result)):
if status == constants.OP_STATUS_SUCCESS:
has_ok = True
elif status == constants.OP_STATUS_ERROR:
errors.MaybeRaise(msg)
if has_ok:
raise errors.OpExecError("partial failure (opcode %d): %s" %
(idx, msg))
raise errors.OpExecError(str(msg))
# default failure mode
raise errors.OpExecError(result)
class JobPollCbBase:
"""Base class for L{GenericPollJob} callbacks.
"""
def __init__(self):
"""Initializes this class.
"""
def WaitForJobChangeOnce(self, job_id, fields,
prev_job_info, prev_log_serial):
"""Waits for changes on a job.
"""
raise NotImplementedError()
def QueryJobs(self, job_ids, fields):
"""Returns the selected fields for the selected job IDs.
@type job_ids: list of numbers
@param job_ids: Job IDs
@type fields: list of strings
@param fields: Fields
"""
raise NotImplementedError()
class JobPollReportCbBase:
"""Base class for L{GenericPollJob} reporting callbacks.
"""
def __init__(self):
"""Initializes this class.
"""
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
"""Handles a log message.
"""
raise NotImplementedError()
def ReportNotChanged(self, job_id, status):
"""Called for if a job hasn't changed in a while.
@type job_id: number
@param job_id: Job ID
@type status: string or None
@param status: Job status if available
"""
raise NotImplementedError()
class _LuxiJobPollCb(JobPollCbBase):
def __init__(self, cl):
"""Initializes this class.
"""
JobPollCbBase.__init__(self)
self.cl = cl
def WaitForJobChangeOnce(self, job_id, fields,
prev_job_info, prev_log_serial):
"""Waits for changes on a job.
"""
return self.cl.WaitForJobChangeOnce(job_id, fields,
prev_job_info, prev_log_serial)
def QueryJobs(self, job_ids, fields):
"""Returns the selected fields for the selected job IDs.
"""
return self.cl.QueryJobs(job_ids, fields)
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
def __init__(self, feedback_fn):
"""Initializes this class.
"""
JobPollReportCbBase.__init__(self)
self.feedback_fn = feedback_fn
assert callable(feedback_fn)
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
"""Handles a log message.
"""
self.feedback_fn((timestamp, log_type, log_msg))
def ReportNotChanged(self, job_id, status):
"""Called if a job hasn't changed in a while.
"""
# Ignore
class StdioJobPollReportCb(JobPollReportCbBase):
def __init__(self):
"""Initializes this class.
"""
JobPollReportCbBase.__init__(self)
self.notified_queued = False
self.notified_waitlock = False
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
"""Handles a log message.
"""
ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
utils.SafeEncode(log_msg))
def ReportNotChanged(self, job_id, status):
"""Called if a job hasn't changed in a while.
"""
if status is None:
return
if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
ToStderr("Job %s is waiting in queue", job_id)
self.notified_queued = True
elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
ToStderr("Job %s is trying to acquire all necessary locks", job_id)
self.notified_waitlock = True
def PollJob(job_id, cl=None, feedback_fn=None):
"""Function to poll for the result of a job.
@type job_id: job identified
@param job_id: the job to poll for results
@type cl: luxi.Client
@param cl: the luxi client to use for communicating with the master;
if None, a new client will be created
"""
if cl is None:
cl = GetClient()
if feedback_fn:
reporter = FeedbackFnJobPollReportCb(feedback_fn)
else: else:
has_ok = False reporter = StdioJobPollReportCb()
for idx, (status, msg) in enumerate(zip(opstatus, result)):
if status == constants.OP_STATUS_SUCCESS: return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
has_ok = True
elif status == constants.OP_STATUS_ERROR:
errors.MaybeRaise(msg)
if has_ok:
raise errors.OpExecError("partial failure (opcode %d): %s" %
(idx, msg))
else:
raise errors.OpExecError(str(msg))
# default failure mode
raise errors.OpExecError(result)
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None): def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
......
...@@ -29,6 +29,8 @@ import testutils ...@@ -29,6 +29,8 @@ import testutils
from ganeti import constants from ganeti import constants
from ganeti import cli from ganeti import cli
from ganeti import errors
from ganeti import utils
from ganeti.errors import OpPrereqError, ParameterError from ganeti.errors import OpPrereqError, ParameterError
...@@ -100,7 +102,7 @@ class TestIdentKeyVal(unittest.TestCase): ...@@ -100,7 +102,7 @@ class TestIdentKeyVal(unittest.TestCase):
class TestToStream(unittest.TestCase): class TestToStream(unittest.TestCase):
"""Thes the ToStream functions""" """Test the ToStream functions"""
def testBasic(self): def testBasic(self):
for data in ["foo", for data in ["foo",
...@@ -246,5 +248,175 @@ class TestGenerateTable(unittest.TestCase): ...@@ -246,5 +248,175 @@ class TestGenerateTable(unittest.TestCase):
None, None, "m", exp) None, None, "m", exp)
class _MockJobPollCb(cli.JobPollCbBase, cli.JobPollReportCbBase):
def __init__(self, tc, job_id):
self.tc = tc
self.job_id = job_id
self._wfjcr = []
self._jobstatus = []
self._expect_notchanged = False
self._expect_log = []
def CheckEmpty(self):
self.tc.assertFalse(self._wfjcr)
self.tc.assertFalse(self._jobstatus)
self.tc.assertFalse(self._expect_notchanged)
self.tc.assertFalse(self._expect_log)
def AddWfjcResult(self, *args):
self._wfjcr.append(args)
def AddQueryJobsResult(self, *args):
self._jobstatus.append(args)
def WaitForJobChangeOnce(self, job_id, fields,
prev_job_info, prev_log_serial):
self.tc.assertEqual(job_id, self.job_id)
self.tc.assertEqualValues(fields, ["status"])
self.tc.assertFalse(self._expect_notchanged)
self.tc.assertFalse(self._expect_log)
(exp_prev_job_info, exp_prev_log_serial, result) = self._wfjcr.pop(0)
self.tc.assertEqualValues(prev_job_info, exp_prev_job_info)
self.tc.assertEqual(prev_log_serial, exp_prev_log_serial)
if result == constants.JOB_NOTCHANGED:
self._expect_notchanged = True
elif result:
(_, logmsgs) = result
if logmsgs:
self._expect_log.extend(logmsgs)
return result
def QueryJobs(self, job_ids, fields):
self.tc.assertEqual(job_ids, [self.job_id])
self.tc.assertEqualValues(fields, ["status", "opstatus", "opresult"])
self.tc.assertFalse(self._expect_notchanged)
self.tc.assertFalse(self._expect_log)
result = self._jobstatus.pop(0)
self.tc.assertEqual(len(fields), len(result))
return [result]
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
self.tc.assertEqual(job_id, self.job_id)
self.tc.assertEqualValues((serial, timestamp, log_type, log_msg),
self._expect_log.pop(0))
def ReportNotChanged(self, job_id, status):
self.tc.assertEqual(job_id, self.job_id)
self.tc.assert_(self._expect_notchanged)
self._expect_notchanged = False
class TestGenericPollJob(testutils.GanetiTestCase):
def testSuccessWithLog(self):
job_id = 29609
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None,
((constants.JOB_STATUS_QUEUED, ), None))
cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
constants.JOB_NOTCHANGED)
cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
((constants.JOB_STATUS_RUNNING, ),
[(1, utils.SplitTime(1273491611.0),
constants.ELOG_MESSAGE, "Step 1"),
(2, utils.SplitTime(1273491615.9),
constants.ELOG_MESSAGE, "Step 2"),
(3, utils.SplitTime(1273491625.02),
constants.ELOG_MESSAGE, "Step 3"),
(4, utils.SplitTime(1273491635.05),
constants.ELOG_MESSAGE, "Step 4"),
(37, utils.SplitTime(1273491645.0),
constants.ELOG_MESSAGE, "Step 5"),
(203, utils.SplitTime(127349155.0),
constants.ELOG_MESSAGE, "Step 6")]))
cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 203,
((constants.JOB_STATUS_RUNNING, ),
[(300, utils.SplitTime(1273491711.01),
constants.ELOG_MESSAGE, "Step X"),
(302, utils.SplitTime(1273491815.8),
constants.ELOG_MESSAGE, "Step Y"),
(303, utils.SplitTime(1273491925.32),
constants.ELOG_MESSAGE, "Step Z")]))
cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 303,
((constants.JOB_STATUS_SUCCESS, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS,
[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS],
["Hello World", "Foo man bar"])
self.assertEqual(["Hello World", "Foo man bar"],
cli.GenericPollJob(job_id, cbs, cbs))
cbs.CheckEmpty()
def testJobLost(self):
job_id = 13746
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None, None)
self.assertRaises(errors.JobLost, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testError(self):
job_id = 31088
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_ERROR],
["Hello World", "Error code 123"])
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testError2(self):
job_id = 22235
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
encexc = errors.EncodeException(errors.LockError("problem"))
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
[constants.OP_STATUS_ERROR], [encexc])
self.assertRaises(errors.LockError, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testWeirdError(self):
job_id = 28847
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
[constants.OP_STATUS_RUNNING,
constants.OP_STATUS_RUNNING],
[None, None])
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testCancel(self):
job_id = 4275
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_CANCELING, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_CANCELING,
[constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELING],
[None, None])
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
if __name__ == '__main__': if __name__ == '__main__':
testutils.GanetiTestProgram() testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment