diff --git a/lib/http/__init__.py b/lib/http/__init__.py index 203e0d54c0d28d7d7dd3153f4caeff13924446e0..3df7d5b3c5397c1f497c56dbd6aa10d453633906 100644 --- a/lib/http/__init__.py +++ b/lib/http/__init__.py @@ -598,7 +598,10 @@ class HttpBase(object): ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD) ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2) - ctx.set_cipher_list(constants.OPENSSL_CIPHERS) + + ciphers = self.GetSslCiphers() + logging.debug("Setting SSL cipher string %s", ciphers) + ctx.set_cipher_list(ciphers) ctx.use_privatekey(self._ssl_key) ctx.use_certificate(self._ssl_cert) @@ -611,6 +614,12 @@ class HttpBase(object): return OpenSSL.SSL.Connection(ctx, sock) + def GetSslCiphers(self): # pylint: disable-msg=R0201 + """Returns the ciphers string for SSL. + + """ + return constants.OPENSSL_CIPHERS + def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok): """Verify the certificate provided by the peer diff --git a/lib/jqueue.py b/lib/jqueue.py index 2375b4550d8634df468d31530820ae61b534718e..fcd42e27f97652f0ff36f0cb73100f80a56d2d6b 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -378,17 +378,14 @@ class _QueuedJob(object): @param result: the opcode result """ - try: - not_marked = True - for op in self.ops: - if op.status in constants.OPS_FINALIZED: - assert not_marked, "Finalized opcodes found after non-finalized ones" - continue - op.status = status - op.result = result - not_marked = False - finally: - self.queue.UpdateJobUnlocked(self) + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False class _OpExecCallbacks(mcpu.OpExecCbBase): @@ -914,47 +911,53 @@ class JobQueue(object): # Setup worker pool self._wpool = _JobQueueWorkerPool(self) try: - # We need to lock here because WorkerPool.AddTask() may start a job while - # we're still doing our work. - self.acquire() - try: - logging.info("Inspecting job queue") + self._InspectQueue() + except: + self._wpool.TerminateWorkers() + raise - all_job_ids = self._GetJobIDsUnlocked() - jobs_count = len(all_job_ids) + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue + def _InspectQueue(self): + """Loads the whole job queue and resumes unfinished jobs. + + This function needs the lock here because WorkerPool.AddTask() may start a + job while we're still doing our work. + + """ + logging.info("Inspecting job queue") + + all_job_ids = self._GetJobIDsUnlocked() + jobs_count = len(all_job_ids) + lastinfo = time.time() + for idx, job_id in enumerate(all_job_ids): + # Give an update every 1000 jobs or 10 seconds + if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or + idx == (jobs_count - 1)): + logging.info("Job queue inspection: %d/%d (%0.1f %%)", + idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) lastinfo = time.time() - for idx, job_id in enumerate(all_job_ids): - # Give an update every 1000 jobs or 10 seconds - if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or - idx == (jobs_count - 1)): - logging.info("Job queue inspection: %d/%d (%0.1f %%)", - idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count) - lastinfo = time.time() - - job = self._LoadJobUnlocked(job_id) - - # a failure in loading the job can cause 'None' to be returned - if job is None: - continue - status = job.CalcStatus() + job = self._LoadJobUnlocked(job_id) - if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask((job, )) + # a failure in loading the job can cause 'None' to be returned + if job is None: + continue - elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK, - constants.JOB_STATUS_CANCELING): - logging.warning("Unfinished job %s found: %s", job.id, job) - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - "Unclean master daemon shutdown") + status = job.CalcStatus() - logging.info("Job queue inspection finished") - finally: - self.release() - except: - self._wpool.TerminateWorkers() - raise + if status in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITLOCK): + self._wpool.AddTask((job, )) + + elif status in (constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_CANCELING): + logging.warning("Unfinished job %s found: %s", job.id, job) + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") + self.UpdateJobUnlocked(job) + + logging.info("Job queue inspection finished") @locking.ssynchronized(_LOCK) @_RequireOpenQueue @@ -1480,12 +1483,16 @@ class JobQueue(object): if job_status == constants.JOB_STATUS_QUEUED: job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, "Job canceled by request") - return (True, "Job %s canceled" % job.id) + msg = "Job %s canceled" % job.id elif job_status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) - return (True, "Job %s will be canceled" % job.id) + msg = "Job %s will be canceled" % job.id + + self.UpdateJobUnlocked(job) + + return (True, msg) @_RequireOpenQueue def _ArchiveJobsUnlocked(self, jobs): diff --git a/lib/locking.py b/lib/locking.py index ef64fb90950ddc050d9abe766a94a0b46457b170..84268de6e1f44092354db8147ac91d7c338d3bce 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -425,9 +425,9 @@ class _PipeConditionWithMode(PipeCondition): class SharedLock(object): """Implements a shared lock. - Multiple threads can acquire the lock in a shared way, calling - acquire_shared(). In order to acquire the lock in an exclusive way threads - can call acquire_exclusive(). + Multiple threads can acquire the lock in a shared way by calling + C{acquire(shared=1)}. In order to acquire the lock in an exclusive way + threads can call C{acquire(shared=0)}. Notes on data structures: C{__pending} contains a priority queue (heapq) of all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), diff --git a/tools/setup-ssh b/tools/setup-ssh index d54fa10a524846ec1a982e3b22733797bf279512..9dfab3843b925183c2d3bfa6f7f059ab6a7bc729 100755 --- a/tools/setup-ssh +++ b/tools/setup-ssh @@ -374,6 +374,8 @@ def main(): SetupLogging(options) + errs = 0 + all_keys = LoadPrivateKeys(options) passwd = None @@ -400,7 +402,7 @@ def main(): if options.ssh_key_check: if not our_server_key: hexified_key = ssh.FormatParamikoFingerprint( - server_key.get_fingerprint()) + paramiko.util.hexify(server_key.get_fingerprint())) msg = ("Unable to verify hostkey of host %s: %s. Do you want to accept" " it?" % (host, hexified_key)) @@ -443,14 +445,18 @@ def main(): raise JoinCheckError("Host %s failed join check" % host) SetupSSH(transport) except errors.GenericError, err: - logging.error("While doing setup on host %s an error occured: %s", + logging.error("While doing setup on host %s an error occurred: %s", host, err) + errs += 1 finally: transport.close() # this is needed for compatibility with older Paramiko or Python # versions transport.join() + if errs > 0: + sys.exit(1) + if __name__ == "__main__": main()