diff --git a/lib/jqueue.py b/lib/jqueue.py index 6b60c5e02fb3309506928ee5ff47436ae11f66fb..d137443c591b75f471e4f2805af68123545b733d 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1137,21 +1137,56 @@ class _JobQueueWorker(workerpool.BaseWorker): queue = job.queue assert queue == self.pool.queue - self.SetTaskName("Job%s" % job.id) + setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) + setname_fn(None) proc = mcpu.Processor(queue.context, job.id) - if not _JobProcessor(queue, proc.ExecOpCode, job)(): + # Create wrapper for setting thread name + wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, + proc.ExecOpCode) + + if not _JobProcessor(queue, wrap_execop_fn, job)(): # Schedule again raise workerpool.DeferTask(priority=job.CalcPriority()) + @staticmethod + def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): + """Updates the worker thread name to include a short summary of the opcode. + + @param setname_fn: Callable setting worker thread name + @param execop_fn: Callable for executing opcode (usually + L{mcpu.Processor.ExecOpCode}) + + """ + setname_fn(op) + try: + return execop_fn(op, *args, **kwargs) + finally: + setname_fn(None) + + @staticmethod + def _GetWorkerName(job, op): + """Sets the worker thread name. + + @type job: L{_QueuedJob} + @type op: L{opcodes.OpCode} + + """ + parts = ["Job%s" % job.id] + + if op: + parts.append(op.TinySummary()) + + return "/".join(parts) + class _JobQueueWorkerPool(workerpool.WorkerPool): """Simple class implementing a job-processing workerpool. """ def __init__(self, queue): - super(_JobQueueWorkerPool, self).__init__("JobQueue", + super(_JobQueueWorkerPool, self).__init__("Jq", JOBQUEUE_THREADS, _JobQueueWorker) self.queue = queue