Commit 56d8ff91 authored by Iustin Pop's avatar Iustin Pop
Browse files

Add a luxi call for multi-job submit

As a workaround for the job submit timeouts that we have, this patch
adds a new luxi call for multi-job submit; the advantage is that all the
jobs are added in the queue and only after the workers can start
processing them.

This is definitely faster than per-job submit, where the submission of
new jobs competes with the workers processing jobs.

On a pure no-op OpDelay opcode (not on master, not on nodes), we have:
  - 100 jobs:
    - individual: submit time ~21s, processing time ~21s
    - multiple:   submit time 7-9s, processing time ~22s
  - 250 jobs:
    - individual: submit time ~56s, processing time ~57s
                  run 2:      ~54s                  ~55s
    - multiple:   submit time ~20s, processing time ~51s
                  run 2:      ~17s                  ~52s

which shows that we indeed gain on the client side, and maybe even on
the total processing time for a high number of jobs. For just 10 or so I
expect the difference to be just noise.

This will probably require increasing the timeout a little when
submitting too many jobs - 250 jobs at ~20 seconds is close to the
current rw timeout of 60s.
Signed-off-by: default avatarIustin Pop <>
Reviewed-by: default avatarGuido Trotter <>
(cherry picked from commit 2971c913)
parent f6424741
......@@ -214,6 +214,13 @@ class ClientOps:
ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
return queue.SubmitJob(ops)
if method == luxi.REQ_SUBMIT_MANY_JOBS:"Received multiple jobs")
jobs = []
for ops in args:
jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
return queue.SubmitManyJobs(jobs)
elif method == luxi.REQ_CANCEL_JOB:
job_id = args"Received job cancel request for %s", job_id)
......@@ -961,9 +961,8 @@ class JobQueue(object):
return True
def SubmitJob(self, ops):
def _SubmitJobUnlocked(self, ops):
"""Create and store a new job.
This enters the job into our job queue and also puts it on the new
......@@ -977,7 +976,7 @@ class JobQueue(object):
if self._IsQueueMarkedDrain():
raise errors.JobQueueDrainError()
raise errors.JobQueueDrainError("Job queue is drained, refusing job")
# Check job queue size
size = len(self._ListJobFiles())
......@@ -1005,6 +1004,37 @@ class JobQueue(object):
def SubmitJob(self, ops):
"""Create and store a new job.
@see: L{_SubmitJobUnlocked}
return self._SubmitJobUnlocked(ops)
def SubmitManyJobs(self, jobs):
"""Create and store multiple jobs.
@see: L{_SubmitJobUnlocked}
results = []
for ops in jobs:
data = self._SubmitJobUnlocked(ops)
status = True
except errors.GenericError, err:
data = str(err)
status = False
results.append((status, data))
return results
def UpdateJobUnlocked(self, job):
"""Update a job's on disk storage.
......@@ -45,6 +45,7 @@ KEY_SUCCESS = "success"
KEY_RESULT = "result"
REQ_SUBMIT_JOB = "SubmitJob"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
......@@ -342,6 +343,12 @@ class Client(object):
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
def SubmitManyJobs(self, jobs):
jobs_state = []
for ops in jobs:
jobs_state.append([op.__getstate__() for op in ops])
return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
def CancelJob(self, job_id):
return self.CallMethod(REQ_CANCEL_JOB, job_id)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment