qa_job_utils.py 14.5 KB
Newer Older
1 2 3 4
#
#

# Copyright (C) 2014 Google Inc.
Klaus Aehlig's avatar
Klaus Aehlig committed
5
# All rights reserved.
6
#
Klaus Aehlig's avatar
Klaus Aehlig committed
7 8 9
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
10
#
Klaus Aehlig's avatar
Klaus Aehlig committed
11 12
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
13
#
Klaus Aehlig's avatar
Klaus Aehlig committed
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 30 31 32 33 34 35


"""QA utility functions for testing jobs

"""

import re
36
import sys
37 38
import threading
import time
39 40 41 42

from ganeti import constants
from ganeti import locking
from ganeti import utils
43
from ganeti.utils import retry
44 45

import qa_config
46
import qa_logging
47 48
import qa_error

Klaus Aehlig's avatar
Klaus Aehlig committed
49
from qa_utils import AssertCommand, GetCommandOutput, GetObjectInfo, stdout_of
50 51 52 53 54


AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]


Klaus Aehlig's avatar
Klaus Aehlig committed
55
def GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
56 57 58 59 60 61 62 63 64 65 66 67
  """ Gets the output of a command executed on master.

  """
  if isinstance(cmd, basestring):
    cmdstr = cmd
  else:
    cmdstr = utils.ShellQuoteArgs(cmd)

  # Necessary due to the stderr stream not being captured properly on the
  # buildbot
  cmdstr += " 2>&1"

68 69
  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
                          use_multiplexer=use_multiplexer, log_cmd=log_cmd)
70 71 72 73 74 75 76 77 78


def ExecuteJobProducingCommand(cmd):
  """ Executes a command that contains the --submit flag, and returns a job id.

  @type cmd: list of string
  @param cmd: The command to execute, broken into constituent components.

  """
79
  job_id_output = GetOutputFromMaster(cmd)
80

81 82 83 84
  # Usually, the output contains "JobID: <job_id>", but for instance related
  # commands, the output is of the form "<job_id>: <instance_name>"
  possible_job_ids = re.findall("JobID: ([0-9]+)", job_id_output) or \
                     re.findall("([0-9]+): .+", job_id_output)
85 86 87 88 89 90 91
  if len(possible_job_ids) != 1:
    raise qa_error.Error("Cannot parse command output to find job id: output "
                         "is %s" % job_id_output)

  return int(possible_job_ids[0])


92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
def GetJobStatuses(job_ids=None):
  """ Invokes gnt-job list and extracts an id to status dictionary.

  @type job_ids: list
  @param job_ids: list of job ids to query the status for; if C{None}, the
                  status of all current jobs is returned
  @rtype: dict of string to string
  @return: A dictionary mapping job ids to matching statuses

  """
  cmd = ["gnt-job", "list", "--no-headers", "--output=id,status"]
  if job_ids is not None:
    cmd.extend(map(str, job_ids))

  list_output = GetOutputFromMaster(cmd)
  return dict(map(lambda s: s.split(), list_output.splitlines()))


110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
def _RetrieveTerminationInfo(job_id):
  """ Retrieves the termination info from a job caused by gnt-debug delay.

  @rtype: dict or None
  @return: The termination log entry, or None if no entry was found

  """
  job_info = GetObjectInfo(["gnt-job", "info", str(job_id)])

  opcodes = job_info[0]["Opcodes"]
  if not opcodes:
    raise qa_error.Error("Cannot retrieve a list of opcodes")

  execution_logs = opcodes[0]["Execution log"]
  if not execution_logs:
    return None

  is_termination_info_fn = \
    lambda e: e["Content"][1] == constants.ELOG_DELAY_TEST

  filtered_logs = filter(is_termination_info_fn, execution_logs)

  no_logs = len(filtered_logs)
  if no_logs > 1:
    raise qa_error.Error("Too many interruption information entries found!")
  elif no_logs == 1:
    return filtered_logs[0]
  else:
    return None


141 142 143 144 145 146 147 148 149 150 151 152 153
def _StartDelayFunction(locks, timeout):
  """ Starts the gnt-debug delay option with the given locks and timeout.

  """
  # The interruptible switch must be used
  cmd = ["gnt-debug", "delay", "-i", "--submit", "--no-master"]

  for node in locks.get(locking.LEVEL_NODE, []):
    cmd.append("-n%s" % node)
  cmd.append(str(timeout))

  job_id = ExecuteJobProducingCommand(cmd)

154 155 156
  # Waits until a non-empty result is returned from the function
  log_entry = retry.SimpleRetry(lambda x: x, _RetrieveTerminationInfo, 2.0,
                                10.0, args=[job_id])
157

158
  if not log_entry:
159 160 161
    raise qa_error.Error("Failure when trying to retrieve delay termination "
                         "information")

162
  _, _, (socket_path, ) = log_entry["Content"]
163 164 165 166 167 168 169 170 171 172 173

  return socket_path


def _TerminateDelayFunction(termination_socket):
  """ Terminates the delay function by communicating with the domain socket.

  """
  AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % termination_socket)


174 175 176 177 178 179 180 181 182 183
def _GetNodeUUIDMap(nodes):
  """ Given a list of nodes, retrieves a mapping of their names to UUIDs.

  @type nodes: list of string
  @param nodes: The nodes to retrieve a map for. If empty, returns information
                for all the nodes.

  """
  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
  cmd.extend(nodes)
184
  output = GetOutputFromMaster(cmd)
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
  return dict(map(lambda x: x.split(), output.splitlines()))


def _FindLockNames(locks):
  """ Finds the ids and descriptions of locks that given locks can block.

  @type locks: dict of locking level to list
  @param locks: The locks that gnt-debug delay is holding.

  @rtype: dict of string to string
  @return: The lock name to entity name map.

  For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
  blocked even though they were not listed explicitly. This function has to take
  care and list all locks that can be blocked by the locks given as parameters.

  """
  lock_map = {}

  if locking.LEVEL_NODE in locks:
    node_locks = locks[locking.LEVEL_NODE]
    if node_locks == locking.ALL_SET:
      # Empty list retrieves all info
      name_uuid_map = _GetNodeUUIDMap([])
    else:
      name_uuid_map = _GetNodeUUIDMap(node_locks)

    for name in name_uuid_map:
      lock_map["node/%s" % name_uuid_map[name]] = name

    # If ALL_SET was requested explicitly, or there is at least one lock
    # Note that locking.ALL_SET is None and hence the strange form of the if
    if node_locks == locking.ALL_SET or node_locks:
      lock_map["node/[lockset]"] = "joint node lock"

  #TODO add other lock types here when support for these is added
  return lock_map


def _GetBlockingLocks():
  """ Finds out which locks are blocking jobs by invoking "gnt-debug locks".

  @rtype: list of string
  @return: The names of the locks currently blocking any job.

  """
  # Due to mysterious issues when a SSH multiplexer is being used by two
  # threads, we turn it off, and block most of the logging to improve the
  # visibility of the other thread's output
234
  locks_output = GetOutputFromMaster("gnt-debug locks", use_multiplexer=False,
Klaus Aehlig's avatar
Klaus Aehlig committed
235
                                     log_cmd=False)
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

  # The first non-empty line is the header, which we do not need
  lock_lines = locks_output.splitlines()[1:]

  blocking_locks = []
  for lock_line in lock_lines:
    components = lock_line.split()
    if len(components) != 4:
      raise qa_error.Error("Error while parsing gnt-debug locks output, "
                           "line at fault is: %s" % lock_line)

    lock_name, _, _, pending_jobs = components

    if pending_jobs != '-':
      blocking_locks.append(lock_name)

  return blocking_locks


255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
class QAThread(threading.Thread):
  """ An exception-preserving thread that executes a given function.

  """
  def __init__(self, fn, args, kwargs):
    """ Constructor accepting the function to be invoked later.

    """
    threading.Thread.__init__(self)
    self._fn = fn
    self._args = args
    self._kwargs = kwargs
    self._exc_info = None

  def run(self):
    """ Executes the function, preserving exception info if necessary.

    """
    # pylint: disable=W0702
    # We explicitly want to catch absolutely anything
    try:
      self._fn(*self._args, **self._kwargs)
    except:
      self._exc_info = sys.exc_info()
    # pylint: enable=W0702

  def reraise(self):
    """ Reraises any exceptions that might have occured during thread execution.

    """
    if self._exc_info is not None:
      raise self._exc_info[0], self._exc_info[1], self._exc_info[2]


289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
class QAThreadGroup(object):
  """This class manages a list of QAThreads.

  """
  def __init__(self):
    self._threads = []

  def Start(self, thread):
    """Starts the given thread and adds it to this group.

    @type thread: qa_job_utils.QAThread
    @param thread: the thread to start and to add to this group.

    """
    thread.start()
    self._threads.append(thread)

  def JoinAndReraise(self):
    """Joins all threads in this group and calls their C{reraise} method.

    """
    for thread in self._threads:
      thread.join()
      thread.reraise()


315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
class PausedWatcher(object):
  """Pauses the watcher for the duration of the inner code

  """
  def __enter__(self):
    AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])

  def __exit__(self, _ex_type, ex_value, _ex_traceback):
    try:
      AssertCommand(["gnt-cluster", "watcher", "continue"])
    except qa_error.Error, err:
      # If an exception happens during 'continue', re-raise it only if there
      # is no exception from the inner block:
      if ex_value is None:
        raise
      else:
        print qa_logging.FormatError('Re-enabling watcher failed: %s' %
                                     (err, ))


