jqueue.py 23.3 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23 24 25 26 27 28
"""Module implementing the job queue handling.

Locking:
There's a single, large lock in the JobQueue class. It's used by all other
classes in this module.

"""
Iustin Pop's avatar
Iustin Pop committed
29

30
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
31 32
import logging
import threading
33 34
import errno
import re
35
import time
36
import weakref
Iustin Pop's avatar
Iustin Pop committed
37

Michael Hanselmann's avatar
Michael Hanselmann committed
38
from ganeti import constants
39
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import workerpool
41
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
42
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
47 48


49
JOBQUEUE_THREADS = 25
Michael Hanselmann's avatar
Michael Hanselmann committed
50

Iustin Pop's avatar
Iustin Pop committed
51

52 53 54 55
def TimeStampNow():
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
56 57 58
class _QueuedOpCode(object):
  """Encasulates an opcode object.

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

Michael Hanselmann's avatar
Michael Hanselmann committed
62
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
63 64 65 66 67
  def __init__(self, op):
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
68 69
    self.start_timestamp = None
    self.end_timestamp = None
70 71 72

  @classmethod
  def Restore(cls, state):
Michael Hanselmann's avatar
Michael Hanselmann committed
73 74 75 76 77
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
78 79
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
80 81 82
    return obj

  def Serialize(self):
83 84 85 86 87
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
88 89
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
90
      }
91

Michael Hanselmann's avatar
Michael Hanselmann committed
92 93 94 95

class _QueuedJob(object):
  """In-memory job representation.

96 97
  This is what we use to track the user-submitted jobs. Locking must be taken
  care of by users of this class.
Michael Hanselmann's avatar
Michael Hanselmann committed
98 99

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
100
  def __init__(self, queue, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
101 102 103 104
    if not ops:
      # TODO
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
105
    self.queue = queue
106
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
107 108
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
109
    self.log_serial = 0
110 111 112
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
113 114 115

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
116 117

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
118 119 120 121 122
  def Restore(cls, queue, state):
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
123 124 125
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
126 127 128 129 130 131 132 133 134 135 136 137

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

138 139 140 141 142
    return obj

  def Serialize(self):
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145 146 147
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
148 149
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
150
  def CalcStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
151 152 153
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
154 155
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
156 157 158 159
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
160
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
161
        pass
Iustin Pop's avatar
Iustin Pop committed
162 163
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
164
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
165
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
166
      elif op.status == constants.OP_STATUS_ERROR:
167 168 169
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
170
      elif op.status == constants.OP_STATUS_CANCELED:
171 172
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
173 174 175 176 177 178

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

179 180 181 182 183 184 185 186 187 188 189 190
  def GetLogEntries(self, newer_than):
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))

    return entries

191

Michael Hanselmann's avatar
Michael Hanselmann committed
192
class _JobQueueWorker(workerpool.BaseWorker):
Iustin Pop's avatar
Iustin Pop committed
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
211
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
212 213
    """Job executor.

214 215
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
216 217 218 219

    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
220
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
221
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
222
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
223 224 225 226 227 228 229 230 231
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
232
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
233
              op.result = None
234
              op.start_timestamp = TimeStampNow()
235 236
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
237 238
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
239
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
240 241 242
            finally:
              queue.release()

243
            def _Log(*args):
244 245 246 247 248 249 250 251 252 253 254 255 256 257
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
258

259
              queue.acquire()
260
              try:
261 262 263
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

264 265
                job.change.notifyAll()
              finally:
266
                queue.release()
267

268
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
269 270
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
271 272 273 274 275

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
276
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
277 278 279 280 281 282 283 284 285 286 287 288
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
289
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
290 291 292 293 294 295 296 297 298 299 300
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
301
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
302 303
      queue.acquire()
      try:
304 305
        try:
          job.run_op_idx = -1
306
          job.end_timestamp = TimeStampNow()
307 308 309 310
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
311 312
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
313
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
314
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
315 316 317


class _JobQueueWorkerPool(workerpool.WorkerPool):
318
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
319 320
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
321
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
322 323


Michael Hanselmann's avatar
Michael Hanselmann committed
324
class JobQueue(object):
325
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
326

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

    This function should be used for all "public" functions. That is, functions
    usually called from other classes.

    Important: Use this decorator only after utils.LockedMethod!

    Example:
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
343
      assert self._queue_lock is not None, "Queue should be open"
344 345 346
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
347
  def __init__(self, context):
348
    self.context = context
349
    self._memcache = weakref.WeakValueDictionary()
350
    self._my_hostname = utils.HostInfo().name
351

Michael Hanselmann's avatar
Michael Hanselmann committed
352 353 354 355 356
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

357
    # Initialize
358
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
359

360 361 362 363
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
364

365
    # Get initial list of nodes
366 367 368 369 370 371 372
    self._nodes = set(self.context.cfg.GetNodeList())

    # Remove master node
    try:
      self._nodes.remove(self._my_hostname)
    except ValueError:
      pass
373 374 375

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
376
    # Setup worker pool
377
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
378 379 380 381 382 383 384 385 386 387 388

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

Iustin Pop's avatar
Iustin Pop committed
389 390
        elif status in (constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
Michael Hanselmann's avatar
Michael Hanselmann committed
391 392 393 394 395 396 397 398 399 400
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

401 402 403 404
  @utils.LockedMethod
  @_RequireOpenQueue
  def AddNode(self, node_name):
    assert node_name != self._my_hostname
405

406 407
    # Clean queue directory on added node
    rpc.call_jobqueue_purge(node_name)
408

409 410
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
411

412 413 414 415
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
416 417 418 419 420 421 422 423
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

      result = rpc.call_jobqueue_update([node_name], file_name, content)
424 425 426 427 428 429 430 431
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

    self._nodes.add(node_name)

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
432
    try:
433 434 435
      # The queue is removed by the "leave node" RPC call.
      self._nodes.remove(node_name)
    except KeyError:
436 437
      pass

438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  def _CheckRpcResult(self, result, nodes, failmsg):
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

456 457 458 459 460 461
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

    """
    utils.WriteFile(file_name, data=data)

