jqueue.py 37.7 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22 23
"""Module implementing the job queue handling.

24 25 26 27 28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29 30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33 34
import logging
import threading
35 36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56 57 58 59 60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62 63 64 65 66 67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68 69 70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71 72 73
class _QueuedOpCode(object):
  """Encasulates an opcode object.

74 75 76 77 78 79 80
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
81

Michael Hanselmann's avatar
Michael Hanselmann committed
82
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
83
  def __init__(self, op):
84 85 86 87 88 89
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
90 91 92 93
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
94 95
    self.start_timestamp = None
    self.end_timestamp = None
96 97 98

  @classmethod
  def Restore(cls, state):
99 100 101 102 103 104 105 106
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
107 108 109 110 111
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
112 113
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
114 115 116
    return obj

  def Serialize(self):
117 118 119 120 121 122
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
123 124 125 126 127
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
128 129
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
130
      }
131

Michael Hanselmann's avatar
Michael Hanselmann committed
132 133 134 135

class _QueuedJob(object):
  """In-memory job representation.

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type run_op_index: int
  @ivar run_op_index: the currently executing opcode, or -1 if
      we didn't yet start executing
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
153 154

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
155
  def __init__(self, queue, job_id, ops):
156 157 158 159 160 161 162 163 164 165 166
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
167
    if not ops:
168
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
169 170
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
171
    self.queue = queue
172
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
173 174
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
175
    self.log_serial = 0
176 177 178
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
179 180 181

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
182 183

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
184
  def Restore(cls, queue, state):
185 186 187 188 189 190 191 192 193 194
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
195 196 197 198
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
199 200 201
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
202 203 204 205 206 207 208 209 210 211 212 213

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

214 215 216
    return obj

  def Serialize(self):
217 218 219 220 221 222
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
223 224
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227 228 229
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
230 231
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
232
  def CalcStatus(self):
233 234 235 236 237 238 239 240 241 242
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
243
          - canceling
244 245 246 247 248 249 250 251 252 253
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
254 255 256
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
257 258
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
259 260 261 262
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
263
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
264
        pass
Iustin Pop's avatar
Iustin Pop committed
265 266
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
267
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
268
        status = constants.JOB_STATUS_RUNNING
269 270 271
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
272
      elif op.status == constants.OP_STATUS_ERROR:
273 274 275
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
276
      elif op.status == constants.OP_STATUS_CANCELED:
277 278
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
279 280 281 282 283 284

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

285
  def GetLogEntries(self, newer_than):
286 287 288 289 290 291 292 293 294 295
    """Selectively returns the log entries.

    @type newer_than: None or int
    @param newer_than: if this is None, return all log enties,
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
296 297 298 299 300 301 302
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304 305 306

    return entries

307

Michael Hanselmann's avatar
Michael Hanselmann committed
308
class _JobQueueWorker(workerpool.BaseWorker):
309 310 311
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
312 313 314 315 316 317 318 319 320 321 322 323 324 325
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
326 327 328 329 330 331 332
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
                                    constants.OP_STATUS_CANCELING)

      # Cancel here if we were asked to
      if self.opcode.status == constants.OP_STATUS_CANCELING:
        raise CancelJob()

Iustin Pop's avatar
Iustin Pop committed
333 334 335 336
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
337
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
338 339
    """Job executor.

340 341
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
342

343 344 345
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
346 347 348
    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
350
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
351
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
352 353 354 355 356 357 358 359
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
360
              assert op.status == constants.OP_STATUS_QUEUED
Michael Hanselmann's avatar
Michael Hanselmann committed
361
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
362
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
363
              op.result = None
364
              op.start_timestamp = TimeStampNow()
365 366
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
367 368
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
369
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
370 371 372
            finally:
              queue.release()

373
            def _Log(*args):
374 375 376 377 378 379 380 381 382 383 384 385 386 387
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
388

389
              queue.acquire()
390
              try:
391 392 393
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

394 395
                job.change.notifyAll()
              finally:
396
                queue.release()
397

398
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
399 400
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
401 402 403 404 405

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
406
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
407 408 409 410 411 412
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
413 414 415
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
416 417 418 419 420 421
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
422
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
423 424 425 426 427 428 429
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

430 431 432 433 434 435
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
436 437 438 439
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
440
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
441 442
      queue.acquire()
      try:
443 444
        try:
          job.run_op_idx = -1
445
          job.end_timestamp = TimeStampNow()
446 447 448 449
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
450 451
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
452
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
453
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
454 455 456


class _JobQueueWorkerPool(workerpool.WorkerPool):
457 458 459
  """Simple class implementing a job-processing workerpool.

  """
460
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
461 462
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
463
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
464 465


Michael Hanselmann's avatar
Michael Hanselmann committed
466
class JobQueue(object):
467 468 469 470 471
  """Quue used to manaage the jobs.

  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
472
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
473

474 475 476
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

477 478
    This function should be used for all 'public' functions. That is,
    functions usually called from other classes.
479

480
    @warning: Use this decorator only after utils.LockedMethod!
481

482
    Example::
483 484 485 486 487 488 489
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
490
      assert self._queue_lock is not None, "Queue should be open"
491 492 493
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
494
  def __init__(self, context):
495 496 497 498 499 500 501 502 503 504 505 506
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
507
    self.context = context
508
    self._memcache = weakref.WeakValueDictionary()
509
    self._my_hostname = utils.HostInfo().name
510

Michael Hanselmann's avatar
Michael Hanselmann committed
511 512 513 514 515
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

516
    # Initialize
517
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
518

519 520 521 522
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
523

524
    # Get initial list of nodes
525
    self._nodes = dict((n.name, n.primary_ip)
526 527
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
528 529 530

    # Remove master node
    try:
531
      del self._nodes[self._my_hostname]
532
    except KeyError:
533
      pass
534 535 536

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
537
    # Setup worker pool
538
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
539
    try:
540 541 542 543
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
544 545 546
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
547
        jobs_count = len(all_job_ids)
548 549 550
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
551 552
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
553
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
554
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
555 556 557 558
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

559 560 561
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
562

563
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
564

565 566
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
567

568
          elif status in (constants.JOB_STATUS_RUNNING,
569 570
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
571 572 573 574 575 576 577
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
              for op in job.ops:
                op.status = constants.OP_STATUS_ERROR
                op.result = "Unclean master daemon shutdown"
            finally:
              self.UpdateJobUnlocked(job)
578 579

        logging.info("Job queue inspection finished")
580 581 582 583 584
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
585

586 587
  @utils.LockedMethod
  @_RequireOpenQueue
588 589 590 591 592 593 594 595
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
596
    assert node_name != self._my_hostname
597

598
    # Clean queue directory on added node
599
    rpc.RpcRunner.call_jobqueue_purge(node_name)
600

601 602 603 604 605 606
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

607 608
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
609

610 611 612 613
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
614 615 616 617 618 619 620
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

621 622 623
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
624 625 626
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

627
    self._nodes[node_name] = node.primary_ip
628 629 630 631

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
632 633 634 635 636 637
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
638
    try:
639
      # The queue is removed by the "leave node" RPC call.
640
      del self._nodes[node_name]
641
    except KeyError:
642 643
      pass

644
  def _CheckRpcResult(self, result, nodes, failmsg):
645 646 647 648 649 650 651 652 653 654 655 656 657
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
    log the case when more than half of the nodes failes.

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

675 676 677
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

678 679 680 681
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

682 683 684 685 686
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

687 688 689
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

690 691 692 693 694 695 696 697
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

698 699 700
    """
    utils.WriteFile(file_name, data=data)

701
    names, addrs = self._GetNodeIp()
702
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
703 704
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
705

706
  def _RenameFilesUnlocked(self, rename):
707 708 709 710 711
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

712 713
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
714 715

    """
716
    # Rename them locally
717 718
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
719

720 721 722 723
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
724

Michael Hanselmann's avatar
Michael Hanselmann committed
725
  def _FormatJobID(self, job_id):
726 727 728 729 730 731 732 733 734 735 736 737
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
738 739 740 741 742 743 744
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

745 746 747 748 749 750 751 752 753 754 755 756
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

757
  def _NewSerialUnlocked(self):
758 759 760 761
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

762 763
    @rtype: str
    @return: a string representing the job identifier.
764 765 766 767 768 769

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
770 771
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
772 773 774 775

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
776
    return self._FormatJobID(serial)
777

Michael Hanselmann's avatar
Michael Hanselmann committed
778 779
  @staticmethod
  def _GetJobPath(job_id):
780 781 782 783 784 785 786 787
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
788 789
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

790 791
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
792 793 794 795 796 797 798 799
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
800 801
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
802

Michael Hanselmann's avatar
Michael Hanselmann committed
803 804
  @classmethod
  def _ExtractJobID(cls, name):
805 806 807 808 809 810 811 812 813 814
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
815
    m = cls._RE_JOB_FILE.match(name)
816 817 818 819 820
    if m:
      return m.group(1)
    else:
      return None

821 822 823 824 825 826
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
827 828 829 830
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

831 832 833
    @rtype: list
    @return: the list of job IDs

834
    """
835
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
836
    jlist = utils.NiceSort(jlist)
837
    return jlist
838

839
  def _ListJobFiles(self):
840 841 842 843 844 845
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
846 847 848
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

849
  def _LoadJobUnlocked(self, job_id):
850 851 852 853 854 855 856 857 858 859 860
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
861 862
    job = self._memcache.get(job_id, None)
    if job:
863
      logging.debug("Found job %s in memcache", job_id)
864
      return job
Iustin Pop's avatar
Iustin Pop committed
865

866
    filepath = self._GetJobPath(job_id)
867 868 869 870 871 872 873 874 875 876 877 878
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

879 880 881 882 883 884 885 886 887 888
    try:
      job = _QueuedJob.Restore(self, data)
    except Exception, err:
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
889
        self._RenameFilesUnlocked([(filepath, new_path)])
890 891
      return None

Iustin Pop's avatar
Iustin Pop committed
892
    self._memcache[job_id] = job
893
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
894
    return job
895 896

  def _GetJobsUnlocked(self, job_ids):
897 898 899 900 901 902 903 904 905
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
906 907
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
908

909
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
910

911 912 913 914 915 916 917
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

918 919 920
    @rtype: boolean
    @return: True of the job queue is marked for draining

921 922 923
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

924 925 926 927 928 929 930
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

931 932 933
    @type drain_flag: boolean
    @param drain_flag: wheter to set or unset the drain flag

934 935 936 937 938 939 940
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

941
  @utils.LockedMethod
942
  @_RequireOpenQueue
943
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
944
    """Create and store a new job.
945

Michael Hanselmann's avatar
Michael Hanselmann committed
946 947
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
948 949

    @type ops: list
950
    @param ops: The list of OpCodes that will become the new job.
951 952 953
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
954 955

    """
956 957
    if self._IsQueueMarkedDrain():
      raise errors.JobQueueDrainError()
Michael Hanselmann's avatar
Michael Hanselmann committed
958 959 960 961 962 963 964 965 966 967 968 969

    # Check job queue size
    size = len(self._ListJobFiles())
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
      # TODO: Autoarchive jobs. Make sure it's not done on every job
      # submission, though.
      #size = ...
      pass

    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
      raise errors.JobQueueFull()

970
    # Get job identifier
971
    job_id = self._NewSerialUnlocked()
972 973 974
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
975
    self.UpdateJobUnlocked(job)
976

977
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
978 979
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
980 981 982 983
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
984

985
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
986
  def UpdateJobUnlocked(self, job):
987 988 989 990 991 992 993 994 995 996
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
997
    filename = self._GetJobPath(job.id)
998
    data = serializer.DumpJson(job.Serialize(), indent=False)
999
    logging.debug("Writing job %s to %s", job.id, filename)
1000
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
1001

1002
    # Notify waiters about potential changes
1003
    job.change.notifyAll()
1004

1005
  @utils.LockedMethod
1006
  @_RequireOpenQueue
1007 1008
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
1019 1020
    @type timeout: float
    @param timeout: maximum time to wait
1021 1022 1023 1024 1025 1026 1027 1028
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
1029 1030

    """