jqueue.py 42.1 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56
57
58
59
60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62
63
64
65
66
67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68
69
70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
72
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
73

74
75
76
77
78
79
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
Iustin Pop's avatar
Iustin Pop committed
80
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81
  @ivar stop_timestamp: timestamp for the end of the execution
82

Michael Hanselmann's avatar
Michael Hanselmann committed
83
  """
84
  __slots__ = ["input", "status", "result", "log",
Iustin Pop's avatar
Iustin Pop committed
85
               "start_timestamp", "exec_timestamp", "end_timestamp",
86
87
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
88
  def __init__(self, op):
89
90
91
92
93
94
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
95
96
97
98
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
99
    self.start_timestamp = None
Iustin Pop's avatar
Iustin Pop committed
100
    self.exec_timestamp = None
101
    self.end_timestamp = None
102
103
104

  @classmethod
  def Restore(cls, state):
105
106
107
108
109
110
111
112
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
113
114
115
116
117
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
118
    obj.start_timestamp = state.get("start_timestamp", None)
Iustin Pop's avatar
Iustin Pop committed
119
    obj.exec_timestamp = state.get("exec_timestamp", None)
120
    obj.end_timestamp = state.get("end_timestamp", None)
121
122
123
    return obj

  def Serialize(self):
124
125
126
127
128
129
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
130
131
132
133
134
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
135
      "start_timestamp": self.start_timestamp,
Iustin Pop's avatar
Iustin Pop committed
136
      "exec_timestamp": self.exec_timestamp,
137
      "end_timestamp": self.end_timestamp,
138
      }
139

Michael Hanselmann's avatar
Michael Hanselmann committed
140
141
142
143

class _QueuedJob(object):
  """In-memory job representation.

144
145
146
147
148
149
150
151
152
153
154
155
156
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
157
  @ivar lock_status: In-memory locking information for debugging
158
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
159
160

  """
Iustin Pop's avatar
Iustin Pop committed
161
  # pylint: disable-msg=W0212
162
  __slots__ = ["queue", "id", "ops", "log_serial",
163
               "received_timestamp", "start_timestamp", "end_timestamp",
164
               "lock_status", "change",
165
166
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
167
  def __init__(self, queue, job_id, ops):
168
169
170
171
172
173
174
175
176
177
178
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
179
    if not ops:
180
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
181
182
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
183
    self.queue = queue
184
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
185
    self.ops = [_QueuedOpCode(op) for op in ops]
186
    self.log_serial = 0
187
188
189
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
190

191
192
193
    # In-memory attributes
    self.lock_status = None

194
195
    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
196

197
198
199
200
201
202
203
  def __repr__(self):
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
              "id=%s" % self.id,
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]

    return "<%s at %#x>" % (" ".join(status), id(self))

204
  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
205
  def Restore(cls, queue, state):
206
207
208
209
210
211
212
213
214
215
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
216
217
218
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
219
220
221
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
222

223
224
225
    # In-memory attributes
    obj.lock_status = None

226
227
228
229
230
231
232
233
234
235
236
    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

237
238
239
    return obj

  def Serialize(self):
240
241
242
243
244
245
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
246
247
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
248
      "ops": [op.Serialize() for op in self.ops],
249
250
251
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
252
253
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
254
  def CalcStatus(self):
255
256
257
258
259
260
261
262
263
264
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
265
          - canceling
266
267
268
269
270
271
272
273
274
275
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
276
277
278
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
279
280
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
281
282
283
284
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
285
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
286
        pass
Iustin Pop's avatar
Iustin Pop committed
287
288
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
289
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
290
        status = constants.JOB_STATUS_RUNNING
291
292
293
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
294
      elif op.status == constants.OP_STATUS_ERROR:
295
296
297
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
298
      elif op.status == constants.OP_STATUS_CANCELED:
299
300
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
301
302
303
304
305
306

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

307
  def GetLogEntries(self, newer_than):
308
309
310
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
311
    @param newer_than: if this is None, return all log entries,
312
313
314
315
316
317
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
318
319
320
321
322
323
324
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
325
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
326
327
328

    return entries

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False

349

350
class _OpExecCallbacks(mcpu.OpExecCbBase):
351
352
  def __init__(self, queue, job, op):
    """Initializes this class.
353

354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
371
372
    """Mark the opcode as running, not lock-waiting.

373
374
375
376
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
377
378

    """
379
    self._queue.acquire()
Iustin Pop's avatar
Iustin Pop committed
380
    try:
381
382
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                 constants.OP_STATUS_CANCELING)
383

