jqueue.py 41.6 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23
"""Module implementing the job queue handling.

24 25 26 27 28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29 30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33 34
import logging
import threading
35 36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56 57 58 59 60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62 63 64 65 66 67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68 69 70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
72
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
73

74 75 76 77 78 79 80
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
81

Michael Hanselmann's avatar
Michael Hanselmann committed
82
  """
83 84 85 86
  __slots__ = ["input", "status", "result", "log",
               "start_timestamp", "end_timestamp",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
87
  def __init__(self, op):
88 89 90 91 92 93
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
94 95 96 97
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
98 99
    self.start_timestamp = None
    self.end_timestamp = None
100 101 102

  @classmethod
  def Restore(cls, state):
103 104 105 106 107 108 109 110
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
111 112 113 114 115
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
116 117
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
118 119 120
    return obj

  def Serialize(self):
121 122 123 124 125 126
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
127 128 129 130 131
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
132 133
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
134
      }
135

Michael Hanselmann's avatar
Michael Hanselmann committed
136 137 138 139

class _QueuedJob(object):
  """In-memory job representation.

140 141 142 143 144 145 146 147 148 149 150 151 152
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
155 156

  """
Iustin Pop's avatar
Iustin Pop committed
157
  # pylint: disable-msg=W0212
158
  __slots__ = ["queue", "id", "ops", "log_serial",
159
               "received_timestamp", "start_timestamp", "end_timestamp",
160
               "lock_status", "change",
161 162
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
163
  def __init__(self, queue, job_id, ops):
164 165 166 167 168 169 170 171 172 173 174
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
175
    if not ops:
176
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
177 178
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
179
    self.queue = queue
180
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
181
    self.ops = [_QueuedOpCode(op) for op in ops]
182
    self.log_serial = 0
183 184 185
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
186

187 188 189
    # In-memory attributes
    self.lock_status = None

190 191
    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
192 193

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
194
  def Restore(cls, queue, state):
195 196 197 198 199 200 201 202 203 204
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
205 206 207
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
208 209 210
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
211

212 213 214
    # In-memory attributes
    obj.lock_status = None

215 216 217 218 219 220 221 222 223 224 225
    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

226 227 228
    return obj

  def Serialize(self):
229 230 231 232 233 234
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
235 236
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
237
      "ops": [op.Serialize() for op in self.ops],
238 239 240
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
241 242
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
243
  def CalcStatus(self):
244 245 246 247 248 249 250 251 252 253
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
254
          - canceling
255 256 257 258 259 260 261 262 263 264
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
265 266 267
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
268 269
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
270 271 272 273
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
274
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
275
        pass
Iustin Pop's avatar
Iustin Pop committed
276 277
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
278
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
279
        status = constants.JOB_STATUS_RUNNING
280 281 282
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
283
      elif op.status == constants.OP_STATUS_ERROR:
284 285 286
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
287
      elif op.status == constants.OP_STATUS_CANCELED:
288 289
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
290 291 292 293 294 295

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

296
  def GetLogEntries(self, newer_than):
297 298 299
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
300
    @param newer_than: if this is None, return all log entries,
301 302 303 304 305 306
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
307 308 309 310 311 312 313
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
314
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
315 316 317

    return entries

318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False

338

339
class _OpExecCallbacks(mcpu.OpExecCbBase):
340 341
  def __init__(self, queue, job, op):
    """Initializes this class.
342

343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
360 361
    """Mark the opcode as running, not lock-waiting.

362 363 364 365
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
366 367

    """
368
    self._queue.acquire()
Iustin Pop's avatar
Iustin Pop committed
369
    try:
370 371
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                 constants.OP_STATUS_CANCELING)
372

373 374 375
      # All locks are acquired by now
      self._job.lock_status = None

376
      # Cancel here if we were asked to
377
      if self._op.status == constants.OP_STATUS_CANCELING:
378 379
        raise CancelJob()

380
      self._op.status = constants.OP_STATUS_RUNNING
Iustin Pop's avatar
Iustin Pop committed
381
    finally:
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
      self._queue.release()

  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
Iustin Pop's avatar
Iustin Pop committed
399

400 401 402 403 404 405 406 407 408
    self._queue.acquire()
    try:
      self._job.log_serial += 1
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))

      self._job.change.notifyAll()
    finally:
      self._queue.release()

409 410 411 412 413 414 415 416 417
  def ReportLocks(self, msg):
    """Write locking information to the job.

    Called whenever the LU processor is waiting for a lock or has acquired one.

    """
    # Not getting the queue lock because this is a single assignment
    self._job.lock_status = msg

418 419 420 421 422

class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
423
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
424 425
    """Job executor.

426 427
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
428

429 430 431
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
432
    """
433
    logging.info("Worker %s processing job %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
434
                  self.worker_id, job.id)
435
    proc = mcpu.Processor(self.pool.queue.context, job.id)
436
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
437
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
438 439 440
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
441
          op_summary = op.input.Summary()
442 443 444 445 446 447 448 449 450
          if op.status == constants.OP_STATUS_SUCCESS:
            # this is a job that was partially completed before master
            # daemon shutdown, so it can be expected that some opcodes
            # are already completed successfully (if any did error
            # out, then the whole job should have been aborted and not
            # resubmitted for processing)
            logging.info("Op %s/%s: opcode %s already processed, skipping",
                         idx + 1, count, op_summary)
            continue
Michael Hanselmann's avatar
Michael Hanselmann committed
451
          try:
452 453
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
454 455 456

            queue.acquire()
            try:
457 458
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
459
              assert op.status == constants.OP_STATUS_QUEUED
Iustin Pop's avatar
Iustin Pop committed
460
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
461
              op.result = None
462
              op.start_timestamp = TimeStampNow()
463 464
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
465 466
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
467
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
468 469 470
            finally:
              queue.release()

471 472
            # Make sure not to hold queue lock while calling ExecOpCode
            result = proc.ExecOpCode(input_opcode,
473
                                     _OpExecCallbacks(queue, job, op))
Michael Hanselmann's avatar
Michael Hanselmann committed
474 475 476 477 478

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
479
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
480 481 482 483
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

484 485
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
486 487 488
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
489 490 491 492 493
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
494 495 496 497
                if isinstance(err, errors.GenericError):
                  op.result = errors.EncodeException(err)
                else:
                  op.result = str(err)
498
                op.end_timestamp = TimeStampNow()
499 500
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
501 502 503 504 505 506
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

507 508 509 510 511 512
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
513 514 515 516
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
517
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
518 519
      queue.acquire()
      try:
520
        try:
521
          job.lock_status = None
522
          job.end_timestamp = TimeStampNow()
523 524 525 526
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
527 528
      finally:
        queue.release()
529

530 531
      logging.info("Worker %s finished job %s, status = %s",
                   self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
532 533 534


class _JobQueueWorkerPool(workerpool.WorkerPool):
535 536 537
  """Simple class implementing a job-processing workerpool.

  """
538
  def __init__(self, queue):
539 540
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
                                              JOBQUEUE_THREADS,
Michael Hanselmann's avatar
Michael Hanselmann committed
541
                                              _JobQueueWorker)
542
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
543 544


Iustin Pop's avatar
Iustin Pop committed
545 546
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
547

Iustin Pop's avatar
Iustin Pop committed
548 549 550 551 552
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
  a '_queue_lock' argument.
553

Iustin Pop's avatar
Iustin Pop committed
554
  @warning: Use this decorator only after utils.LockedMethod!
555

Iustin Pop's avatar
Iustin Pop committed
556 557 558 559 560
  Example::
    @utils.LockedMethod
    @_RequireOpenQueue
    def Example(self):
      pass
561

Iustin Pop's avatar
Iustin Pop committed
562 563
  """
  def wrapper(self, *args, **kwargs):
Iustin Pop's avatar
Iustin Pop committed
564
    # pylint: disable-msg=W0212
Iustin Pop's avatar
Iustin Pop committed
565 566 567
    assert self._queue_lock is not None, "Queue should be open"
    return fn(self, *args, **kwargs)
  return wrapper
568 569


Iustin Pop's avatar
Iustin Pop committed
570 571
class JobQueue(object):
  """Queue used to manage the jobs.
572

Iustin Pop's avatar
Iustin Pop committed
573 574 575 576
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
577

Michael Hanselmann's avatar
Michael Hanselmann committed
578
  def __init__(self, context):
579 580 581 582 583 584 585 586 587 588 589 590
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
591
    self.context = context
592
    self._memcache = weakref.WeakValueDictionary()
593
    self._my_hostname = utils.HostInfo().name
594

Michael Hanselmann's avatar
Michael Hanselmann committed
595 596 597 598 599
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

600
    # Initialize
601
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
602

603 604 605 606
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
607

608
    # Get initial list of nodes
609
    self._nodes = dict((n.name, n.primary_ip)
610 611
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
612 613 614

    # Remove master node
    try:
615
      del self._nodes[self._my_hostname]
616
    except KeyError:
617
      pass
618 619 620

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
621
    # Setup worker pool
622
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
623
    try:
624 625 626 627
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
628 629 630
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
631
        jobs_count = len(all_job_ids)
632 633 634
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
635 636
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
637
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
638
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
639 640 641 642
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

643 644 645
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
646

647
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
648

649 650
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
651

652
          elif status in (constants.JOB_STATUS_RUNNING,
653 654
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
655 656
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
657 658
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                    "Unclean master daemon shutdown")
659 660
            finally:
              self.UpdateJobUnlocked(job)
661 662

        logging.info("Job queue inspection finished")
663 664 665 666 667
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
668

669 670
  @utils.LockedMethod
  @_RequireOpenQueue
671 672 673 674 675 676 677 678
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
679
    assert node_name != self._my_hostname
680

681
    # Clean queue directory on added node
682
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
683
    msg = result.fail_msg
684 685 686
    if msg:
      logging.warning("Cannot cleanup queue directory on node %s: %s",
                      node_name, msg)
687

688 689 690 691 692 693
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

694 695
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
696

697 698 699 700
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
701
      # Read file content
702
      content = utils.ReadFile(file_name)
703

704 705 706
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
707
      msg = result[node_name].fail_msg
708 709 710
      if msg:
        logging.error("Failed to upload file %s to node %s: %s",
                      file_name, node_name, msg)
711

712
    self._nodes[node_name] = node.primary_ip
713 714 715 716

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
717 718 719 720 721 722
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
723
    try:
724
      # The queue is removed by the "leave node" RPC call.
725
      del self._nodes[node_name]
726
    except KeyError:
727 728
      pass

729 730
  @staticmethod
  def _CheckRpcResult(result, nodes, failmsg):
731 732 733 734
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
735
    log the case when more than half of the nodes fails.
736 737 738 739 740 741 742 743

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
744 745 746 747
    failed = []
    success = []

    for node in nodes:
748
      msg = result[node].fail_msg
749
      if msg:
750
        failed.append(node)
751 752
        logging.error("RPC call %s (%s) failed on node %s: %s",
                      result[node].call, failmsg, node, msg)
753 754
      else:
        success.append(node)
755 756 757 758 759 760

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

761 762 763
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

764 765 766 767
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

768 769 770 771 772
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

773 774 775
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

776 777 778 779 780 781 782 783
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

784 785 786
    """
    utils.WriteFile(file_name, data=data)

787
    names, addrs = self._GetNodeIp()
788
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
789 790
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
791

792
  def _RenameFilesUnlocked(self, rename):
793 794 795 796 797
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

798 799
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
800 801

    """
802
    # Rename them locally
803 804
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
805

806 807 808 809
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
810

811 812
  @staticmethod
  def _FormatJobID(job_id):
813 814 815 816 817 818 819 820 821 822 823 824
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
825 826 827 828 829 830 831
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

832 833 834 835 836 837 838 839 840 841 842 843
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

Iustin Pop's avatar
Iustin Pop committed
844
  def _NewSerialsUnlocked(self, count):
845 846 847 848
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

Iustin Pop's avatar
Iustin Pop committed
849 850
    @type count: integer
    @param count: how many serials to return
851 852
    @rtype: str
    @return: a string representing the job identifier.
853 854

    """
Iustin Pop's avatar
Iustin Pop committed
855
    assert count > 0
856
    # New number
Iustin Pop's avatar
Iustin Pop committed
857
    serial = self._last_serial + count
858 859

    # Write to file
860 861
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
862

Iustin Pop's avatar
Iustin Pop committed
863 864
    result = [self._FormatJobID(v)
              for v in range(self._last_serial, serial + 1)]
865 866 867
    # Keep it only if we were able to write the file
    self._last_serial = serial

Iustin Pop's avatar
Iustin Pop committed
868
    return result
869

Michael Hanselmann's avatar
Michael Hanselmann committed
870 871
  @staticmethod
  def _GetJobPath(job_id):
872 873 874 875 876 877 878 879
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
880 881
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

882 883
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
884 885 886 887 888 889 890 891
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
892 893
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
894

Michael Hanselmann's avatar
Michael Hanselmann committed
895 896
  @classmethod
  def _ExtractJobID(cls, name):
897 898 899 900 901 902 903 904 905 906
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
907
    m = cls._RE_JOB_FILE.match(name)
908 909 910 911 912
    if m:
      return m.group(1)
    else:
      return None

913 914 915 916 917 918
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
919 920 921 922
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

923 924 925
    @rtype: list
    @return: the list of job IDs

926
    """
927
    # pylint: disable-msg=W0613
928
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
929
    jlist = utils.NiceSort(jlist)
930
    return jlist
931

932
  def _ListJobFiles(self):
933 934 935 936 937 938
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
939 940 941
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

942
  def _LoadJobUnlocked(self, job_id):
943 944 945 946 947 948 949 950 951 952 953
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
954 955
    job = self._memcache.get(job_id, None)
    if job:
956
      logging.debug("Found job %s in memcache", job_id)
957
      return job
Iustin Pop's avatar
Iustin Pop committed
958

959
    filepath = self._GetJobPath(job_id)
960 961
    logging.debug("Loading job from %s", filepath)
    try:
962
      raw_data = utils.ReadFile(filepath)
963 964 965 966
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
967 968

    data = serializer.LoadJson(raw_data)
969

970 971
    try:
      job = _QueuedJob.Restore(self, data)
Iustin Pop's avatar
Iustin Pop committed
972
    except Exception, err: # pylint: disable-msg=W0703
973 974 975 976 977 978 979
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
980
        self.