From c8457ce738e9aea1d17d9bb4e98d6a46675f4a16 Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Wed, 10 Jun 2009 19:27:48 +0200 Subject: [PATCH] Convert the jobqueue rpc to new style result This patch converts the job queue rpc calls to the new style result. It's done in a single patch as there are helper function (in both jqueue and backend) that are used by multiple rpcs and need synchronized change. Signed-off-by: Iustin Pop <iustin@google.com> Reviewed-by: Guido Trotter <ultrotter@google.com> --- daemons/ganeti-noded | 2 +- lib/backend.py | 37 ++++++++++++++++++------------------- lib/jqueue.py | 24 +++++++++++++++--------- 3 files changed, 34 insertions(+), 29 deletions(-) diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 3e53f2b66..15af1014c 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -708,7 +708,7 @@ class NodeHttpServer(http.server.HttpServer): """ # TODO: What if a file fails to rename? - return [backend.JobQueueRename(old, new) for old, new in params] + return True, [backend.JobQueueRename(old, new) for old, new in params] @staticmethod def perspective_jobqueue_set_drain(params): diff --git a/lib/backend.py b/lib/backend.py index 79e44bfbe..e5077c171 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -147,11 +147,13 @@ def _CleanDirectory(path, exclude=None): def JobQueuePurge(): """Removes job queue files and archived jobs. - @rtype: None + @rtype: tuple + @return: True, None """ _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE]) _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR) + return True, None def GetMasterInfo(): @@ -2138,23 +2140,21 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): return True, None -def _IsJobQueueFile(file_name): +def _EnsureJobQueueFile(file_name): """Checks whether the given filename is in the queue directory. @type file_name: str @param file_name: the file name we should check - @rtype: boolean - @return: whether the file is under the queue directory + @rtype: None + @raises RPCFail: if the file is not valid """ queue_dir = os.path.normpath(constants.QUEUE_DIR) result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) if not result: - logging.error("'%s' is not a file in the queue directory", - file_name) - - return result + _Fail("Passed job queue file '%s' does not belong to" + " the queue directory '%s'", file_name, queue_dir) def JobQueueUpdate(file_name, content): @@ -2171,13 +2171,12 @@ def JobQueueUpdate(file_name, content): @return: the success of the operation """ - if not _IsJobQueueFile(file_name): - return False + _EnsureJobQueueFile(file_name) # Write and replace the file atomically utils.WriteFile(file_name, data=_Decompress(content)) - return True + return True, None def JobQueueRename(old, new): @@ -2189,16 +2188,16 @@ def JobQueueRename(old, new): @param old: the old (actual) file name @type new: str @param new: the desired file name - @rtype: boolean - @return: the success of the operation + @rtype: tuple + @return: the success of the operation and payload """ - if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)): - return False + _EnsureJobQueueFile(old) + _EnsureJobQueueFile(new) utils.RenameFile(old, new, mkdir=True) - return True + return True, None def JobQueueSetDrainFlag(drain_flag): @@ -2208,8 +2207,8 @@ def JobQueueSetDrainFlag(drain_flag): @type drain_flag: boolean @param drain_flag: if True, will set the drain flag, otherwise reset it. - @rtype: boolean - @return: always True + @rtype: truple + @return: always True, None @warning: the function always returns True """ @@ -2218,7 +2217,7 @@ def JobQueueSetDrainFlag(drain_flag): else: utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) - return True + return True, None def BlockdevClose(instance_name, disks): diff --git a/lib/jqueue.py b/lib/jqueue.py index 90757a5d0..ed2b9a908 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -601,7 +601,11 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - rpc.RpcRunner.call_jobqueue_purge(node_name) + result = rpc.RpcRunner.call_jobqueue_purge(node_name) + msg = result.RemoteFailMsg() + if msg: + logging.warning("Cannot cleanup queue directory on node %s: %s", + node_name, msg) if not node.master_candidate: # remove if existing, ignoring errors @@ -626,8 +630,10 @@ class JobQueue(object): result = rpc.RpcRunner.call_jobqueue_update([node_name], [node.primary_ip], file_name, content) - if not result[node_name]: - logging.error("Failed to upload %s to %s", file_name, node_name) + msg = result[node_name].RemoteFailMsg() + if msg: + logging.error("Failed to upload file %s to node %s: %s", + file_name, node_name, msg) self._nodes[node_name] = node.primary_ip @@ -664,13 +670,13 @@ class JobQueue(object): success = [] for node in nodes: - if result[node]: - success.append(node) - else: + msg = result[node].RemoteFailMsg() + if msg: failed.append(node) - - if failed: - logging.error("%s failed on %s", failmsg, ", ".join(failed)) + logging.error("RPC call %s failed on node %s: %s", + result[node].call, node, msg) + else: + success.append(node) # +1 for the master node if (len(success) + 1) < len(failed): -- GitLab