Commit a68fe106 authored by Michael Hanselmann's avatar Michael Hanselmann

Merge branch 'devel-2.2'

* devel-2.2:
  Fix pylint warning in http/__init__.py
  Allow SSL ciphers to be overridden in HTTP server
  jqueue: Resume jobs from “waitlock” status
  jqueue: Move queue inspection into separate function
  jqueue: Don't update file in MarkUnfinishedOps
  locking.SharedLock: Update class docstring
  If we had any errors in setup in one of the hosts, exit with non-zero
  Fix the output of the key fingerprint from binary to hex
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parents d3b18b8e a6d350cc
......@@ -598,7 +598,10 @@ class HttpBase(object):
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
ctx.set_cipher_list(constants.OPENSSL_CIPHERS)
ciphers = self.GetSslCiphers()
logging.debug("Setting SSL cipher string %s", ciphers)
ctx.set_cipher_list(ciphers)
ctx.use_privatekey(self._ssl_key)
ctx.use_certificate(self._ssl_cert)
......@@ -611,6 +614,12 @@ class HttpBase(object):
return OpenSSL.SSL.Connection(ctx, sock)
def GetSslCiphers(self): # pylint: disable-msg=R0201
"""Returns the ciphers string for SSL.
"""
return constants.OPENSSL_CIPHERS
def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
"""Verify the certificate provided by the peer
......
......@@ -378,17 +378,14 @@ class _QueuedJob(object):
@param result: the opcode result
"""
try:
not_marked = True
for op in self.ops:
if op.status in constants.OPS_FINALIZED:
assert not_marked, "Finalized opcodes found after non-finalized ones"
continue
op.status = status
op.result = result
not_marked = False
finally:
self.queue.UpdateJobUnlocked(self)
not_marked = True
for op in self.ops:
if op.status in constants.OPS_FINALIZED:
assert not_marked, "Finalized opcodes found after non-finalized ones"
continue
op.status = status
op.result = result
not_marked = False
class _OpExecCallbacks(mcpu.OpExecCbBase):
......@@ -914,47 +911,53 @@ class JobQueue(object):
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
try:
# We need to lock here because WorkerPool.AddTask() may start a job while
# we're still doing our work.
self.acquire()
try:
logging.info("Inspecting job queue")
self._InspectQueue()
except:
self._wpool.TerminateWorkers()
raise
all_job_ids = self._GetJobIDsUnlocked()
jobs_count = len(all_job_ids)
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def _InspectQueue(self):
"""Loads the whole job queue and resumes unfinished jobs.
This function needs the lock here because WorkerPool.AddTask() may start a
job while we're still doing our work.
"""
logging.info("Inspecting job queue")
all_job_ids = self._GetJobIDsUnlocked()
jobs_count = len(all_job_ids)
lastinfo = time.time()
for idx, job_id in enumerate(all_job_ids):
# Give an update every 1000 jobs or 10 seconds
if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
idx == (jobs_count - 1)):
logging.info("Job queue inspection: %d/%d (%0.1f %%)",
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
lastinfo = time.time()
for idx, job_id in enumerate(all_job_ids):
# Give an update every 1000 jobs or 10 seconds
if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
idx == (jobs_count - 1)):
logging.info("Job queue inspection: %d/%d (%0.1f %%)",
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
lastinfo = time.time()
job = self._LoadJobUnlocked(job_id)
# a failure in loading the job can cause 'None' to be returned
if job is None:
continue
status = job.CalcStatus()
job = self._LoadJobUnlocked(job_id)
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask((job, ))
# a failure in loading the job can cause 'None' to be returned
if job is None:
continue
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
status = job.CalcStatus()
logging.info("Job queue inspection finished")
finally:
self.release()
except:
self._wpool.TerminateWorkers()
raise
if status in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_WAITLOCK):
self._wpool.AddTask((job, ))
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
self.UpdateJobUnlocked(job)
logging.info("Job queue inspection finished")
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
......@@ -1480,12 +1483,16 @@ class JobQueue(object):
if job_status == constants.JOB_STATUS_QUEUED:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
return (True, "Job %s canceled" % job.id)
msg = "Job %s canceled" % job.id
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
return (True, "Job %s will be canceled" % job.id)
msg = "Job %s will be canceled" % job.id
self.UpdateJobUnlocked(job)
return (True, msg)
@_RequireOpenQueue
def _ArchiveJobsUnlocked(self, jobs):
......
......@@ -425,9 +425,9 @@ class _PipeConditionWithMode(PipeCondition):
class SharedLock(object):
"""Implements a shared lock.
Multiple threads can acquire the lock in a shared way, calling
acquire_shared(). In order to acquire the lock in an exclusive way threads
can call acquire_exclusive().
Multiple threads can acquire the lock in a shared way by calling
C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
threads can call C{acquire(shared=0)}.
Notes on data structures: C{__pending} contains a priority queue (heapq) of
all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
......
......@@ -374,6 +374,8 @@ def main():
SetupLogging(options)
errs = 0
all_keys = LoadPrivateKeys(options)
passwd = None
......@@ -400,7 +402,7 @@ def main():
if options.ssh_key_check:
if not our_server_key:
hexified_key = ssh.FormatParamikoFingerprint(
server_key.get_fingerprint())
paramiko.util.hexify(server_key.get_fingerprint()))
msg = ("Unable to verify hostkey of host %s: %s. Do you want to accept"
" it?" % (host, hexified_key))
......@@ -443,14 +445,18 @@ def main():
raise JoinCheckError("Host %s failed join check" % host)
SetupSSH(transport)
except errors.GenericError, err:
logging.error("While doing setup on host %s an error occured: %s",
logging.error("While doing setup on host %s an error occurred: %s",
host, err)
errs += 1
finally:
transport.close()
# this is needed for compatibility with older Paramiko or Python
# versions
transport.join()
if errs > 0:
sys.exit(1)
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment