Skip to content
Snippets Groups Projects
Commit d2e03a33 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Implement {Add,Remove}Node

These functions will be used to notify the queue about newly added
or removed nodes.

Reviewed-by: iustinp
parent 4c848b18
No related branches found
No related tags found
No related merge requests found
...@@ -325,18 +325,33 @@ class JobQueue(object): ...@@ -325,18 +325,33 @@ class JobQueue(object):
finally: finally:
self.release() self.release()
def _WriteAndReplicateFileUnlocked(self, file_name, data): @utils.LockedMethod
"""Writes a file locally and then replicates it to all nodes. @_RequireOpenQueue
def AddNode(self, node_name):
assert node_name != self._my_hostname
""" # TODO: Clean queue directory on added node
utils.WriteFile(file_name, data=data)
nodes = self._nodes[:] # Upload the whole queue excluding archived jobs
files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
# Remove master node # Upload current serial file
files.append(constants.JOB_QUEUE_SERIAL_FILE)
for file_name in files:
result = rpc.call_upload_file([node_name], file_name)
if not result[node_name]:
logging.error("Failed to upload %s to %s", file_name, node_name)
self._nodes.add(node_name)
@utils.LockedMethod
@_RequireOpenQueue
def RemoveNode(self, node_name):
try: try:
nodes.remove(self._my_hostname) # The queue is removed by the "leave node" RPC call.
except ValueError: self._nodes.remove(node_name)
except KeyError:
pass pass
def _WriteAndReplicateFileUnlocked(self, file_name, data): def _WriteAndReplicateFileUnlocked(self, file_name, data):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment