Commit 76b4ac58 authored by Petr Pudlak's avatar Petr Pudlak

Add optional fields for job livelocks and process IDs

This will allow to check if a particular job is alive, and send signals
to it when it's running.

The fields aren't serialized, if missing, for backwards compatibility.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent 3b8150c3
......@@ -222,7 +222,9 @@ class _QueuedJob(object):
# pylint: disable=W0212
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
"received_timestamp", "start_timestamp", "end_timestamp",
"__weakref__", "processor_lock", "writable", "archived"]
"processor_lock", "writable", "archived",
"livelock", "process_id",
"__weakref__"]
def AddReasons(self, pickup=False):
"""Extend the reason trail
......@@ -271,6 +273,8 @@ class _QueuedJob(object):
self.start_timestamp = None
self.end_timestamp = None
self.archived = False
self.livelock = None
self.process_id = None
self._InitInMemory(self, writable)
......@@ -321,6 +325,10 @@ class _QueuedJob(object):
obj.start_timestamp = state.get("start_timestamp", None)
obj.end_timestamp = state.get("end_timestamp", None)
obj.archived = archived
obj.livelock = state.get("livelock", None)
obj.process_id = state.get("process_id", None)
if obj.process_id is not None:
obj.process_id = int(obj.process_id)
obj.ops = []
obj.log_serial = 0
......@@ -347,6 +355,8 @@ class _QueuedJob(object):
"start_timestamp": self.start_timestamp,
"end_timestamp": self.end_timestamp,
"received_timestamp": self.received_timestamp,
"livelock": self.livelock,
"process_id": self.process_id,
}
def CalcStatus(self):
......
......@@ -104,9 +104,11 @@ import Ganeti.Path
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
import Ganeti.THH
import Ganeti.THH.Field
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Atomic
import Ganeti.Utils.Livelock (Livelock)
import Ganeti.VCluster (makeVirtualPath)
-- * Data types
......@@ -192,6 +194,9 @@ $(buildObject "QueuedJob" "qj"
simpleField "start_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "end_timestamp" [t| Timestamp |]
, optionalField $
simpleField "livelock" [t| FilePath |]
, optionalField $ processIdField "process_id"
])
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
......@@ -219,6 +224,8 @@ queuedJobFromOpCodes jobid ops = do
, qjReceivedTimestamp = Nothing
, qjStartTimestamp = Nothing
, qjEndTimestamp = Nothing
, qjLivelock = Nothing
, qjProcessId = Nothing
}
-- | Attach a received timestamp to a Queued Job.
......
......@@ -72,7 +72,7 @@ genQueuedOpCode =
emptyJob :: (Monad m) => m QueuedJob
emptyJob = do
jid0 <- makeJobId 0
return $ QueuedJob jid0 [] justNoTs justNoTs justNoTs
return $ QueuedJob jid0 [] justNoTs justNoTs justNoTs Nothing Nothing
-- | Generates a job ID.
genJobId :: Gen JobId
......@@ -94,7 +94,7 @@ prop_JobPriority =
forAll (listOf1 (genQueuedOpCode `suchThat`
(not . opStatusFinalized . qoStatus))) $ \ops -> do
jid0 <- makeJobId 0
let job = QueuedJob jid0 ops justNoTs justNoTs justNoTs
let job = QueuedJob jid0 ops justNoTs justNoTs justNoTs Nothing Nothing
calcJobPriority job ==? minimum (map qoPriority ops)
-- | Tests default job status.
......@@ -108,7 +108,7 @@ prop_JobStatus :: Property
prop_JobStatus =
forAll genJobId $ \jid ->
forAll genQueuedOpCode $ \op ->
let job1 = QueuedJob jid [op] justNoTs justNoTs justNoTs
let job1 = QueuedJob jid [op] justNoTs justNoTs justNoTs Nothing Nothing
st1 = calcJobStatus job1
op_succ = op { qoStatus = OP_STATUS_SUCCESS }
op_err = op { qoStatus = OP_STATUS_ERROR }
......@@ -139,7 +139,8 @@ case_JobStatusPri_py_equiv = do
num_ops <- choose (1, 5)
ops <- vectorOf num_ops genQueuedOpCode
jid <- genJobId
return $ QueuedJob jid ops justNoTs justNoTs justNoTs)
return $ QueuedJob jid ops justNoTs justNoTs justNoTs
Nothing Nothing)
let serialized = encode jobs
-- check for non-ASCII fields, usually due to 'arbitrary :: String'
mapM_ (\job -> when (any (not . isAscii) (encode job)) .
......@@ -197,7 +198,7 @@ prop_LoadJobs :: Property
prop_LoadJobs = monadicIO $ do
ops <- pick $ resize 5 (listOf1 genQueuedOpCode)
jid <- pick genJobId
let job = QueuedJob jid ops justNoTs justNoTs justNoTs
let job = QueuedJob jid ops justNoTs justNoTs justNoTs Nothing Nothing
job_s = encode job
-- check that jobs in the right directories are parsed correctly
(missing, current, archived, missing_current, broken) <-
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment