diff --git a/lib/jqueue.py b/lib/jqueue.py index dfaf61570020edb51b73513492de86174d05c933..a96947883073171613e2915754f524002273648e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -325,18 +325,33 @@ class JobQueue(object): finally: self.release() - def _WriteAndReplicateFileUnlocked(self, file_name, data): - """Writes a file locally and then replicates it to all nodes. + @utils.LockedMethod + @_RequireOpenQueue + def AddNode(self, node_name): + assert node_name != self._my_hostname - """ - utils.WriteFile(file_name, data=data) + # TODO: Clean queue directory on added node - nodes = self._nodes[:] + # Upload the whole queue excluding archived jobs + files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] - # Remove master node + # Upload current serial file + files.append(constants.JOB_QUEUE_SERIAL_FILE) + + for file_name in files: + result = rpc.call_upload_file([node_name], file_name) + if not result[node_name]: + logging.error("Failed to upload %s to %s", file_name, node_name) + + self._nodes.add(node_name) + + @utils.LockedMethod + @_RequireOpenQueue + def RemoveNode(self, node_name): try: - nodes.remove(self._my_hostname) - except ValueError: + # The queue is removed by the "leave node" RPC call. + self._nodes.remove(node_name) + except KeyError: pass def _WriteAndReplicateFileUnlocked(self, file_name, data):