jqueue.py 41.4 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56
57
58
59
60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62
63
64
65
66
67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68
69
70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
72
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
73

74
75
76
77
78
79
80
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
81

Michael Hanselmann's avatar
Michael Hanselmann committed
82
  """
83
84
85
86
  __slots__ = ["input", "status", "result", "log",
               "start_timestamp", "end_timestamp",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
87
  def __init__(self, op):
88
89
90
91
92
93
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
94
95
96
97
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
98
99
    self.start_timestamp = None
    self.end_timestamp = None
100
101
102

  @classmethod
  def Restore(cls, state):
103
104
105
106
107
108
109
110
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
111
112
113
114
115
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
116
117
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
118
119
120
    return obj

  def Serialize(self):
121
122
123
124
125
126
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
127
128
129
130
131
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
132
133
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
134
      }
135

Michael Hanselmann's avatar
Michael Hanselmann committed
136
137
138
139

class _QueuedJob(object):
  """In-memory job representation.

140
141
142
143
144
145
146
147
148
149
150
151
152
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
155
156

  """
Iustin Pop's avatar
Iustin Pop committed
157
  # pylint: disable-msg=W0212
158
  __slots__ = ["queue", "id", "ops", "log_serial",
159
               "received_timestamp", "start_timestamp", "end_timestamp",
160
               "lock_status", "change",
161
162
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
163
  def __init__(self, queue, job_id, ops):
164
165
166
167
168
169
170
171
172
173
174
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
175
    if not ops:
176
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
177
178
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
179
    self.queue = queue
180
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
181
    self.ops = [_QueuedOpCode(op) for op in ops]
182
    self.log_serial = 0
183
184
185
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
186

187
188
189
    # In-memory attributes
    self.lock_status = None

190
191
    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
192
193

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
194
  def Restore(cls, queue, state):
195
196
197
198
199
200
201
202
203
204
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
205
206
207
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
208
209
210
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
211

212
213
214
    # In-memory attributes
    obj.lock_status = None

215
216
217
218
219
220
221
222
223
224
225
    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

226
227
228
    return obj

  def Serialize(self):
229
230
231
232
233
234
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
235
236
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
237
      "ops": [op.Serialize() for op in self.ops],
238
239
240
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
241
242
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
243
  def CalcStatus(self):
244
245
246
247
248
249
250
251
252
253
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
254
          - canceling
255
256
257
258
259
260
261
262
263
264
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
265
266
267
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
268
269
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
270
271
272
273
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
274
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
275
        pass
Iustin Pop's avatar
Iustin Pop committed
276
277
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
278
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
279
        status = constants.JOB_STATUS_RUNNING
280
281
282
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
283
      elif op.status == constants.OP_STATUS_ERROR:
284
285
286
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
287
      elif op.status == constants.OP_STATUS_CANCELED:
288
289
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
290
291
292
293
294
295

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

296
  def GetLogEntries(self, newer_than):
297
298
299
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
300
    @param newer_than: if this is None, return all log entries,
301
302
303
304
305
306
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
307
308
309
310
311
312
313
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
314
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
315
316
317

    return entries

318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False

338

339
class _OpExecCallbacks(mcpu.OpExecCbBase):
340
341
  def __init__(self, queue, job, op):
    """Initializes this class.
342

343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
360
361
    """Mark the opcode as running, not lock-waiting.

362
363
364
365
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
366
367

    """
368
    self._queue.acquire()
Iustin Pop's avatar
Iustin Pop committed
369
    try:
370
371
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                 constants.OP_STATUS_CANCELING)
372

373
374
375
      # All locks are acquired by now
      self._job.lock_status = None

376
      # Cancel here if we were asked to
377
      if self._op.status == constants.OP_STATUS_CANCELING:
378
379
        raise CancelJob()

380
      self._op.status = constants.OP_STATUS_RUNNING
Iustin Pop's avatar
Iustin Pop committed
381
    finally:
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
      self._queue.release()

  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
Iustin Pop's avatar
Iustin Pop committed
399

400
401
402
403
404
405
406
407
408
    self._queue.acquire()
    try:
      self._job.log_serial += 1
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))

      self._job.change.notifyAll()
    finally:
      self._queue.release()

409
410
411
412
413
414
415
416
417
  def ReportLocks(self, msg):
    """Write locking information to the job.

    Called whenever the LU processor is waiting for a lock or has acquired one.

    """
    # Not getting the queue lock because this is a single assignment
    self._job.lock_status = msg

418
419
420
421
422

class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
423
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
424
425
    """Job executor.

426
427
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
428

429
430
431
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
432
    """
433
    logging.info("Worker %s processing job %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
434
                  self.worker_id, job.id)
435
    proc = mcpu.Processor(self.pool.queue.context, job.id)
436
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
437
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
438
439
440
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
441
          op_summary = op.input.Summary()
442
443
444
445
446
447
448
449
450
          if op.status == constants.OP_STATUS_SUCCESS:
            # this is a job that was partially completed before master
            # daemon shutdown, so it can be expected that some opcodes
            # are already completed successfully (if any did error
            # out, then the whole job should have been aborted and not
            # resubmitted for processing)
            logging.info("Op %s/%s: opcode %s already processed, skipping",
                         idx + 1, count, op_summary)
            continue
Michael Hanselmann's avatar
Michael Hanselmann committed
451
          try:
452
453
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
454
455
456

            queue.acquire()
            try:
457
458
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
459
              assert op.status == constants.OP_STATUS_QUEUED
Iustin Pop's avatar
Iustin Pop committed
460
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
461
              op.result = None
462
              op.start_timestamp = TimeStampNow()
463
464
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
465
466
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
467
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
468
469
470
            finally:
              queue.release()

471
472
            # Make sure not to hold queue lock while calling ExecOpCode
            result = proc.ExecOpCode(input_opcode,
473
                                     _OpExecCallbacks(queue, job, op))
Michael Hanselmann's avatar
Michael Hanselmann committed
474
475
476
477
478

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
479
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
480
481
482
483
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

484
485
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
486
487
488
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
489
490
491
492
493
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
494
495
496
497
                if isinstance(err, errors.GenericError):
                  op.result = errors.EncodeException(err)
                else:
                  op.result = str(err)
498
                op.end_timestamp = TimeStampNow()
499
500
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
501
502
503
504
505
506
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

507
508
509
510
511
512
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
513
514
515
516
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
517
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
518
519
      queue.acquire()
      try:
520
        try:
521
          job.lock_status = None
522
          job.end_timestamp = TimeStampNow()
523
524
525
526
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
527
528
      finally:
        queue.release()
529

530
531
      logging.info("Worker %s finished job %s, status = %s",
                   self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
532
533
534


class _JobQueueWorkerPool(workerpool.WorkerPool):
535
536
537
  """Simple class implementing a job-processing workerpool.

  """
538
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
539
540
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
541
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
542
543


Iustin Pop's avatar
Iustin Pop committed
544
545
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
546

Iustin Pop's avatar
Iustin Pop committed
547
548
549
550
551
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
  a '_queue_lock' argument.
552

Iustin Pop's avatar
Iustin Pop committed
553
  @warning: Use this decorator only after utils.LockedMethod!
554

Iustin Pop's avatar
Iustin Pop committed
555
556
557
558
559
  Example::
    @utils.LockedMethod
    @_RequireOpenQueue
    def Example(self):
      pass
560

Iustin Pop's avatar
Iustin Pop committed
561
562
  """
  def wrapper(self, *args, **kwargs):
Iustin Pop's avatar
Iustin Pop committed
563
    # pylint: disable-msg=W0212
Iustin Pop's avatar
Iustin Pop committed
564
565
566
    assert self._queue_lock is not None, "Queue should be open"
    return fn(self, *args, **kwargs)
  return wrapper
567
568


Iustin Pop's avatar
Iustin Pop committed
569
570
class JobQueue(object):
  """Queue used to manage the jobs.
571

Iustin Pop's avatar
Iustin Pop committed
572
573
574
575
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
576

Michael Hanselmann's avatar
Michael Hanselmann committed
577
  def __init__(self, context):
578
579
580
581
582
583
584
585
586
587
588
589
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
590
    self.context = context
591
    self._memcache = weakref.WeakValueDictionary()
592
    self._my_hostname = utils.HostInfo().name
593

Michael Hanselmann's avatar
Michael Hanselmann committed
594
595
596
597
598
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

599
    # Initialize
600
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
601

602
603
604
605
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
606

607
    # Get initial list of nodes
608
    self._nodes = dict((n.name, n.primary_ip)
609
610
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
611
612
613

    # Remove master node
    try:
614
      del self._nodes[self._my_hostname]
615
    except KeyError:
616
      pass
617
618
619

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
620
    # Setup worker pool
621
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
622
    try:
623
624
625
626
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
627
628
629
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
630
        jobs_count = len(all_job_ids)
631
632
633
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
634
635
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
636
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
637
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
638
639
640
641
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

642
643
644
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
645

646
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
647

648
649
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
650

651
          elif status in (constants.JOB_STATUS_RUNNING,
652
653
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
654
655
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
656
657
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                    "Unclean master daemon shutdown")
658
659
            finally:
              self.UpdateJobUnlocked(job)
660
661

        logging.info("Job queue inspection finished")
662
663
664
665
666
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
667

668
669
  @utils.LockedMethod
  @_RequireOpenQueue
670
671
672
673
674
675
676
677
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
678
    assert node_name != self._my_hostname
679

680
    # Clean queue directory on added node
681
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
682
    msg = result.fail_msg
683
684
685
    if msg:
      logging.warning("Cannot cleanup queue directory on node %s: %s",
                      node_name, msg)
686

687
688
689
690
691
692
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

693
694
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
695

696
697
698
699
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
700
      # Read file content
701
      content = utils.ReadFile(file_name)
702

703
704
705
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
706
      msg = result[node_name].fail_msg
707
708
709
      if msg:
        logging.error("Failed to upload file %s to node %s: %s",
                      file_name, node_name, msg)
710

711
    self._nodes[node_name] = node.primary_ip
712
713
714
715

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
716
717
718
719
720
721
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
722
    try:
723
      # The queue is removed by the "leave node" RPC call.
724
      del self._nodes[node_name]
725
    except KeyError:
726
727
      pass

728
  def _CheckRpcResult(self, result, nodes, failmsg):
729
730
731
732
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
733
    log the case when more than half of the nodes fails.
734
735
736
737
738
739
740
741

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
742
743
744
745
    failed = []
    success = []

    for node in nodes:
746
      msg = result[node].fail_msg
747
      if msg:
748
        failed.append(node)
749
750
751
752
        logging.error("RPC call %s failed on node %s: %s",
                      result[node].call, node, msg)
      else:
        success.append(node)
753
754
755
756
757
758

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

759
760
761
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

762
763
764
765
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

766
767
768
769
770
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

771
772
773
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

774
775
776
777
778
779
780
781
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

782
783
784
    """
    utils.WriteFile(file_name, data=data)

785
    names, addrs = self._GetNodeIp()
786
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
787
788
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
789

790
  def _RenameFilesUnlocked(self, rename):
791
792
793
794
795
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

796
797
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
798
799

    """
800
    # Rename them locally
801
802
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
803

804
805
806
807
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
808

Michael Hanselmann's avatar
Michael Hanselmann committed
809
  def _FormatJobID(self, job_id):
810
811
812
813
814
815
816
817
818
819
820
821
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
822
823
824
825
826
827
828
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

829
830
831
832
833
834
835
836
837
838
839
840
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

Iustin Pop's avatar
Iustin Pop committed
841
  def _NewSerialsUnlocked(self, count):
842
843
844
845
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

Iustin Pop's avatar
Iustin Pop committed
846
847
    @type count: integer
    @param count: how many serials to return
848
849
    @rtype: str
    @return: a string representing the job identifier.
850
851

    """
Iustin Pop's avatar
Iustin Pop committed
852
    assert count > 0
853
    # New number
Iustin Pop's avatar
Iustin Pop committed
854
    serial = self._last_serial + count
855
856

    # Write to file
857
858
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
859

Iustin Pop's avatar
Iustin Pop committed
860
861
    result = [self._FormatJobID(v)
              for v in range(self._last_serial, serial + 1)]
862
863
864
    # Keep it only if we were able to write the file
    self._last_serial = serial

Iustin Pop's avatar
Iustin Pop committed
865
    return result
866

Michael Hanselmann's avatar
Michael Hanselmann committed
867
868
  @staticmethod
  def _GetJobPath(job_id):
869
870
871
872
873
874
875
876
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
877
878
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

879
880
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
881
882
883
884
885
886
887
888
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
889
890
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
891

Michael Hanselmann's avatar
Michael Hanselmann committed
892
893
  @classmethod
  def _ExtractJobID(cls, name):
894
895
896
897
898
899
900
901
902
903
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
904
    m = cls._RE_JOB_FILE.match(name)
905
906
907
908
909
    if m:
      return m.group(1)
    else:
      return None

910
911
912
913
914
915
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
916
917
918
919
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

920
921
922
    @rtype: list
    @return: the list of job IDs

923
    """
924
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
925
    jlist = utils.NiceSort(jlist)
926
    return jlist
927

928
  def _ListJobFiles(self):
929
930
931
932
933
934
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
935
936
937
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

938
  def _LoadJobUnlocked(self, job_id):
939
940
941
942
943
944
945
946
947
948
949
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
950
951
    job = self._memcache.get(job_id, None)
    if job:
952
      logging.debug("Found job %s in memcache", job_id)
953
      return job
Iustin Pop's avatar
Iustin Pop committed
954

955
    filepath = self._GetJobPath(job_id)
956
957
    logging.debug("Loading job from %s", filepath)
    try:
958
      raw_data = utils.ReadFile(filepath)
959
960
961
962
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
963
964

    data = serializer.LoadJson(raw_data)
965

966
967
    try:
      job = _QueuedJob.Restore(self, data)
Iustin Pop's avatar
Iustin Pop committed
968
    except Exception, err: # pylint: disable-msg=W0703
969
970
971
972
973
974
975
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
976
        self._RenameFilesUnlocked([(filepath, new_path)])
977
978
      return None

Iustin Pop's avatar
Iustin Pop committed
979
    self._memcache[job_id] = job
980
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
981
    return job
982
983

  def _GetJobsUnlocked(self, job_ids):
984
985
986
987
988
989
990
991
992
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
993
994
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
995

996
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
997

998
999
1000
1001
1002
1003
1004
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

1005
1006
1007
    @rtype: boolean
    @return: True of the job queue is marked for draining

1008
1009
1010
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

1011
1012
1013
1014
1015
1016
1017
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

1018
    @type drain_flag: boolean
Michael Hanselmann's avatar
Michael Hanselmann committed
1019
    @param drain_flag: Whether to set or unset the drain flag
1020

1021
1022
1023
1024
1025
1026
1027
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

1028
  @_RequireOpenQueue
Iustin Pop's avatar
Iustin Pop committed
1029
  def _SubmitJobUnlocked(self, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
1030
    """Create and store a new job.
1031

Michael Hanselmann's avatar
Michael Hanselmann committed
1032
1033
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
1034

Iustin Pop's avatar
Iustin Pop committed
1035
    @type job_id: job ID
1036
    @param job_id: the job ID for the new job
1037
    @type ops: list
1038
    @param ops: The list of OpCodes that will become the new job.
1039
1040
1041
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
1042
1043

    """
1044
    if self._IsQueueMarkedDrain():
1045
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
Michael Hanselmann's avatar
Michael Hanselmann committed
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057

    # Check job queue size
    size = len(self._ListJobFiles())
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
      # TODO: Autoarchive jobs. Make sure it's not done on every job
      # submission, though.
      #size = ...
      pass

    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
      raise errors.JobQueueFull()

1058
1059
1060
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
1061
    self.UpdateJobUnlocked(job)
1062

1063
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
1064
1065
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
1066
1067
1068
1069
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
1070

1071
1072
1073
1074
1075
1076
1077
1078
  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitJob(self, ops):
    """Create and store a new job.

    @see: L{_SubmitJobUnlocked}

    """
Iustin Pop's avatar
Iustin Pop committed
1079
1080
    job_id = self._NewSerialsUnlocked(1)[0]
    return self._SubmitJobUnlocked(job_id, ops)
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090

  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitManyJobs(self, jobs):
    """Create and store multiple jobs.

    @see: L{_SubmitJobUnlocked}

    """
    results = []
Iustin Pop's avatar
Iustin Pop committed
1091
1092
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
    for job_id, ops in zip(all_job_ids, jobs):
1093
      try:
Iustin Pop's avatar
Iustin Pop committed
1094
        data = self._SubmitJobUnlocked(job_id, ops)
1095
1096
1097
1098
1099
1100
1101
1102
        status = True
      except errors.GenericError, err:
        data = str(err)
        status = False
      results.append((status, data))

    return results

1103
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
1104
  def UpdateJobUnlocked(self, job):
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
1115
    filename = self._GetJobPath(job.id)
1116
    data = serializer.DumpJson(job.Serialize(), indent=False)
1117
    logging.debug("Writing job %s to %s", job.id, filename)
1118
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
1119

1120
    # Notify waiters about potential changes
1121
    job.change.notifyAll()
1122

1123
  @utils.LockedMethod
1124
  @_RequireOpenQueue
1125
1126
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
1137
1138
    @type timeout: float
    @param timeout: maximum time to wait
1139
1140
1141
1142
1143
1144
1145
1146
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
1147
1148

    """
1149
1150
1151
1152
    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return None
1153

1154
1155
    def _CheckForChanges():
      logging.debug("Waiting for changes in job %s", job_id)
1156