Commit 6c5a7090 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Make sure that client programs get all messages

This is a large patch, but I can't figure out how to split it without
breaking stuff. The old way of getting messages by always getting the
last one didn't bring all messages to the client if they were added
too fast, thereby making commands like “gnt-cluster verify” less than
useful. These changes now introduce some sort a serial number per
log entry to keep track what message a client already received. They
also remove the log lock per opcode to make reading log entries thread
safe.

Reviewed-by: ultrotter
parent 305cb9bb
......@@ -217,8 +217,9 @@ class ClientOps:
return queue.ArchiveJob(job_id)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
(job_id, fields, previous) = args
return queue.WaitForJobChanges(job_id, fields, previous)
(job_id, fields, prev_job_info, prev_log_serial) = args
return queue.WaitForJobChanges(job_id, fields, prev_job_info,
prev_log_serial)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
......
......@@ -405,25 +405,34 @@ def PollJob(job_id, cl=None, feedback_fn=None):
if cl is None:
cl = GetClient()
state = None
lastmsg = None
prev_job_info = None
prev_logmsg_serial = None
while True:
state = cl.WaitForJobChange(job_id, ["status", "ticker"], state)
if not state:
result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
prev_logmsg_serial)
if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
# Split result, a tuple of (field values, log entries)
(job_info, log_entries) = result
(status, ) = job_info
if log_entries:
for log_entry in log_entries:
(serial, timestamp, _, message) = log_entry
if callable(feedback_fn):
feedback_fn(log_entry[1:])
else:
print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs
status = state[0]
if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
break
msg = state[1]
if msg is not None and msg != lastmsg:
if callable(feedback_fn):
feedback_fn(msg)
else:
print "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
lastmsg = msg
prev_job_info = job_info
jobs = cl.QueryJobs([job_id], ["status", "opresult"])
if not jobs:
......
......@@ -19,7 +19,13 @@
# 02110-1301, USA.
"""Module implementing the job queue handling."""
"""Module implementing the job queue handling.
Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.
"""
import os
import logging
......@@ -45,18 +51,10 @@ JOBQUEUE_THREADS = 5
class _QueuedOpCode(object):
"""Encasulates an opcode object.
Access is synchronized by the '_lock' attribute.
The 'log' attribute holds the execution log and consists of tuples
of the form (timestamp, level, message).
of the form (log_serial, timestamp, level, message).
"""
def __new__(cls, *args, **kwargs):
obj = object.__new__(cls, *args, **kwargs)
# Create a special lock for logging
obj._log_lock = threading.Lock()
return obj
def __init__(self, op):
self.input = op
self.status = constants.OP_STATUS_QUEUED
......@@ -73,60 +71,21 @@ class _QueuedOpCode(object):
return obj
def Serialize(self):
self._log_lock.acquire()
try:
return {
"input": self.input.__getstate__(),
"status": self.status,
"result": self.result,
"log": self.log,
}
finally:
self._log_lock.release()
def Log(self, *args):
"""Append a log entry.
"""
assert len(args) < 3
if len(args) == 1:
log_type = constants.ELOG_MESSAGE
log_msg = args[0]
else:
log_type, log_msg = args
self._log_lock.acquire()
try:
# The time is split to make serialization easier and not lose more
# precision.
self.log.append((utils.SplitTime(time.time()), log_type, log_msg))
finally:
self._log_lock.release()
def RetrieveLog(self, start_at=0):
"""Retrieve (a part of) the execution log.
"""
self._log_lock.acquire()
try:
return self.log[start_at:]
finally:
self._log_lock.release()
return {
"input": self.input.__getstate__(),
"status": self.status,
"result": self.result,
"log": self.log,
}
class _QueuedJob(object):
"""In-memory job representation.
This is what we use to track the user-submitted jobs.
This is what we use to track the user-submitted jobs. Locking must be taken
care of by users of this class.
"""
def __new__(cls, *args, **kwargs):
obj = object.__new__(cls, *args, **kwargs)
# Condition to wait for changes
obj.change = threading.Condition()
return obj
def __init__(self, queue, job_id, ops):
if not ops:
# TODO
......@@ -136,14 +95,29 @@ class _QueuedJob(object):
self.id = job_id
self.ops = [_QueuedOpCode(op) for op in ops]
self.run_op_index = -1
self.log_serial = 0
# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
@classmethod
def Restore(cls, queue, state):
obj = _QueuedJob.__new__(cls)
obj.queue = queue
obj.id = state["id"]
obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
obj.run_op_index = state["run_op_index"]
obj.ops = []
obj.log_serial = 0
for op_state in state["ops"]:
op = _QueuedOpCode.Restore(op_state)
for log_entry in op.log:
obj.log_serial = max(obj.log_serial, log_entry[0])
obj.ops.append(op)
# Condition to wait for changes
obj.change = threading.Condition(obj.queue._lock)
return obj
def Serialize(self):
......@@ -180,12 +154,25 @@ class _QueuedJob(object):
return status
def GetLogEntries(self, newer_than):
if newer_than is None:
serial = -1
else:
serial = newer_than
entries = []
for op in self.ops:
entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
return entries
class _JobQueueWorker(workerpool.BaseWorker):
def RunTask(self, job):
"""Job executor.
This functions processes a job.
This functions processes a job. It is closely tied to the _QueuedJob and
_QueuedOpCode classes.
"""
logging.debug("Worker %s processing job %s",
......@@ -211,14 +198,31 @@ class _JobQueueWorker(workerpool.BaseWorker):
queue.release()
def _Log(*args):
op.Log(*args)
"""Append a log entry.
"""
assert len(args) < 3
if len(args) == 1:
log_type = constants.ELOG_MESSAGE
log_msg = args[0]
else:
log_type, log_msg = args
# The time is split to make serialization easier and not lose
# precision.
timestamp = utils.SplitTime(time.time())
job.change.acquire()
queue.acquire()
try:
job.log_serial += 1
op.log.append((job.log_serial, timestamp, log_type, log_msg))
job.change.notifyAll()
finally:
job.change.release()
queue.release()
# Make sure not to hold lock while _Log is called
result = proc.ExecOpCode(input_opcode, _Log)
queue.acquire()
......@@ -532,11 +536,7 @@ class JobQueue(object):
self._CleanCacheUnlocked([job.id])
# Notify waiters about potential changes
job.change.acquire()
try:
job.change.notifyAll()
finally:
job.change.release()
job.change.notifyAll()
def _CleanCacheUnlocked(self, exclude):
"""Clean the memory cache.
......@@ -558,42 +558,60 @@ class JobQueue(object):
except KeyError:
pass
@utils.LockedMethod
@_RequireOpenQueue
def WaitForJobChanges(self, job_id, fields, previous):
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
"""Waits for changes in a job.
@type job_id: string
@param job_id: Job identifier
@type fields: list of strings
@param fields: Which fields to check for changes
@type prev_job_info: list or None
@param prev_job_info: Last job information returned
@type prev_log_serial: int
@param prev_log_serial: Last job message serial number
"""
logging.debug("Waiting for changes in job %s", job_id)
while True:
self.acquire()
try:
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
new_state = None
break
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
new_state = None
break
new_state = self._GetJobInfoUnlocked(job, fields)
finally:
self.release()
status = job.CalcStatus()
job_info = self._GetJobInfoUnlocked(job, fields)
log_entries = job.GetLogEntries(prev_log_serial)
# Serializing and deserializing data can cause type changes (e.g. from
# tuple to list) or precision loss. We're doing it here so that we get
# the same modifications as the data received from the client. Without
# this, the comparison afterwards might fail without the data being
# significantly different.
new_state = serializer.LoadJson(serializer.DumpJson(new_state))
job_info = serializer.LoadJson(serializer.DumpJson(job_info))
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
if previous != new_state:
if status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING):
# Don't even try to wait if the job is no longer running, there will be
# no changes.
break
job.change.acquire()
try:
job.change.wait()
finally:
job.change.release()
if (prev_job_info != job_info or
(log_entries and prev_log_serial != log_entries[0][0])):
break
logging.debug("Waiting again")
# Release the queue lock while waiting
job.change.wait()
logging.debug("Job %s changed", job_id)
return new_state
return (job_info, log_entries)
@utils.LockedMethod
@_RequireOpenQueue
......@@ -669,18 +687,6 @@ class JobQueue(object):
row.append([op.result for op in job.ops])
elif fname == "opstatus":
row.append([op.status for op in job.ops])
elif fname == "ticker":
ji = job.run_op_index
if ji < 0:
lmsg = None
else:
lmsg = job.ops[ji].RetrieveLog(-1)
# message might be empty here
if lmsg:
lmsg = lmsg[0]
else:
lmsg = None
row.append(lmsg)
else:
raise errors.OpExecError("Invalid job query field '%s'" % fname)
return row
......
......@@ -289,9 +289,9 @@ class Client(object):
def ArchiveJob(self, job_id):
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def WaitForJobChange(self, job_id, fields, previous):
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, previous))
(job_id, fields, prev_job_info, prev_log_serial))
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment