diff --git a/lib/jqueue.py b/lib/jqueue.py index edc69b6f241f2109b087737c3b70c22b6af7673b..e55f46d5cc2ccbfd67e16a4a480e90c5a6e0cfd3 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -401,30 +401,40 @@ class JobQueue(object): except KeyError: pass + def _CheckRpcResult(self, result, nodes, failmsg): + failed = [] + success = [] + + for node in nodes: + if result[node]: + success.append(node) + else: + failed.append(node) + + if failed: + logging.error("%s failed on %s", failmsg, ", ".join(failed)) + + # +1 for the master node + if (len(success) + 1) < len(failed): + # TODO: Handle failing nodes + logging.error("More than half of the nodes failed") + def _WriteAndReplicateFileUnlocked(self, file_name, data): """Writes a file locally and then replicates it to all nodes. """ utils.WriteFile(file_name, data=data) - failed_nodes = 0 result = rpc.call_jobqueue_update(self._nodes, file_name, data) - for node in self._nodes: - if not result[node]: - failed_nodes += 1 - logging.error("Copy of job queue file to node %s failed", node) - - # TODO: check failed_nodes + self._CheckRpcResult(result, self._nodes, + "Updating %s" % file_name) def _RenameFileUnlocked(self, old, new): os.rename(old, new) result = rpc.call_jobqueue_rename(self._nodes, old, new) - for node in self._nodes: - if not result[node]: - logging.error("Moving %s to %s failed on %s", old, new, node) - - # TODO: check failed nodes + self._CheckRpcResult(result, self._nodes, + "Moving %s to %s" % (old, new)) def _FormatJobID(self, job_id): if not isinstance(job_id, (int, long)):