jqueue.py 20 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
24
25
26
27
28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31
32
import logging
import threading
33
34
import errno
import re
35
import time
36
import weakref
Iustin Pop's avatar
Iustin Pop committed
37

Michael Hanselmann's avatar
Michael Hanselmann committed
38
from ganeti import constants
39
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import workerpool
41
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
47
48
49
50


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
51

52
53
54
55
def TimeStampNow():
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
56
57
58
class _QueuedOpCode(object):
  """Encasulates an opcode object.

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

Michael Hanselmann's avatar
Michael Hanselmann committed
62
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
63
64
65
66
67
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
68
69
    self.start_timestamp = None
    self.end_timestamp = None
70
71
72

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
73
74
75
76
77
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
78
79
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
80
81
82
    return obj

  def Serialize(self):
83
84
85
86
87
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
88
89
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
90
      }
91

Michael Hanselmann's avatar
Michael Hanselmann committed
92
93
94
95

class _QueuedJob(object):
  """In-memory job representation.

96
97
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
98
99

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
100
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
101
102
103
104
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
105
    self.queue = queue
106
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
107
108
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
109
110
111
112
    self.log_serial = 0

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
113
114

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
115
116
117
118
119
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
120
121
122
123
124
125
126
127
128
129
130
131

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

132
133
134
135
136
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
137
      "ops": [op.Serialize() for op in self.ops],
138
      "run_op_index": self.run_op_index,
139
140
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
141
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
142
143
144
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
145
146
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
147
148
149
150
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
151
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
152
        pass
Michael Hanselmann's avatar
Michael Hanselmann committed
153
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
154
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
155
      elif op.status == constants.OP_STATUS_ERROR:
156
157
158
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
159
      elif op.status == constants.OP_STATUS_CANCELED:
160
161
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
162
163
164
165
166
167

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

168
169
170
171
172
173
174
175
176
177
178
179
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

180

Michael Hanselmann's avatar
Michael Hanselmann committed
181
182
class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
183
184
    """Job executor.

185
186
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
187
188
189
190

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
191
    proc = mcpu.Processor(self.pool.queue.context)
Michael Hanselmann's avatar
Michael Hanselmann committed
192
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
193
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
194
195
196
197
198
199
200
201
202
203
204
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
              op.status = constants.OP_STATUS_RUNNING
              op.result = None
205
              op.start_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
206
207
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
208
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
209
210
211
            finally:
              queue.release()

212
            def _Log(*args):
213
214
215
216
217
218
219
220
221
222
223
224
225
226
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
227

228
              queue.acquire()
229
              try:
230
231
232
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

233
234
                job.change.notifyAll()
              finally:
235
                queue.release()
236

237
            # Make sure not to hold lock while _Log is called
238
            result = proc.ExecOpCode(input_opcode, _Log)
Michael Hanselmann's avatar
Michael Hanselmann committed
239
240
241
242
243

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
244
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
245
246
247
248
249
250
251
252
253
254
255
256
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
257
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
258
259
260
261
262
263
264
265
266
267
268
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
269
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
270
271
      queue.acquire()
      try:
272
273
274
275
276
277
        try:
          job.run_op_idx = -1
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
278
279
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
280
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
281
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
282
283
284


class _JobQueueWorkerPool(workerpool.WorkerPool):
285
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
286
287
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
288
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
289
290


Michael Hanselmann's avatar
Michael Hanselmann committed
291
class JobQueue(object):
292
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293

294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
310
      assert self._queue_lock is not None, "Queue should be open"
311
312
313
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
314
  def __init__(self, context):
315
    self.context = context
316
    self._memcache = weakref.WeakValueDictionary()
317
    self._my_hostname = utils.HostInfo().name
318

Michael Hanselmann's avatar
Michael Hanselmann committed
319
320
321
322
323
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

324
    # Initialize
325
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326

327
328
329
330
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
331

332
    # Get initial list of nodes
333
334
335
336
337
338
339
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
340
341
342

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
343
    # Setup worker pool
344
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

        elif status in (constants.JOB_STATUS_RUNNING, ):
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

367
368
369
370
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
371

372
373
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
374

375
376
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377

378
379
380
381
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
382
383
384
385
386
387
388
389
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
390
391
392
393
394
395
396
397
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
398
    try:
399
400
401
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
402
403
      pass

404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
  def _CheckRpcResult(self, result, nodes, failmsg):
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

422
423
424
425
426
427
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

428
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
429
430
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
431

432
433
434
435
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
436
437
    self._CheckRpcResult(result, self._nodes,
                         "Moving %s to %s" % (old, new))
438

Michael Hanselmann's avatar
Michael Hanselmann committed
439
440
441
442
443
444
445
446
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

447
  def _NewSerialUnlocked(self):
448
449
450
451
452
453
454
455
456
457
458
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
459
460
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
461
462
463
464

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
465
    return self._FormatJobID(serial)
466

Michael Hanselmann's avatar
Michael Hanselmann committed
467
468
  @staticmethod
  def _GetJobPath(job_id):
469
470
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
471
472
  @staticmethod
  def _GetArchivedJobPath(job_id):
473
474
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
475
476
477
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
478
479
480
481
482
    if m:
      return m.group(1)
    else:
      return None

483
484
485
486
487
488
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
489
490
491
492
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

493
    """
