diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 50cd34c87d5ffb81b0148b6422d918439ae40cc8..794afe7ba03b07c919ef2e0e8b31b0a3a4a32a74 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -529,6 +529,21 @@ class NodeDaemonRequestHandler(http.HTTPRequestHandler): return backend.RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir) + @staticmethod + def perspective_jobqueue_update(params): + """Update job queue. + + """ + (file_name, content) = params + return backend.JobQueueUpdate(file_name, content) + + @staticmethod + def perspective_jobqueue_purge(params): + """Purge job queue. + + """ + return backend.JobQueuePurge() + class NodeDaemonHttpServer(http.HTTPServer): def __init__(self, server_address): diff --git a/lib/backend.py b/lib/backend.py index 47570c0e0b757ec5d5ae8e1165f416e1636144f7..b61f6cc1c7d995fe0aec3cc9db57d064d08db2fa 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -1658,6 +1658,22 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): return result +def JobQueueUpdate(file_name, content): + """Updates a file in the queue directory. + + """ + queue_dir = os.path.normpath(constants.QUEUE_DIR) + if os.path.commonprefix([queue_dir, file_name]) != queue_dir: + logging.error("'%s' is not a file in the queue directory", + file_name) + return False + + # Write and replace the file atomically + utils.WriteFile(file_name, data=content) + + return True + + def JobQueuePurge(): """Removes job queue files and archived jobs diff --git a/lib/rpc.py b/lib/rpc.py index 68ac0bc7073cd613e510d66c883ac4afec2a6238..368d1f713230ecbc622c04514960e7da8f59e96f 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -801,3 +801,28 @@ def call_file_storage_dir_rename(node, old_file_storage_dir, c.connect(node) c.run() return c.getresult().get(node, False) + + +def call_jobqueue_update(node_list, file_name, content): + """Update job queue. + + This is a multi-node call. + + """ + c = Client("jobqueue_update", [file_name, content]) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result + + +def call_jobqueue_purge(node): + """Purge job queue. + + This is a single-node call. + + """ + c = Client("jobqueue_purge", []) + c.connect(node) + c.run() + return c.getresult().get(node, False)