Skip to content
Snippets Groups Projects
Commit ca52cdeb authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Add job queue RPC functions

jobqueue_update: Uploads a job queue file's content to a node. The
most common operation is to upload something that we already have
in a string. Unlike in the upload_file function, the file is not
read again when distributing changes, but content has to be passed
as a string.

jobqueue_purge: Removes all queue related files from a node.

Reviewed-by: iustinp
parent 3956cee1
No related branches found
No related tags found
No related merge requests found
...@@ -529,6 +529,21 @@ class NodeDaemonRequestHandler(http.HTTPRequestHandler): ...@@ -529,6 +529,21 @@ class NodeDaemonRequestHandler(http.HTTPRequestHandler):
return backend.RenameFileStorageDir(old_file_storage_dir, return backend.RenameFileStorageDir(old_file_storage_dir,
new_file_storage_dir) new_file_storage_dir)
@staticmethod
def perspective_jobqueue_update(params):
"""Update job queue.
"""
(file_name, content) = params
return backend.JobQueueUpdate(file_name, content)
@staticmethod
def perspective_jobqueue_purge(params):
"""Purge job queue.
"""
return backend.JobQueuePurge()
class NodeDaemonHttpServer(http.HTTPServer): class NodeDaemonHttpServer(http.HTTPServer):
def __init__(self, server_address): def __init__(self, server_address):
......
...@@ -1658,6 +1658,22 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): ...@@ -1658,6 +1658,22 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
return result return result
def JobQueueUpdate(file_name, content):
"""Updates a file in the queue directory.
"""
queue_dir = os.path.normpath(constants.QUEUE_DIR)
if os.path.commonprefix([queue_dir, file_name]) != queue_dir:
logging.error("'%s' is not a file in the queue directory",
file_name)
return False
# Write and replace the file atomically
utils.WriteFile(file_name, data=content)
return True
def JobQueuePurge(): def JobQueuePurge():
"""Removes job queue files and archived jobs """Removes job queue files and archived jobs
......
...@@ -801,3 +801,28 @@ def call_file_storage_dir_rename(node, old_file_storage_dir, ...@@ -801,3 +801,28 @@ def call_file_storage_dir_rename(node, old_file_storage_dir,
c.connect(node) c.connect(node)
c.run() c.run()
return c.getresult().get(node, False) return c.getresult().get(node, False)
def call_jobqueue_update(node_list, file_name, content):
"""Update job queue.
This is a multi-node call.
"""
c = Client("jobqueue_update", [file_name, content])
c.connect_list(node_list)
c.run()
result = c.getresult()
return result
def call_jobqueue_purge(node):
"""Purge job queue.
This is a single-node call.
"""
c = Client("jobqueue_purge", [])
c.connect(node)
c.run()
return c.getresult().get(node, False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment