diff --git a/lib/jqueue.py b/lib/jqueue.py index a3ef74836eaa5b3710ccab9813b8937304a880db..4ad1397a87ecbb3572fd4d72ce186afc8aa51662 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -21,9 +21,11 @@ """Module implementing the job queue handling. -Locking: -There's a single, large lock in the JobQueue class. It's used by all other -classes in this module. +Locking: there's a single, large lock in the L{JobQueue} class. It's +used by all other classes in this module. + +@var JOBQUEUE_THREADS: the number of worker threads we start for + processing jobs """ @@ -51,17 +53,34 @@ JOBQUEUE_THREADS = 25 def TimeStampNow(): + """Returns the current timestamp. + + @rtype: tuple + @return: the current time in the (seconds, microseconds) format + + """ return utils.SplitTime(time.time()) class _QueuedOpCode(object): """Encasulates an opcode object. - The 'log' attribute holds the execution log and consists of tuples - of the form (log_serial, timestamp, level, message). + @ivar log: holds the execution log and consists of tuples + of the form C{(log_serial, timestamp, level, message)} + @ivar input: the OpCode we encapsulate + @ivar status: the current status + @ivar result: the result of the LU execution + @ivar start_timestamp: timestamp for the start of the execution + @ivar stop_timestamp: timestamp for the end of the execution """ def __init__(self, op): + """Constructor for the _QuededOpCode. + + @type op: L{opcodes.OpCode} + @param op: the opcode we encapsulate + + """ self.input = op self.status = constants.OP_STATUS_QUEUED self.result = None @@ -71,6 +90,14 @@ class _QueuedOpCode(object): @classmethod def Restore(cls, state): + """Restore the _QueuedOpCode from the serialized form. + + @type state: dict + @param state: the serialized state + @rtype: _QueuedOpCode + @return: a new _QueuedOpCode instance + + """ obj = _QueuedOpCode.__new__(cls) obj.input = opcodes.OpCode.LoadOpCode(state["input"]) obj.status = state["status"] @@ -81,6 +108,12 @@ class _QueuedOpCode(object): return obj def Serialize(self): + """Serializes this _QueuedOpCode. + + @rtype: dict + @return: the dictionary holding the serialized state + + """ return { "input": self.input.__getstate__(), "status": self.status, @@ -94,13 +127,39 @@ class _QueuedOpCode(object): class _QueuedJob(object): """In-memory job representation. - This is what we use to track the user-submitted jobs. Locking must be taken - care of by users of this class. + This is what we use to track the user-submitted jobs. Locking must + be taken care of by users of this class. + + @type queue: L{JobQueue} + @ivar queue: the parent queue + @ivar id: the job ID + @type ops: list + @ivar ops: the list of _QueuedOpCode that constitute the job + @type run_op_index: int + @ivar run_op_index: the currently executing opcode, or -1 if + we didn't yet start executing + @type log_serial: int + @ivar log_serial: holds the index for the next log entry + @ivar received_timestamp: the timestamp for when the job was received + @ivar start_timestmap: the timestamp for start of execution + @ivar end_timestamp: the timestamp for end of execution + @ivar change: a Condition variable we use for waiting for job changes """ def __init__(self, queue, job_id, ops): + """Constructor for the _QueuedJob. + + @type queue: L{JobQueue} + @param queue: our parent queue + @type job_id: job_id + @param job_id: our job id + @type ops: list + @param ops: the list of opcodes we hold, which will be encapsulated + in _QueuedOpCodes + + """ if not ops: - # TODO + # TODO: use a better exception raise Exception("No opcodes") self.queue = queue @@ -117,6 +176,16 @@ class _QueuedJob(object): @classmethod def Restore(cls, queue, state): + """Restore a _QueuedJob from serialized state: + + @type queue: L{JobQueue} + @param queue: to which queue the restored job belongs + @type state: dict + @param state: the serialized state + @rtype: _JobQueue + @return: the restored _JobQueue instance + + """ obj = _QueuedJob.__new__(cls) obj.queue = queue obj.id = state["id"] @@ -139,6 +208,12 @@ class _QueuedJob(object): return obj def Serialize(self): + """Serialize the _JobQueue instance. + + @rtype: dict + @return: the serialized state + + """ return { "id": self.id, "ops": [op.Serialize() for op in self.ops], @@ -149,6 +224,26 @@ class _QueuedJob(object): } def CalcStatus(self): + """Compute the status of this job. + + This function iterates over all the _QueuedOpCodes in the job and + based on their status, computes the job status. + + The algorithm is: + - if we find a cancelled, or finished with error, the job + status will be the same + - otherwise, the last opcode with the status one of: + - waitlock + - running + + will determine the job status + + - otherwise, it means either all opcodes are queued, or success, + and the job status will be the same + + @return: the job status + + """ status = constants.JOB_STATUS_QUEUED all_success = True @@ -178,6 +273,16 @@ class _QueuedJob(object): return status def GetLogEntries(self, newer_than): + """Selectively returns the log entries. + + @type newer_than: None or int + @param newer_than: if this is None, return all log enties, + otherwise return only the log entries with serial higher + than this value + @rtype: list + @return: the list of the log entries selected + + """ if newer_than is None: serial = -1 else: @@ -191,6 +296,9 @@ class _QueuedJob(object): class _JobQueueWorker(workerpool.BaseWorker): + """The actual job workers. + + """ def _NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -215,6 +323,9 @@ class _JobQueueWorker(workerpool.BaseWorker): This functions processes a job. It is closely tied to the _QueuedJob and _QueuedOpCode classes. + @type job: L{_QueuedJob} + @param job: the job to be processed + """ logging.debug("Worker %s processing job %s", self.worker_id, job.id) @@ -316,6 +427,9 @@ class _JobQueueWorker(workerpool.BaseWorker): class _JobQueueWorkerPool(workerpool.WorkerPool): + """Simple class implementing a job-processing workerpool. + + """ def __init__(self, queue): super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS, _JobQueueWorker) @@ -323,17 +437,22 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): class JobQueue(object): + """Quue used to manaage the jobs. + + @cvar _RE_JOB_FILE: regex matching the valid job file names + + """ _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) def _RequireOpenQueue(fn): """Decorator for "public" functions. - This function should be used for all "public" functions. That is, functions - usually called from other classes. + This function should be used for all 'public' functions. That is, + functions usually called from other classes. - Important: Use this decorator only after utils.LockedMethod! + @warning: Use this decorator only after utils.LockedMethod! - Example: + Example:: @utils.LockedMethod @_RequireOpenQueue def Example(self): @@ -346,6 +465,18 @@ class JobQueue(object): return wrapper def __init__(self, context): + """Constructor for JobQueue. + + The constructor will initialize the job queue object and then + start loading the current jobs from disk, either for starting them + (if they were queue) or for aborting them (if they were already + running). + + @type context: GanetiContext + @param context: the context object for access to the configuration + data and other ganeti objects + + """ self.context = context self._memcache = weakref.WeakValueDictionary() self._my_hostname = utils.HostInfo().name @@ -443,6 +574,12 @@ class JobQueue(object): @utils.LockedMethod @_RequireOpenQueue def RemoveNode(self, node_name): + """Callback called when removing nodes from the cluster. + + @type node_name: str + @param node_name: the name of the node to remove + + """ try: # The queue is removed by the "leave node" RPC call. del self._nodes[node_name] @@ -450,6 +587,19 @@ class JobQueue(object): pass def _CheckRpcResult(self, result, nodes, failmsg): + """Verifies the status of an RPC call. + + Since we aim to keep consistency should this node (the current + master) fail, we will log errors if our rpc fail, and especially + log the case when more than half of the nodes failes. + + @param result: the data as returned from the rpc call + @type nodes: list + @param nodes: the list of nodes we made the call to + @type failmsg: str + @param failmsg: the identifier to be used for logging + + """ failed = [] success = [] @@ -470,6 +620,10 @@ class JobQueue(object): def _GetNodeIp(self): """Helper for returning the node name/ip list. + @rtype: (list, list) + @return: a tuple of two lists, the first one with the node + names and the second one with the node addresses + """ name_list = self._nodes.keys() addr_list = [self._nodes[name] for name in name_list] @@ -478,6 +632,14 @@ class JobQueue(object): def _WriteAndReplicateFileUnlocked(self, file_name, data): """Writes a file locally and then replicates it to all nodes. + This function will replace the contents of a file on the local + node and then replicate it to all the other nodes we have. + + @type file_name: str + @param file_name: the path of the file to be replicated + @type data: str + @param data: the new contents of the file + """ utils.WriteFile(file_name, data=data) @@ -487,6 +649,17 @@ class JobQueue(object): "Updating %s" % file_name) def _RenameFileUnlocked(self, old, new): + """Renames a file locally and then replicate the change. + + This function will rename a file in the local queue directory + and then replicate this rename to all the other nodes we have. + + @type old: str + @param old: the current name of the file + @type new: str + @param new: the new name of the file + + """ os.rename(old, new) names, addrs = self._GetNodeIp() @@ -495,6 +668,18 @@ class JobQueue(object): "Moving %s to %s" % (old, new)) def _FormatJobID(self, job_id): + """Convert a job ID to string format. + + Currently this just does C{str(job_id)} after performing some + checks, but if we want to change the job id format this will + abstract this change. + + @type job_id: int or long + @param job_id: the numeric job id + @rtype: str + @return: the formatted job id + + """ if not isinstance(job_id, (int, long)): raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) if job_id < 0: @@ -507,7 +692,8 @@ class JobQueue(object): Job identifiers are unique during the lifetime of a cluster. - Returns: A string representing the job identifier. + @rtype: str + @return: a string representing the job identifier. """ # New number @@ -524,14 +710,40 @@ class JobQueue(object): @staticmethod def _GetJobPath(job_id): + """Returns the job file for a given job id. + + @type job_id: str + @param job_id: the job identifier + @rtype: str + @return: the path to the job file + + """ return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) @staticmethod def _GetArchivedJobPath(job_id): + """Returns the archived job file for a give job id. + + @type job_id: str + @param job_id: the job identifier + @rtype: str + @return: the path to the archived job file + + """ return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id) @classmethod def _ExtractJobID(cls, name): + """Extract the job id from a filename. + + @type name: str + @param name: the job filename + @rtype: job id or None + @return: the job id corresponding to the given filename, + or None if the filename does not represent a valid + job file + + """ m = cls._RE_JOB_FILE.match(name) if m: return m.group(1) @@ -548,16 +760,36 @@ class JobQueue(object): jobs are present on disk (so in the _memcache we don't have any extra IDs). + @rtype: list + @return: the list of job IDs + """ jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] jlist = utils.NiceSort(jlist) return jlist def _ListJobFiles(self): + """Returns the list of current job files. + + @rtype: list + @return: the list of job file names + + """ return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR) if self._RE_JOB_FILE.match(name)] def _LoadJobUnlocked(self, job_id): + """Loads a job from the disk or memory. + + Given a job id, this will return the cached job object if + existing, or try to load the job from the disk. If loading from + disk, it will also add the job to the cache. + + @param job_id: the job id + @rtype: L{_QueuedJob} or None + @return: either None or the job object + + """ job = self._memcache.get(job_id, None) if job: logging.debug("Found job %s in memcache", job_id) @@ -594,6 +826,15 @@ class JobQueue(object): return job def _GetJobsUnlocked(self, job_ids): + """Return a list of jobs based on their IDs. + + @type job_ids: list + @param job_ids: either an empty list (meaning all jobs), + or a list of job IDs + @rtype: list + @return: the list of job objects + + """ if not job_ids: job_ids = self._GetJobIDsUnlocked() @@ -606,6 +847,9 @@ class JobQueue(object): This currently uses the queue drain file, which makes it a per-node flag. In the future this can be moved to the config file. + @rtype: boolean + @return: True of the job queue is marked for draining + """ return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) @@ -616,6 +860,9 @@ class JobQueue(object): This is similar to the function L{backend.JobQueueSetDrainFlag}, and in the future we might merge them. + @type drain_flag: boolean + @param drain_flag: wheter to set or unset the drain flag + """ if drain_flag: utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) @@ -633,6 +880,9 @@ class JobQueue(object): @type ops: list @param ops: The list of OpCodes that will become the new job. + @rtype: job ID + @return: the job ID of the newly created job + @raise errors.JobQueueDrainError: if the job is marked for draining """ if self._IsQueueMarkedDrain(): @@ -654,6 +904,16 @@ class JobQueue(object): @_RequireOpenQueue def UpdateJobUnlocked(self, job): + """Update a job's on disk storage. + + After a job has been modified, this function needs to be called in + order to write the changes to disk and replicate them to the other + nodes. + + @type job: L{_QueuedJob} + @param job: the changed job + + """ filename = self._GetJobPath(job.id) data = serializer.DumpJson(job.Serialize(), indent=False) logging.debug("Writing job %s to %s", job.id, filename) @@ -678,6 +938,14 @@ class JobQueue(object): @param prev_log_serial: Last job message serial number @type timeout: float @param timeout: maximum time to wait + @rtype: tuple (job info, log entries) + @return: a tuple of the job information as required via + the fields parameter, and the log entries as a list + + if the job has not changed and the timeout has expired, + we instead return a special value, + L{constants.JOB_NOTCHANGED}, which should be interpreted + as such by the clients """ logging.debug("Waiting for changes in job %s", job_id) @@ -729,8 +997,10 @@ class JobQueue(object): def CancelJob(self, job_id): """Cancels a job. + This will only succeed if the job has not started yet. + @type job_id: string - @param job_id: Job ID of job to be cancelled. + @param job_id: job ID of job to be cancelled. """ logging.debug("Cancelling job %s", job_id) @@ -784,6 +1054,8 @@ class JobQueue(object): def ArchiveJob(self, job_id): """Archives a job. + This is just a wrapper over L{_ArchiveJobUnlocked}. + @type job_id: string @param job_id: Job ID of job to be archived. @@ -825,6 +1097,18 @@ class JobQueue(object): self._ArchiveJobUnlocked(jid) def _GetJobInfoUnlocked(self, job, fields): + """Returns information about a job. + + @type job: L{_QueuedJob} + @param job: the job which we query + @type fields: list + @param fields: names of fields to return + @rtype: list + @return: list with one element for each field + @raise errors.OpExecError: when an invalid field + has been passed + + """ row = [] for fname in fields: if fname == "id": @@ -860,9 +1144,16 @@ class JobQueue(object): def QueryJobs(self, job_ids, fields): """Returns a list of jobs in queue. - Args: - - job_ids: Sequence of job identifiers or None for all - - fields: Names of fields to return + This is a wrapper of L{_GetJobsUnlocked}, which actually does the + processing for each job. + + @type job_ids: list + @param job_ids: sequence of job identifiers or None for all + @type fields: list + @param fields: names of fields to return + @rtype: list + @return: list one element per job, each element being list with + the requested fields """ jobs = [] @@ -880,6 +1171,8 @@ class JobQueue(object): def Shutdown(self): """Stops the job queue. + This shutdowns all the worker threads an closes the queue. + """ self._wpool.TerminateWorkers()