jqueue.py 19.6 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
24
25
26
27
28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31
32
import logging
import threading
33
34
import errno
import re
35
import time
Iustin Pop's avatar
Iustin Pop committed
36

Michael Hanselmann's avatar
Michael Hanselmann committed
37
from ganeti import constants
38
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
39
from ganeti import workerpool
40
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
41
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import mcpu
43
from ganeti import utils
44
from ganeti import jstore
45
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
46
47
48
49


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
50

Michael Hanselmann's avatar
Michael Hanselmann committed
51
52
53
class _QueuedOpCode(object):
  """Encasulates an opcode object.

54
  The 'log' attribute holds the execution log and consists of tuples
55
  of the form (log_serial, timestamp, level, message).
56

Michael Hanselmann's avatar
Michael Hanselmann committed
57
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
58
59
60
61
62
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
63
64
65

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
66
67
68
69
70
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
71
72
73
    return obj

  def Serialize(self):
74
75
76
77
78
79
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
      }
80

Michael Hanselmann's avatar
Michael Hanselmann committed
81
82
83
84

class _QueuedJob(object):
  """In-memory job representation.

85
86
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
87
88

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
89
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
90
91
92
93
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
94
    self.queue = queue
95
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
96
97
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
98
99
100
101
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
102
103

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
104
105
106
107
108
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
109
110
111
112
113
114
115
116
117
118
119
120

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

121
122
123
124
125
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
126
      "ops": [op.Serialize() for op in self.ops],
127
      "run_op_index": self.run_op_index,
128
129
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
130
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
131
132
133
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
134
135
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
136
137
138
139
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
140
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
141
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
142
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
143
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
144
      elif op.status == constants.OP_STATUS_ERROR:
145
146
147
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
148
      elif op.status == constants.OP_STATUS_CANCELED:
149
150
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
151
152
153
154
155
156

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

157
158
159
160
161
162
163
164
165
166
167
168
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

169

Michael Hanselmann's avatar
Michael Hanselmann committed
170
171
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
172
173
    """Job executor.

174
175
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
176
177
178
179

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
180
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
181
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
182
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
183
184
185
186
187
188
189
190
191
192
193
194
195
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
196
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
197
198
199
            finally:
              queue.release()

200
            def _Log(*args):
201
202
203
204
205
206
207
208
209
210
211
212
213
214
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
215

216
              queue.acquire()
217
              try:
218
219
220
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

221
222
                job.change.notifyAll()
              finally:
223
                queue.release()
224

225
            # Make sure not to hold lock while _Log is called
226
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
255
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
256
257
258
259
260
261
      queue.acquire()
      try:
        job_id = job.id
        status = job.CalcStatus()
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
262
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
263
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
264
265
266


class _JobQueueWorkerPool(workerpool.WorkerPool):
267
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
268
269
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
270
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
271
272


Michael Hanselmann's avatar
Michael Hanselmann committed
273
class JobQueue(object):
274
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
275

276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
292
      assert self._queue_lock is not None, "Queue should be open"
293
294
295
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
296
  def __init__(self, context):
297
    self.context = context
Iustin Pop's avatar
Iustin Pop committed
298
    self._memcache = {}
299
    self._my_hostname = utils.HostInfo().name
300

Michael Hanselmann's avatar
Michael Hanselmann committed
301
302
303
304
305
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

306
    # Initialize
307
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
308

309
310
311
312
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
313

314
    # Get initial list of nodes
315
316
317
318
319
320
321
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
322
323
324

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
325
    # Setup worker pool
326
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

349
350
351
352
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
353

354
355
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
356

357
358
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
359

360
361
362
363
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
364
365
366
367
368
369
370
371
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
372
373
374
375
376
377
378
379
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
380
    try:
381
382
383
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
384
385
      pass

386
387
388
389
390
391
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

392
    failed_nodes = 0
393
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
394
    for node in self._nodes:
395
396
397
398
399
400
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

401
402
403
404
405
406
407
408
409
410
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
    for node in self._nodes:
      if not result[node]:
        logging.error("Moving %s to %s failed on %s", old, new, node)

    # TODO: check failed nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
411
412
413
414
415
416
417
418
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

419
  def _NewSerialUnlocked(self):
420
421
422
423
424
425
426
427
428
429
430
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
431
432
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
433
434
435
436

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
437
    return self._FormatJobID(serial)
438

Michael Hanselmann's avatar
Michael Hanselmann committed
439
440
  @staticmethod
  def _GetJobPath(job_id):
441
442
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
443
444
  @staticmethod
  def _GetArchivedJobPath(job_id):
445
446
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
447
448
449
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
450
451
452
453
454
    if m:
      return m.group(1)
    else:
      return None

455
456
457
458
459
460
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
461
462
463
464
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

465
    """
466
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
467
468
    jlist.sort()
    return jlist
469

470
471
472
473
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

474
  def _LoadJobUnlocked(self, job_id):
Iustin Pop's avatar
Iustin Pop committed
475
    if job_id in self._memcache:
476
      logging.debug("Found job %s in memcache", job_id)
Iustin Pop's avatar
Iustin Pop committed
477
478
      return self._memcache[job_id]

479
    filepath = self._GetJobPath(job_id)
480
481
482
483
484
485
486
487
488
489
490
491
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
492
493
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
494
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
495
    return job
496
497

  def _GetJobsUnlocked(self, job_ids):
498
499
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
500

501
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
502
503

  @utils.LockedMethod
504
  @_RequireOpenQueue
505
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
506
    """Create and store a new job.
507

Michael Hanselmann's avatar
Michael Hanselmann committed
508
509
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
510
511

    @type ops: list
512
    @param ops: The list of OpCodes that will become the new job.
513
514

    """
515
    # Get job identifier
516
    job_id = self._NewSerialUnlocked()
517
518
519
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
520
    self.UpdateJobUnlocked(job)
521

522
    logging.debug("Added new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
523
524
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
525
526
527
528
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
529

530
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
531
  def UpdateJobUnlocked(self, job):
532
    filename = self._GetJobPath(job.id)
533
    data = serializer.DumpJson(job.Serialize(), indent=False)
534
    logging.debug("Writing job %s to %s", job.id, filename)
535
    self._WriteAndReplicateFileUnlocked(filename, data)
536
    self._CleanCacheUnlocked([job.id])
Iustin Pop's avatar
Iustin Pop committed
537

538
    # Notify waiters about potential changes
539
    job.change.notifyAll()
540

541
  def _CleanCacheUnlocked(self, exclude):
Iustin Pop's avatar
Iustin Pop committed
542
543
544
545
546
547
    """Clean the memory cache.

    The exceptions argument contains job IDs that should not be
    cleaned.

    """
548
    assert isinstance(exclude, list)
Michael Hanselmann's avatar
Michael Hanselmann committed
549

Iustin Pop's avatar
Iustin Pop committed
550
    for job in self._memcache.values():
551
      if job.id in exclude:
Iustin Pop's avatar
Iustin Pop committed
552
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
553
554
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
                                  constants.JOB_STATUS_RUNNING):
555
        logging.debug("Cleaning job %s from the cache", job.id)
Iustin Pop's avatar
Iustin Pop committed
556
557
558
559
        try:
          del self._memcache[job.id]
        except KeyError:
          pass
560

561
  @utils.LockedMethod
562
  @_RequireOpenQueue
563
564
565
566
567
568
569
570
571
572
573
574
575
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number

    """
576
577
578
    logging.debug("Waiting for changes in job %s", job_id)

    while True:
579
580
581
582
583
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        new_state = None
        break
584

585
586
587
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
588
589
590
591
592
593

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
594
595
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
596

597
598
599
600
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
601
602
        break

603
604
605
606
607
608
609
610
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
      job.change.wait()
611
612
613

    logging.debug("Job %s changed", job_id)

614
    return (job_info, log_entries)
615

616
  @utils.LockedMethod
617
  @_RequireOpenQueue
618
619
620
621
622
623
624
625
626
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
627
    job = self._LoadJobUnlocked(job_id)
628
629
630
631
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
632
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
633
634
635
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
636
637
638
639
640
641
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
642

643
  @utils.LockedMethod
644
  @_RequireOpenQueue
645
  def ArchiveJob(self, job_id):
646
647
648
649
650
651
652
653
654
655
656
657
658
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
659
660
661
662
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
663
664
665
666
667
668
      return

    try:
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)

669
      self._RenameFileUnlocked(old, new)
670
671
672
673
674
675

      logging.debug("Successfully archived job %s", job.id)
    finally:
      # Cleaning the cache because we don't know what os.rename actually did
      # and to be on the safe side.
      self._CleanCacheUnlocked([])
676

Michael Hanselmann's avatar
Michael Hanselmann committed
677
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
678
679
680
681
682
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
683
        row.append(job.CalcStatus())
684
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
685
        row.append([op.input.__getstate__() for op in job.ops])
686
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
687
        row.append([op.result for op in job.ops])
688
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
689
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
690
691
692
693
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
694
  @utils.LockedMethod
695
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
696
697
698
699
700
701
702
703
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
704
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
705

Michael Hanselmann's avatar
Michael Hanselmann committed
706
707
708
709
710
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
711

Michael Hanselmann's avatar
Michael Hanselmann committed
712
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
713

714
  @utils.LockedMethod
715
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
716
717
718
719
720
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
721

722
723
    self._queue_lock.Close()
    self._queue_lock = None