jqueue.py 19.6 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23 24 25 26 27 28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31 32
import logging
import threading
33 34
import errno
import re
35
import time
Iustin Pop's avatar
Iustin Pop committed
36

Michael Hanselmann's avatar
Michael Hanselmann committed
37
from ganeti import constants
38
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
39
from ganeti import workerpool
40
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
41
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import mcpu
43
from ganeti import utils
44
from ganeti import jstore
45
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
46 47 48 49


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
50

Michael Hanselmann's avatar
Michael Hanselmann committed
51 52 53
class _QueuedOpCode(object):
  """Encasulates an opcode object.

54
  The 'log' attribute holds the execution log and consists of tuples
55
  of the form (log_serial, timestamp, level, message).
56

Michael Hanselmann's avatar
Michael Hanselmann committed
57
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
58 59 60 61 62
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
63 64 65

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
66 67 68 69 70
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
71 72 73
    return obj

  def Serialize(self):
74 75 76 77 78 79
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
      }
80

Michael Hanselmann's avatar
Michael Hanselmann committed
81 82 83 84

class _QueuedJob(object):
  """In-memory job representation.

85 86
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
87 88

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
89
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
90 91 92 93
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
94
    self.queue = queue
95
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
96 97
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
98 99 100 101
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
102 103

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
104 105 106 107 108
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
109 110 111 112 113 114 115 116 117 118 119 120

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

121 122 123 124 125
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
126
      "ops": [op.Serialize() for op in self.ops],
127
      "run_op_index": self.run_op_index,
128 129
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
130
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
131 132 133
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
134 135
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
136 137 138 139
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
140
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
141
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
142
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
143
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
144
      elif op.status == constants.OP_STATUS_ERROR:
145 146 147
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
148
      elif op.status == constants.OP_STATUS_CANCELED:
149 150
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
151 152 153 154 155 156

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

157 158 159 160 161 162 163 164 165 166 167 168
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

169

Michael Hanselmann's avatar
Michael Hanselmann committed
170 171
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
172 173
    """Job executor.

174 175
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
176 177 178 179

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
180
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
181
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
182
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
183 184 185 186 187 188 189 190 191 192 193 194 195
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
196
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
197 198 199
            finally:
              queue.release()

200
            def _Log(*args):
201 202 203 204 205 206 207 208 209 210 211 212 213 214
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
215

216
              queue.acquire()
217
              try:
218 219 220
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

221 222
                job.change.notifyAll()
              finally:
223
                queue.release()
224

225
            # Make sure not to hold lock while _Log is called
226
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
255
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
256 257 258 259 260 261
      queue.acquire()
      try:
        job_id = job.id
        status = job.CalcStatus()
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
262
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
263
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
264 265 266


class _JobQueueWorkerPool(workerpool.WorkerPool):
267
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
268 269
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
270
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
271 272


Michael Hanselmann's avatar
Michael Hanselmann committed
273
class JobQueue(object):
274
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
275

276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
292
      assert self._queue_lock is not None, "Queue should be open"
293 294 295
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
296
  def __init__(self, context):
297
    self.context = context
Iustin Pop's avatar
Iustin Pop committed
298
    self._memcache = {}
299
    self._my_hostname = utils.HostInfo().name
300

Michael Hanselmann's avatar
Michael Hanselmann committed
301 302 303 304 305
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

306
    # Initialize
307
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
308

309 310 311 312
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
313

314
    # Get initial list of nodes
315 316 317 318 319 320 321
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
322 323 324

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
325
    # Setup worker pool
326
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

349 350 351 352
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
353

354 355
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
356

357 358
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
359

360 361 362 363
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
364 365 366 367 368 369 370 371
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
372 373 374 375 376 377 378 379
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
380
    try:
381 382 383
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
384 385
      pass

386 387 388 389 390 391
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

392
    failed_nodes = 0
393
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
394
    for node in self._nodes:
395 396 397 398 399 400
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

401 402 403 404 405 406 407 408 409 410
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
    for node in self._nodes:
      if not result[node]:
        logging.error("Moving %s to %s failed on %s", old, new, node)

    # TODO: check failed nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
411 412 413 414 415 416 417 418
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

419
  def _NewSerialUnlocked(self):
420 421 422 423 424 425 426 427 428 429 430
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
431 432
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
433 434 435 436

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
437
    return self._FormatJobID(serial)
438

Michael Hanselmann's avatar
Michael Hanselmann committed
439 440
  @staticmethod
  def _GetJobPath(job_id):
441 442
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
443 444
  @staticmethod
  def _GetArchivedJobPath(job_id):
445 446
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
447 448 449
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
450 451 452 453 454
    if m:
      return m.group(1)
    else:
      return None

455 456 457 458 459 460
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
461 462 463 464
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

465
    """
