Commit 4c36bdf5 authored by Guido Trotter's avatar Guido Trotter
Browse files

jqueue: make replication on job update optional



Sometimes it's useful to write to the local filesystem, but immediate
replication to all master candidates is not needed.

The _WriteAndReplicateFileUnlocked function gets renamed to
_UpdateJobQueueFile, as calling "write and replicate, but don't
replicate" seemed a bit strange.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 6a290889
......@@ -822,7 +822,7 @@ class JobQueue(object):
addr_list = [self._nodes[name] for name in name_list]
return name_list, addr_list
def _WriteAndReplicateFileUnlocked(self, file_name, data):
def _UpdateJobQueueFile(self, file_name, data, replicate):
"""Writes a file locally and then replicates it to all nodes.
This function will replace the contents of a file on the local
......@@ -832,14 +832,16 @@ class JobQueue(object):
@param file_name: the path of the file to be replicated
@type data: str
@param data: the new contents of the file
@type replicate: boolean
@param replicate: whether to spread the changes to the remote nodes
"""
utils.WriteFile(file_name, data=data)
names, addrs = self._GetNodeIp()
result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
self._CheckRpcResult(result, self._nodes,
"Updating %s" % file_name)
if replicate:
names, addrs = self._GetNodeIp()
result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
def _RenameFilesUnlocked(self, rename):
"""Renames a file locally and then replicate the change.
......@@ -909,8 +911,8 @@ class JobQueue(object):
serial = self._last_serial + count
# Write to file
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
"%s\n" % serial)
self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
"%s\n" % serial, True)
result = [self._FormatJobID(v)
for v in range(self._last_serial, serial + 1)]
......@@ -1149,7 +1151,7 @@ class JobQueue(object):
return results
@_RequireOpenQueue
def UpdateJobUnlocked(self, job):
def UpdateJobUnlocked(self, job, replicate=True):
"""Update a job's on disk storage.
After a job has been modified, this function needs to be called in
......@@ -1158,12 +1160,14 @@ class JobQueue(object):
@type job: L{_QueuedJob}
@param job: the changed job
@type replicate: boolean
@param replicate: whether to replicate the change to remote nodes
"""
filename = self._GetJobPath(job.id)
data = serializer.DumpJson(job.Serialize(), indent=False)
logging.debug("Writing job %s to %s", job.id, filename)
self._WriteAndReplicateFileUnlocked(filename, data)
self._UpdateJobQueueFile(filename, data, replicate)
# Notify waiters about potential changes
job.change.notifyAll()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment