jqueue.py 17.1 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module implementing the job queue handling."""

24
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
25
26
import logging
import threading
27
28
import errno
import re
29
import time
Iustin Pop's avatar
Iustin Pop committed
30

Michael Hanselmann's avatar
Michael Hanselmann committed
31
from ganeti import constants
32
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
33
from ganeti import workerpool
34
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
35
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
39
40
41
42


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
43

Michael Hanselmann's avatar
Michael Hanselmann committed
44
45
46
class _QueuedOpCode(object):
  """Encasulates an opcode object.

47
  Access is synchronized by the '_lock' attribute.
Michael Hanselmann's avatar
Michael Hanselmann committed
48

49
50
51
  The 'log' attribute holds the execution log and consists of tuples
  of the form (timestamp, level, message).

Michael Hanselmann's avatar
Michael Hanselmann committed
52
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
53
54
55
56
57
  def __new__(cls, *args, **kwargs):
    obj = object.__new__(cls, *args, **kwargs)
    # Create a special lock for logging
    obj._log_lock = threading.Lock()
    return obj
58

Michael Hanselmann's avatar
Michael Hanselmann committed
59
60
61
62
63
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
64
65
66

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
67
68
69
70
71
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
72
73
74
    return obj

  def Serialize(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
75
76
77
78
79
80
81
82
83
84
    self._log_lock.acquire()
    try:
      return {
        "input": self.input.__getstate__(),
        "status": self.status,
        "result": self.result,
        "log": self.log,
        }
    finally:
      self._log_lock.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
85

86
87
88
89
  def Log(self, *args):
    """Append a log entry.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
90
    assert len(args) < 3
91
92
93
94
95
96
97

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      log_type, log_msg = args

Michael Hanselmann's avatar
Michael Hanselmann committed
98
99
100
101
102
103
    self._log_lock.acquire()
    try:
      self.log.append((time.time(), log_type, log_msg))
    finally:
      self._log_lock.release()

104
105
106
107
  def RetrieveLog(self, start_at=0):
    """Retrieve (a part of) the execution log.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
108
109
110
111
112
    self._log_lock.acquire()
    try:
      return self.log[start_at:]
    finally:
      self._log_lock.release()
113

Michael Hanselmann's avatar
Michael Hanselmann committed
114
115
116
117
118
119
120

class _QueuedJob(object):
  """In-memory job representation.

  This is what we use to track the user-submitted jobs.

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
121
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
122
123
124
125
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
126
    self.queue = queue
127
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
128
129
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
130
131

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
132
133
134
135
136
137
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
    obj.run_op_index = state["run_op_index"]
138
139
140
141
142
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145
146
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
147
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
148
149
150
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
151
152
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
153
154
155
156
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
157
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
158
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
159
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
160
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
161
      elif op.status == constants.OP_STATUS_ERROR:
162
163
164
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
165
      elif op.status == constants.OP_STATUS_CANCELED:
166
167
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
168
169
170
171
172
173

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

174

Michael Hanselmann's avatar
Michael Hanselmann committed
175
176
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
177
178
    """Job executor.

Michael Hanselmann's avatar
Michael Hanselmann committed
179
    This functions processes a job.
Michael Hanselmann's avatar
Michael Hanselmann committed
180
181
182
183

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
184
    proc = mcpu.Processor(self.pool.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
185
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
186
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
              queue.UpdateJobUnlocked(job)

              input = op.input
            finally:
              queue.release()

            result = proc.ExecOpCode(input, op.Log)

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
233
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
234
235
236
237
238
239
      queue.acquire()
      try:
        job_id = job.id
        status = job.CalcStatus()
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
240
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
241
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
242
243
244
245
246
247
248
249
250


class _JobQueueWorkerPool(workerpool.WorkerPool):
  def __init__(self, context):
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
    self.context = context


Michael Hanselmann's avatar
Michael Hanselmann committed
251
class JobQueue(object):
252
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253

Michael Hanselmann's avatar
Michael Hanselmann committed
254
  def __init__(self, context):
Iustin Pop's avatar
Iustin Pop committed
255
    self._memcache = {}
256
    self._my_hostname = utils.HostInfo().name
257

Michael Hanselmann's avatar
Michael Hanselmann committed
258
259
260
261
262
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

263
264
265
266
267
268
269
    # Make sure our directories exists
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
      try:
        os.mkdir(path, 0700)
      except OSError, err:
        if err.errno not in (errno.EEXIST, ):
          raise
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302

    # Get queue lock
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
    try:
      utils.LockFile(self.lock_fd)
    except:
      self.lock_fd.close()
      raise

    # Read version
    try:
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
    except IOError, err:
      if err.errno not in (errno.ENOENT, ):
        raise

      # Setup a new queue
      self._InitQueueUnlocked()

      # Try to open again
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")

    try:
      # Try to read version
      version = int(version_fd.read(128))

      # Verify version
      if version != constants.JOB_QUEUE_VERSION:
        raise errors.JobQueueError("Found version %s, expected %s",
                                   version, constants.JOB_QUEUE_VERSION)
    finally:
      version_fd.close()

303
304
305
306
307
    self._last_serial = self._ReadSerial()
    if self._last_serial is None:
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
                                      " file")

Michael Hanselmann's avatar
Michael Hanselmann committed
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
    # Setup worker pool
    self._wpool = _JobQueueWorkerPool(context)

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

332
333
334
335
336
337
338
339
340
  @staticmethod
  def _ReadSerial():
    """Try to read the job serial file.

    @rtype: None or int
    @return: If the serial can be read, then it is returned. Otherwise None
             is returned.

    """
341
    try:
342
343
344
345
346
347
348
349
350
351
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
      try:
        # Read last serial
        serial = int(serial_fd.read(1024).strip())
      finally:
        serial_fd.close()
    except (ValueError, EnvironmentError):
      serial = None

    return serial
352
353
354
355
356
357

  def _InitQueueUnlocked(self):
    assert self.lock_fd, "Queue should be open"

    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
358
359
360
    if self._ReadSerial() is None:
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
                      data="%s\n" % 0)
361

Michael Hanselmann's avatar
Michael Hanselmann committed
362
363
364
365
366
367
368
369
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

370
  def _NewSerialUnlocked(self, nodes):
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    assert self.lock_fd, "Queue should be open"

    # New number
    serial = self._last_serial + 1

    # Write to file
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
                    data="%s\n" % serial)

    # Keep it only if we were able to write the file
    self._last_serial = serial

390
391
392
393
394
395
396
397
398
399
400
    # Distribute the serial to the other nodes
    try:
      nodes.remove(self._my_hostname)
    except ValueError:
      pass

    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
    for node in nodes:
      if not result[node]:
        logging.error("copy of job queue file to node %s failed", node)

Michael Hanselmann's avatar
Michael Hanselmann committed
401
    return self._FormatJobID(serial)
402

Michael Hanselmann's avatar
Michael Hanselmann committed
403
404
  @staticmethod
  def _GetJobPath(job_id):
405
406
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
407
408
  @staticmethod
  def _GetArchivedJobPath(job_id):
409
410
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
411
412
413
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
414
415
416
417
418
    if m:
      return m.group(1)
    else:
      return None

419
420
421
422
423
424
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
425
426
427
428
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

429
    """
430
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
431
432
    jlist.sort()
    return jlist
433

434
435
436
437
438
439
  def _ListJobFiles(self):
    assert self.lock_fd, "Queue should be open"

    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

440
  def _LoadJobUnlocked(self, job_id):
441
442
    assert self.lock_fd, "Queue should be open"

Iustin Pop's avatar
Iustin Pop committed
443
    if job_id in self._memcache:
444
      logging.debug("Found job %s in memcache", job_id)
Iustin Pop's avatar
Iustin Pop committed
445
446
      return self._memcache[job_id]

447
    filepath = self._GetJobPath(job_id)
448
449
450
451
452
453
454
455
456
457
458
459
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
460
461
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
462
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
463
    return job
464
465

  def _GetJobsUnlocked(self, job_ids):
466
467
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
468

469
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
470
471

  @utils.LockedMethod
Michael Hanselmann's avatar
Michael Hanselmann committed
472
473
  def SubmitJob(self, ops, nodes):
    """Create and store a new job.
474

Michael Hanselmann's avatar
Michael Hanselmann committed
475
476
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
477
478

    @type ops: list
479
    @param ops: The list of OpCodes that will become the new job.
