From 4c36bdf562bcc1ff4b57a981daa7891b2c69070b Mon Sep 17 00:00:00 2001 From: Guido Trotter <ultrotter@google.com> Date: Thu, 17 Jun 2010 08:44:25 +0100 Subject: [PATCH] jqueue: make replication on job update optional Sometimes it's useful to write to the local filesystem, but immediate replication to all master candidates is not needed. The _WriteAndReplicateFileUnlocked function gets renamed to _UpdateJobQueueFile, as calling "write and replicate, but don't replicate" seemed a bit strange. Signed-off-by: Guido Trotter <ultrotter@google.com> Reviewed-by: Michael Hanselmann <hansmi@google.com> --- lib/jqueue.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index 6e0ccec20..001bc43e2 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -822,7 +822,7 @@ class JobQueue(object): addr_list = [self._nodes[name] for name in name_list] return name_list, addr_list - def _WriteAndReplicateFileUnlocked(self, file_name, data): + def _UpdateJobQueueFile(self, file_name, data, replicate): """Writes a file locally and then replicates it to all nodes. This function will replace the contents of a file on the local @@ -832,14 +832,16 @@ class JobQueue(object): @param file_name: the path of the file to be replicated @type data: str @param data: the new contents of the file + @type replicate: boolean + @param replicate: whether to spread the changes to the remote nodes """ utils.WriteFile(file_name, data=data) - names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) - self._CheckRpcResult(result, self._nodes, - "Updating %s" % file_name) + if replicate: + names, addrs = self._GetNodeIp() + result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): """Renames a file locally and then replicate the change. @@ -909,8 +911,8 @@ class JobQueue(object): serial = self._last_serial + count # Write to file - self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, - "%s\n" % serial) + self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE, + "%s\n" % serial, True) result = [self._FormatJobID(v) for v in range(self._last_serial, serial + 1)] @@ -1149,7 +1151,7 @@ class JobQueue(object): return results @_RequireOpenQueue - def UpdateJobUnlocked(self, job): + def UpdateJobUnlocked(self, job, replicate=True): """Update a job's on disk storage. After a job has been modified, this function needs to be called in @@ -1158,12 +1160,14 @@ class JobQueue(object): @type job: L{_QueuedJob} @param job: the changed job + @type replicate: boolean + @param replicate: whether to replicate the change to remote nodes """ filename = self._GetJobPath(job.id) data = serializer.DumpJson(job.Serialize(), indent=False) logging.debug("Writing job %s to %s", job.id, filename) - self._WriteAndReplicateFileUnlocked(filename, data) + self._UpdateJobQueueFile(filename, data, replicate) # Notify waiters about potential changes job.change.notifyAll() -- GitLab