384
385
386
      # All locks are acquired by now
      self._job.lock_status = None

387
      # Cancel here if we were asked to
388
      if self._op.status == constants.OP_STATUS_CANCELING:
389
390
        raise CancelJob()

391
      self._op.status = constants.OP_STATUS_RUNNING
Iustin Pop's avatar
Iustin Pop committed
392
      self._op.exec_timestamp = TimeStampNow()
Iustin Pop's avatar
Iustin Pop committed
393
    finally:
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
      self._queue.release()

  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
Iustin Pop's avatar
Iustin Pop committed
411

412
413
414
415
416
417
418
419
420
    self._queue.acquire()
    try:
      self._job.log_serial += 1
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))

      self._job.change.notifyAll()
    finally:
      self._queue.release()

421
422
423
424
425
426
427
428
429
  def ReportLocks(self, msg):
    """Write locking information to the job.

    Called whenever the LU processor is waiting for a lock or has acquired one.

    """
    # Not getting the queue lock because this is a single assignment
    self._job.lock_status = msg

430
431
432
433
434

class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
435
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
436
437
    """Job executor.

438
439
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
440

441
442
443
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
444
    """
445
    logging.info("Processing job %s", job.id)
446
    proc = mcpu.Processor(self.pool.queue.context, job.id)
447
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
448
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
449
450
451
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
452
          op_summary = op.input.Summary()
453
454
455
456
457
458
459
460
461
          if op.status == constants.OP_STATUS_SUCCESS:
            # this is a job that was partially completed before master
            # daemon shutdown, so it can be expected that some opcodes
            # are already completed successfully (if any did error
            # out, then the whole job should have been aborted and not
            # resubmitted for processing)
            logging.info("Op %s/%s: opcode %s already processed, skipping",
                         idx + 1, count, op_summary)
            continue
Michael Hanselmann's avatar
Michael Hanselmann committed
462
          try:
463
464
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
465
466
467

            queue.acquire()
            try:
468
469
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
470
              assert op.status == constants.OP_STATUS_QUEUED
Iustin Pop's avatar
Iustin Pop committed
471
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
472
              op.result = None
473
              op.start_timestamp = TimeStampNow()
474
475
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
476
477
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
478
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
479
480
481
            finally:
              queue.release()

482
483
            # Make sure not to hold queue lock while calling ExecOpCode
            result = proc.ExecOpCode(input_opcode,
484
                                     _OpExecCallbacks(queue, job, op))
Michael Hanselmann's avatar
Michael Hanselmann committed
485
486
487
488
489

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
490
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
491
492
493
494
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

495
496
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
497
498
499
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
500
501
502
503
504
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
505
506
507
508
                if isinstance(err, errors.GenericError):
                  op.result = errors.EncodeException(err)
                else:
                  op.result = str(err)
509
                op.end_timestamp = TimeStampNow()
510
511
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
512
513
514
515
516
517
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

518
519
520
521
522
523
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
524
525
526
527
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
528
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
529
530
      queue.acquire()
      try:
531
        try:
532
          job.lock_status = None
533
          job.end_timestamp = TimeStampNow()
534
535
536
537
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
538
539
      finally:
        queue.release()
540

541
      logging.info("Finished job %s, status = %s", job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
542
543
544


class _JobQueueWorkerPool(workerpool.WorkerPool):
545
546
547
  """Simple class implementing a job-processing workerpool.

  """
548
  def __init__(self, queue):
549
550
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
                                              JOBQUEUE_THREADS,
Michael Hanselmann's avatar
Michael Hanselmann committed
551
                                              _JobQueueWorker)
552
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
553
554


Iustin Pop's avatar
Iustin Pop committed
555
556
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
557

Iustin Pop's avatar
Iustin Pop committed
558
559
560
561
562
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
  a '_queue_lock' argument.
563

Iustin Pop's avatar
Iustin Pop committed
564
  @warning: Use this decorator only after utils.LockedMethod!
565

Iustin Pop's avatar
Iustin Pop committed
566
567
568
569
570
  Example::
    @utils.LockedMethod
    @_RequireOpenQueue
    def Example(self):
      pass
571

Iustin Pop's avatar
Iustin Pop committed
572
573
  """
  def wrapper(self, *args, **kwargs):
Iustin Pop's avatar
Iustin Pop committed
574
    # pylint: disable-msg=W0212
Iustin Pop's avatar
Iustin Pop committed
575
576
577
    assert self._queue_lock is not None, "Queue should be open"
    return fn(self, *args, **kwargs)
  return wrapper
578
579


Iustin Pop's avatar
Iustin Pop committed
580
581
class JobQueue(object):
  """Queue used to manage the jobs.
582

Iustin Pop's avatar
Iustin Pop committed
583
584
585
586
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
587

Michael Hanselmann's avatar
Michael Hanselmann committed
588
  def __init__(self, context):
589
590
591
592
593
594
595
596
597
598
599
600
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
601
    self.context = context
602
    self._memcache = weakref.WeakValueDictionary()
603
    self._my_hostname = utils.HostInfo().name
604

Michael Hanselmann's avatar
Michael Hanselmann committed
605
606
607
608
609
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

610
    # Initialize
611
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
612

613
614
615
616
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
617

618
    # Get initial list of nodes
619
    self._nodes = dict((n.name, n.primary_ip)
620
621
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
622
623
624

    # Remove master node
    try:
625
      del self._nodes[self._my_hostname]
626
    except KeyError:
627
      pass
628
629
630

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
631
    # Setup worker pool
632
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
633
    try:
634
635
636
637
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
638
639
640
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
641
        jobs_count = len(all_job_ids)
642
643
644
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
645
646
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
647
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
648
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
649
650
651
652
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

653
654
655
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
656

657
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
658

659
660
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
661

662
          elif status in (constants.JOB_STATUS_RUNNING,
663
664
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
665
666
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
667
668
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                    "Unclean master daemon shutdown")
669
670
            finally:
              self.UpdateJobUnlocked(job)
671
672

        logging.info("Job queue inspection finished")
673
674
675
676
677
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
678

679
680
  @utils.LockedMethod
  @_RequireOpenQueue
681
682
683
684
685
686
687
688
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
689
    assert node_name != self._my_hostname
690

691
    # Clean queue directory on added node
692
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
693
    msg = result.fail_msg
694
695
696
    if msg:
      logging.warning("Cannot cleanup queue directory on node %s: %s",
                      node_name, msg)
697

698
699
700
701
702
703
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

704
705
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
706

707
708
709
710
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
711
      # Read file content
712
      content = utils.ReadFile(file_name)
713

714
715
716
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
717
      msg = result[node_name].fail_msg
718
719
720
      if msg:
        logging.error("Failed to upload file %s to node %s: %s",
                      file_name, node_name, msg)
721

722
    self._nodes[node_name] = node.primary_ip
723
724
725
726

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
727
728
729
730
731
732
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
733
    try:
734
      # The queue is removed by the "leave node" RPC call.
735
      del self._nodes[node_name]
736
    except KeyError:
737
738
      pass

739
740
  @staticmethod
  def _CheckRpcResult(result, nodes, failmsg):
741
742
743
744
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
745
    log the case when more than half of the nodes fails.
746
747
748
749
750
751
752
753

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
754
755
756
757
    failed = []
    success = []

    for node in nodes:
758
      msg = result[node].fail_msg
759
      if msg:
760
        failed.append(node)
761
762
        logging.error("RPC call %s (%s) failed on node %s: %s",
                      result[node].call, failmsg, node, msg)
763
764
      else:
        success.append(node)
765
766
767
768
769
770

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

771
772
773
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

774
775
776
777
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

778
779
780
781
782
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

783
784
785
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

786
787
788
789
790
791
792
793
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

794
795
796
    """
    utils.WriteFile(file_name, data=data)

797
    names, addrs = self._GetNodeIp()
798
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
799
800
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
801

802
  def _RenameFilesUnlocked(self, rename):
803
804
805
806
807
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

808
809
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
810
811

    """
812
    # Rename them locally
813
814
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
815

816
817
818
819
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
820

821
822
  @staticmethod
  def _FormatJobID(job_id):
823
824
825
826
827
828
829
830
831
832
833
834
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
835
836
837
838
839
840
841
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

842
843
844
845
846
847
848
849
850
851
852
853
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

Iustin Pop's avatar
Iustin Pop committed
854
  def _NewSerialsUnlocked(self, count):
855
856
857
858
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

Iustin Pop's avatar
Iustin Pop committed
859
860
    @type count: integer
    @param count: how many serials to return
861
862
    @rtype: str
    @return: a string representing the job identifier.
863
864

    """
Iustin Pop's avatar
Iustin Pop committed
865
    assert count > 0
866
    # New number
Iustin Pop's avatar
Iustin Pop committed
867
    serial = self._last_serial + count
868
869

    # Write to file
870
871
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
872

Iustin Pop's avatar
Iustin Pop committed
873
874
    result = [self._FormatJobID(v)
              for v in range(self._last_serial, serial + 1)]
875
876
877
    # Keep it only if we were able to write the file
    self._last_serial = serial

Iustin Pop's avatar
Iustin Pop committed
878
    return result
879

Michael Hanselmann's avatar
Michael Hanselmann committed
880
881
  @staticmethod
  def _GetJobPath(job_id):
882
883
884
885
886
887
888
889
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
890
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
891

892
893
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
894
895
896
897
898
899
900
901
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
Iustin Pop's avatar
Iustin Pop committed
902
903
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
904

Michael Hanselmann's avatar
Michael Hanselmann committed
905
906
  @classmethod
  def _ExtractJobID(cls, name):
907
908
909
910
911
912
913
914
915
916
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
917
    m = cls._RE_JOB_FILE.match(name)
918
919
920
921
922
    if m:
      return m.group(1)
    else:
      return None

923
924
925
926
927
928
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
929
930
931
932
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

933
934
935
    @rtype: list
    @return: the list of job IDs

936
    """
937
    # pylint: disable-msg=W0613
938
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
939
    jlist = utils.NiceSort(jlist)
940
    return jlist
941

942
  def _ListJobFiles(self):
943
944
945
946
947
948
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
949
950
951
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

952
  def _LoadJobUnlocked(self, job_id):
953
954
955
956
957
958
959
960
961
962
963
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
964
965
    job = self._memcache.get(job_id, None)
    if job:
966
      logging.debug("Found job %s in memcache", job_id)
967
      return job
Iustin Pop's avatar
Iustin Pop committed
968

969
    filepath = self._GetJobPath(job_id)
970
971
    logging.debug("Loading job from %s", filepath)
    try:
972
      raw_data = utils.ReadFile(filepath)
973
974
975
976
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
977
978

    data = serializer.LoadJson(raw_data)
979

980
981
    try:
      job = _QueuedJob.Restore(self, data)
Iustin Pop's avatar
Iustin Pop committed
982
    except Exception, err: # pylint: disable-msg=W0703
983
984
985
986
987
988
989
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
990
        self._RenameFilesUnlocked([(filepath, new_path)])
991
992
      return None

Iustin Pop's avatar
Iustin Pop committed
993
    self._memcache[job_id] = job
994
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
995
    return job
996
997

  def _GetJobsUnlocked(self, job_ids):
998
999
1000
1001
1002
1003
1004
1005
1006
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
1007
1008
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
1009

1010
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1011

1012
1013
1014
1015
1016
1017
1018
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

1019
1020
1021
    @rtype: boolean
    @return: True of the job queue is marked for draining

1022
1023
1024
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

1025
1026
1027
1028
1029
1030
1031
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

1032
    @type drain_flag: boolean
Michael Hanselmann's avatar
Michael Hanselmann committed
1033
    @param drain_flag: Whether to set or unset the drain flag
1034

1035
1036
1037
1038
1039
1040
1041
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

1042
  @_RequireOpenQueue
Iustin Pop's avatar
Iustin Pop committed
1043
  def _SubmitJobUnlocked(self, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
1044
    """Create and store a new job.
1045

Michael Hanselmann's avatar
Michael Hanselmann committed
1046
1047
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
1048

Iustin Pop's avatar
Iustin Pop committed
1049
    @type job_id: job ID
1050
    @param job_id: the job ID for the new job
1051
    @type ops: list
1052
    @param ops: The list of OpCodes that will become the new job.
1053
1054
1055
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
1056
1057

    """
1058
    if self._IsQueueMarkedDrain():
1059
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
Michael Hanselmann's avatar
Michael Hanselmann committed
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071

    # Check job queue size
    size = len(self._ListJobFiles())
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
      # TODO: Autoarchive jobs. Make sure it's not done on every job
      # submission, though.
      #size = ...
      pass

    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
      raise errors.JobQueueFull()

1072
1073
1074
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
1075
    self.UpdateJobUnlocked(job)
1076

1077
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
1078
1079
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
1080
1081
1082
1083
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
1084

1085
1086
1087
1088
1089
1090
1091
1092
  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitJob(self, ops):
    """Create and store a new job.

    @see: L{_SubmitJobUnlocked}

    """
Iustin Pop's avatar
Iustin Pop committed
1093
1094
    job_id = self._NewSerialsUnlocked(1)[0]
    return self._SubmitJobUnlocked(job_id, ops)
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104

  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitManyJobs(self, jobs):
    """Create and store multiple jobs.

    @see: L{_SubmitJobUnlocked}

    """
    results = []
Iustin Pop's avatar
Iustin Pop committed
1105
1106
    all_job_ids = self._NewSerialsUnlocked(len(jobs))