Skip to content
Snippets Groups Projects
Commit 25d6d12a authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Implement queue locking in node daemon

Reviewed-by: iustinp
parent 5d6fb8eb
No related branches found
No related tags found
No related merge requests found
...@@ -39,11 +39,15 @@ from ganeti import logger ...@@ -39,11 +39,15 @@ from ganeti import logger
from ganeti import constants from ganeti import constants
from ganeti import objects from ganeti import objects
from ganeti import errors from ganeti import errors
from ganeti import jstore
from ganeti import ssconf from ganeti import ssconf
from ganeti import http from ganeti import http
from ganeti import utils from ganeti import utils
queue_lock = None
class NodeDaemonRequestHandler(http.HTTPRequestHandler): class NodeDaemonRequestHandler(http.HTTPRequestHandler):
"""The server implementation. """The server implementation.
...@@ -539,7 +543,15 @@ class NodeDaemonRequestHandler(http.HTTPRequestHandler): ...@@ -539,7 +543,15 @@ class NodeDaemonRequestHandler(http.HTTPRequestHandler):
""" """
(file_name, content) = params (file_name, content) = params
return backend.JobQueueUpdate(file_name, content)
# Locking in exclusive, blocking mode because there could be several
# children running at the same time.
# TODO: Implement nonblocking locking with retries?
queue_lock.Exclusive(blocking=True)
try:
return backend.JobQueueUpdate(file_name, content)
finally:
queue_lock.Unlock()
@staticmethod @staticmethod
def perspective_jobqueue_purge(params): def perspective_jobqueue_purge(params):
...@@ -600,6 +612,8 @@ def main(): ...@@ -600,6 +612,8 @@ def main():
"""Main function for the node daemon. """Main function for the node daemon.
""" """
global queue_lock
options, args = ParseOptions() options, args = ParseOptions()
utils.debug = options.debug utils.debug = options.debug
for fname in (constants.SSL_CERT_FILE,): for fname in (constants.SSL_CERT_FILE,):
...@@ -640,6 +654,9 @@ def main(): ...@@ -640,6 +654,9 @@ def main():
stderr_logging=not options.fork) stderr_logging=not options.fork)
logging.info("ganeti node daemon startup") logging.info("ganeti node daemon startup")
# Prepare job queue
queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
if options.fork: if options.fork:
server = ForkingHTTPServer(('', port)) server = ForkingHTTPServer(('', port))
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment