workerpool: Change signature of AddTask function to not use *args

By changing it to a normal parameter, which must be a sequence, we can
start using keyword parameters.

Before this patch all arguments to “AddTask(self, *args)” were passed as
arguments to the worker's “RunTask” method. Priorities, which should be
optional and will be implemented in a future patch, must be passed as a keyword
parameter. This means “*args” can no longer be used as one can't combine *args
and keyword parameters in a clean way:

>>> def f(name=None, *args):
...   print "%r, %r" % (args, name)
>>> f("p1", "p2", "p3", name="thename")
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
 TypeError: f() got multiple values for keyword argument 'name'
Signed-off-by: default avatarMichael Hanselmann <>
Reviewed-by: default avatarIustin Pop <>
......@@ -114,7 +114,7 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
self.server = server
def handle_message(self, message, _):
self.server.request_workers.AddTask(self.server, message, self)
self.server.request_workers.AddTask((self.server, message, self))
class MasterServer(daemon.AsyncStreamServer):
......@@ -912,7 +912,7 @@ class JobQueue(object):
status = job.CalcStatus()
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask((job, ))
elif status in (constants.JOB_STATUS_RUNNING,
......@@ -1339,7 +1339,7 @@ class JobQueue(object):
job_id = self._NewSerialsUnlocked(1)[0]
self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
return job_id
......@@ -189,9 +189,10 @@ class WorkerPool(object):
# Notify a waiting worker
def AddTask(self, *args):
def AddTask(self, args):
"""Adds a task to the queue.
@type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask}
......@@ -93,7 +93,7 @@ class TestWorkerpool(unittest.TestCase):
self._CheckWorkerCount(wp, 3)
for i in range(10):
wp.AddTask(ctx, "Hello world %s" % i)
wp.AddTask((ctx, "Hello world %s" % i))
......@@ -133,7 +133,7 @@ class TestWorkerpool(unittest.TestCase):
checksum = ChecksumContext.CHECKSUM_START
for i in range(1, 100):
checksum = ChecksumContext.UpdateChecksum(checksum, i)
wp.AddTask(ctx, i)
wp.AddTask((ctx, i))
......@@ -156,8 +156,8 @@ class TestWorkerpool(unittest.TestCase):
self._CheckWorkerCount(wp, 3)
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask(ctx, "A separate hello")
wp.AddTask(ctx, "Once more, hi!")
wp.AddTask((ctx, "A separate hello"))
wp.AddTask((ctx, "Once more, hi!"))
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
......@@ -180,7 +180,7 @@ class TestWorkerpool(unittest.TestCase):
[i for i in range(10)])
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask(ctx, "A separate hello")
wp.AddTask((ctx, "A separate hello"))
......@@ -819,7 +819,7 @@ def main():
# Add instance moves to workerpool
for move in moves:
wp.AddTask(rapi_factory, move)
wp.AddTask((rapi_factory, move))
# Wait for all moves to finish
