jqueue.py 20.2 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23 24 25 26 27 28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31 32
import logging
import threading
33 34
import errno
import re
35
import time
Iustin Pop's avatar
Iustin Pop committed
36

Michael Hanselmann's avatar
Michael Hanselmann committed
37
from ganeti import constants
38
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
39
from ganeti import workerpool
40
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
41
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import mcpu
43
from ganeti import utils
44
from ganeti import jstore
45
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
46 47 48 49


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
50

51 52 53 54
def TimeStampNow():
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
55 56 57
class _QueuedOpCode(object):
  """Encasulates an opcode object.

58
  The 'log' attribute holds the execution log and consists of tuples
59
  of the form (log_serial, timestamp, level, message).
60

Michael Hanselmann's avatar
Michael Hanselmann committed
61
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
62 63 64 65 66
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
67 68
    self.start_timestamp = None
    self.end_timestamp = None
69 70 71

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
72 73 74 75 76
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
77 78
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
79 80 81
    return obj

  def Serialize(self):
82 83 84 85 86
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
87 88
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
89
      }
90

Michael Hanselmann's avatar
Michael Hanselmann committed
91 92 93 94

class _QueuedJob(object):
  """In-memory job representation.

95 96
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
97 98

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
99
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
100 101 102 103
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
104
    self.queue = queue
105
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
106 107
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
108 109 110 111
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
112 113

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
114 115 116 117 118
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
119 120 121 122 123 124 125 126 127 128 129 130

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

131 132 133 134 135
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
136
      "ops": [op.Serialize() for op in self.ops],
137
      "run_op_index": self.run_op_index,
138 139
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
140
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
141 142 143
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
144 145
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
146 147 148 149
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
150
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
151
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
152
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
153
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
154
      elif op.status == constants.OP_STATUS_ERROR:
155 156 157
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
158
      elif op.status == constants.OP_STATUS_CANCELED:
159 160
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
161 162 163 164 165 166

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

167 168 169 170 171 172 173 174 175 176 177 178
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

179

Michael Hanselmann's avatar
Michael Hanselmann committed
180 181
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
182 183
    """Job executor.

184 185
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
186 187 188 189

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
190
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
191
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
192
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
193 194 195 196 197 198 199 200 201 202 203
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
204
              op.start_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
205 206
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
207
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
208 209 210
            finally:
              queue.release()

211
            def _Log(*args):
212 213 214 215 216 217 218 219 220 221 222 223 224 225
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
226

227
              queue.acquire()
228
              try:
229 230 231
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

232 233
                job.change.notifyAll()
              finally:
234
                queue.release()
235

236
            # Make sure not to hold lock while _Log is called
237
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
238 239 240 241 242

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
243
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
244 245 246 247 248 249 250 251 252 253 254 255
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
256
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
257 258 259 260 261 262 263 264 265 266 267
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
268
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
269 270
      queue.acquire()
      try:
271 272 273 274 275 276
        try:
          job.run_op_idx = -1
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
277 278
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
279
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
280
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
281 282 283


class _JobQueueWorkerPool(workerpool.WorkerPool):
284
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
285 286
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
287
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
288 289


Michael Hanselmann's avatar
Michael Hanselmann committed
290
class JobQueue(object):
291
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
292

293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
309
      assert self._queue_lock is not None, "Queue should be open"
310 311 312
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
313
  def __init__(self, context):
314
    self.context = context
Iustin Pop's avatar
Iustin Pop committed
315
    self._memcache = {}
316
    self._my_hostname = utils.HostInfo().name
317

Michael Hanselmann's avatar
Michael Hanselmann committed
318 319 320 321 322
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

323
    # Initialize
324
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
325

326 327 328 329
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
330

331
    # Get initial list of nodes
332 333 334 335 336 337 338
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
339 340 341

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
342
    # Setup worker pool
343
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

366 367 368 369
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
370

371 372
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
373

374 375
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
376

377 378 379 380
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
381 382 383 384 385 386 387 388
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
389 390 391 392 393 394 395 396
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
397
    try:
398 399 400
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
401 402
      pass

403 404 405 406 407 408
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

409
    failed_nodes = 0
410
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
411
    for node in self._nodes:
412 413 414 415 416 417
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

418 419 420 421 422 423 424 425 426 427
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
    for node in self._nodes:
      if not result[node]:
        logging.error("Moving %s to %s failed on %s", old, new, node)

    # TODO: check failed nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
428 429 430 431 432 433 434 435
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

436
  def _NewSerialUnlocked(self):
437 438 439 440 441 442 443 444 445 446 447
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
448 449
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
450 451 452 453

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
454
    return self._FormatJobID(serial)
455

Michael Hanselmann's avatar
Michael Hanselmann committed
456 457
  @staticmethod
  def _GetJobPath(job_id):
458 459
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
460 461
  @staticmethod
  def _GetArchivedJobPath(job_id):
462 463
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
464 465 466
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
467 468 469 470 471
    if m:
      return m.group(1)
    else:
      return None

472 473 474 475 476 477
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
478 479 480 481
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

482
    """
483
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
484 485
    jlist.sort()
    return jlist
486

487 488 489 490
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

491
  def _LoadJobUnlocked(self, job_id):
Iustin Pop's avatar
Iustin Pop committed
492
    if job_id in self._memcache:
493
      logging.debug("Found job %s in memcache", job_id)
Iustin Pop's avatar
Iustin Pop committed
494 495
      return self._memcache[job_id]

496
    filepath = self._GetJobPath(job_id)
497 498 499 500 501 502 503 504 505 506 507 508
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
509 510
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
511
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
512
    return job
513 514

  def _GetJobsUnlocked(self, job_ids):
515 516
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
517

518
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
519 520

  @utils.LockedMethod
521
  @_RequireOpenQueue
522
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
523
    """Create and store a new job.
524

Michael Hanselmann's avatar
Michael Hanselmann committed
525 526
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
527 528

    @type ops: list
529
    @param ops: The list of OpCodes that will become the new job.
530 531

    """
532
    # Get job identifier
533
    job_id = self._NewSerialUnlocked()
534 535 536
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
537
    self.UpdateJobUnlocked(job)
538

539
    logging.debug("Added new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
540 541
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
542 543 544 545
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
546

547
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
548
  def UpdateJobUnlocked(self, job):
549
    filename = self._GetJobPath(job.id)
550
    data = serializer.DumpJson(job.Serialize(), indent=False)
551
    logging.debug("Writing job %s to %s", job.id, filename)
552
    self._WriteAndReplicateFileUnlocked(filename, data)
553
    self._CleanCacheUnlocked([job.id])
Iustin Pop's avatar
Iustin Pop committed
554

555
    # Notify waiters about potential changes
556
    job.change.notifyAll()
557

558
  def _CleanCacheUnlocked(self, exclude):
Iustin Pop's avatar
Iustin Pop committed
559 560 561 562 563 564
    """Clean the memory cache.

    The exceptions argument contains job IDs that should not be
    cleaned.

    """
565
    assert isinstance(exclude, list)
Michael Hanselmann's avatar
Michael Hanselmann committed
566

Iustin Pop's avatar
Iustin Pop committed
567
    for job in self._memcache.values():
568
      if job.id in exclude:
Iustin Pop's avatar
Iustin Pop committed
569
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
570 571
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
                                  constants.JOB_STATUS_RUNNING):
572
        logging.debug("Cleaning job %s from the cache", job.id)
Iustin Pop's avatar
Iustin Pop committed
573 574 575 576
        try:
          del self._memcache[job.id]
        except KeyError:
          pass
577

578
  @utils.LockedMethod
579
  @_RequireOpenQueue
580 581 582 583 584 585 586 587 588 589 590 591 592
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number

    """
593 594 595
    logging.debug("Waiting for changes in job %s", job_id)

    while True:
596 597 598 599 600
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        new_state = None
        break
601

602 603 604
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
605 606 607 608 609 610

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
611 612
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
613

614 615 616 617
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
618 619
        break

620 621 622 623 624 625 626 627
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
      job.change.wait()
628 629 630

    logging.debug("Job %s changed", job_id)

631
    return (job_info, log_entries)
632

633
  @utils.LockedMethod
634
  @_RequireOpenQueue
635 636 637 638 639 640 641 642 643
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
644
    job = self._LoadJobUnlocked(job_id)
645 646 647 648
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
649
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
650 651 652
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
653 654 655 656 657 658
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
659

660
  @utils.LockedMethod
661
  @_RequireOpenQueue
662
  def ArchiveJob(self, job_id):
663 664 665 666 667 668 669 670 671 672 673 674 675
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
676 677 678 679
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
680 681 682 683 684 685
      return

    try:
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)

686
      self._RenameFileUnlocked(old, new)
687 688 689 690 691 692

      logging.debug("Successfully archived job %s", job.id)
    finally:
      # Cleaning the cache because we don't know what os.rename actually did
      # and to be on the safe side.
      self._CleanCacheUnlocked([])
693

Michael Hanselmann's avatar
Michael Hanselmann committed
694
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
695 696 697 698 699
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
700
        row.append(job.CalcStatus())
701
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
702
        row.append([op.input.__getstate__() for op in job.ops])
703
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
704
        row.append([op.result for op in job.ops])
705
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
706
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
707 708 709 710
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
711
  @utils.LockedMethod
712
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
713 714 715 716 717 718 719 720
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
721
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
722

Michael Hanselmann's avatar
Michael Hanselmann committed
723 724 725 726 727
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
728

Michael Hanselmann's avatar
Michael Hanselmann committed
729
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
730

731
  @utils.LockedMethod
732
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
733 734 735 736 737
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
738

739 740
    self._queue_lock.Close()
    self._queue_lock = None