jqueue.py 19.7 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23 24 25 26 27 28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31 32
import logging
import threading
33 34
import errno
import re
35
import time
36
import weakref
Iustin Pop's avatar
Iustin Pop committed
37

Michael Hanselmann's avatar
Michael Hanselmann committed
38
from ganeti import constants
39
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import workerpool
41
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
47 48 49 50


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
51

52 53 54 55
def TimeStampNow():
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
56 57 58
class _QueuedOpCode(object):
  """Encasulates an opcode object.

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

Michael Hanselmann's avatar
Michael Hanselmann committed
62
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
63 64 65 66 67
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
68 69
    self.start_timestamp = None
    self.end_timestamp = None
70 71 72

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
73 74 75 76 77
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
78 79
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
80 81 82
    return obj

  def Serialize(self):
83 84 85 86 87
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
88 89
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
90
      }
91

Michael Hanselmann's avatar
Michael Hanselmann committed
92 93 94 95

class _QueuedJob(object):
  """In-memory job representation.

96 97
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
98 99

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
100
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
101 102 103 104
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
105
    self.queue = queue
106
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
107 108
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
109 110 111 112
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
113 114

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
115 116 117 118 119
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
120 121 122 123 124 125 126 127 128 129 130 131

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

132 133 134 135 136
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
137
      "ops": [op.Serialize() for op in self.ops],
138
      "run_op_index": self.run_op_index,
139 140
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
141
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
142 143 144
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
145 146
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
147 148 149 150
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
151
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
152
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
153
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
154
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
155
      elif op.status == constants.OP_STATUS_ERROR:
156 157 158
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
159
      elif op.status == constants.OP_STATUS_CANCELED:
160 161
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
162 163 164 165 166 167

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

168 169 170 171 172 173 174 175 176 177 178 179
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

180

Michael Hanselmann's avatar
Michael Hanselmann committed
181 182
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
183 184
    """Job executor.

185 186
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
187 188 189 190

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
191
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
192
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
193
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
194 195 196 197 198 199 200 201 202 203 204
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
205
              op.start_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
206 207
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
208
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
209 210 211
            finally:
              queue.release()

212
            def _Log(*args):
213 214 215 216 217 218 219 220 221 222 223 224 225 226
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
227

228
              queue.acquire()
229
              try:
230 231 232
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

233 234
                job.change.notifyAll()
              finally:
235
                queue.release()
236

237
            # Make sure not to hold lock while _Log is called
238
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
239 240 241 242 243

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
244
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
245 246 247 248 249 250 251 252 253 254 255 256
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
257
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
258 259 260 261 262 263 264 265 266 267 268
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
269
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
270 271
      queue.acquire()
      try:
272 273 274 275 276 277
        try:
          job.run_op_idx = -1
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
278 279
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
280
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
281
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
282 283 284


class _JobQueueWorkerPool(workerpool.WorkerPool):
285
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
286 287
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
288
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
289 290


Michael Hanselmann's avatar
Michael Hanselmann committed
291
class JobQueue(object):
292
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
310
      assert self._queue_lock is not None, "Queue should be open"
311 312 313
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
314
  def __init__(self, context):
315
    self.context = context
316
    self._memcache = weakref.WeakValueDictionary()
317
    self._my_hostname = utils.HostInfo().name
318

Michael Hanselmann's avatar
Michael Hanselmann committed
319 320 321 322 323
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

324
    # Initialize
325
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326

327 328 329 330
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
331

332
    # Get initial list of nodes
333 334 335 336 337 338 339
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
340 341 342

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
343
    # Setup worker pool
344
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

367 368 369 370
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
371

372 373
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
374

375 376
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377

378 379 380 381
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
382 383 384 385 386 387 388 389
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
390 391 392 393 394 395 396 397
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
398
    try:
399 400 401
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
402 403
      pass

404 405 406 407 408 409
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

410
    failed_nodes = 0
411
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
412
    for node in self._nodes:
413 414 415 416 417 418
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

419 420 421 422 423 424 425 426 427 428
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
    for node in self._nodes:
      if not result[node]:
        logging.error("Moving %s to %s failed on %s", old, new, node)

    # TODO: check failed nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
429 430 431 432 433 434 435 436
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

437
  def _NewSerialUnlocked(self):
438 439 440 441 442 443 444 445 446 447 448
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
449 450
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
451 452 453 454

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
455
    return self._FormatJobID(serial)
456

Michael Hanselmann's avatar
Michael Hanselmann committed
457 458
  @staticmethod
  def _GetJobPath(job_id):
459 460
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
461 462
  @staticmethod
  def _GetArchivedJobPath(job_id):
463 464
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
465 466 467
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
468 469 470 471 472
    if m:
      return m.group(1)
    else:
      return None

473 474 475 476 477 478
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
479 480 481 482
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

483
    """
