jqueue.py 19.7 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
24
25
26
27
28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31
32
import logging
import threading
33
34
import errno
import re
35
import time
36
import weakref
Iustin Pop's avatar
Iustin Pop committed
37

Michael Hanselmann's avatar
Michael Hanselmann committed
38
from ganeti import constants
39
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import workerpool
41
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
47
48
49
50


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
51

52
53
54
55
def TimeStampNow():
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
56
57
58
class _QueuedOpCode(object):
  """Encasulates an opcode object.

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

Michael Hanselmann's avatar
Michael Hanselmann committed
62
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
63
64
65
66
67
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
68
69
    self.start_timestamp = None
    self.end_timestamp = None
70
71
72

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
73
74
75
76
77
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
78
79
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
80
81
82
    return obj

  def Serialize(self):
83
84
85
86
87
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
88
89
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
90
      }
91

Michael Hanselmann's avatar
Michael Hanselmann committed
92
93
94
95

class _QueuedJob(object):
  """In-memory job representation.

96
97
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
98
99

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
100
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
101
102
103
104
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
105
    self.queue = queue
106
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
107
108
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
109
110
111
112
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
113
114

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
115
116
117
118
119
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
120
121
122
123
124
125
126
127
128
129
130
131

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

132
133
134
135
136
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
137
      "ops": [op.Serialize() for op in self.ops],
138
      "run_op_index": self.run_op_index,
139
140
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
141
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
142
143
144
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
145
146
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
147
148
149
150
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
151
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
152
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
153
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
154
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
155
      elif op.status == constants.OP_STATUS_ERROR:
156
157
158
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
159
      elif op.status == constants.OP_STATUS_CANCELED:
160
161
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
162
163
164
165
166
167

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

168
169
170
171
172
173
174
175
176
177
178
179
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

180

Michael Hanselmann's avatar
Michael Hanselmann committed
181
182
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
183
184
    """Job executor.

185
186
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
187
188
189
190

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
191
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
192
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
193
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
194
195
196
197
198
199
200
201
202
203
204
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
205
              op.start_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
206
207
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
208
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
209
210
211
            finally:
              queue.release()

212
            def _Log(*args):
213
214
215
216
217
218
219
220
221
222
223
224
225
226
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
227

228
              queue.acquire()
229
              try:
230
231
232
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

233
234
                job.change.notifyAll()
              finally:
235
                queue.release()
236

237
            # Make sure not to hold lock while _Log is called
238
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
239
240
241
242
243

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
244
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
245
246
247
248
249
250
251
252
253
254
255
256
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
257
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
258
259
260
261
262
263
264
265
266
267
268
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
269
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
270
271
      queue.acquire()
      try:
272
273
274
275
276
277
        try:
          job.run_op_idx = -1
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
278
279
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
280
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
281
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
282
283
284


class _JobQueueWorkerPool(workerpool.WorkerPool):
285
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
286
287
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
288
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
289
290


Michael Hanselmann's avatar
Michael Hanselmann committed
291
class JobQueue(object):
292
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293

294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
310
      assert self._queue_lock is not None, "Queue should be open"
311
312
313
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
314
  def __init__(self, context):
315
    self.context = context
316
    self._memcache = weakref.WeakValueDictionary()
317
    self._my_hostname = utils.HostInfo().name
318

Michael Hanselmann's avatar
Michael Hanselmann committed
319
320
321
322
323
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

324
    # Initialize
325
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326

327
328
329
330
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
331

332
    # Get initial list of nodes
333
334
335
336
337
338
339
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
340
341
342

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
343
    # Setup worker pool
344
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

367
368
369
370
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
371

372
373
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
374

375
376
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377

378
379
380
381
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
382
383
384
385
386
387
388
389
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
390
391
392
393
394
395
396
397
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
398
    try:
399
400
401
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
402
403
      pass

404
405
406
407
408
409
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

410
    failed_nodes = 0
411
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
412
    for node in self._nodes:
413
414
415
416
417
418
      if not result[node]:
        failed_nodes += 1
        logging.error("Copy of job queue file to node %s failed", node)

    # TODO: check failed_nodes

419
420
421
422
423
424
425
426
427
428
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
    for node in self._nodes:
      if not result[node]:
        logging.error("Moving %s to %s failed on %s", old, new, node)

    # TODO: check failed nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
429
430
431
432
433
434
435
436
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

437
  def _NewSerialUnlocked(self):
438
439
440
441
442
443
444
445
446
447
448
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
449
450
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
451
452
453
454

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
455
    return self._FormatJobID(serial)
456

Michael Hanselmann's avatar
Michael Hanselmann committed
457
458
  @staticmethod
  def _GetJobPath(job_id):
459
460
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
461
462
  @staticmethod
  def _GetArchivedJobPath(job_id):
463
464
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
465
466
467
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
468
469
470
471
472
    if m:
      return m.group(1)
    else:
      return None

473
474
475
476
477
478
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
479
480
481
482
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

483
    """
484
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
485
486
    jlist.sort()
    return jlist
487

488
489
490
491
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

492
  def _LoadJobUnlocked(self, job_id):
493
494
    job = self._memcache.get(job_id, None)
    if job:
495
      logging.debug("Found job %s in memcache", job_id)
496
      return job
Iustin Pop's avatar
Iustin Pop committed
497

498
    filepath = self._GetJobPath(job_id)
499
500
501
502
503
504
505
506
507
508
509
510
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
511
512
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
513
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
514
    return job
515
516

  def _GetJobsUnlocked(self, job_ids):
517
518
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
519

520
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
521
522

  @utils.LockedMethod
523
  @_RequireOpenQueue
524
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
525
    """Create and store a new job.
526

Michael Hanselmann's avatar
Michael Hanselmann committed
527
528
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
529
530

    @type ops: list
531
    @param ops: The list of OpCodes that will become the new job.
532
533

    """
534
    # Get job identifier
535
    job_id = self._NewSerialUnlocked()
536
537
538
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
539
    self.UpdateJobUnlocked(job)
540

541
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
542
543
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
544
545
546
547
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
548

549
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
550
  def UpdateJobUnlocked(self, job):
551
    filename = self._GetJobPath(job.id)
552
    data = serializer.DumpJson(job.Serialize(), indent=False)
553
    logging.debug("Writing job %s to %s", job.id, filename)
554
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
555

556
    # Notify waiters about potential changes
557
    job.change.notifyAll()
558

559
  @utils.LockedMethod
560
  @_RequireOpenQueue
561
562
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
563
564
565
566
567
568
569
570
571
572
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
573
574
    @type timeout: float
    @param timeout: maximum time to wait
575
576

    """
577
    logging.debug("Waiting for changes in job %s", job_id)
578
    end_time = time.time() + timeout
579
    while True:
580
581
582
583
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

584
585
586
587
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
588

589
590
591
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
592
593
594
595
596
597

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
598
599
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600

601
602
603
604
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
605
606
        break

607
608
609
610
611
612
613
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
614
      job.change.wait(delta_time)
615
616
617

    logging.debug("Job %s changed", job_id)

618
    return (job_info, log_entries)
619

620
  @utils.LockedMethod
621
  @_RequireOpenQueue
622
623
624
625
626
627
628
629
630
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
631
    job = self._LoadJobUnlocked(job_id)
632
633
634
635
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
636
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
637
638
639
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
640
641
642
643
644
645
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
646

647
  @utils.LockedMethod
648
  @_RequireOpenQueue
649
  def ArchiveJob(self, job_id):
650
651
652
653
654
655
656
657
658
659
660
661
662
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
663
664
665
666
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
667
668
      return

669
670
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
671

672
    self._RenameFileUnlocked(old, new)
673

674
    logging.debug("Successfully archived job %s", job.id)
675

Michael Hanselmann's avatar
Michael Hanselmann committed
676
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
677
678
679
680
681
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
682
        row.append(job.CalcStatus())
683
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
684
        row.append([op.input.__getstate__() for op in job.ops])
685
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
686
        row.append([op.result for op in job.ops])
687
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
688
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
689
690
691
692
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
693
  @utils.LockedMethod
694
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
695
696
697
698
699
700
701
702
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
703
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
704

Michael Hanselmann's avatar
Michael Hanselmann committed
705
706
707
708
709
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
710

Michael Hanselmann's avatar
Michael Hanselmann committed
711
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
712

713
  @utils.LockedMethod
714
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
715
716
717
718
719
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
720

721
722
    self._queue_lock.Close()
    self._queue_lock = None