480
481
482
483
484
    @type nodes: list
    @param nodes: The list of nodes to which the new job serial will be
                  distributed.

    """
485
486
487
    assert self.lock_fd, "Queue should be open"

    # Get job identifier
488
    job_id = self._NewSerialUnlocked(nodes)
489
490
491
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
492
    self.UpdateJobUnlocked(job)
493

494
    logging.debug("Added new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
495
496
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
497
498
499
500
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
501

Michael Hanselmann's avatar
Michael Hanselmann committed
502
  def UpdateJobUnlocked(self, job):
503
504
505
506
507
508
    assert self.lock_fd, "Queue should be open"

    filename = self._GetJobPath(job.id)
    logging.debug("Writing job %s to %s", job.id, filename)
    utils.WriteFile(filename,
                    data=serializer.DumpJson(job.Serialize(), indent=False))
509
    self._CleanCacheUnlocked([job.id])
Iustin Pop's avatar
Iustin Pop committed
510

511
  def _CleanCacheUnlocked(self, exclude):
Iustin Pop's avatar
Iustin Pop committed
512
513
514
515
516
517
    """Clean the memory cache.

    The exceptions argument contains job IDs that should not be
    cleaned.

    """
518
    assert isinstance(exclude, list)
Michael Hanselmann's avatar
Michael Hanselmann committed
519

Iustin Pop's avatar
Iustin Pop committed
520
    for job in self._memcache.values():
521
      if job.id in exclude:
Iustin Pop's avatar
Iustin Pop committed
522
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
523
524
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
                                  constants.JOB_STATUS_RUNNING):
525
        logging.debug("Cleaning job %s from the cache", job.id)
Iustin Pop's avatar
Iustin Pop committed
526
527
528
529
        try:
          del self._memcache[job.id]
        except KeyError:
          pass
530
531

  @utils.LockedMethod
532
533
534
535
536
537
538
539
540
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
541
    job = self._LoadJobUnlocked(job_id)
542
543
544
545
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
546
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
547
548
549
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
550
551
552
553
554
555
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
556

557
  @utils.LockedMethod
558
  def ArchiveJob(self, job_id):
559
560
561
562
563
564
565
566
567
568
569
570
571
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
572
573
574
575
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
576
577
578
579
580
581
582
583
584
585
586
587
588
      return

    try:
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)

      os.rename(old, new)

      logging.debug("Successfully archived job %s", job.id)
    finally:
      # Cleaning the cache because we don't know what os.rename actually did
      # and to be on the safe side.
      self._CleanCacheUnlocked([])
589

Michael Hanselmann's avatar
Michael Hanselmann committed
590
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
591
592
593
594
595
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
596
        row.append(job.CalcStatus())
597
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
598
        row.append([op.input.__getstate__() for op in job.ops])
599
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
600
        row.append([op.result for op in job.ops])
601
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
602
        row.append([op.status for op in job.ops])
603
      elif fname == "ticker":
Michael Hanselmann's avatar
Michael Hanselmann committed
604
        ji = job.run_op_index
605
606
607
        if ji < 0:
          lmsg = None
        else:
Michael Hanselmann's avatar
Michael Hanselmann committed
608
          lmsg = job.ops[ji].RetrieveLog(-1)
609
610
611
612
613
614
          # message might be empty here
          if lmsg:
            lmsg = lmsg[0]
          else:
            lmsg = None
        row.append(lmsg)
Michael Hanselmann's avatar
Michael Hanselmann committed
615
616
617
618
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
619
  @utils.LockedMethod
Michael Hanselmann's avatar
Michael Hanselmann committed
620
621
622
623
624
625
626
627
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
628
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
629

Michael Hanselmann's avatar
Michael Hanselmann committed
630
631
632
633
634
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
635

Michael Hanselmann's avatar
Michael Hanselmann committed
636
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
637

638
  @utils.LockedMethod
Michael Hanselmann's avatar
Michael Hanselmann committed
639
640
641
642
  def Shutdown(self):
    """Stops the job queue.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
643
644
    assert self.lock_fd, "Queue should be open"

Michael Hanselmann's avatar
Michael Hanselmann committed
645
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
646
647
648

    self.lock_fd.close()
    self.lock_fd = None