484
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
485 486
    jlist.sort()
    return jlist
487

488 489 490 491
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

492
  def _LoadJobUnlocked(self, job_id):
493 494
    job = self._memcache.get(job_id, None)
    if job:
495
      logging.debug("Found job %s in memcache", job_id)
496
      return job
Iustin Pop's avatar
Iustin Pop committed
497

498
    filepath = self._GetJobPath(job_id)
499 500 501 502 503 504 505 506 507 508 509 510
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
511 512
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
513
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
514
    return job
515 516

  def _GetJobsUnlocked(self, job_ids):
517 518
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
519

520
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
521 522

  @utils.LockedMethod
523
  @_RequireOpenQueue
524
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
525
    """Create and store a new job.
526

Michael Hanselmann's avatar
Michael Hanselmann committed
527 528
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
529 530

    @type ops: list
531
    @param ops: The list of OpCodes that will become the new job.
532 533

    """
534
    # Get job identifier
535
    job_id = self._NewSerialUnlocked()
536 537 538
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
539
    self.UpdateJobUnlocked(job)
540

541
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
542 543
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
544 545 546 547
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
548

549
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
550
  def UpdateJobUnlocked(self, job):
551
    filename = self._GetJobPath(job.id)
552
    data = serializer.DumpJson(job.Serialize(), indent=False)
553
    logging.debug("Writing job %s to %s", job.id, filename)
554
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
555

556
    # Notify waiters about potential changes
557
    job.change.notifyAll()
558

559
  @utils.LockedMethod
560
  @_RequireOpenQueue
561 562
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
563 564 565 566 567 568 569 570 571 572
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
573 574
    @type timeout: float
    @param timeout: maximum time to wait
575 576

    """
577
    logging.debug("Waiting for changes in job %s", job_id)
578
    end_time = time.time() + timeout
579
    while True:
580 581 582 583
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

584 585 586 587
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
588

589 590 591
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
592 593 594 595 596 597

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
598 599
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600

601 602 603 604
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
605 606
        break

607 608 609 610 611 612 613
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
614
      job.change.wait(delta_time)
615 616 617

    logging.debug("Job %s changed", job_id)

618
    return (job_info, log_entries)
619

620
  @utils.LockedMethod
621
  @_RequireOpenQueue
622 623 624 625 626 627 628 629 630
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
631
    job = self._LoadJobUnlocked(job_id)
632 633 634 635
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
636
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
637 638 639
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
640 641 642 643 644 645
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
646

647
  @utils.LockedMethod
648
  @_RequireOpenQueue
649
  def ArchiveJob(self, job_id):
650 651 652 653 654 655 656 657 658 659 660 661 662
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
663 664 665 666
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
667 668
      return

669 670
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
671

672
    self._RenameFileUnlocked(old, new)
673

674
    logging.debug("Successfully archived job %s", job.id)
675

Michael Hanselmann's avatar
Michael Hanselmann committed
676
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
677 678 679 680 681
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
682
        row.append(job.CalcStatus())
683
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
684
        row.append([op.input.__getstate__() for op in job.ops])
685
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
686
        row.append([op.result for op in job.ops])
687
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
688
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
689 690 691 692
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
693
  @utils.LockedMethod
694
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
695 696 697 698 699 700 701 702
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
703
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
704

Michael Hanselmann's avatar
Michael Hanselmann committed
705 706 707 708 709
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
710

Michael Hanselmann's avatar
Michael Hanselmann committed
711
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
712

713
  @utils.LockedMethod
714
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
715 716 717 718 719
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
720

721 722
    self._queue_lock.Close()
    self._queue_lock = None