Commit 23752136 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Replicate jobs to all nodes

Newly added nodes are not yet taken care of. Queue locking on
non-master nodes is not yet correct.

Reviewed-by: iustinp
parent 04ab05ce
...@@ -290,6 +290,11 @@ class JobQueue(object): ...@@ -290,6 +290,11 @@ class JobQueue(object):
assert self._last_serial is not None, ("Serial file was modified between" assert self._last_serial is not None, ("Serial file was modified between"
" check in jstore and here") " check in jstore and here")
# Get initial list of nodes
self._nodes = self.context.cfg.GetNodeList()
# TODO: Check consistency across nodes
# Setup worker pool # Setup worker pool
self._wpool = _JobQueueWorkerPool(self) self._wpool = _JobQueueWorkerPool(self)
...@@ -314,6 +319,29 @@ class JobQueue(object): ...@@ -314,6 +319,29 @@ class JobQueue(object):
finally: finally:
self.release() self.release()
def _WriteAndReplicateFileUnlocked(self, file_name, data):
"""Writes a file locally and then replicates it to all nodes.
"""
utils.WriteFile(file_name, data=data)
nodes = self._nodes[:]
# Remove master node
try:
nodes.remove(self._my_hostname)
except ValueError:
pass
failed_nodes = 0
result = rpc.call_upload_file(nodes, file_name)
for node in nodes:
if not result[node]:
failed_nodes += 1
logging.error("Copy of job queue file to node %s failed", node)
# TODO: check failed_nodes
def _FormatJobID(self, job_id): def _FormatJobID(self, job_id):
if not isinstance(job_id, (int, long)): if not isinstance(job_id, (int, long)):
raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
...@@ -334,23 +362,12 @@ class JobQueue(object): ...@@ -334,23 +362,12 @@ class JobQueue(object):
serial = self._last_serial + 1 serial = self._last_serial + 1
# Write to file # Write to file
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % serial) "%s\n" % serial)
# Keep it only if we were able to write the file # Keep it only if we were able to write the file
self._last_serial = serial self._last_serial = serial
# Distribute the serial to the other nodes
try:
nodes.remove(self._my_hostname)
except ValueError:
pass
result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
for node in nodes:
if not result[node]:
logging.error("copy of job queue file to node %s failed", node)
return self._FormatJobID(serial) return self._FormatJobID(serial)
@staticmethod @staticmethod
...@@ -450,9 +467,9 @@ class JobQueue(object): ...@@ -450,9 +467,9 @@ class JobQueue(object):
@_RequireOpenQueue @_RequireOpenQueue
def UpdateJobUnlocked(self, job): def UpdateJobUnlocked(self, job):
filename = self._GetJobPath(job.id) filename = self._GetJobPath(job.id)
data = serializer.DumpJson(job.Serialize(), indent=False)
logging.debug("Writing job %s to %s", job.id, filename) logging.debug("Writing job %s to %s", job.id, filename)
utils.WriteFile(filename, self._WriteAndReplicateFileUnlocked(filename, data)
data=serializer.DumpJson(job.Serialize(), indent=False))
self._CleanCacheUnlocked([job.id]) self._CleanCacheUnlocked([job.id])
def _CleanCacheUnlocked(self, exclude): def _CleanCacheUnlocked(self, exclude):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment