jqueue.py 21.1 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23 24 25 26 27 28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31 32
import logging
import threading
33 34
import errno
import re
35
import time
36
import weakref
Iustin Pop's avatar
Iustin Pop committed
37

Michael Hanselmann's avatar
Michael Hanselmann committed
38
from ganeti import constants
39
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import workerpool
41
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
47 48


49
JOBQUEUE_THREADS = 25
Michael Hanselmann's avatar
Michael Hanselmann committed
50

Iustin Pop's avatar
Iustin Pop committed
51

52 53 54 55
def TimeStampNow():
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
56 57 58
class _QueuedOpCode(object):
  """Encasulates an opcode object.

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

Michael Hanselmann's avatar
Michael Hanselmann committed
62
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
63 64 65 66 67
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
68 69
    self.start_timestamp = None
    self.end_timestamp = None
70 71 72

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
73 74 75 76 77
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
78 79
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
80 81 82
    return obj

  def Serialize(self):
83 84 85 86 87
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
88 89
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
90
      }
91

Michael Hanselmann's avatar
Michael Hanselmann committed
92 93 94 95

class _QueuedJob(object):
  """In-memory job representation.

96 97
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
98 99

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
100
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
101 102 103 104
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
105
    self.queue = queue
106
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
107 108
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
109
    self.log_serial = 0
110 111 112
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
113 114 115

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
116 117

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
118 119 120 121 122
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
123 124 125
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
126 127 128 129 130 131 132 133 134 135 136 137

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

138 139 140 141 142
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145 146 147
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
148 149
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
150
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
151 152 153
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
154 155
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
156 157 158 159
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
160
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
161
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
162
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
163
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
164
      elif op.status == constants.OP_STATUS_ERROR:
165 166 167
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
168
      elif op.status == constants.OP_STATUS_CANCELED:
169 170
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
171 172 173 174 175 176

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

177 178 179 180 181 182 183 184 185 186 187 188
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

189

Michael Hanselmann's avatar
Michael Hanselmann committed
190 191
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
192 193
    """Job executor.

194 195
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
196 197 198 199

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
200
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
201
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
202
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
203 204 205 206 207 208 209 210 211 212 213
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
214
              op.start_timestamp = TimeStampNow()
215 216
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
217 218
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
219
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
220 221 222
            finally:
              queue.release()

223
            def _Log(*args):
224 225 226 227 228 229 230 231 232 233 234 235 236 237
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
238

239
              queue.acquire()
240
              try:
241 242 243
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

244 245
                job.change.notifyAll()
              finally:
246
                queue.release()
247

248
            # Make sure not to hold lock while _Log is called
249
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
250 251 252 253 254

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
255
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
256 257 258 259 260 261 262 263 264 265 266 267
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
268
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
269 270 271 272 273 274 275 276 277 278 279
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
280
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
281 282
      queue.acquire()
      try:
283 284
        try:
          job.run_op_idx = -1
285
          job.end_timestamp = TimeStampNow()
286 287 288 289
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
290 291
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
292
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
293
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
294 295 296


class _JobQueueWorkerPool(workerpool.WorkerPool):
297
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
298 299
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
300
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
301 302


Michael Hanselmann's avatar
Michael Hanselmann committed
303
class JobQueue(object):
304
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
305

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
322
      assert self._queue_lock is not None, "Queue should be open"
323 324 325
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
326
  def __init__(self, context):
327
    self.context = context
328
    self._memcache = weakref.WeakValueDictionary()
329
    self._my_hostname = utils.HostInfo().name
330

Michael Hanselmann's avatar
Michael Hanselmann committed
331 332 333 334 335
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

336
    # Initialize
337
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
338

339 340 341 342
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
343

344
    # Get initial list of nodes
345 346 347 348 349 350 351
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
352 353 354

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
355
    # Setup worker pool
356
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

379 380 381 382
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
383

384 385
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
386

387 388
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
389

390 391 392 393
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
394 395 396 397 398 399 400 401
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
402 403 404 405 406 407 408 409
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
410
    try:
411 412 413
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
414 415
      pass

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
  def _CheckRpcResult(self, result, nodes, failmsg):
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

434 435 436 437 438 439
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

440
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
441 442
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
443

444 445 446 447
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
448 449
    self._CheckRpcResult(result, self._nodes,
                         "Moving %s to %s" % (old, new))
450

Michael Hanselmann's avatar
Michael Hanselmann committed
451 452 453 454 455 456 457 458
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

459
  def _NewSerialUnlocked(self):
460 461 462 463 464 465 466 467 468 469 470
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
471 472
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
473 474 475 476

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
477
    return self._FormatJobID(serial)
478

Michael Hanselmann's avatar
Michael Hanselmann committed
479 480
  @staticmethod
  def _GetJobPath(job_id):
481 482
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
483 484
  @staticmethod
  def _GetArchivedJobPath(job_id):
485 486
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
487 488 489
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
490 491 492 493 494
    if m:
      return m.group(1)
    else:
      return None

495 496 497 498 499 500
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
501 502 503 504
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

505
    """
506
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
507
    jlist = utils.NiceSort(jlist)
508
    return jlist
509

510 511 512 513
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

514
  def _LoadJobUnlocked(self, job_id):
515 516
    job = self._memcache.get(job_id, None)
    if job:
517
      logging.debug("Found job %s in memcache", job_id)
518
      return job
Iustin Pop's avatar
Iustin Pop committed
519

520
    filepath = self._GetJobPath(job_id)
521 522 523 524 525 526 527 528 529 530 531 532
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
533 534
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
535
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
536
    return job
537 538

  def _GetJobsUnlocked(self, job_ids):
539 540
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
541

542
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
543 544

  @utils.LockedMethod
545
  @_RequireOpenQueue
546
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
547
    """Create and store a new job.
548

Michael Hanselmann's avatar
Michael Hanselmann committed
549 550
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
551 552

    @type ops: list
553
    @param ops: The list of OpCodes that will become the new job.
554 555

    """
556
    # Get job identifier
557
    job_id = self._NewSerialUnlocked()
558 559 560
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
561
    self.UpdateJobUnlocked(job)
562

563
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
564 565
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
566 567 568 569
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
570

571
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
572
  def UpdateJobUnlocked(self, job):
573
    filename = self._GetJobPath(job.id)
574
    data = serializer.DumpJson(job.Serialize(), indent=False)
575
    logging.debug("Writing job %s to %s", job.id, filename)
576
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
577

578
    # Notify waiters about potential changes
579
    job.change.notifyAll()
580

581
  @utils.LockedMethod
582
  @_RequireOpenQueue
583 584
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
585 586 587 588 589 590 591 592 593 594
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
595 596
    @type timeout: float
    @param timeout: maximum time to wait
597 598

    """
599
    logging.debug("Waiting for changes in job %s", job_id)
600
    end_time = time.time() + timeout
601
    while True:
602 603 604 605
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

606 607 608 609
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
610

611 612 613
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
614 615 616 617 618 619

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
620 621
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
622

623 624 625 626
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
627 628
        break

629 630 631 632 633 634 635
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
636
      job.change.wait(delta_time)
637 638 639

    logging.debug("Job %s changed", job_id)

640
    return (job_info, log_entries)
641

642
  @utils.LockedMethod
643
  @_RequireOpenQueue
644 645 646 647 648 649 650 651 652
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
653
    job = self._LoadJobUnlocked(job_id)
654 655 656 657
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
658
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
659 660 661
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
662 663 664 665 666 667
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
668

669
  @utils.LockedMethod
670
  @_RequireOpenQueue
671
  def ArchiveJob(self, job_id):
672 673 674 675 676 677 678 679 680 681 682 683 684
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
685 686 687 688
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
689 690
      return

691 692
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
693

694
    self._RenameFileUnlocked(old, new)
695

696
    logging.debug("Successfully archived job %s", job.id)
697

Michael Hanselmann's avatar
Michael Hanselmann committed
698
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
699 700 701 702 703
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
704
        row.append(job.CalcStatus())
705
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
706
        row.append([op.input.__getstate__() for op in job.ops])
707
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
708
        row.append([op.result for op in job.ops])
709
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
710
        row.append([op.status for op in job.ops])
711 712
      elif fname == "oplog":
        row.append([op.log for op in job.ops])
713 714 715 716 717 718 719 720 721 722
      elif fname == "opstart":
        row.append([op.start_timestamp for op in job.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in job.ops])
      elif fname == "received_ts":
        row.append(job.received_timestamp)
      elif fname == "start_ts":
        row.append(job.start_timestamp)
      elif fname == "end_ts":
        row.append(job.end_timestamp)
723 724
      elif fname == "summary":
        row.append([op.input.Summary() for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
725 726 727 728
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
729
  @utils.LockedMethod
730
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
731 732 733 734 735 736 737 738
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
739
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
740

Michael Hanselmann's avatar
Michael Hanselmann committed
741 742 743 744 745
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
746

Michael Hanselmann's avatar
Michael Hanselmann committed
747
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
748

749
  @utils.LockedMethod
750
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
751 752 753 754 755
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
756

757 758
    self._queue_lock.Close()
    self._queue_lock = None