335
# TODO: Can this be done as a decorator? Implement as needed.
336
def RunWithLocks(fn, locks, timeout, block, *args, **kwargs):
337 338 339 340 341 342 343 344 345
  """ Runs the given function, acquiring a set of locks beforehand.

  @type fn: function
  @param fn: The function to invoke.
  @type locks: dict of string to list of string
  @param locks: The locks to acquire, per lock category.
  @type timeout: number
  @param timeout: The number of seconds the locks should be held before
                  expiring.
346 347
  @type block: bool
  @param block: Whether the test should block when locks are used or not.
348 349 350 351 352

  This function allows a set of locks to be acquired in preparation for a QA
  test, to try and see if the function can run in parallel with other
  operations.

353 354 355 356 357
  Locks are acquired by invoking a gnt-debug delay operation which can be
  interrupted as needed. The QA test is then run in a separate thread, with the
  current thread observing jobs waiting for locks. When a job is spotted waiting
  for a lock held by the started delay operation, this is noted, and the delay
  is interrupted, allowing the QA test to continue.
358 359 360 361 362 363 364 365 366 367

  A default timeout is not provided by design - the test creator must make a
  good conservative estimate.

  """
  if filter(lambda l_type: l_type not in AVAILABLE_LOCKS, locks):
    raise qa_error.Error("Attempted to acquire locks that cannot yet be "
                         "acquired in the course of a QA test.")

  # The watcher may interfere by issuing its own jobs - therefore pause it
368
  # also reject all its jobs and wait for any running jobs to finish.
369
  AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
370 371 372 373 374 375 376
  filter_uuid = stdout_of([
    "gnt-filter", "add",
    '--predicates=[["reason", ["=", "source", "gnt:watcher"]]]',
    "--action=REJECT"
  ])
  while stdout_of(["gnt-job", "list", "--no-header", "--running"]) != "":
    time.sleep(1)
377

378 379
  # Find out the lock names prior to starting the delay function
  lock_name_map = _FindLockNames(locks)
380

381 382 383
  blocking_owned_locks = []
  test_blocked = False

Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
384 385 386
  termination_socket = _StartDelayFunction(locks, timeout)
  delay_fn_terminated = False

387
  try:
Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
388 389 390
    qa_thread = QAThread(fn, args, kwargs)
    qa_thread.start()

391 392 393 394 395 396
    while qa_thread.isAlive():
      blocking_locks = _GetBlockingLocks()
      blocking_owned_locks = \
        set(blocking_locks).intersection(set(lock_name_map))

      if blocking_owned_locks:
Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
397 398 399
        # Set the flag first - if the termination attempt fails, we do not want
        # to redo it in the finally block
        delay_fn_terminated = True
400
        _TerminateDelayFunction(termination_socket)
Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
401
        test_blocked = True
402 403
        break

Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
404 405 406 407 408 409 410
      time.sleep(5) # Set arbitrarily

    # The thread should be either finished or unblocked at this point
    qa_thread.join()

    # Raise any errors that might have occured in the thread
    qa_thread.reraise()
411

Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
412 413 414
  finally:
    if not delay_fn_terminated:
      _TerminateDelayFunction(termination_socket)
415

416 417 418 419 420 421 422
  blocking_lock_names = ", ".join(map(lock_name_map.get, blocking_owned_locks))
  if not block and test_blocked:
    raise qa_error.Error("QA test succeded, but was blocked by locks: %s" %
                         blocking_lock_names)
  elif block and not test_blocked:
    raise qa_error.Error("QA test succeded, but was not blocked as it was "
                         "expected to by locks: %s" % blocking_lock_names)
423
  else:
Hrvoje Ribicic's avatar
Hrvoje Ribicic committed
424
    pass
425 426

  # Revive the watcher
427
  AssertCommand(["gnt-filter", "delete", filter_uuid])
428
  AssertCommand(["gnt-cluster", "watcher", "continue"])
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443


def GetJobStatus(job_id):
  """ Retrieves the status of a job.

  @type job_id: string
  @param job_id: The job id, represented as a string.
  @rtype: string or None

  @return: The job status, or None if not present.

  """
  return GetJobStatuses([job_id]).get(job_id, None)


444 445 446 447 448 449 450 451 452 453 454 455 456 457
def RetryingUntilJobStatus(retry_status, job_id):
  """ Used with C{retry.Retry}, waits for a given status.

  @type retry_status: string
  @param retry_status: The job status to wait for.
  @type job_id: string
  @param job_id: The job id, represented as a string.

  """
  status = GetJobStatus(job_id)
  if status != retry_status:
    raise retry.RetryAgain()


458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
def RetryingWhileJobStatus(retry_status, job_id):
  """ Used with C{retry.Retry}, waits for a status other than the one given.

  @type retry_status: string
  @param retry_status: The old job status, expected to change.
  @type job_id: string
  @param job_id: The job id, represented as a string.

  @rtype: string or None
  @return: The new job status, or None if none could be retrieved.

  """
  status = GetJobStatus(job_id)
  if status == retry_status:
    raise retry.RetryAgain()
  return status