462
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
463 464
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
465

466 467 468 469
  def _RenameFileUnlocked(self, old, new):
    os.rename(old, new)

    result = rpc.call_jobqueue_rename(self._nodes, old, new)
470 471
    self._CheckRpcResult(result, self._nodes,
                         "Moving %s to %s" % (old, new))
472

Michael Hanselmann's avatar
Michael Hanselmann committed
473 474 475 476 477 478 479 480
  def _FormatJobID(self, job_id):
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

481
  def _NewSerialUnlocked(self):
482 483 484 485 486 487 488 489 490 491 492
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
493 494
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
495 496 497 498

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
499
    return self._FormatJobID(serial)
500

Michael Hanselmann's avatar
Michael Hanselmann committed
501 502
  @staticmethod
  def _GetJobPath(job_id):
503 504
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
505 506
  @staticmethod
  def _GetArchivedJobPath(job_id):
507 508
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
509 510 511
  @classmethod
  def _ExtractJobID(cls, name):
    m = cls._RE_JOB_FILE.match(name)
512 513 514 515 516
    if m:
      return m.group(1)
    else:
      return None

517 518 519 520 521 522
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
523 524 525 526
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

527
    """
528
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
529
    jlist = utils.NiceSort(jlist)
530
    return jlist
531

532 533 534 535
  def _ListJobFiles(self):
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

536
  def _LoadJobUnlocked(self, job_id):
537 538
    job = self._memcache.get(job_id, None)
    if job:
539
      logging.debug("Found job %s in memcache", job_id)
540
      return job
Iustin Pop's avatar
Iustin Pop committed
541

542
    filepath = self._GetJobPath(job_id)
543 544 545 546 547 548 549 550 551 552 553 554
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

Iustin Pop's avatar
Iustin Pop committed
555 556
    job = _QueuedJob.Restore(self, data)
    self._memcache[job_id] = job
557
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
558
    return job
559 560

  def _GetJobsUnlocked(self, job_ids):
561 562
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
563

564
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
565 566

  @utils.LockedMethod
567
  @_RequireOpenQueue
568
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
569
    """Create and store a new job.
570

Michael Hanselmann's avatar
Michael Hanselmann committed
571 572
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
573 574

    @type ops: list
575
    @param ops: The list of OpCodes that will become the new job.
576 577

    """
578
    # Get job identifier
579
    job_id = self._NewSerialUnlocked()
580 581 582
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
583
    self.UpdateJobUnlocked(job)
584

585
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
586 587
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
588 589 590 591
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
592

593
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
594
  def UpdateJobUnlocked(self, job):
595
    filename = self._GetJobPath(job.id)
596
    data = serializer.DumpJson(job.Serialize(), indent=False)
597
    logging.debug("Writing job %s to %s", job.id, filename)
598
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
599

600
    # Notify waiters about potential changes
601
    job.change.notifyAll()
602

603
  @utils.LockedMethod
604
  @_RequireOpenQueue
605 606
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
607 608 609 610 611 612 613 614 615 616
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
617 618
    @type timeout: float
    @param timeout: maximum time to wait
619 620

    """
621
    logging.debug("Waiting for changes in job %s", job_id)
622
    end_time = time.time() + timeout
623
    while True:
624 625 626 627
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

628 629 630 631
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
632

633 634 635
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
636 637 638 639 640 641

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
642 643
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
644

645
      if status not in (constants.JOB_STATUS_QUEUED,
Iustin Pop's avatar
Iustin Pop committed
646 647
                        constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
648 649
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
650 651
        break

652 653 654 655 656 657 658
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
659
      job.change.wait(delta_time)
660 661 662

    logging.debug("Job %s changed", job_id)

663
    return (job_info, log_entries)
664

665
  @utils.LockedMethod
666
  @_RequireOpenQueue
667 668 669 670 671 672 673 674 675
  def CancelJob(self, job_id):
    """Cancels a job.

    @type job_id: string
    @param job_id: Job ID of job to be cancelled.

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
676
    job = self._LoadJobUnlocked(job_id)
677 678 679 680
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
681
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
682 683 684
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
685 686 687 688 689 690
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
691

692
  @_RequireOpenQueue
Iustin Pop's avatar
Iustin Pop committed
693
  def _ArchiveJobUnlocked(self, job_id):
694 695 696 697 698 699
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
Iustin Pop's avatar
Iustin Pop committed
700
    logging.info("Archiving job %s", job_id)
701 702 703 704 705 706

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
707 708 709 710
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
711 712
      return

713 714
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
715

716
    self._RenameFileUnlocked(old, new)
717

718
    logging.debug("Successfully archived job %s", job.id)
719

Iustin Pop's avatar
Iustin Pop committed
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764
  @utils.LockedMethod
  @_RequireOpenQueue
  def ArchiveJob(self, job_id):
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    return self._ArchiveJobUnlocked(job_id)

  @utils.LockedMethod
  @_RequireOpenQueue
  def AutoArchiveJobs(self, age):
    """Archives all jobs based on age.

    The method will archive all jobs which are older than the age
    parameter. For jobs that don't have an end timestamp, the start
    timestamp will be considered. The special '-1' age will cause
    archival of all jobs (that are not running or queued).

    @type age: int
    @param age: the minimum age in seconds

    """
    logging.info("Archiving jobs with age more than %s seconds", age)

    now = time.time()
    for jid in self._GetJobIDsUnlocked(archived=False):
      job = self._LoadJobUnlocked(jid)
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
                                  constants.OP_STATUS_ERROR,
                                  constants.OP_STATUS_CANCELED):
        continue
      if job.end_timestamp is None:
        if job.start_timestamp is None:
          job_age = job.received_timestamp
        else:
          job_age = job.start_timestamp
      else:
        job_age = job.end_timestamp

      if age == -1 or now - job_age[0] > age:
        self._ArchiveJobUnlocked(jid)

Michael Hanselmann's avatar
Michael Hanselmann committed
765
  def _GetJobInfoUnlocked(self, job, fields):
Michael Hanselmann's avatar
Michael Hanselmann committed
766 767 768 769 770
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
771
        row.append(job.CalcStatus())
772
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
773
        row.append([op.input.__getstate__() for op in job.ops])
774
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
775
        row.append([op.result for op in job.ops])
776
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
777
        row.append([op.status for op in job.ops])
778 779
      elif fname == "oplog":
        row.append([op.log for op in job.ops])
780 781 782 783 784 785 786 787 788 789
      elif fname == "opstart":
        row.append([op.start_timestamp for op in job.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in job.ops])
      elif fname == "received_ts":
        row.append(job.received_timestamp)
      elif fname == "start_ts":
        row.append(job.start_timestamp)
      elif fname == "end_ts":
        row.append(job.end_timestamp)
790 791
      elif fname == "summary":
        row.append([op.input.Summary() for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
792 793 794 795
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
796
  @utils.LockedMethod
797
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
798 799 800 801 802 803 804 805
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
806
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
807

Michael Hanselmann's avatar
Michael Hanselmann committed
808 809 810 811 812
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
813

Michael Hanselmann's avatar
Michael Hanselmann committed
814
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
815

816
  @utils.LockedMethod
817
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
818 819 820 821 822
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
823

824 825
    self._queue_lock.Close()
    self._queue_lock = None