Commit f1048938 authored by Iustin Pop's avatar Iustin Pop
Browse files

First version of user feedback fixes

This patch contains a raw version for fixing feedback_fn.

The new mechanism works as follows:
  - instead of a per-Processor feedback_fn, there's one for each
    ExecOpCode, so that feedback for different opcodes go via possibly
    different functions
  - each _QueuedOpCode gets a message buffer, a method for adding
    feedback and a method for retrieving (parts of) the feedback
  - the _QueuedJob object gets a new attribute that is equal to the
    index of the currently executing opcode
  - job queries get an extra parameter called 'ticker' that will return
    the latest message on the current executing opcode
  - the cli.py job completion poll will show the new status if different
    from the old one

Of course, quick messages will be lost, as currently only the latest one
is available. Also changes between opcodes are not represented at all.

Reviewed-by: imsnah
parent ac0930b9
......@@ -382,8 +382,9 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
job_id = cl.SubmitJob([op])
lastmsg = None
while True:
jobs = cl.QueryJobs([job_id], ["status"])
jobs = cl.QueryJobs([job_id], ["status", "ticker"])
if not jobs:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
......@@ -392,6 +393,10 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
status = jobs[0][0]
if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
break
msg = jobs[0][1]
if msg is not None and msg != lastmsg:
print "%s %s" % (time.ctime(msg[0]), msg[2])
lastmsg = msg
time.sleep(1)
jobs = cl.QueryJobs([job_id], ["status", "opresult"])
......
......@@ -262,3 +262,7 @@ OP_STATUS_QUEUED = "queued"
OP_STATUS_RUNNING = "running"
OP_STATUS_SUCCESS = "success"
OP_STATUS_ERROR = "error"
# Execution log types
ELOG_MESSAGE = "message"
ELOG_PROGRESS = "progress"
......@@ -26,6 +26,7 @@ import logging
import threading
import errno
import re
import time
from ganeti import constants
from ganeti import serializer
......@@ -44,21 +45,25 @@ class _QueuedOpCode(object):
Access is synchronized by the '_lock' attribute.
The 'log' attribute holds the execution log and consists of tuples
of the form (timestamp, level, message).
"""
def __init__(self, op):
self.__Setup(op, constants.OP_STATUS_QUEUED, None)
self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
def __Setup(self, input, status, result):
def __Setup(self, input_, status, result, log):
self._lock = threading.Lock()
self.input = input
self.input = input_
self.status = status
self.result = result
self.log = log
@classmethod
def Restore(cls, state):
obj = object.__new__(cls)
obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
state["status"], state["result"])
state["status"], state["result"], state["log"])
return obj
@utils.LockedMethod
......@@ -67,6 +72,7 @@ class _QueuedOpCode(object):
"input": self.input.__getstate__(),
"status": self.status,
"result": self.result,
"log": self.log,
}
@utils.LockedMethod
......@@ -98,6 +104,27 @@ class _QueuedOpCode(object):
"""
return self.result
@utils.LockedMethod
def Log(self, *args):
"""Append a log entry.
"""
assert len(args) < 2
if len(args) == 1:
log_type = constants.ELOG_MESSAGE
log_msg = args[0]
else:
log_type, log_msg = args
self.log.append((time.time(), log_type, log_msg))
@utils.LockedMethod
def RetrieveLog(self, start_at=0):
"""Retrieve (a part of) the execution log.
"""
return self.log[start_at:]
class _QueuedJob(object):
"""In-memory job representation.
......@@ -110,24 +137,27 @@ class _QueuedJob(object):
# TODO
raise Exception("No opcodes")
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
def __Setup(self, storage, job_id, ops):
def __Setup(self, storage, job_id, ops, run_op_index):
self._lock = threading.Lock()
self.storage = storage
self.id = job_id
self._ops = ops
self.run_op_index = run_op_index
@classmethod
def Restore(cls, storage, state):
obj = object.__new__(cls)
obj.__Setup(storage, state["id"],
[_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
return obj
def Serialize(self):
return {
"id": self.id,
"ops": [op.Serialize() for op in self._ops],
"run_op_index": self.run_op_index,
}
def SetUnclean(self, msg):
......@@ -162,6 +192,10 @@ class _QueuedJob(object):
return status
@utils.LockedMethod
def GetRunOpIndex(self):
return self.run_op_index
def Run(self, proc):
"""Job executor.
......@@ -177,10 +211,17 @@ class _QueuedJob(object):
for idx, op in enumerate(self._ops):
try:
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
self._lock.acquire()
try:
self.run_op_index = idx
finally:
self._lock.release()
op.SetStatus(constants.OP_STATUS_RUNNING, None)
self.storage.UpdateJob(self)
result = proc.ExecOpCode(op.input)
result = proc.ExecOpCode(op.input, op.Log)
op.SetStatus(constants.OP_STATUS_SUCCESS, result)
self.storage.UpdateJob(self)
......@@ -207,7 +248,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
logging.debug("Worker %s processing job %s",
self.worker_id, job.id)
# TODO: feedback function
proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
proc = mcpu.Processor(self.pool.context)
try:
job.Run(proc)
finally:
......@@ -477,6 +518,18 @@ class JobQueue:
row.append([op.GetResult() for op in job._ops])
elif fname == "opstatus":
row.append([op.GetStatus() for op in job._ops])
elif fname == "ticker":
ji = job.GetRunOpIndex()
if ji < 0:
lmsg = None
else:
lmsg = job._ops[ji].RetrieveLog(-1)
# message might be empty here
if lmsg:
lmsg = lmsg[0]
else:
lmsg = None
row.append(lmsg)
else:
raise errors.OpExecError("Invalid job query field '%s'" % fname)
return row
......
......@@ -90,7 +90,7 @@ class Processor(object):
opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
}
def __init__(self, context, feedback=None):
def __init__(self, context):
"""Constructor for Processor
Args:
......@@ -98,7 +98,7 @@ class Processor(object):
interesting events are happening
"""
self.context = context
self._feedback_fn = feedback
self._feedback_fn = None
self.exclusive_BGL = False
def _ExecLU(self, lu):
......@@ -146,7 +146,7 @@ class Processor(object):
return result
def ExecOpCode(self, op):
def ExecOpCode(self, op, feedback_fn):
"""Execute an opcode.
Args:
......@@ -157,6 +157,7 @@ class Processor(object):
raise errors.ProgrammerError("Non-opcode instance passed"
" to ExecOpcode")
self._feedback_fn = feedback_fn
lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment