diff --git a/lib/jqueue.py b/lib/jqueue.py index 6e0ccec206eaae45f6a7c66035452606189a593b..001bc43e2bd86efd4ee9edd547556a50f7f6c363 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -822,7 +822,7 @@ class JobQueue(object): addr_list = [self._nodes[name] for name in name_list] return name_list, addr_list - def _WriteAndReplicateFileUnlocked(self, file_name, data): + def _UpdateJobQueueFile(self, file_name, data, replicate): """Writes a file locally and then replicates it to all nodes. This function will replace the contents of a file on the local @@ -832,14 +832,16 @@ class JobQueue(object): @param file_name: the path of the file to be replicated @type data: str @param data: the new contents of the file + @type replicate: boolean + @param replicate: whether to spread the changes to the remote nodes """ utils.WriteFile(file_name, data=data) - names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) - self._CheckRpcResult(result, self._nodes, - "Updating %s" % file_name) + if replicate: + names, addrs = self._GetNodeIp() + result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): """Renames a file locally and then replicate the change. @@ -909,8 +911,8 @@ class JobQueue(object): serial = self._last_serial + count # Write to file - self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, - "%s\n" % serial) + self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, + "%s\n" % serial, True) result = [self._FormatJobID(v) for v in range(self._last_serial, serial + 1)] @@ -1149,7 +1151,7 @@ class JobQueue(object): return results @_RequireOpenQueue - def UpdateJobUnlocked(self, job): + def UpdateJobUnlocked(self, job, replicate=True): """Update a job's on disk storage. After a job has been modified, this function needs to be called in @@ -1158,12 +1160,14 @@ class JobQueue(object): @type job: L{_QueuedJob} @param job: the changed job + @type replicate: boolean + @param replicate: whether to replicate the change to remote nodes """ filename = self._GetJobPath(job.id) data = serializer.DumpJson(job.Serialize(), indent=False) logging.debug("Writing job %s to %s", job.id, filename) - self._WriteAndReplicateFileUnlocked(filename, data) + self._UpdateJobQueueFile(filename, data, replicate) # Notify waiters about potential changes job.change.notifyAll()