Commit 76b62028 authored by Iustin Pop's avatar Iustin Pop
Browse files

Switch job IDs to numeric



This has been a long-standing cleanup item, which we've always
refrained from doing due to the high estimated effort needed.

In reality, it turned out that after some infrastructure improvements
(the previous patches), the actual job queue-related changes are quite
small.

We will need to update the NEWS file later, but so far the RAPI
documentation doesn't mention that the job ID is a string (it only
says it is "a number"), so it doesn't look like it needs update.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarRené Nussbaumer <rn@google.com>
parent 6f287cf3
......@@ -205,7 +205,7 @@ execJobSet master nl il cref (js:jss) = do
case jids of
Bad x -> return $ Bad x
Ok x -> do
putStrLn $ "Got job IDs " ++ commaJoin x
putStrLn $ "Got job IDs " ++ commaJoin (map show x)
waitForJobs client x
)
case jrs of
......
......@@ -44,6 +44,7 @@ module Ganeti.Luxi
) where
import Data.IORef
import Data.Ratio (numerator, denominator)
import Control.Monad
import Text.JSON (encodeStrict, decodeStrict)
import qualified Text.JSON as J
......@@ -73,7 +74,7 @@ withTimeout secs descr action = do
-- * Generic protocol functionality
-- | The Ganeti job type.
type JobId = String
type JobId = Int
$(declareSADT "QrViaLuxi"
[ ("QRLock", 'qrLock)
......@@ -107,7 +108,7 @@ $(genLuxiOp "LuxiOp"
, ("lock", [t| Bool |], [| id |])
])
, (luxiReqQueryJobs,
[ ("ids", [t| [Int] |], [| map show |])
[ ("ids", [t| [Int] |], [| id |])
, ("fields", [t| [String] |], [| id |])
])
, (luxiReqQueryExports,
......@@ -129,21 +130,21 @@ $(genLuxiOp "LuxiOp"
[ ("ops", [t| [[OpCode]] |], [| id |]) ]
)
, (luxiReqWaitForJobChange,
[ ("job", [t| Int |], [| show |])
[ ("job", [t| Int |], [| id |])
, ("fields", [t| [String]|], [| id |])
, ("prev_job", [t| JSValue |], [| id |])
, ("prev_log", [t| JSValue |], [| id |])
, ("tmout", [t| Int |], [| id |])
])
, (luxiReqArchiveJob,
[ ("job", [t| Int |], [| show |]) ]
[ ("job", [t| Int |], [| id |]) ]
)
, (luxiReqAutoArchiveJobs,
[ ("age", [t| Int |], [| id |])
, ("tmout", [t| Int |], [| id |])
])
, (luxiReqCancelJob,
[ ("job", [t| Int |], [| show |]) ]
[ ("job", [t| Int |], [| id |]) ]
)
, (luxiReqSetDrainFlag,
[ ("flag", [t| Bool |], [| id |]) ]
......@@ -267,7 +268,7 @@ decodeCall (LuxiCall call args) =
case call of
ReqQueryJobs -> do
(jid, jargs) <- fromJVal args
rid <- mapM (tryRead "parsing job ID" . fromJSString) jid
rid <- mapM parseJobId jid
let rargs = map fromJSString jargs
return $ QueryJobs rid rargs
ReqQueryInstances -> do
......@@ -307,11 +308,11 @@ decodeCall (LuxiCall call args) =
J.readJSON d `ap`
J.readJSON e
_ -> J.Error "Not enough values"
rid <- tryRead "parsing job ID" jid
rid <- parseJobId jid
return $ WaitForJobChange rid fields pinfo pidx wtmout
ReqArchiveJob -> do
[jid] <- fromJVal args
rid <- tryRead "parsing job ID" jid
rid <- parseJobId jid
return $ ArchiveJob rid
ReqAutoArchiveJobs -> do
(age, tmout) <- fromJVal args
......@@ -327,7 +328,7 @@ decodeCall (LuxiCall call args) =
return $ QueryTags kind name
ReqCancelJob -> do
[job] <- fromJVal args
rid <- tryRead "parsing job ID" job
rid <- parseJobId job
return $ CancelJob rid
ReqSetDrainFlag -> do
[flag] <- fromJVal args
......@@ -359,7 +360,12 @@ callMethod method s = do
-- | Parses a job ID.
parseJobId :: JSValue -> Result JobId
parseJobId (JSString x) = Ok $ fromJSString x
parseJobId (JSString x) = tryRead "parsing job id" . fromJSString $ x
parseJobId (JSRational _ x) =
if denominator x /= 1
then Bad $ "Got fractional job ID from master daemon?! Value:" ++ show x
-- FIXME: potential integer overflow here on 32-bit platforms
else Ok . fromIntegral . numerator $ x
parseJobId x = Bad $ "Wrong type/value for job id: " ++ show x
-- | Parse job submission result.
......@@ -383,7 +389,7 @@ submitManyJobs s jobs = do
-- | Custom queryJobs call.
queryJobsStatus :: Client -> [JobId] -> IO (Result [JobStatus])
queryJobsStatus s jids = do
rval <- callMethod (QueryJobs (map read jids) ["status"]) s
rval <- callMethod (QueryJobs jids ["status"]) s
return $ case rval of
Bad x -> Bad x
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
......
......@@ -312,7 +312,8 @@ def _TestJobSubmission(opts):
result = cl.SubmitManyJobs(jobs)
if not (len(result) == 2 and
compat.all(len(i) == 2 for i in result) and
compat.all(isinstance(i[1], basestring) for i in result) and
isinstance(result[0][1], int) and
isinstance(result[1][1], basestring) and
result[0][0] and not result[1][0]):
raise errors.OpExecError("Submitting multiple jobs did not work as"
" expected, result %s" % result)
......
#
#
# Copyright (C) 2006, 2007 Google Inc.
# Copyright (C) 2006, 2007, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -60,6 +60,21 @@ def _FormatStatus(value):
raise errors.ProgrammerError("Unknown job status code '%s'" % value)
def _ParseJobIds(args):
"""Parses a list of string job IDs into integers.
@param args: list of strings
@return: list of integers
@raise OpPrereqError: in case of invalid values
"""
try:
return [int(a) for a in args]
except (ValueError, TypeError), err:
raise errors.OpPrereqError("Invalid job ID passed: %s" % err,
errors.ECODE_INVAL)
def ListJobs(opts, args):
"""List the jobs
......@@ -85,7 +100,7 @@ def ListJobs(opts, args):
opts.separator, not opts.no_headers,
format_override=fmtoverride, verbose=opts.verbose,
force_filter=opts.force_filter, namefield="id",
qfilter=qfilter)
qfilter=qfilter, isnumeric=True)
def ListJobFields(opts, args):
......@@ -203,8 +218,8 @@ def ShowJobs(opts, args):
"opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts",
]
result = GetClient().Query(constants.QR_JOB, selected_fields,
qlang.MakeSimpleFilter("id", args)).data
qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args))
result = GetClient().Query(constants.QR_JOB, selected_fields, qfilter).data
first = True
......
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -220,7 +220,7 @@ class _QueuedJob(object):
raise errors.GenericError("A job needs at least one opcode")
self.queue = queue
self.id = job_id
self.id = int(job_id)
self.ops = [_QueuedOpCode(op) for op in ops]
self.log_serial = 0
self.received_timestamp = TimeStampNow()
......@@ -267,7 +267,7 @@ class _QueuedJob(object):
"""
obj = _QueuedJob.__new__(cls)
obj.queue = queue
obj.id = state["id"]
obj.id = int(state["id"])
obj.received_timestamp = state.get("received_timestamp", None)
obj.start_timestamp = state.get("start_timestamp", None)
obj.end_timestamp = state.get("end_timestamp", None)
......@@ -1384,14 +1384,14 @@ class _JobDependencyManager:
@type job: L{_QueuedJob}
@param job: Job object
@type dep_job_id: string
@type dep_job_id: int
@param dep_job_id: ID of dependency job
@type dep_status: list
@param dep_status: Required status
"""
assert ht.TString(job.id)
assert ht.TString(dep_job_id)
assert ht.TJobId(job.id)
assert ht.TJobId(dep_job_id)
assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
if job.id == dep_job_id:
......@@ -1446,11 +1446,11 @@ class _JobDependencyManager:
@attention: Do not call until L{CheckAndRegister} returned a status other
than C{WAITDEP} for C{job_id}, or behaviour is undefined
@type job_id: string
@type job_id: int
@param job_id: Job ID
"""
assert ht.TString(job_id)
assert ht.TJobId(job_id)
self._lock.acquire()
try:
......@@ -1803,8 +1803,8 @@ class JobQueue(object):
@type count: integer
@param count: how many serials to return
@rtype: str
@return: a string representing the job identifier.
@rtype: list of int
@return: a list of job identifiers.
"""
assert ht.TPositiveInt(count)
......@@ -1870,9 +1870,9 @@ class JobQueue(object):
for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
m = constants.JOB_FILE_RE.match(filename)
if m:
jlist.append(m.group(1))
jlist.append(int(m.group(1)))
if sort:
jlist = utils.NiceSort(jlist)
jlist.sort()
return jlist
def _LoadJobUnlocked(self, job_id):
......@@ -1882,6 +1882,7 @@ class JobQueue(object):
existing, or try to load the job from the disk. If loading from
disk, it will also add the job to the cache.
@type job_id: int
@param job_id: the job id
@rtype: L{_QueuedJob} or None
@return: either None or the job object
......@@ -1920,7 +1921,7 @@ class JobQueue(object):
Given a job file, read, load and restore it in a _QueuedJob format.
@type job_id: string
@type job_id: int
@param job_id: job identifier
@type try_archived: bool
@param try_archived: Whether to try loading an archived job
......@@ -1968,7 +1969,7 @@ class JobQueue(object):
In case of error reading the job, it gets returned as None, and the
exception is logged.
@type job_id: string
@type job_id: int
@param job_id: job identifier
@type try_archived: bool
@param try_archived: Whether to try loading an archived job
......@@ -2181,14 +2182,11 @@ class JobQueue(object):
def _GetJobStatusForDependencies(self, job_id):
"""Gets the status of a job for dependencies.
@type job_id: string
@type job_id: int
@param job_id: Job ID
@raise errors.JobLost: If job can't be found
"""
if not isinstance(job_id, basestring):
job_id = jstore.FormatJobID(job_id)
# Not using in-memory cache as doing so would require an exclusive lock
# Try to load from disk
......@@ -2229,7 +2227,7 @@ class JobQueue(object):
timeout):
"""Waits for changes in a job.
@type job_id: string
@type job_id: int
@param job_id: Job identifier
@type fields: list of strings
@param fields: Which fields to check for changes
......@@ -2264,7 +2262,7 @@ class JobQueue(object):
This will only succeed if the job has not started yet.
@type job_id: string
@type job_id: int
@param job_id: job ID of job to be cancelled.
"""
......@@ -2331,7 +2329,7 @@ class JobQueue(object):
This is just a wrapper over L{_ArchiveJobsUnlocked}.
@type job_id: string
@type job_id: int
@param job_id: Job ID of job to be archived.
@rtype: bool
@return: Whether job was archived
......@@ -2433,9 +2431,9 @@ class JobQueue(object):
@param qfilter: Query filter
"""
(qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
(qobj, ctx, _) = self._Query(fields, qfilter)
return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
def OldStyleQueryJobs(self, job_ids, fields):
"""Returns a list of jobs in queue.
......@@ -2449,11 +2447,13 @@ class JobQueue(object):
the requested fields
"""
# backwards compat:
job_ids = [int(jid) for jid in job_ids]
qfilter = qlang.MakeSimpleFilter("id", job_ids)
(qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
(qobj, ctx, _) = self._Query(fields, qfilter)
return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
return qobj.OldStyleQuery(ctx, sort_by_name=False)
@locking.ssynchronized(_LOCK)
def PrepareShutdown(self):
......
#
#
# Copyright (C) 2006, 2007 Google Inc.
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -174,15 +174,14 @@ def SetDrainFlag(drain_flag):
def FormatJobID(job_id):
"""Convert a job ID to string format.
"""Convert a job ID to int format.
Currently this just does C{str(job_id)} after performing some
checks, but if we want to change the job id format this will
abstract this change.
Currently this just is a no-op that performs some checks, but if we
want to change the job id format this will abstract this change.
@type job_id: int or long
@param job_id: the numeric job id
@rtype: str
@rtype: int
@return: the formatted job id
"""
......@@ -191,7 +190,7 @@ def FormatJobID(job_id):
if job_id < 0:
raise errors.ProgrammerError("Job ID %s is negative" % job_id)
return str(job_id)
return job_id
def GetArchiveDirectory(job_id):
......
......@@ -298,7 +298,7 @@ def _MakeFilterPart(namefield, text, isnumeric=False):
try:
number = int(text)
except (TypeError, ValueError), err:
raise errors.OpPrereqError("Invalid integer passed: %s" % str(err),
raise errors.OpPrereqError("Invalid job ID passed: %s" % str(err),
errors.ECODE_INVAL)
return [OP_EQUAL, namefield, number]
elif _CheckGlobbing(text):
......
......@@ -746,6 +746,7 @@ class Query:
(status, name) = _ProcessResult(self._name_fn(ctx, item))
assert status == constants.RS_NORMAL
# TODO: Are there cases where we wouldn't want to use NiceSort?
# Answer: if the name field is non-string...
result.append((utils.NiceSortKey(name), idx, row))
else:
result.append(row)
......@@ -2267,7 +2268,7 @@ def _BuildJobFields():
"""
fields = [
(_MakeField("id", "ID", QFT_TEXT, "Job ID"),
(_MakeField("id", "ID", QFT_NUMBER, "Job ID"),
None, QFF_JOB_ID, lambda _, (job_id, job): job_id),
(_MakeField("status", "Status", QFT_TEXT, "Job status"),
None, 0, _JobUnavail(lambda job: job.CalcStatus())),
......
......@@ -148,7 +148,8 @@ def _DoTests(uris):
def _VerifyReturnsJob(data):
AssertMatch(data, r"^\d+$")
if not isinstance(data, int):
AssertMatch(data, r"^\d+$")
def TestVersion():
......
#!/usr/bin/python
#
# Copyright (C) 2010, 2011 Google Inc.
# Copyright (C) 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -702,7 +702,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# Check job status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.GetInfo(["id"]), [str(job_id)])
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
# Check opcode status
......@@ -926,7 +926,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
for i in range(3)]
# Create job
job_id = str(28492)
job_id = 28492
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
......
......@@ -36,10 +36,10 @@ import testutils
class TestFormatJobID(testutils.GanetiTestCase):
def test(self):
self.assertEqual(jstore.FormatJobID(0), "0")
self.assertEqual(jstore.FormatJobID(30498), "30498")
self.assertEqual(jstore.FormatJobID(0), 0)
self.assertEqual(jstore.FormatJobID(30498), 30498)
self.assertEqual(jstore.FormatJobID(319472592764518609),
"319472592764518609")
319472592764518609)
def testErrors(self):
for i in [-1, -2288, -9667, -0.205641, 0.0, 0.1, 13041.4472, "", "Hello",
......
......@@ -1162,14 +1162,19 @@ class TestQueryFilter(unittest.TestCase):
for (what, fielddefs) in query.ALL_FIELDS.items():
if what == constants.QR_JOB:
namefield = "id"
elif what == constants.QR_EXPORT:
namefield = "export"
nameval = 123
namevalempty = 0
genval = lambda i: i * 10
randvals = [17361, 22015, 13193, 15215]
else:
namefield = "name"
nameval = "abc"
namevalempty = ""
genval = lambda i: "x%s" % i
randvals = ["x17361", "x22015", "x13193", "x15215"]
nameval = "abc"
namevalempty = ""
genval = lambda i: "x%s" % i
randvals = ["x17361", "x22015", "x13193", "x15215"]
if what == constants.QR_EXPORT:
namefield = "export"
else:
namefield = "name"
assert namefield in fielddefs
......@@ -1247,11 +1252,13 @@ class TestQueryFilter(unittest.TestCase):
for (what, fielddefs) in query.ALL_FIELDS.items():
if what == constants.QR_JOB:
namefield = "id"
nameval = 123
elif what == constants.QR_EXPORT:
namefield = "export"
nameval = "value"
else:
namefield = "name"
nameval = "value"
nameval = "value"
checks = [
[], ["="], ["=", "foo"], ["unknownop"], ["!"],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment