jqueue.py 20 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23 24 25 26 27 28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31 32
import logging
import threading
33 34
import errno
import re
35
import time
36
import weakref
Iustin Pop's avatar
Iustin Pop committed
37

Michael Hanselmann's avatar
Michael Hanselmann committed
38
from ganeti import constants
39
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import workerpool
41
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
47 48 49 50


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
51

52 53 54 55
def TimeStampNow():
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
56 57 58
class _QueuedOpCode(object):
  """Encasulates an opcode object.

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

Michael Hanselmann's avatar
Michael Hanselmann committed
62
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
63 64 65 66 67
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
68 69
    self.start_timestamp = None
    self.end_timestamp = None
70 71 72

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
73 74 75 76 77
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
78 79
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
80 81 82
    return obj

  def Serialize(self):
83 84 85 86 87
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
88 89
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
90
      }
91

Michael Hanselmann's avatar
Michael Hanselmann committed
92 93 94 95

class _QueuedJob(object):
  """In-memory job representation.

96 97
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
98 99

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
100
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
101 102 103 104
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
105
    self.queue = queue
106
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
107 108
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
109 110 111 112
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
113 114

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
115 116 117 118 119
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
120 121 122 123 124 125 126 127 128 129 130 131

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

132 133 134 135 136
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
137
      "ops": [op.Serialize() for op in self.ops],
138
      "run_op_index": self.run_op_index,
139 140
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
141
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
142 143 144
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
145 146
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
147 148 149 150
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
151
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
152
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
153
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
154
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
155
      elif op.status == constants.OP_STATUS_ERROR:
156 157 158
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
159
      elif op.status == constants.OP_STATUS_CANCELED:
160 161
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
162 163 164 165 166 167

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

168 169 170 171 172 173 174 175 176 177 178 179
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

180

Michael Hanselmann's avatar
Michael Hanselmann committed
181 182
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
183 184
    """Job executor.

185 186
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
187 188 189 190

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
191
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
192
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
193
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
194 195 196 197 198 199 200 201 202 203 204
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
205
              op.start_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
206 207
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
208
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
209 210 211
            finally:
              queue.release()

212
            def _Log(*args):
213 214 215 216 217 218 219 220 221 222 223 224 225 226
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
227

228
              queue.acquire()
229
              try:
230 231 232
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

233 234
                job.change.notifyAll()
              finally:
235
                queue.release()
236

237
            # Make sure not to hold lock while _Log is called
238
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
239 240 241 242 243

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
244
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
245 246 247 248 249 250 251 252 253 254 255 256
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
257
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
258 259 260 261 262 263 264 265 266 267 268
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
269
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
270 271
      queue.acquire()
      try:
272 273 274 275 276 277
        try:
          job.run_op_idx = -1
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
278 279
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
280
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
281
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
282 283 284


class _JobQueueWorkerPool(workerpool.WorkerPool):
285
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
286 287
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
288
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
289 290


Michael Hanselmann's avatar
Michael Hanselmann committed
291
class JobQueue(object):
292
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
310
      assert self._queue_lock is not None, "Queue should be open"
311 312 313
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
314
  def __init__(self, context):
315
    self.context = context
316
    self._memcache = weakref.WeakValueDictionary()
317
    self._my_hostname = utils.HostInfo().name
318

Michael Hanselmann's avatar
Michael Hanselmann committed
319 320 321 322 323
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

324
    # Initialize
325
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326

327 328 329 330
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
331

332
    # Get initial list of nodes
333 334 335 336 337 338 339
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
340 341 342

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
343
    # Setup worker pool
344
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

367 368 369 370
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
371

372 373
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
374

375 376
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377

378 379 380 381
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
382 383 384 385 386 387 388 389
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
390 391 392 393 394 395 396 397
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
398
    try:
399 400 401
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
402 403
      pass

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
  def _CheckRpcResult(self, result, nodes, failmsg):
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

422 423 424 425 426 427
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

428
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
429 430
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
431

432 433 434 435
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
436 437
    self._CheckRpcResult(result, self._nodes,
                         "Moving %s to %s" % (old, new))
438

Michael Hanselmann's avatar
Michael Hanselmann committed
439 440 441 442 443 444 445 446
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

447
  def _NewSerialUnlocked(self):
448 449 450 451 452 453 454 455 456 457 458
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
459 460
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
461 462 463 464

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
465
    return self._FormatJobID(serial)
466

Michael Hanselmann's avatar
Michael Hanselmann committed
467 468
  @staticmethod
  def _GetJobPath(job_id):
469 470
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
471 472
  @staticmethod
  def _GetArchivedJobPath(job_id):
473 474
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
475 476 477
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
478 479 480 481 482
    if m:
      return m.group(1)
    else:
      return None

483 484 485 486 487 488
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
489 490 491 492
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

493
    """
494
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
495
    jlist = utils.NiceSort(jlist)
496
    return jlist
497

498 499 500 501
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

502
  def _LoadJobUnlocked(self, job_id):
503 504
    job = self._memcache.get(job_id, None)
    if job:
505
      logging.debug("Found job %s in memcache", job_id)
506
      return job
Iustin Pop's avatar
Iustin Pop committed
507

508
    filepath = self._GetJobPath(job_id)
509 510 511 512 513 514 515 516 517 518 519 520
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
521 522
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
523
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
524
    return job
525 526

  def _GetJobsUnlocked(self, job_ids):
527 528
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
529

530
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
531 532

  @utils.LockedMethod
533
  @_RequireOpenQueue
534
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
535
    """Create and store a new job.
536

Michael Hanselmann's avatar
Michael Hanselmann committed
537 538
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
539 540

    @type ops: list
541
    @param ops: The list of OpCodes that will become the new job.
542 543

    """
544
    # Get job identifier
545
    job_id = self._NewSerialUnlocked()
546 547 548
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
549
    self.UpdateJobUnlocked(job)
550

551
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
552 553
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
554 555 556 557
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
558

559
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
560
  def UpdateJobUnlocked(self, job):
561
    filename = self._GetJobPath(job.id)
562
    data = serializer.DumpJson(job.Serialize(), indent=False)
563
    logging.debug("Writing job %s to %s", job.id, filename)
564
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
565

566
    # Notify waiters about potential changes
567
    job.change.notifyAll()
568

569
  @utils.LockedMethod
570
  @_RequireOpenQueue
571 572
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
573 574 575 576 577 578 579 580 581 582
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
583 584
    @type timeout: float
    @param timeout: maximum time to wait
585 586

    """
587
    logging.debug("Waiting for changes in job %s", job_id)
588
    end_time = time.time() + timeout
589
    while True:
590 591 592 593
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

594 595 596 597
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
598

599 600 601
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
602 603 604 605 606 607

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
608 609
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610

611 612 613 614
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
615 616
        break

617 618 619 620 621 622 623
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
624
      job.change.wait(delta_time)
625 626 627

    logging.debug("Job %s changed", job_id)

628
    return (job_info, log_entries)
629

630
  @utils.LockedMethod
631
  @_RequireOpenQueue
632 633 634 635 636 637 638 639 640
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
641
    job = self._LoadJobUnlocked(job_id)
642 643 644 645
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
646
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
647 648 649
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
650 651 652 653 654 655
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
656

657
  @utils.LockedMethod
658
  @_RequireOpenQueue
659
  def ArchiveJob(self, job_id):
660 661 662 663 664 665 666 667 668 669 670 671 672
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
673 674 675 676
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
677 678
      return

679 680
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
681

682
    self._RenameFileUnlocked(old, new)
683

684
    logging.debug("Successfully archived job %s", job.id)
685

Michael Hanselmann's avatar
Michael Hanselmann committed
686
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
687 688 689 690 691
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
692
        row.append(job.CalcStatus())
693
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
694
        row.append([op.input.__getstate__() for op in job.ops])
695
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
696
        row.append([op.result for op in job.ops])
697
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
698
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
699 700 701 702
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
703
  @utils.LockedMethod
704
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
705 706 707 708 709 710 711 712
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
713
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
714

Michael Hanselmann's avatar
Michael Hanselmann committed
715 716 717 718 719
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
720

Michael Hanselmann's avatar
Michael Hanselmann committed
721
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
722

723
  @utils.LockedMethod
724
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
725 726 727 728 729
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
730

731 732
    self._queue_lock.Close()
    self._queue_lock = None