From daba67c785a583b30528b51549db165885b3ba59 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Mon, 23 Aug 2010 13:31:55 +0200 Subject: [PATCH] workerpool: Allow setting task name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With this patch, the task name is added to the thread name and will show up in logs. Log messages from jobs will look like βpid=578/JobQueue14/Job13 mcpu:289 DEBUG LU locks acquired/cluster/BGL/sharedβ. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Guido Trotter <ultrotter@google.com> --- lib/jqueue.py | 2 ++ lib/workerpool.py | 26 +++++++++++++++++++++++++- test/ganeti.workerpool_unittest.py | 7 +++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index 96608e24f..4126db747 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -690,6 +690,8 @@ class _JobQueueWorker(workerpool.BaseWorker): @param job: the job to be processed """ + self.SetTaskName("Job%s" % job.id) + logging.info("Processing job %s", job.id) proc = mcpu.Processor(self.pool.queue.context, job.id) queue = job.queue diff --git a/lib/workerpool.py b/lib/workerpool.py index 812732946..9f00b91c8 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -49,8 +49,11 @@ class BaseWorker(threading.Thread, object): """ super(BaseWorker, self).__init__(name=worker_id) self.pool = pool + self._worker_id = worker_id self._current_task = None + assert self.getName() == worker_id + def ShouldTerminate(self): """Returns whether this worker should terminate. @@ -64,6 +67,23 @@ class BaseWorker(threading.Thread, object): finally: self.pool._lock.release() + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + def _HasRunningTaskUnlocked(self): """Returns whether this worker is currently running a task. @@ -107,7 +127,11 @@ class BaseWorker(threading.Thread, object): # Run the actual task try: logging.debug("Starting task %r", self._current_task) - self.RunTask(*self._current_task) + assert self.getName() == self._worker_id + try: + self.RunTask(*self._current_task) + finally: + self.SetTaskName(None) logging.debug("Done with task %r", self._current_task) except: # pylint: disable-msg=W0702 logging.exception("Caught unhandled exception") diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 586cc5e9f..549edba53 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -76,6 +76,13 @@ class ChecksumContext: class ChecksumBaseWorker(workerpool.BaseWorker): def RunTask(self, ctx, number): + name = "number%s" % number + self.SetTaskName(name) + + # This assertion needs to be checked before updating the checksum. A + # failing assertion will then cause the result to be wrong. + assert self.getName() == ("%s/%s" % (self._worker_id, name)) + ctx.lock.acquire() try: ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) -- GitLab