Skip to content
Snippets Groups Projects
Commit f1da30e6 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Add experimental persistency to job queue

It's not perfect and it's not finished, but it's a start.

- Serial number is read only once, but written on each update
- Jobs are kept only on disk (caching will be implemented)

Reviewed-by: iustinp
parent 18682bca
No related branches found
No related tags found
No related merge requests found
......@@ -91,6 +91,7 @@ CLUSTER_CONF_FILE = DATA_DIR + "/config.data"
SSL_CERT_FILE = DATA_DIR + "/server.pem"
WATCHER_STATEFILE = DATA_DIR + "/watcher.data"
SSH_KNOWN_HOSTS_FILE = DATA_DIR + "/known_hosts"
QUEUE_DIR = DATA_DIR + "/queue"
ETC_HOSTS = "/etc/hosts"
DEFAULT_FILE_STORAGE_DIR = _autoconf.FILE_STORAGE_DIR
MASTER_SOCKET = RUN_GANETI_DIR + "/master.sock"
......@@ -244,6 +245,12 @@ IARUN_NOTFOUND = 1
IARUN_FAILURE = 2
IARUN_SUCCESS = 3
# Job queue
JOB_QUEUE_VERSION = 1
JOB_QUEUE_LOCK_FILE = QUEUE_DIR + "/lock"
JOB_QUEUE_VERSION_FILE = QUEUE_DIR + "/version"
JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial"
# Job status
JOB_STATUS_QUEUED = "queued"
JOB_STATUS_RUNNING = "running"
......
......@@ -236,3 +236,8 @@ class QuitGanetiException(Exception):
"""
class JobQueueError(Exception):
"""Job queue error.
"""
......@@ -21,11 +21,16 @@
"""Module implementing the job queue handling."""
import os
import logging
import threading
import errno
import re
from ganeti import constants
from ganeti import serializer
from ganeti import workerpool
from ganeti import opcodes
from ganeti import errors
from ganeti import mcpu
from ganeti import utils
......@@ -41,10 +46,28 @@ class _QueuedOpCode(object):
"""
def __init__(self, op):
self.input = op
self.status = constants.OP_STATUS_QUEUED
self.result = None
self.__Setup(op, constants.OP_STATUS_QUEUED, None)
def __Setup(self, input, status, result):
self._lock = threading.Lock()
self.input = input
self.status = status
self.result = result
@classmethod
def Restore(cls, state):
obj = object.__new__(cls)
obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
state["status"], state["result"])
return obj
@utils.LockedMethod
def Serialize(self):
return {
"input": self.input.__getstate__(),
"status": self.status,
"result": self.result,
}
@utils.LockedMethod
def GetInput(self):
......@@ -82,17 +105,37 @@ class _QueuedJob(object):
This is what we use to track the user-submitted jobs.
"""
def __init__(self, ops, job_id):
def __init__(self, storage, job_id, ops):
if not ops:
# TODO
raise Exception("No opcodes")
self.id = job_id
self._lock = threading.Lock()
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
# _ops should not be modified again because we don't acquire the lock
# to use it.
self._ops = [_QueuedOpCode(op) for op in ops]
def __Setup(self, storage, job_id, ops):
self.storage = storage
self.id = job_id
self._ops = ops
@classmethod
def Restore(cls, storage, state):
obj = object.__new__(cls)
obj.__Setup(storage, state["id"],
[_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
return obj
def Serialize(self):
return {
"id": self.id,
"ops": [op.Serialize() for op in self._ops],
}
def SetUnclean(self, msg):
try:
for op in self._ops:
op.SetStatus(constants.OP_STATUS_ERROR, msg)
finally:
self.storage.UpdateJob(self)
def GetStatus(self):
status = constants.JOB_STATUS_QUEUED
......@@ -107,10 +150,12 @@ class _QueuedJob(object):
if op_status == constants.OP_STATUS_QUEUED:
pass
elif op_status == constants.OP_STATUS_ERROR:
status = constants.JOB_STATUS_ERROR
elif op_status == constants.OP_STATUS_RUNNING:
status = constants.JOB_STATUS_RUNNING
elif op_status == constants.OP_STATUS_ERROR:
status = constants.JOB_STATUS_ERROR
# The whole job fails if one opcode failed
break
if all_success:
status = constants.JOB_STATUS_SUCCESS
......@@ -133,15 +178,20 @@ class _QueuedJob(object):
try:
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
op.SetStatus(constants.OP_STATUS_RUNNING, None)
self.storage.UpdateJob(self)
result = proc.ExecOpCode(op.input)
op.SetStatus(constants.OP_STATUS_SUCCESS, result)
self.storage.UpdateJob(self)
logging.debug("Op %s/%s: Successfully finished %s",
idx + 1, count, op)
except Exception, err:
op.SetStatus(constants.OP_STATUS_ERROR, str(err))
logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
try:
op.SetStatus(constants.OP_STATUS_ERROR, str(err))
logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
finally:
self.storage.UpdateJob(self)
raise
except errors.GenericError, err:
......@@ -172,25 +222,181 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
self.context = context
class JobStorage(object):
_RE_JOB_FILE = re.compile(r"^job-\d+$")
def __init__(self):
self._lock = threading.Lock()
# Make sure our directory exists
try:
os.mkdir(constants.QUEUE_DIR, 0700)
except OSError, err:
if err.errno not in (errno.EEXIST, ):
raise
# Get queue lock
self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
try:
utils.LockFile(self.lock_fd)
except:
self.lock_fd.close()
raise
# Read version
try:
version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
except IOError, err:
if err.errno not in (errno.ENOENT, ):
raise
# Setup a new queue
self._InitQueueUnlocked()
# Try to open again
version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
try:
# Try to read version
version = int(version_fd.read(128))
# Verify version
if version != constants.JOB_QUEUE_VERSION:
raise errors.JobQueueError("Found version %s, expected %s",
version, constants.JOB_QUEUE_VERSION)
finally:
version_fd.close()
serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
try:
# Read last serial
self._last_serial = int(serial_fd.read(1024).strip())
finally:
serial_fd.close()
def Close(self):
assert self.lock_fd, "Queue should be open"
self.lock_fd.close()
self.lock_fd = None
def _InitQueueUnlocked(self):
assert self.lock_fd, "Queue should be open"
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
data="%s\n" % constants.JOB_QUEUE_VERSION)
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % 0)
def _NewSerialUnlocked(self):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
Returns: A string representing the job identifier.
"""
assert self.lock_fd, "Queue should be open"
# New number
serial = self._last_serial + 1
# Write to file
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % serial)
# Keep it only if we were able to write the file
self._last_serial = serial
return serial
def _GetJobPath(self, job_id):
return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
def _ListJobFiles(self):
assert self.lock_fd, "Queue should be open"
return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
if self._RE_JOB_FILE.match(name)]
def _LoadJobUnlocked(self, filepath):
assert self.lock_fd, "Queue should be open"
logging.debug("Loading job from %s", filepath)
try:
fd = open(filepath, "r")
except IOError, err:
if err.errno in (errno.ENOENT, ):
return None
raise
try:
data = serializer.LoadJson(fd.read())
finally:
fd.close()
return _QueuedJob.Restore(self, data)
def _GetJobsUnlocked(self, job_ids):
if job_ids:
files = [self._GetJobPath(job_id) for job_id in job_ids]
else:
files = [os.path.join(constants.QUEUE_DIR, filename)
for filename in self._ListJobFiles()]
return [self._LoadJobUnlocked(filepath) for filepath in files]
@utils.LockedMethod
def GetJobs(self, job_ids):
return self._GetJobsUnlocked(job_ids)
@utils.LockedMethod
def AddJob(self, ops):
assert self.lock_fd, "Queue should be open"
# Get job identifier
job_id = self._NewSerialUnlocked()
job = _QueuedJob(self, job_id, ops)
# Write to disk
self._UpdateJobUnlocked(job)
return job
def _UpdateJobUnlocked(self, job):
assert self.lock_fd, "Queue should be open"
filename = self._GetJobPath(job.id)
logging.debug("Writing job %s to %s", job.id, filename)
utils.WriteFile(filename,
data=serializer.DumpJson(job.Serialize(), indent=False))
@utils.LockedMethod
def UpdateJob(self, job):
return self._UpdateJobUnlocked(job)
def ArchiveJob(self, job_id):
raise NotImplementedError()
class JobQueue:
"""The job queue.
"""
def __init__(self, context):
self._lock = threading.Lock()
self._last_job_id = 0
self._jobs = {}
self._jobs = JobStorage()
self._wpool = _JobQueueWorkerPool(context)
def _NewJobIdUnlocked(self):
"""Generates a new job identifier.
Returns: A string representing the job identifier.
for job in self._jobs.GetJobs(None):
status = job.GetStatus()
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask(job)
"""
self._last_job_id += 1
return str(self._last_job_id)
elif status in (constants.JOB_STATUS_RUNNING, ):
logging.warning("Unfinished job %s found: %s", job.id, job)
job.SetUnclean("Unclean master daemon shutdown")
@utils.LockedMethod
def SubmitJob(self, ops):
"""Add a new job to the queue.
......@@ -201,26 +407,12 @@ class JobQueue:
- ops: Sequence of opcodes
"""
# Get job identifier
self._lock.acquire()
try:
job_id = self._NewJobIdUnlocked()
finally:
self._lock.release()
job = _QueuedJob(ops, job_id)
# Add it to our internal queue
self._lock.acquire()
try:
self._jobs[job_id] = job
finally:
self._lock.release()
job = self._jobs.AddJob(ops)
# Add to worker pool
self._wpool.AddTask(job)
return job_id
return job.id
def ArchiveJob(self, job_id):
raise NotImplementedError()
......@@ -255,16 +447,9 @@ class JobQueue:
"""
self._lock.acquire()
try:
if not job_ids:
job_ids = self._jobs.keys()
# TODO: define sort order?
job_ids.sort()
jobs = []
for job_id in job_ids:
job = self._jobs.get(job_id, None)
for job in self._jobs.GetJobs(job_ids):
if job is None:
jobs.append(None)
else:
......@@ -274,8 +459,10 @@ class JobQueue:
finally:
self._lock.release()
@utils.LockedMethod
def Shutdown(self):
"""Stops the job queue.
"""
self._wpool.TerminateWorkers()
self._jobs.Close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment