jqueue.py 17 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module implementing the job queue handling."""

24
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
25 26
import logging
import threading
27 28
import errno
import re
29
import time
Iustin Pop's avatar
Iustin Pop committed
30

Michael Hanselmann's avatar
Michael Hanselmann committed
31
from ganeti import constants
32
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
33
from ganeti import workerpool
34
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
35
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import jstore
39
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
40 41 42 43


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
44

Michael Hanselmann's avatar
Michael Hanselmann committed
45 46 47
class _QueuedOpCode(object):
  """Encasulates an opcode object.

48
  Access is synchronized by the '_lock' attribute.
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50 51 52
  The 'log' attribute holds the execution log and consists of tuples
  of the form (timestamp, level, message).

Michael Hanselmann's avatar
Michael Hanselmann committed
53
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
54 55 56 57 58
  def __new__(cls, *args, **kwargs):
    obj = object.__new__(cls, *args, **kwargs)
    # Create a special lock for logging
    obj._log_lock = threading.Lock()
    return obj
59

Michael Hanselmann's avatar
Michael Hanselmann committed
60 61 62 63 64
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
65 66 67

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
68 69 70 71 72
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
73 74 75
    return obj

  def Serialize(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
76 77 78 79 80 81 82 83 84 85
    self._log_lock.acquire()
    try:
      return {
        "input": self.input.__getstate__(),
        "status": self.status,
        "result": self.result,
        "log": self.log,
        }
    finally:
      self._log_lock.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
86

87 88 89 90
  def Log(self, *args):
    """Append a log entry.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
91
    assert len(args) < 3
92 93 94 95 96 97 98

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      log_type, log_msg = args

Michael Hanselmann's avatar
Michael Hanselmann committed
99 100 101 102 103 104
    self._log_lock.acquire()
    try:
      self.log.append((time.time(), log_type, log_msg))
    finally:
      self._log_lock.release()

105 106 107 108
  def RetrieveLog(self, start_at=0):
    """Retrieve (a part of) the execution log.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
109 110 111 112 113
    self._log_lock.acquire()
    try:
      return self.log[start_at:]
    finally:
      self._log_lock.release()
114

Michael Hanselmann's avatar
Michael Hanselmann committed
115 116 117 118 119 120 121

class _QueuedJob(object):
  """In-memory job representation.

  This is what we use to track the user-submitted jobs.

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
122
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
123 124 125 126
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
127
    self.queue = queue
128
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
129 130
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
131 132

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
133 134 135 136 137 138
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
    obj.run_op_index = state["run_op_index"]
139 140 141 142 143
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
144
      "ops": [op.Serialize() for op in self.ops],
145
      "run_op_index": self.run_op_index,
146 147
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
148
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
149 150 151
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
152 153
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
154 155 156 157
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
158
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
159
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
160
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
161
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
162
      elif op.status == constants.OP_STATUS_ERROR:
163 164 165
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
166
      elif op.status == constants.OP_STATUS_CANCELED:
167 168
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
169 170 171 172 173 174

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

175

Michael Hanselmann's avatar
Michael Hanselmann committed
176 177
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
178 179
    """Job executor.

Michael Hanselmann's avatar
Michael Hanselmann committed
180
    This functions processes a job.
Michael Hanselmann's avatar
Michael Hanselmann committed
181 182 183 184

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
185
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
186
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
187
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
188 189 190 191 192 193 194 195 196 197 198 199 200
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
201
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
202 203 204
            finally:
              queue.release()

Iustin Pop's avatar
Iustin Pop committed
205
            result = proc.ExecOpCode(input_opcode, op.Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
234
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
235 236 237 238 239 240
      queue.acquire()
      try:
        job_id = job.id
        status = job.CalcStatus()
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
241
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
242
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
243 244 245


class _JobQueueWorkerPool(workerpool.WorkerPool):
246
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
247 248
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
249
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
250 251


Michael Hanselmann's avatar
Michael Hanselmann committed
252
class JobQueue(object):
253
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
254

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
271
      assert self._queue_lock is not None, "Queue should be open"
272 273 274
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
275
  def __init__(self, context):
276
    self.context = context
Iustin Pop's avatar
Iustin Pop committed
277
    self._memcache = {}
278
    self._my_hostname = utils.HostInfo().name
279

Michael Hanselmann's avatar
Michael Hanselmann committed
280 281 282 283 284
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

285 286
    # Initialize
    self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
287

288 289 290 291
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
292

293
    # Get initial list of nodes
294 295 296 297 298 299 300
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
301 302 303

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
304
    # Setup worker pool
305
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

328 329 330 331
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
332

333 334
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
335

336 337
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
338

339 340 341 342
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
343 344 345 346 347 348 349 350
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
351 352 353 354 355 356 357 358
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
359
    try:
360 361 362
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
363 364
      pass

365 366 367 368 369 370
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

371
    failed_nodes = 0
372
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
373
    for node in self._nodes:
374 375 376 377 378 379
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
380 381 382 383 384 385 386 387
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

388
  def _NewSerialUnlocked(self):
389 390 391 392 393 394 395 396 397 398 399
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
400 401
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
402 403 404 405

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
406
    return self._FormatJobID(serial)
407

Michael Hanselmann's avatar
Michael Hanselmann committed
408 409
  @staticmethod
  def _GetJobPath(job_id):
410 411
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
412 413
  @staticmethod
  def _GetArchivedJobPath(job_id):
414 415
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
416 417 418
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
419 420 421 422 423
    if m:
      return m.group(1)
    else:
      return None

424 425 426 427 428 429
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
430 431 432 433
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

434
    """
435
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
436 437
    jlist.sort()
    return jlist
438

439 440 441 442
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

443
  def _LoadJobUnlocked(self, job_id):
Iustin Pop's avatar
Iustin Pop committed
444
    if job_id in self._memcache:
445
      logging.debug("Found job %s in memcache", job_id)
Iustin Pop's avatar
Iustin Pop committed
446 447
      return self._memcache[job_id]

448
    filepath = self._GetJobPath(job_id)
449 450 451 452 453 454 455 456 457 458 459 460
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
461 462
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
463
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
464
    return job
465 466

  def _GetJobsUnlocked(self, job_ids):
467 468
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
469

470
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
471 472

  @utils.LockedMethod
473
  @_RequireOpenQueue
474
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
475
    """Create and store a new job.
476

Michael Hanselmann's avatar
Michael Hanselmann committed
477 478
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
479 480

    @type ops: list
481
    @param ops: The list of OpCodes that will become the new job.
482 483

    """
484
    # Get job identifier
485
    job_id = self._NewSerialUnlocked()
486 487 488
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
489
    self.UpdateJobUnlocked(job)
490

491
    logging.debug("Added new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
492 493
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
494 495 496 497
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
498

499
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
500
  def UpdateJobUnlocked(self, job):
501
    filename = self._GetJobPath(job.id)
502
    data = serializer.DumpJson(job.Serialize(), indent=False)
503
    logging.debug("Writing job %s to %s", job.id, filename)
504
    self._WriteAndReplicateFileUnlocked(filename, data)
505
    self._CleanCacheUnlocked([job.id])
Iustin Pop's avatar
Iustin Pop committed
506

507
  def _CleanCacheUnlocked(self, exclude):
Iustin Pop's avatar
Iustin Pop committed
508 509 510 511 512 513
    """Clean the memory cache.

    The exceptions argument contains job IDs that should not be
    cleaned.

    """
514
    assert isinstance(exclude, list)
Michael Hanselmann's avatar
Michael Hanselmann committed
515

Iustin Pop's avatar
Iustin Pop committed
516
    for job in self._memcache.values():
517
      if job.id in exclude:
Iustin Pop's avatar
Iustin Pop committed
518
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
519 520
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
                                  constants.JOB_STATUS_RUNNING):
521
        logging.debug("Cleaning job %s from the cache", job.id)
Iustin Pop's avatar
Iustin Pop committed
522 523 524 525
        try:
          del self._memcache[job.id]
        except KeyError:
          pass
526 527

  @utils.LockedMethod
528
  @_RequireOpenQueue
529 530 531 532 533 534 535 536 537
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
538
    job = self._LoadJobUnlocked(job_id)
539 540 541 542
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
543
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
544 545 546
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
547 548 549 550 551 552
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
553

554
  @utils.LockedMethod
555
  @_RequireOpenQueue
556
  def ArchiveJob(self, job_id):
557 558 559 560 561 562 563 564 565 566 567 568 569
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
570 571 572 573
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
574 575 576 577 578 579 580 581 582 583 584 585 586
      return

    try:
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)

      os.rename(old, new)

      logging.debug("Successfully archived job %s", job.id)
    finally:
      # Cleaning the cache because we don't know what os.rename actually did
      # and to be on the safe side.
      self._CleanCacheUnlocked([])
587

Michael Hanselmann's avatar
Michael Hanselmann committed
588
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
589 590 591 592 593
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
594
        row.append(job.CalcStatus())
595
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
596
        row.append([op.input.__getstate__() for op in job.ops])
597
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
598
        row.append([op.result for op in job.ops])
599
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
600
        row.append([op.status for op in job.ops])
601
      elif fname == "ticker":
Michael Hanselmann's avatar
Michael Hanselmann committed
602
        ji = job.run_op_index
603 604 605
        if ji < 0:
          lmsg = None
        else:
Michael Hanselmann's avatar
Michael Hanselmann committed
606
          lmsg = job.ops[ji].RetrieveLog(-1)
607 608 609 610 611 612
          # message might be empty here
          if lmsg:
            lmsg = lmsg[0]
          else:
            lmsg = None
        row.append(lmsg)
Michael Hanselmann's avatar
Michael Hanselmann committed
613 614 615 616
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
617
  @utils.LockedMethod
618
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
619 620 621 622 623 624 625 626
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
627
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
628

Michael Hanselmann's avatar
Michael Hanselmann committed
629 630 631 632 633
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
634

Michael Hanselmann's avatar
Michael Hanselmann committed
635
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
636

637
  @utils.LockedMethod
638
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
639 640 641 642 643
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
644

645 646
    self._queue_lock.Close()
    self._queue_lock = None