jqueue.py 19.7 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23 24 25 26 27 28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31 32
import logging
import threading
33 34
import errno
import re
35
import time
Iustin Pop's avatar
Iustin Pop committed
36

Michael Hanselmann's avatar
Michael Hanselmann committed
37
from ganeti import constants
38
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
39
from ganeti import workerpool
40
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
41
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import mcpu
43
from ganeti import utils
44
from ganeti import jstore
45
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
46 47 48 49


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
50

Michael Hanselmann's avatar
Michael Hanselmann committed
51 52 53
class _QueuedOpCode(object):
  """Encasulates an opcode object.

54
  The 'log' attribute holds the execution log and consists of tuples
55
  of the form (log_serial, timestamp, level, message).
56

Michael Hanselmann's avatar
Michael Hanselmann committed
57
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
58 59 60 61 62
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
63 64 65

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
66 67 68 69 70
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
71 72 73
    return obj

  def Serialize(self):
74 75 76 77 78 79
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
      }
80

Michael Hanselmann's avatar
Michael Hanselmann committed
81 82 83 84

class _QueuedJob(object):
  """In-memory job representation.

85 86
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
87 88

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
89
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
90 91 92 93
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
94
    self.queue = queue
95
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
96 97
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
98 99 100 101
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
102 103

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
104 105 106 107 108
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
109 110 111 112 113 114 115 116 117 118 119 120

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

121 122 123 124 125
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
126
      "ops": [op.Serialize() for op in self.ops],
127
      "run_op_index": self.run_op_index,
128 129
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
130
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
131 132 133
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
134 135
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
136 137 138 139
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
140
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
141
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
142
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
143
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
144
      elif op.status == constants.OP_STATUS_ERROR:
145 146 147
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
148
      elif op.status == constants.OP_STATUS_CANCELED:
149 150
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
151 152 153 154 155 156

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

157 158 159 160 161 162 163 164 165 166 167 168
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

169

Michael Hanselmann's avatar
Michael Hanselmann committed
170 171
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
172 173
    """Job executor.

174 175
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
176 177 178 179

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
180
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
181
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
182
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
183 184 185 186 187 188 189 190 191 192 193 194 195
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
196
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
197 198 199
            finally:
              queue.release()

200
            def _Log(*args):
201 202 203 204 205 206 207 208 209 210 211 212 213 214
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
215

216
              queue.acquire()
217
              try:
218 219 220
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

221 222
                job.change.notifyAll()
              finally:
223
                queue.release()
224

225
            # Make sure not to hold lock while _Log is called
226
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
255
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
256 257
      queue.acquire()
      try:
258 259 260 261 262 263
        try:
          job.run_op_idx = -1
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
264 265
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
266
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
267
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
268 269 270


class _JobQueueWorkerPool(workerpool.WorkerPool):
271
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
272 273
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
274
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
275 276


Michael Hanselmann's avatar
Michael Hanselmann committed
277
class JobQueue(object):
278
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
279

280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
296
      assert self._queue_lock is not None, "Queue should be open"
297 298 299
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
300
  def __init__(self, context):
301
    self.context = context
Iustin Pop's avatar
Iustin Pop committed
302
    self._memcache = {}
303
    self._my_hostname = utils.HostInfo().name
304

Michael Hanselmann's avatar
Michael Hanselmann committed
305 306 307 308 309
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

310
    # Initialize
311
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
312

313 314 315 316
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
317

318
    # Get initial list of nodes
319 320 321 322 323 324 325
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
326 327 328

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
329
    # Setup worker pool
330
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

353 354 355 356
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
357

358 359
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
360

361 362
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
363

364 365 366 367
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
368 369 370 371 372 373 374 375
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
376 377 378 379 380 381 382 383
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
384
    try:
385 386 387
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
388 389
      pass

390 391 392 393 394 395
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

396
    failed_nodes = 0
397
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
398
    for node in self._nodes:
399 400 401 402 403 404
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

405 406 407 408 409 410 411 412 413 414
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
    for node in self._nodes:
      if not result[node]:
        logging.error("Moving %s to %s failed on %s", old, new, node)

    # TODO: check failed nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
415 416 417 418 419 420 421 422
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

423
  def _NewSerialUnlocked(self):
424 425 426 427 428 429 430 431 432 433 434
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
435 436
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
437 438 439 440

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
441
    return self._FormatJobID(serial)
442

Michael Hanselmann's avatar
Michael Hanselmann committed
443 444
  @staticmethod
  def _GetJobPath(job_id):
445 446
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
447 448
  @staticmethod
  def _GetArchivedJobPath(job_id):
449 450
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
451 452 453
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
454 455 456 457 458
    if m:
      return m.group(1)
    else:
      return None

459 460 461 462 463 464
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
465 466 467 468
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

469
    """
470
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
471 472
    jlist.sort()
    return jlist
473

474 475 476 477
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

478
  def _LoadJobUnlocked(self, job_id):
Iustin Pop's avatar
Iustin Pop committed
479
    if job_id in self._memcache:
480
      logging.debug("Found job %s in memcache", job_id)
Iustin Pop's avatar
Iustin Pop committed
481 482
      return self._memcache[job_id]

483
    filepath = self._GetJobPath(job_id)
484 485 486 487 488 489 490 491 492 493 494 495
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
496 497
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
498
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
499
    return job
500 501

  def _GetJobsUnlocked(self, job_ids):
502 503
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
504

505
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
506 507

  @utils.LockedMethod
508
  @_RequireOpenQueue
509
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
510
    """Create and store a new job.
511

Michael Hanselmann's avatar
Michael Hanselmann committed
512 513
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
514 515

    @type ops: list
516
    @param ops: The list of OpCodes that will become the new job.
517 518

    """
519
    # Get job identifier
520
    job_id = self._NewSerialUnlocked()
521 522 523
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
524
    self.UpdateJobUnlocked(job)
525

526
    logging.debug("Added new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
527 528
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
529 530 531 532
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
533

534
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
535
  def UpdateJobUnlocked(self, job):
536
    filename = self._GetJobPath(job.id)
537
    data = serializer.DumpJson(job.Serialize(), indent=False)
538
    logging.debug("Writing job %s to %s", job.id, filename)
539
    self._WriteAndReplicateFileUnlocked(filename, data)
540
    self._CleanCacheUnlocked([job.id])
Iustin Pop's avatar
Iustin Pop committed
541

542
    # Notify waiters about potential changes
543
    job.change.notifyAll()
544

545
  def _CleanCacheUnlocked(self, exclude):
Iustin Pop's avatar
Iustin Pop committed
546 547 548 549 550 551
    """Clean the memory cache.

    The exceptions argument contains job IDs that should not be
    cleaned.

    """
552
    assert isinstance(exclude, list)
Michael Hanselmann's avatar
Michael Hanselmann committed
553

Iustin Pop's avatar
Iustin Pop committed
554
    for job in self._memcache.values():
555
      if job.id in exclude:
Iustin Pop's avatar
Iustin Pop committed
556
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
557 558
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
                                  constants.JOB_STATUS_RUNNING):
559
        logging.debug("Cleaning job %s from the cache", job.id)
Iustin Pop's avatar
Iustin Pop committed
560 561 562 563
        try:
          del self._memcache[job.id]
        except KeyError:
          pass
564

565
  @utils.LockedMethod
566
  @_RequireOpenQueue
567 568 569 570 571 572 573 574 575 576 577 578 579
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number

    """
580 581 582
    logging.debug("Waiting for changes in job %s", job_id)

    while True:
583 584 585 586 587
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        new_state = None
        break
588

589 590 591
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
592 593 594 595 596 597

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
598 599
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600

601 602 603 604
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
605 606
        break

607 608 609 610 611 612 613 614
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
      job.change.wait()
615 616 617

    logging.debug("Job %s changed", job_id)

618
    return (job_info, log_entries)
619

620
  @utils.LockedMethod
621
  @_RequireOpenQueue
622 623 624 625 626 627 628 629 630
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
631
    job = self._LoadJobUnlocked(job_id)
632 633 634 635
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
636
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
637 638 639
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
640 641 642 643 644 645
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
646

647
  @utils.LockedMethod
648
  @_RequireOpenQueue
649
  def ArchiveJob(self, job_id):
650 651 652 653 654 655 656 657 658 659 660 661 662
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
663 664 665 666
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
667 668 669 670 671 672
      return

    try:
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)

673
      self._RenameFileUnlocked(old, new)
674 675 676 677 678 679

      logging.debug("Successfully archived job %s", job.id)
    finally:
      # Cleaning the cache because we don't know what os.rename actually did
      # and to be on the safe side.
      self._CleanCacheUnlocked([])
680

Michael Hanselmann's avatar
Michael Hanselmann committed
681
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
682 683 684 685 686
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
687
        row.append(job.CalcStatus())
688
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
689
        row.append([op.input.__getstate__() for op in job.ops])
690
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
691
        row.append([op.result for op in job.ops])
692
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
693
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
694 695 696 697
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
698
  @utils.LockedMethod
699
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
700 701 702 703 704 705 706 707
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
708
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
709

Michael Hanselmann's avatar
Michael Hanselmann committed
710 711 712 713 714
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
715

Michael Hanselmann's avatar
Michael Hanselmann committed
716
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
717

718
  @utils.LockedMethod
719
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
720 721 722 723 724
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
725

726 727
    self._queue_lock.Close()
    self._queue_lock = None