Commit c8457ce7 authored by Iustin Pop's avatar Iustin Pop
Browse files

Convert the jobqueue rpc to new style result



This patch converts the job queue rpc calls to the new style result.
It's done in a single patch as there are helper function (in both jqueue
and backend) that are used by multiple rpcs and need synchronized
change.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent b2b8bcce
......@@ -708,7 +708,7 @@ class NodeHttpServer(http.server.HttpServer):
"""
# TODO: What if a file fails to rename?
return [backend.JobQueueRename(old, new) for old, new in params]
return True, [backend.JobQueueRename(old, new) for old, new in params]
@staticmethod
def perspective_jobqueue_set_drain(params):
......
......@@ -147,11 +147,13 @@ def _CleanDirectory(path, exclude=None):
def JobQueuePurge():
"""Removes job queue files and archived jobs.
@rtype: None
@rtype: tuple
@return: True, None
"""
_CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
_CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
return True, None
def GetMasterInfo():
......@@ -2138,23 +2140,21 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
return True, None
def _IsJobQueueFile(file_name):
def _EnsureJobQueueFile(file_name):
"""Checks whether the given filename is in the queue directory.
@type file_name: str
@param file_name: the file name we should check
@rtype: boolean
@return: whether the file is under the queue directory
@rtype: None
@raises RPCFail: if the file is not valid
"""
queue_dir = os.path.normpath(constants.QUEUE_DIR)
result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
if not result:
logging.error("'%s' is not a file in the queue directory",
file_name)
return result
_Fail("Passed job queue file '%s' does not belong to"
" the queue directory '%s'", file_name, queue_dir)
def JobQueueUpdate(file_name, content):
......@@ -2171,13 +2171,12 @@ def JobQueueUpdate(file_name, content):
@return: the success of the operation
"""
if not _IsJobQueueFile(file_name):
return False
_EnsureJobQueueFile(file_name)
# Write and replace the file atomically
utils.WriteFile(file_name, data=_Decompress(content))
return True
return True, None
def JobQueueRename(old, new):
......@@ -2189,16 +2188,16 @@ def JobQueueRename(old, new):
@param old: the old (actual) file name
@type new: str
@param new: the desired file name
@rtype: boolean
@return: the success of the operation
@rtype: tuple
@return: the success of the operation and payload
"""
if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
return False
_EnsureJobQueueFile(old)
_EnsureJobQueueFile(new)
utils.RenameFile(old, new, mkdir=True)
return True
return True, None
def JobQueueSetDrainFlag(drain_flag):
......@@ -2208,8 +2207,8 @@ def JobQueueSetDrainFlag(drain_flag):
@type drain_flag: boolean
@param drain_flag: if True, will set the drain flag, otherwise reset it.
@rtype: boolean
@return: always True
@rtype: truple
@return: always True, None
@warning: the function always returns True
"""
......@@ -2218,7 +2217,7 @@ def JobQueueSetDrainFlag(drain_flag):
else:
utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
return True
return True, None
def BlockdevClose(instance_name, disks):
......
......@@ -601,7 +601,11 @@ class JobQueue(object):
assert node_name != self._my_hostname
# Clean queue directory on added node
rpc.RpcRunner.call_jobqueue_purge(node_name)
result = rpc.RpcRunner.call_jobqueue_purge(node_name)
msg = result.RemoteFailMsg()
if msg:
logging.warning("Cannot cleanup queue directory on node %s: %s",
node_name, msg)
if not node.master_candidate:
# remove if existing, ignoring errors
......@@ -626,8 +630,10 @@ class JobQueue(object):
result = rpc.RpcRunner.call_jobqueue_update([node_name],
[node.primary_ip],
file_name, content)
if not result[node_name]:
logging.error("Failed to upload %s to %s", file_name, node_name)
msg = result[node_name].RemoteFailMsg()
if msg:
logging.error("Failed to upload file %s to node %s: %s",
file_name, node_name, msg)
self._nodes[node_name] = node.primary_ip
......@@ -664,13 +670,13 @@ class JobQueue(object):
success = []
for node in nodes:
if result[node]:
success.append(node)
else:
msg = result[node].RemoteFailMsg()
if msg:
failed.append(node)
if failed:
logging.error("%s failed on %s", failmsg, ", ".join(failed))
logging.error("RPC call %s failed on node %s: %s",
result[node].call, node, msg)
else:
success.append(node)
# +1 for the master node
if (len(success) + 1) < len(failed):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment