diff --git a/lib/jqueue.py b/lib/jqueue.py index 42ad15f3d1cc4b832c1c72f83150f292b90d112c..feaedfb0a0fb3821d6e27c23e3a484c8101edd8c 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -290,6 +290,11 @@ class JobQueue(object): assert self._last_serial is not None, ("Serial file was modified between" " check in jstore and here") + # Get initial list of nodes + self._nodes = self.context.cfg.GetNodeList() + + # TODO: Check consistency across nodes + # Setup worker pool self._wpool = _JobQueueWorkerPool(self) @@ -314,6 +319,29 @@ class JobQueue(object): finally: self.release() + def _WriteAndReplicateFileUnlocked(self, file_name, data): + """Writes a file locally and then replicates it to all nodes. + + """ + utils.WriteFile(file_name, data=data) + + nodes = self._nodes[:] + + # Remove master node + try: + nodes.remove(self._my_hostname) + except ValueError: + pass + + failed_nodes = 0 + result = rpc.call_upload_file(nodes, file_name) + for node in nodes: + if not result[node]: + failed_nodes += 1 + logging.error("Copy of job queue file to node %s failed", node) + + # TODO: check failed_nodes + def _FormatJobID(self, job_id): if not isinstance(job_id, (int, long)): raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) @@ -334,23 +362,12 @@ class JobQueue(object): serial = self._last_serial + 1 # Write to file - utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, - data="%s\n" % serial) + self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, + "%s\n" % serial) # Keep it only if we were able to write the file self._last_serial = serial - # Distribute the serial to the other nodes - try: - nodes.remove(self._my_hostname) - except ValueError: - pass - - result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE) - for node in nodes: - if not result[node]: - logging.error("copy of job queue file to node %s failed", node) - return self._FormatJobID(serial) @staticmethod @@ -450,9 +467,9 @@ class JobQueue(object): @_RequireOpenQueue def UpdateJobUnlocked(self, job): filename = self._GetJobPath(job.id) + data = serializer.DumpJson(job.Serialize(), indent=False) logging.debug("Writing job %s to %s", job.id, filename) - utils.WriteFile(filename, - data=serializer.DumpJson(job.Serialize(), indent=False)) + self._WriteAndReplicateFileUnlocked(filename, data) self._CleanCacheUnlocked([job.id]) def _CleanCacheUnlocked(self, exclude):