diff --git a/lib/jqueue.py b/lib/jqueue.py index 17f099aafc1565aed8b3c10d02b2422d36863875..eb4ebde84d2603b3866a2a818b0cf487873ace3d 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -264,10 +264,41 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): self.context = context -class DiskJobStorage(object): +class JobStorageBase(object): + def __init__(self, id_prefix): + self.id_prefix = id_prefix + + if id_prefix: + prefix_pattern = re.escape("%s-" % id_prefix) + else: + prefix_pattern = "" + + # Apart from the prefix, all job IDs are numeric + self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern) + + def OwnsJobId(self, job_id): + return self._re_job_id.match(job_id) + + def FormatJobID(self, job_id): + if not isinstance(job_id, (int, long)): + raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) + if job_id < 0: + raise errors.ProgrammerError("Job ID %s is negative" % job_id) + + if self.id_prefix: + prefix = "%s-" % self.id_prefix + else: + prefix = "" + + return "%s%010d" % (prefix, job_id) + + +class DiskJobStorage(JobStorageBase): _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) - def __init__(self): + def __init__(self, id_prefix): + JobStorageBase.__init__(self, id_prefix) + self._lock = threading.Lock() self._memcache = {} self._my_hostname = utils.HostInfo().name @@ -383,7 +414,7 @@ class DiskJobStorage(object): if not result[node]: logging.error("copy of job queue file to node %s failed", node) - return str(serial) + return self.FormatJobID(serial) def _GetJobPath(self, job_id): return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) @@ -400,7 +431,7 @@ class DiskJobStorage(object): """ jfiles = self._ListJobFiles() - jlist = [int(m.group(1)) for m in + jlist = [m.group(1) for m in [self._RE_JOB_FILE.match(name) for name in jfiles]] jlist.sort() return jlist @@ -510,10 +541,10 @@ class DiskJobStorage(object): class JobQueue: """The job queue. - """ + """ def __init__(self, context): self._lock = threading.Lock() - self._jobs = DiskJobStorage() + self._jobs = DiskJobStorage("") self._wpool = _JobQueueWorkerPool(context) for job in self._jobs.GetJobs(None):