494
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
495
496
    jlist.sort()
    return jlist
497

498
499
500
501
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

502
  def _LoadJobUnlocked(self, job_id):
503
504
    job = self._memcache.get(job_id, None)
    if job:
505
      logging.debug("Found job %s in memcache", job_id)
506
      return job
Iustin Pop's avatar
Iustin Pop committed
507

508
    filepath = self._GetJobPath(job_id)
509
510
511
512
513
514
515
516
517
518
519
520
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
521
522
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
523
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
524
    return job
525
526

  def _GetJobsUnlocked(self, job_ids):
527
528
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
529

530
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
531
532

  @utils.LockedMethod
533
  @_RequireOpenQueue
534
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
535
    """Create and store a new job.
536

Michael Hanselmann's avatar
Michael Hanselmann committed
537
538
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
539
540

    @type ops: list
541
    @param ops: The list of OpCodes that will become the new job.
542
543

    """
544
    # Get job identifier
545
    job_id = self._NewSerialUnlocked()
546
547
548
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
549
    self.UpdateJobUnlocked(job)
550

551
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
552
553
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
554
555
556
557
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
558

559
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
560
  def UpdateJobUnlocked(self, job):
561
    filename = self._GetJobPath(job.id)
562
    data = serializer.DumpJson(job.Serialize(), indent=False)
563
    logging.debug("Writing job %s to %s", job.id, filename)
564
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
565

566
    # Notify waiters about potential changes
567
    job.change.notifyAll()
568

569
  @utils.LockedMethod
570
  @_RequireOpenQueue
571
572
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
573
574
575
576
577
578
579
580
581
582
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
583
584
    @type timeout: float
    @param timeout: maximum time to wait
585
586

    """
587
    logging.debug("Waiting for changes in job %s", job_id)
588
    end_time = time.time() + timeout
589
    while True:
590
591
592
593
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

594
595
596
597
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
598

599
600
601
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
602
603
604
605
606
607

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
608
609
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610

611
612
613
614
      if status not in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_RUNNING):
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
615
616
        break

617
618
619
620
621
622
623
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
624
      job.change.wait(delta_time)
625
626
627

    logging.debug("Job %s changed", job_id)

628
    return (job_info, log_entries)
629

630
  @utils.LockedMethod
631
  @_RequireOpenQueue
632
633
634
635
636
637
638
639
640
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
641
    job = self._LoadJobUnlocked(job_id)
642
643
644
645
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
646
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
647
648
649
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
650
651
652
653
654
655
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
656

657
  @utils.LockedMethod
658
  @_RequireOpenQueue
659
  def ArchiveJob(self, job_id):
660
661
662
663
664
665
666
667
668
669
670
671
672
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    logging.debug("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
673
674
675
676
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
677
678
      return

679
680
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
681

682
    self._RenameFileUnlocked(old, new)
683

684
    logging.debug("Successfully archived job %s", job.id)
685

Michael Hanselmann's avatar
Michael Hanselmann committed
686
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
687
688
689
690
691
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
692
        row.append(job.CalcStatus())
693
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
694
        row.append([op.input.__getstate__() for op in job.ops])
695
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
696
        row.append([op.result for op in job.ops])
697
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
698
        row.append([op.status for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
699
700
701
702
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
703
  @utils.LockedMethod
704
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
705
706
707
708
709
710
711
712
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
713
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
714

Michael Hanselmann's avatar
Michael Hanselmann committed
715
716
717
718
719
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
720

Michael Hanselmann's avatar
Michael Hanselmann committed
721
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
722

723
  @utils.LockedMethod
724
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
725
726
727
728
729
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
730

731
732
    self._queue_lock.Close()
    self._queue_lock = None