466
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
467 468
    jlist.sort()
    return jlist
469

470 471 472 473
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

474
  def _LoadJobUnlocked(self, job_id):
Iustin Pop's avatar
Iustin Pop committed
475
    if job_id in self._memcache:
476
      logging.debug("Found job %s in memcache", job_id)
Iustin Pop's avatar
Iustin Pop committed
477 478
      return self._memcache[job_id]

479
    filepath = self._GetJobPath(job_id)
480 481 482 483 484 485 486 487 488 489 490 491
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
492 493
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
494
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
495
    return job
496 497

  def _GetJobsUnlocked(self, job_ids):
498 499
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
500

501
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
502 503

  @utils.LockedMethod
504
  @_RequireOpenQueue
505
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
506
    """Create and store a new job.
507

Michael Hanselmann's avatar
Michael Hanselmann committed
508 509
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
510 511

    @type ops: list
512
    @param ops: The list of OpCodes that will become the new job.
513 514

    """
515
    # Get job identifier
516
    job_id = self._NewSerialUnlocked()
517 518 519
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
520
    self.UpdateJobUnlocked(job)
521

522
    logging.debug("Added new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
523 524
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
525 526 527 528
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
529

530
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
531
  def UpdateJobUnlocked(self, job):
532
    filename = self._GetJobPath(job.id)
533
    data = serializer.DumpJson(job.Serialize(), indent=False)
534
    logging.debug("Writing job %s to %s", job.id, filename)
535
    self._WriteAndReplicateFileUnlocked(filename, data)
536
    self._CleanCacheUnlocked([job.id])
Iustin Pop's avatar
Iustin Pop committed
537

538
    # Notify waiters about potential changes
539
    job.change.notifyAll()
540

541
  def _CleanCacheUnlocked(self, exclude):
Iustin Pop's avatar
Iustin Pop committed
542 543 544 545 546 547
    """Clean the memory cache.

    The exceptions argument contains job IDs that should not be
    cleaned.

    """
548
    assert isinstance(exclude, list)
Michael Hanselmann's avatar
Michael Hanselmann committed
549

Iustin Pop's avatar
Iustin Pop committed
550
    for job in self._memcache.values():
551
      if job.id in exclude:
Iustin Pop's avatar
Iustin Pop committed
552
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
553 554
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
                                  constants.JOB_STATUS_RUNNING):
555
        logging.debug("Cleaning job %s from the cache", job.id)
Iustin Pop's avatar
Iustin Pop committed
556 557 558 559
        try:
          del self._memcache[job.id]
        except KeyError:
          pass
560

561
  @utils.LockedMethod
562
  @_RequireOpenQueue
563 564 565 566 567 568 569 570 571 572 573 574 575
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number

    """
576 577 578
    logging.debug("Waiting for changes in job %s", job_id)

    while True:
579 580 581 582 583
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        new_state = None
        break
584

585 586 587
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
588 589 590 591 592 593

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
594 595
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
596

597 598 599 600
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
601 602
        break

603 604 605 606 607 608 609 610
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
      job.change.wait()
611 612 613

    logging.debug("Job %s changed", job_id)

614
    return (job_info, log_entries)
615

616
  @utils.LockedMethod
617
  @_RequireOpenQueue
618 619 620 621 622 623 624 625 626
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
627
    job = self._LoadJobUnlocked(job_id)
628 629 630 631
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
632
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
633 634 635
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
636 637 638 639 640 641
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
642

643
  @utils.LockedMethod
644
  @_RequireOpenQueue
645
  def ArchiveJob(self, job_id):
646 647 648 649 650 651 652 653 654 655 656 657 658
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
659 660 661 662
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
663 664 665 666 667 668
      return

    try:
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)

669
      self._RenameFileUnlocked(old, new)
670 671 672 673 674 675

      logging.debug("Successfully archived job %s", job.id)
    finally:
      # Cleaning the cache because we don't know what os.rename actually did
      # and to be on the safe side.
      self._CleanCacheUnlocked([])
676

Michael Hanselmann's avatar
Michael Hanselmann committed
677
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
678 679 680 681 682
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
683
        row.append(job.CalcStatus())
684
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
685
        row.append([op.input.__getstate__() for op in job.ops])
686
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
687
        row.append([op.result for op in job.ops])
688
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
689
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
690 691 692 693
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
694
  @utils.LockedMethod
695
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
696 697 698 699 700 701 702 703
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
704
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
705

Michael Hanselmann's avatar
Michael Hanselmann committed
706 707 708 709 710
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
711

Michael Hanselmann's avatar
Michael Hanselmann committed
712
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
713

714
  @utils.LockedMethod
715
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
716 717 718 719 720
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
721

722 723
    self._queue_lock.Close()
    self._queue_lock = None