diff --git a/lib/constants.py b/lib/constants.py index 053ee04a5935bac82305c47d33b63ab98b6b8821..d3a8c10c308a26d4dd5ee61b9cfb723a55edf7ae 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -91,6 +91,7 @@ CLUSTER_CONF_FILE = DATA_DIR + "/config.data" SSL_CERT_FILE = DATA_DIR + "/server.pem" WATCHER_STATEFILE = DATA_DIR + "/watcher.data" SSH_KNOWN_HOSTS_FILE = DATA_DIR + "/known_hosts" +QUEUE_DIR = DATA_DIR + "/queue" ETC_HOSTS = "/etc/hosts" DEFAULT_FILE_STORAGE_DIR = _autoconf.FILE_STORAGE_DIR MASTER_SOCKET = RUN_GANETI_DIR + "/master.sock" @@ -244,6 +245,12 @@ IARUN_NOTFOUND = 1 IARUN_FAILURE = 2 IARUN_SUCCESS = 3 +# Job queue +JOB_QUEUE_VERSION = 1 +JOB_QUEUE_LOCK_FILE = QUEUE_DIR + "/lock" +JOB_QUEUE_VERSION_FILE = QUEUE_DIR + "/version" +JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial" + # Job status JOB_STATUS_QUEUED = "queued" JOB_STATUS_RUNNING = "running" diff --git a/lib/errors.py b/lib/errors.py index 9a62409b7905efe7dd39a2298b873e859fbf23e0..50a194117cbe1ed68a819727e2bec163dc428261 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -236,3 +236,8 @@ class QuitGanetiException(Exception): """ + +class JobQueueError(Exception): + """Job queue error. + + """ diff --git a/lib/jqueue.py b/lib/jqueue.py index 67054aefc6358b89cfec6a20c9ff5518c7f2ddd1..2e765af0d31498070beb48ee4a6f0c14aa2f4008 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -21,11 +21,16 @@ """Module implementing the job queue handling.""" +import os import logging import threading +import errno +import re from ganeti import constants +from ganeti import serializer from ganeti import workerpool +from ganeti import opcodes from ganeti import errors from ganeti import mcpu from ganeti import utils @@ -41,10 +46,28 @@ class _QueuedOpCode(object): """ def __init__(self, op): - self.input = op - self.status = constants.OP_STATUS_QUEUED - self.result = None + self.__Setup(op, constants.OP_STATUS_QUEUED, None) + + def __Setup(self, input, status, result): self._lock = threading.Lock() + self.input = input + self.status = status + self.result = result + + @classmethod + def Restore(cls, state): + obj = object.__new__(cls) + obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]), + state["status"], state["result"]) + return obj + + @utils.LockedMethod + def Serialize(self): + return { + "input": self.input.__getstate__(), + "status": self.status, + "result": self.result, + } @utils.LockedMethod def GetInput(self): @@ -82,17 +105,37 @@ class _QueuedJob(object): This is what we use to track the user-submitted jobs. """ - def __init__(self, ops, job_id): + def __init__(self, storage, job_id, ops): if not ops: # TODO raise Exception("No opcodes") - self.id = job_id - self._lock = threading.Lock() + self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops]) - # _ops should not be modified again because we don't acquire the lock - # to use it. - self._ops = [_QueuedOpCode(op) for op in ops] + def __Setup(self, storage, job_id, ops): + self.storage = storage + self.id = job_id + self._ops = ops + + @classmethod + def Restore(cls, storage, state): + obj = object.__new__(cls) + obj.__Setup(storage, state["id"], + [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]) + return obj + + def Serialize(self): + return { + "id": self.id, + "ops": [op.Serialize() for op in self._ops], + } + + def SetUnclean(self, msg): + try: + for op in self._ops: + op.SetStatus(constants.OP_STATUS_ERROR, msg) + finally: + self.storage.UpdateJob(self) def GetStatus(self): status = constants.JOB_STATUS_QUEUED @@ -107,10 +150,12 @@ class _QueuedJob(object): if op_status == constants.OP_STATUS_QUEUED: pass - elif op_status == constants.OP_STATUS_ERROR: - status = constants.JOB_STATUS_ERROR elif op_status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING + elif op_status == constants.OP_STATUS_ERROR: + status = constants.JOB_STATUS_ERROR + # The whole job fails if one opcode failed + break if all_success: status = constants.JOB_STATUS_SUCCESS @@ -133,15 +178,20 @@ class _QueuedJob(object): try: logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) op.SetStatus(constants.OP_STATUS_RUNNING, None) + self.storage.UpdateJob(self) result = proc.ExecOpCode(op.input) op.SetStatus(constants.OP_STATUS_SUCCESS, result) + self.storage.UpdateJob(self) logging.debug("Op %s/%s: Successfully finished %s", idx + 1, count, op) except Exception, err: - op.SetStatus(constants.OP_STATUS_ERROR, str(err)) - logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) + try: + op.SetStatus(constants.OP_STATUS_ERROR, str(err)) + logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) + finally: + self.storage.UpdateJob(self) raise except errors.GenericError, err: @@ -172,25 +222,181 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): self.context = context +class JobStorage(object): + _RE_JOB_FILE = re.compile(r"^job-\d+$") + + def __init__(self): + self._lock = threading.Lock() + + # Make sure our directory exists + try: + os.mkdir(constants.QUEUE_DIR, 0700) + except OSError, err: + if err.errno not in (errno.EEXIST, ): + raise + + # Get queue lock + self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w") + try: + utils.LockFile(self.lock_fd) + except: + self.lock_fd.close() + raise + + # Read version + try: + version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") + except IOError, err: + if err.errno not in (errno.ENOENT, ): + raise + + # Setup a new queue + self._InitQueueUnlocked() + + # Try to open again + version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") + + try: + # Try to read version + version = int(version_fd.read(128)) + + # Verify version + if version != constants.JOB_QUEUE_VERSION: + raise errors.JobQueueError("Found version %s, expected %s", + version, constants.JOB_QUEUE_VERSION) + finally: + version_fd.close() + + serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r") + try: + # Read last serial + self._last_serial = int(serial_fd.read(1024).strip()) + finally: + serial_fd.close() + + def Close(self): + assert self.lock_fd, "Queue should be open" + + self.lock_fd.close() + self.lock_fd = None + + def _InitQueueUnlocked(self): + assert self.lock_fd, "Queue should be open" + + utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, + data="%s\n" % constants.JOB_QUEUE_VERSION) + utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, + data="%s\n" % 0) + + def _NewSerialUnlocked(self): + """Generates a new job identifier. + + Job identifiers are unique during the lifetime of a cluster. + + Returns: A string representing the job identifier. + + """ + assert self.lock_fd, "Queue should be open" + + # New number + serial = self._last_serial + 1 + + # Write to file + utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, + data="%s\n" % serial) + + # Keep it only if we were able to write the file + self._last_serial = serial + + return serial + + def _GetJobPath(self, job_id): + return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) + + def _ListJobFiles(self): + assert self.lock_fd, "Queue should be open" + + return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR) + if self._RE_JOB_FILE.match(name)] + + def _LoadJobUnlocked(self, filepath): + assert self.lock_fd, "Queue should be open" + + logging.debug("Loading job from %s", filepath) + try: + fd = open(filepath, "r") + except IOError, err: + if err.errno in (errno.ENOENT, ): + return None + raise + try: + data = serializer.LoadJson(fd.read()) + finally: + fd.close() + + return _QueuedJob.Restore(self, data) + + def _GetJobsUnlocked(self, job_ids): + if job_ids: + files = [self._GetJobPath(job_id) for job_id in job_ids] + else: + files = [os.path.join(constants.QUEUE_DIR, filename) + for filename in self._ListJobFiles()] + + return [self._LoadJobUnlocked(filepath) for filepath in files] + + @utils.LockedMethod + def GetJobs(self, job_ids): + return self._GetJobsUnlocked(job_ids) + + @utils.LockedMethod + def AddJob(self, ops): + assert self.lock_fd, "Queue should be open" + + # Get job identifier + job_id = self._NewSerialUnlocked() + job = _QueuedJob(self, job_id, ops) + + # Write to disk + self._UpdateJobUnlocked(job) + + return job + + def _UpdateJobUnlocked(self, job): + assert self.lock_fd, "Queue should be open" + + filename = self._GetJobPath(job.id) + logging.debug("Writing job %s to %s", job.id, filename) + utils.WriteFile(filename, + data=serializer.DumpJson(job.Serialize(), indent=False)) + + @utils.LockedMethod + def UpdateJob(self, job): + return self._UpdateJobUnlocked(job) + + def ArchiveJob(self, job_id): + raise NotImplementedError() + + class JobQueue: """The job queue. """ def __init__(self, context): self._lock = threading.Lock() - self._last_job_id = 0 - self._jobs = {} + self._jobs = JobStorage() self._wpool = _JobQueueWorkerPool(context) - def _NewJobIdUnlocked(self): - """Generates a new job identifier. - - Returns: A string representing the job identifier. + for job in self._jobs.GetJobs(None): + status = job.GetStatus() + if status in (constants.JOB_STATUS_QUEUED, ): + self._wpool.AddTask(job) - """ - self._last_job_id += 1 - return str(self._last_job_id) + elif status in (constants.JOB_STATUS_RUNNING, ): + logging.warning("Unfinished job %s found: %s", job.id, job) + job.SetUnclean("Unclean master daemon shutdown") + @utils.LockedMethod def SubmitJob(self, ops): """Add a new job to the queue. @@ -201,26 +407,12 @@ class JobQueue: - ops: Sequence of opcodes """ - # Get job identifier - self._lock.acquire() - try: - job_id = self._NewJobIdUnlocked() - finally: - self._lock.release() - - job = _QueuedJob(ops, job_id) - - # Add it to our internal queue - self._lock.acquire() - try: - self._jobs[job_id] = job - finally: - self._lock.release() + job = self._jobs.AddJob(ops) # Add to worker pool self._wpool.AddTask(job) - return job_id + return job.id def ArchiveJob(self, job_id): raise NotImplementedError() @@ -255,16 +447,9 @@ class JobQueue: """ self._lock.acquire() try: - if not job_ids: - job_ids = self._jobs.keys() - - # TODO: define sort order? - job_ids.sort() - jobs = [] - for job_id in job_ids: - job = self._jobs.get(job_id, None) + for job in self._jobs.GetJobs(job_ids): if job is None: jobs.append(None) else: @@ -274,8 +459,10 @@ class JobQueue: finally: self._lock.release() + @utils.LockedMethod def Shutdown(self): """Stops the job queue. """ self._wpool.TerminateWorkers() + self._jobs.Close()