Commit 6ea039ef authored by Petr Pudlak's avatar Petr Pudlak
Browse files

Add a thread ID to the WConfd client id



This allows to distinguish threads that don't have a job id, which is
needed for answering queries.

Since Python thread IDs aren't guaranteed to be unique, in future it'd
be preferable to use a different, unique identifier.

Note that this breaks 'gnt-debug locks'.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarHelga Velroyen <helgav@google.com>
parent c211dcc4
......@@ -65,19 +65,19 @@ class LUWConfdClient(object):
self.lu = lu
def TryUpdateLocks(self, req):
jid, livelockfile = self.lu.wconfdcontext
self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req)
self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile)
self.lu.wconfd.Client().TryUpdateLocks(self.lu.wconfdcontext, req)
self.lu.wconfdlocks = \
self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
def DownGradeLocksLevel(self, level):
jid, livelockfile = self.lu.wconfdcontext
self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, level)
self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile)
self.lu.wconfd.Client().DownGradeLocksLevel(self.lu.wconfdcontext, level)
self.lu.wconfdlocks = \
self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
def FreeLocksLevel(self, level):
jid, livelockfile = self.lu.wconfdcontext
self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level)
self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile)
self.lu.wconfd.Client().FreeLocksLevel(self.lu.wconfdcontext, level)
self.lu.wconfdlocks = \
self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
class LogicalUnit(object):
......
......@@ -33,6 +33,7 @@ import logging
import random
import time
import itertools
import threading
import traceback
from ganeti import opcodes
......@@ -308,6 +309,9 @@ class Processor(object):
self.hmclass = hooksmaster.HooksMaster
self._enable_locks = enable_locks
self.wconfd = wconfd # Indirection to allow testing
self._wconfdcontext = (ec_id,
threading.current_thread().ident,
self.context.livelock.lockfile.name)
def _CheckLocksEnabled(self):
"""Checks if locking is enabled.
......@@ -362,14 +366,12 @@ class Processor(object):
names = [names]
levelname = locking.LEVEL_NAMES[level]
jid = int(self.GetECId())
livelockfile = self.context.livelock.lockfile.name
locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
if not names:
logging.debug("Acquiring no locks for %d (%s) at level %s",
jid, livelockfile, levelname)
logging.debug("Acquiring no locks for (%s) at level %s",
self._wconfdcontext, levelname)
return []
if shared:
......@@ -378,26 +380,26 @@ class Processor(object):
request = [[lock, "exclusive"] for lock in locks]
if opportunistic:
logging.debug("Opportunistically acquring some of %s for %d (%s).",
locks, jid, livelockfile)
locks = self.wconfd.Client().OpportunisticLockUnion(jid, livelockfile,
logging.debug("Opportunistically acquring some of %s for %s.",
locks, self._wconfdcontext)
locks = self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext,
request)
elif timeout is None:
while True:
## TODO: use asynchronous wait instead of polling
blockedon = self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
blockedon = self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
request)
logging.debug("Requesting %s for %d (%s) blocked on %s",
request, jid, livelockfile, blockedon)
logging.debug("Requesting %s for %s blocked on %s",
request, self._wconfdcontext, blockedon)
if not blockedon:
break
time.sleep(random.random())
else:
logging.debug("Trying %ss to request %s for %d (%s)",
timeout, request, jid, livelockfile)
logging.debug("Trying %ss to request %s for %s",
timeout, request, self._wconfdcontext)
## TODO: use blocking wait instead of polling
blocked = utils.SimpleRetry([], self.wconfd.Client().TryUpdateLocks, 0.1,
timeout, args=[jid, livelockfile, request])
timeout, args=[self._wconfdcontext, request])
if blocked:
raise LockAcquireTimeout()
......@@ -497,8 +499,7 @@ class Processor(object):
self._AcquireLocks(level, needed_locks, share, opportunistic,
calc_timeout())
(jid, livelockfile) = lu.wconfdcontext
lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
else:
# Adding locks
add_locks = lu.add_locks[level]
......@@ -507,8 +508,6 @@ class Processor(object):
lu.remove_locks[level] = add_locks
try:
jid = int(self.GetECId())
livelockfile = self.context.livelock.lockfile.name
levelname = locking.LEVEL_NAMES[level]
if share:
......@@ -519,13 +518,13 @@ class Processor(object):
request = [["%s/%s" % (levelname, lock), mode]
for lock in add_locks]
logging.debug("Requesting %s for %d (%s)",
request, jid, livelockfile)
blocked = \
self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request)
logging.debug("Requesting %s for %s",
request, self._wconfdcontext)
blocked = self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
request)
assert blocked == [], "Allocating newly 'created' locks failed"
(jid, livelockfile) = lu.wconfdcontext
lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
lu.wconfdlocks = \
self.wconfd.Client().ListLocks(self._wconfdcontext)
except errors.GenericError:
# TODO: verify what actually caused the error
logging.exception("Detected lock error in level %s for locks"
......@@ -539,21 +538,18 @@ class Processor(object):
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
finally:
if level in lu.remove_locks:
jid = int(self.GetECId())
livelockfile = self.context.livelock.lockfile.name
levelname = locking.LEVEL_NAMES[level]
request = [["%s/%s" % (levelname, lock), "release"]
for lock in lu.remove_locks[level]]
blocked = \
self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request)
self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
request)
assert blocked == [], "Release may not fail"
finally:
jid = int(self.GetECId())
livelockfile = self.context.livelock.lockfile.name
levelname = locking.LEVEL_NAMES[level]
logging.debug("Freeing locks at level %s for %d (%s)",
levelname, jid, livelockfile)
self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname)
logging.debug("Freeing locks at level %s for %s",
levelname, self._wconfdcontext)
self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
else:
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
......@@ -614,11 +610,9 @@ class Processor(object):
" disabled" % op.OP_ID)
try:
jid = int(self.GetECId())
livelockfile = self.context.livelock.lockfile.name
lu = lu_class(self, op, self.context, self.rpc, (jid, livelockfile),
lu = lu_class(self, op, self.context, self.rpc, self._wconfdcontext,
self.wconfd)
lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
......@@ -630,11 +624,9 @@ class Processor(object):
self.context.cfg.DropECReservations(self._ec_id)
finally:
# Release BGL if owned
jid = int(self.GetECId())
livelockfile = self.context.livelock.lockfile.name
bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
locking.BGL)
self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
[[bglname, "release"]])
finally:
self._cbs = None
......
......@@ -44,7 +44,7 @@ import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors (ResultG, GanetiException)
import Ganeti.JSON (readEitherString, fromJResultE)
import Ganeti.JSON (readEitherString, fromJResultE, MaybeForJSON(..))
import Ganeti.Locking.Allocation
import Ganeti.Locking.Types
import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
......@@ -171,17 +171,30 @@ instance Lock GanetiLocks where
lockImplications (Network _) = [NetworkLockSet]
lockImplications _ = []
-- | A client is identified as a job id and path to its process
-- | A client is identified as a job id, thread id and path to its process
-- identifier file.
--
-- The JobId isn't enough to identify a client as the master daemon
-- also handles RPC calls that aren't jobs, but which use the configuration.
-- Therefore it's needed to include the identification for threads.
-- An alternative would be to use something like @Either JobId RpcCallId@.
--
-- FIXME: Python threads are only unique wrt running threads, so it's possible
-- that a new thread will get a thread id that has been used before by another
-- finished thread. Since we rely on threads releasing their locks anyway,
-- this isn't a big issue, but in the future it'd be better to have a unique
-- identifier for each operation.
data ClientId = ClientId
{ ciJobId :: JobId
{ ciJobId :: Maybe JobId
, ciThreadId :: Integer
, ciLockFile :: FilePath
}
deriving (Ord, Eq, Show)
instance J.JSON ClientId where
showJSON (ClientId jid lf) = J.showJSON (jid, lf)
readJSON = liftM (uncurry ClientId) . J.readJSON
showJSON (ClientId jid tid lf) = J.showJSON (MaybeForJSON jid, tid, lf)
readJSON = liftM (\(MaybeForJSON jid, tid, lf) -> ClientId jid tid lf)
. J.readJSON
-- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
-- locks are jobs.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment