jqueue.py 17 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module implementing the job queue handling."""

24
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
25
26
import logging
import threading
27
28
import errno
import re
29
import time
Iustin Pop's avatar
Iustin Pop committed
30

Michael Hanselmann's avatar
Michael Hanselmann committed
31
from ganeti import constants
32
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
33
from ganeti import workerpool
34
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
35
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import jstore
39
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
40
41
42
43


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
44

Michael Hanselmann's avatar
Michael Hanselmann committed
45
46
47
class _QueuedOpCode(object):
  """Encasulates an opcode object.

48
  Access is synchronized by the '_lock' attribute.
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50
51
52
  The 'log' attribute holds the execution log and consists of tuples
  of the form (timestamp, level, message).

Michael Hanselmann's avatar
Michael Hanselmann committed
53
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
54
55
56
57
58
  def __new__(cls, *args, **kwargs):
    obj = object.__new__(cls, *args, **kwargs)
    # Create a special lock for logging
    obj._log_lock = threading.Lock()
    return obj
59

Michael Hanselmann's avatar
Michael Hanselmann committed
60
61
62
63
64
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
65
66
67

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
68
69
70
71
72
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
73
74
75
    return obj

  def Serialize(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
76
77
78
79
80
81
82
83
84
85
    self._log_lock.acquire()
    try:
      return {
        "input": self.input.__getstate__(),
        "status": self.status,
        "result": self.result,
        "log": self.log,
        }
    finally:
      self._log_lock.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
86

87
88
89
90
  def Log(self, *args):
    """Append a log entry.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
91
    assert len(args) < 3
92
93
94
95
96
97
98

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      log_type, log_msg = args

Michael Hanselmann's avatar
Michael Hanselmann committed
99
100
101
102
103
104
    self._log_lock.acquire()
    try:
      self.log.append((time.time(), log_type, log_msg))
    finally:
      self._log_lock.release()

105
106
107
108
  def RetrieveLog(self, start_at=0):
    """Retrieve (a part of) the execution log.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
109
110
111
112
113
    self._log_lock.acquire()
    try:
      return self.log[start_at:]
    finally:
      self._log_lock.release()
114

Michael Hanselmann's avatar
Michael Hanselmann committed
115
116
117
118
119
120
121

class _QueuedJob(object):
  """In-memory job representation.

  This is what we use to track the user-submitted jobs.

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
122
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
123
124
125
126
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
127
    self.queue = queue
128
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
129
130
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
131
132

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
133
134
135
136
137
138
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
    obj.run_op_index = state["run_op_index"]
139
140
141
142
143
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
144
      "ops": [op.Serialize() for op in self.ops],
145
      "run_op_index": self.run_op_index,
146
147
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
148
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
149
150
151
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
152
153
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
154
155
156
157
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
158
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
159
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
160
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
161
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
162
      elif op.status == constants.OP_STATUS_ERROR:
163
164
165
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
166
      elif op.status == constants.OP_STATUS_CANCELED:
167
168
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
169
170
171
172
173
174

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

175

Michael Hanselmann's avatar
Michael Hanselmann committed
176
177
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
178
179
    """Job executor.

Michael Hanselmann's avatar
Michael Hanselmann committed
180
    This functions processes a job.
Michael Hanselmann's avatar
Michael Hanselmann committed
181
182
183
184

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
185
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
186
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
187
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
188
189
190
191
192
193
194
195
196
197
198
199
200
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
201
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
202
203
204
            finally:
              queue.release()

Iustin Pop's avatar
Iustin Pop committed
205
            result = proc.ExecOpCode(input_opcode, op.Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
234
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
235
236
237
238
239
240
      queue.acquire()
      try:
        job_id = job.id
        status = job.CalcStatus()
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
241
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
242
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
243
244
245


class _JobQueueWorkerPool(workerpool.WorkerPool):
246
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
247
248
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
249
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
250
251


Michael Hanselmann's avatar
Michael Hanselmann committed
252
class JobQueue(object):
253
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
254

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
271
      assert self._queue_lock is not None, "Queue should be open"
272
273
274
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
275
  def __init__(self, context):
276
    self.context = context
Iustin Pop's avatar
Iustin Pop committed
277
    self._memcache = {}
278
    self._my_hostname = utils.HostInfo().name
279

Michael Hanselmann's avatar
Michael Hanselmann committed
280
281
282
283
284
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

285
286
    # Initialize
    self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
287

288
289
290
291
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
292

293
    # Get initial list of nodes
294
295
296
297
298
299
300
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
301
302
303

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
304
    # Setup worker pool
305
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

328
329
330
331
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
332

333
334
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
335

336
337
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
338

339
340
341
342
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
343
344
345
346
347
348
349
350
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
351
352
353
354
355
356
357
358
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
359
    try:
360
361
362
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
363
364
      pass

365
366
367
368
369
370
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

371
    failed_nodes = 0
372
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
373
    for node in self._nodes:
374
375
376
377
378
379
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
380
381
382
383
384
385
386
387
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

388
  def _NewSerialUnlocked(self):
389
390
391
392
393
394
395
396
397
398
399
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
400
401
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
402
403
404
405

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
406
    return self._FormatJobID(serial)
407

Michael Hanselmann's avatar
Michael Hanselmann committed
408
409
  @staticmethod
  def _GetJobPath(job_id):
410
411
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
412
413
  @staticmethod
  def _GetArchivedJobPath(job_id):
414
415
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
416
417
418
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
419
420
421
422
423
    if m:
      return m.group(1)
    else:
      return None

424
425
426
427
428
429
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
430
431
432
433
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

434
    """
435
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
436
437
    jlist.sort()
    return jlist
438

439
440
441
442
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

443
  def _LoadJobUnlocked(self, job_id):
Iustin Pop's avatar
Iustin Pop committed
444
    if job_id in self._memcache:
445
      logging.debug("Found job %s in memcache", job_id)
Iustin Pop's avatar
Iustin Pop committed
446
447
      return self._memcache[job_id]

448
    filepath = self._GetJobPath(job_id)
449
450
451
452
453
454
455
456
457
458
459
460
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
461
462
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
463
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
464
    return job
465
466

  def _GetJobsUnlocked(self, job_ids):
467
468
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
469

470
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
471
472

  @utils.LockedMethod
473
  @_RequireOpenQueue
474
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
475
    """Create and store a new job.
476

Michael Hanselmann's avatar
Michael Hanselmann committed
477
478
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
479
480

    @type ops: list
481
    @param ops: The list of OpCodes that will become the new job.
482
483

    """
484
    # Get job identifier
485
    job_id = self._NewSerialUnlocked()
486
487
488
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
489
    self.UpdateJobUnlocked(job)
490

491
    logging.debug("Added new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
492
493
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
494
495
496
497
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
498

499
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
500
  def UpdateJobUnlocked(self, job):
501
    filename = self._GetJobPath(job.id)
502
    data = serializer.DumpJson(job.Serialize(), indent=False)
503
    logging.debug("Writing job %s to %s", job.id, filename)
504
    self._WriteAndReplicateFileUnlocked(filename, data)
505
    self._CleanCacheUnlocked([job.id])
Iustin Pop's avatar
Iustin Pop committed
506

507
  def _CleanCacheUnlocked(self, exclude):
Iustin Pop's avatar
Iustin Pop committed
508
509
510
511
512
513
    """Clean the memory cache.

    The exceptions argument contains job IDs that should not be
    cleaned.

    """
514
    assert isinstance(exclude, list)
Michael Hanselmann's avatar
Michael Hanselmann committed
515

Iustin Pop's avatar
Iustin Pop committed
516
    for job in self._memcache.values():
517
      if job.id in exclude:
Iustin Pop's avatar
Iustin Pop committed
518
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
519
520
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
                                  constants.JOB_STATUS_RUNNING):
521
        logging.debug("Cleaning job %s from the cache", job.id)
Iustin Pop's avatar
Iustin Pop committed
522
523
524
525
        try:
          del self._memcache[job.id]
        except KeyError:
          pass
526
527

  @utils.LockedMethod
528
  @_RequireOpenQueue
529
530
531
532
533
534
535
536
537
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
538
    job = self._LoadJobUnlocked(job_id)
539
540
541
542
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
543
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
544
545
546
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
547
548
549
550
551
552
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
553

554
  @utils.LockedMethod
555
  @_RequireOpenQueue
556
  def ArchiveJob(self, job_id):
557
558
559
560
561
562
563
564
565
566
567
568
569
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
570
571
572
573
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
574
575
576
577
578
579
580
581
582
583
584
585
586
      return

    try:
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)

      os.rename(old, new)

      logging.debug("Successfully archived job %s", job.id)
    finally:
      # Cleaning the cache because we don't know what os.rename actually did
      # and to be on the safe side.
      self._CleanCacheUnlocked([])
587

Michael Hanselmann's avatar
Michael Hanselmann committed
588
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
589
590
591
592
593
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
594
        row.append(job.CalcStatus())
595
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
596
        row.append([op.input.__getstate__() for op in job.ops])
597
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
598
        row.append([op.result for op in job.ops])
599
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
600
        row.append([op.status for op in job.ops])
601
      elif fname == "ticker":
Michael Hanselmann's avatar
Michael Hanselmann committed
602
        ji = job.run_op_index
603
604
605
        if ji < 0:
          lmsg = None
        else:
Michael Hanselmann's avatar
Michael Hanselmann committed
606
          lmsg = job.ops[ji].RetrieveLog(-1)
607
608
609
610
611
612
          # message might be empty here
          if lmsg:
            lmsg = lmsg[0]
          else:
            lmsg = None
        row.append(lmsg)
Michael Hanselmann's avatar
Michael Hanselmann committed
613
614
615
616
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
617
  @utils.LockedMethod
618
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
619
620
621
622
623
624
625
626
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
627
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
628

Michael Hanselmann's avatar
Michael Hanselmann committed
629
630
631
632
633
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
634

Michael Hanselmann's avatar
Michael Hanselmann committed
635
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
636

637
  @utils.LockedMethod
638
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
639
640
641
642
643
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
644

645
646
    self._queue_lock.Close()
    self._queue_lock = None