Commit c3f0a12f authored by Iustin Pop's avatar Iustin Pop
Browse files

Distribute the queue serial file after each update

This patch adds distribution of the queue serial file after each write
to it (but before a new job is created and written with that ID, and
before a response is returned, so we should be safe from crashes in
between).

Currently it only logs if a node cannot be contacted, it should abort if
> 50% errors are seen.

Reviewed-by: imsnah
parent c4beba1c
......@@ -208,7 +208,11 @@ class ClientOps:
if method == luxi.REQ_SUBMIT_JOB:
ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
return queue.SubmitJob(ops)
# we need to compute the node list here, since from now on all
# operations require locks on the queue or the storage, and we
# shouldn't get another lock
node_list = self.server.context.cfg.GetNodeList()
return queue.SubmitJob(ops, node_list)
elif method == luxi.REQ_CANCEL_JOB:
(job_id, ) = args
......
......@@ -986,6 +986,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
constants.ETC_HOSTS,
constants.SSH_KNOWN_HOSTS_FILE,
constants.VNC_PASSWORD_FILE,
constants.JOB_QUEUE_SERIAL_FILE,
]
allowed_files.extend(ssconf.SimpleStore().GetFileList())
if file_name not in allowed_files:
......
......@@ -35,6 +35,7 @@ from ganeti import opcodes
from ganeti import errors
from ganeti import mcpu
from ganeti import utils
from ganeti import rpc
JOBQUEUE_THREADS = 5
......@@ -269,6 +270,7 @@ class JobStorage(object):
def __init__(self):
self._lock = threading.Lock()
self._memcache = {}
self._my_hostname = utils.HostInfo().name
# Make sure our directory exists
try:
......@@ -350,7 +352,7 @@ class JobStorage(object):
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % 0)
def _NewSerialUnlocked(self):
def _NewSerialUnlocked(self, nodes):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
......@@ -370,6 +372,17 @@ class JobStorage(object):
# Keep it only if we were able to write the file
self._last_serial = serial
# Distribute the serial to the other nodes
try:
nodes.remove(self._my_hostname)
except ValueError:
pass
result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
for node in nodes:
if not result[node]:
logging.error("copy of job queue file to node %s failed", node)
return serial
def _GetJobPath(self, job_id):
......@@ -434,11 +447,20 @@ class JobStorage(object):
return self._GetJobsUnlocked(job_ids)
@utils.LockedMethod
def AddJob(self, ops):
def AddJob(self, ops, nodes):
"""Create and store on disk a new job.
@type ops: list
@param ops: The list of OpCodes that will becom the new job.
@type nodes: list
@param nodes: The list of nodes to which the new job serial will be
distributed.
"""
assert self.lock_fd, "Queue should be open"
# Get job identifier
job_id = self._NewSerialUnlocked()
job_id = self._NewSerialUnlocked(nodes)
job = _QueuedJob(self, job_id, ops)
# Write to disk
......@@ -504,17 +526,20 @@ class JobQueue:
job.SetUnclean("Unclean master daemon shutdown")
@utils.LockedMethod
def SubmitJob(self, ops):
def SubmitJob(self, ops, nodes):
"""Add a new job to the queue.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
Args:
- ops: Sequence of opcodes
@type ops: list
@param ops: the sequence of opcodes that will become the new job
@type nodes: list
@param nodes: the list of nodes to which the queue should be
distributed
"""
job = self._jobs.AddJob(ops)
job = self._jobs.AddJob(ops, nodes)
# Add to worker pool
self._wpool.AddTask(job)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment