Skip to content
Snippets Groups Projects
Commit daba67c7 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Allow setting task name


With this patch, the task name is added to the thread name and will show up in
logs. Log messages from jobs will look like “pid=578/JobQueue14/Job13 mcpu:289
DEBUG LU locks acquired/cluster/BGL/shared”.

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent a4ebd726
No related branches found
No related tags found
No related merge requests found
...@@ -690,6 +690,8 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -690,6 +690,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
@param job: the job to be processed @param job: the job to be processed
""" """
self.SetTaskName("Job%s" % job.id)
logging.info("Processing job %s", job.id) logging.info("Processing job %s", job.id)
proc = mcpu.Processor(self.pool.queue.context, job.id) proc = mcpu.Processor(self.pool.queue.context, job.id)
queue = job.queue queue = job.queue
......
...@@ -49,8 +49,11 @@ class BaseWorker(threading.Thread, object): ...@@ -49,8 +49,11 @@ class BaseWorker(threading.Thread, object):
""" """
super(BaseWorker, self).__init__(name=worker_id) super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool self.pool = pool
self._worker_id = worker_id
self._current_task = None self._current_task = None
assert self.getName() == worker_id
def ShouldTerminate(self): def ShouldTerminate(self):
"""Returns whether this worker should terminate. """Returns whether this worker should terminate.
...@@ -64,6 +67,23 @@ class BaseWorker(threading.Thread, object): ...@@ -64,6 +67,23 @@ class BaseWorker(threading.Thread, object):
finally: finally:
self.pool._lock.release() self.pool._lock.release()
def SetTaskName(self, taskname):
"""Sets the name of the current task.
Should only be called from within L{RunTask}.
@type taskname: string
@param taskname: Task's name
"""
if taskname:
name = "%s/%s" % (self._worker_id, taskname)
else:
name = self._worker_id
# Set thread name
self.setName(name)
def _HasRunningTaskUnlocked(self): def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task. """Returns whether this worker is currently running a task.
...@@ -107,7 +127,11 @@ class BaseWorker(threading.Thread, object): ...@@ -107,7 +127,11 @@ class BaseWorker(threading.Thread, object):
# Run the actual task # Run the actual task
try: try:
logging.debug("Starting task %r", self._current_task) logging.debug("Starting task %r", self._current_task)
self.RunTask(*self._current_task) assert self.getName() == self._worker_id
try:
self.RunTask(*self._current_task)
finally:
self.SetTaskName(None)
logging.debug("Done with task %r", self._current_task) logging.debug("Done with task %r", self._current_task)
except: # pylint: disable-msg=W0702 except: # pylint: disable-msg=W0702
logging.exception("Caught unhandled exception") logging.exception("Caught unhandled exception")
......
...@@ -76,6 +76,13 @@ class ChecksumContext: ...@@ -76,6 +76,13 @@ class ChecksumContext:
class ChecksumBaseWorker(workerpool.BaseWorker): class ChecksumBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, number): def RunTask(self, ctx, number):
name = "number%s" % number
self.SetTaskName(name)
# This assertion needs to be checked before updating the checksum. A
# failing assertion will then cause the result to be wrong.
assert self.getName() == ("%s/%s" % (self._worker_id, name))
ctx.lock.acquire() ctx.lock.acquire()
try: try:
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment