jqueue.py 40.7 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56
57
58
59
60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62
63
64
65
66
67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68
69
70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
72
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
73

74
75
76
77
78
79
80
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
81

Michael Hanselmann's avatar
Michael Hanselmann committed
82
  """
83
84
85
86
  __slots__ = ["input", "status", "result", "log",
               "start_timestamp", "end_timestamp",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
87
  def __init__(self, op):
88
89
90
91
92
93
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
94
95
96
97
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
98
99
    self.start_timestamp = None
    self.end_timestamp = None
100
101
102

  @classmethod
  def Restore(cls, state):
103
104
105
106
107
108
109
110
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
111
112
113
114
115
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
116
117
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
118
119
120
    return obj

  def Serialize(self):
121
122
123
124
125
126
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
127
128
129
130
131
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
132
133
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
134
      }
135

Michael Hanselmann's avatar
Michael Hanselmann committed
136
137
138
139

class _QueuedJob(object):
  """In-memory job representation.

140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type run_op_index: int
  @ivar run_op_index: the currently executing opcode, or -1 if
      we didn't yet start executing
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
157
158

  """
159
160
161
162
163
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
               "received_timestamp", "start_timestamp", "end_timestamp",
               "change",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
164
  def __init__(self, queue, job_id, ops):
165
166
167
168
169
170
171
172
173
174
175
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
176
    if not ops:
177
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
178
179
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
180
    self.queue = queue
181
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
182
183
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
184
    self.log_serial = 0
185
186
187
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
188
189
190

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
191
192

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
193
  def Restore(cls, queue, state):
194
195
196
197
198
199
200
201
202
203
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
204
205
206
207
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
208
209
210
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
211
212
213
214
215
216
217
218
219
220
221
222

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

223
224
225
    return obj

  def Serialize(self):
226
227
228
229
230
231
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
232
233
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
237
238
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
239
240
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
241
  def CalcStatus(self):
242
243
244
245
246
247
248
249
250
251
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
252
          - canceling
253
254
255
256
257
258
259
260
261
262
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
263
264
265
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
266
267
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
268
269
270
271
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
272
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
273
        pass
Iustin Pop's avatar
Iustin Pop committed
274
275
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
276
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
277
        status = constants.JOB_STATUS_RUNNING
278
279
280
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
281
      elif op.status == constants.OP_STATUS_ERROR:
282
283
284
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
287
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
288
289
290
291
292
293

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

294
  def GetLogEntries(self, newer_than):
295
296
297
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
298
    @param newer_than: if this is None, return all log entries,
299
300
301
302
303
304
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
305
306
307
308
309
310
311
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314
315

    return entries

316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False

336

Michael Hanselmann's avatar
Michael Hanselmann committed
337
class _JobQueueWorker(workerpool.BaseWorker):
338
339
340
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
341
342
343
344
345
346
347
348
349
350
351
352
353
354
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
355
356
357
358
359
360
361
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
                                    constants.OP_STATUS_CANCELING)

      # Cancel here if we were asked to
      if self.opcode.status == constants.OP_STATUS_CANCELING:
        raise CancelJob()

Iustin Pop's avatar
Iustin Pop committed
362
363
364
365
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
366
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
367
368
    """Job executor.

369
370
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
371

372
373
374
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
375
    """
376
    logging.info("Worker %s processing job %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
377
                  self.worker_id, job.id)
378
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
379
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
380
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
381
382
383
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
384
          op_summary = op.input.Summary()
385
386
387
388
389
390
391
392
393
          if op.status == constants.OP_STATUS_SUCCESS:
            # this is a job that was partially completed before master
            # daemon shutdown, so it can be expected that some opcodes
            # are already completed successfully (if any did error
            # out, then the whole job should have been aborted and not
            # resubmitted for processing)
            logging.info("Op %s/%s: opcode %s already processed, skipping",
                         idx + 1, count, op_summary)
            continue
Michael Hanselmann's avatar
Michael Hanselmann committed
394
          try:
395
396
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
397
398
399

            queue.acquire()
            try:
400
401
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
402
              assert op.status == constants.OP_STATUS_QUEUED
Michael Hanselmann's avatar
Michael Hanselmann committed
403
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
404
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
405
              op.result = None
406
              op.start_timestamp = TimeStampNow()
407
408
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
409
410
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
411
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
412
413
414
            finally:
              queue.release()

415
            def _Log(*args):
416
417
418
419
420
421
422
423
424
425
426
427
428
429
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
430

431
              queue.acquire()
432
              try:
433
434
435
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

436
437
                job.change.notifyAll()
              finally:
438
                queue.release()
439

440
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
441
442
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
443
444
445
446
447

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
448
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
449
450
451
452
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

453
454
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
455
456
457
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
458
459
460
461
462
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
463
464
465
466
                if isinstance(err, errors.GenericError):
                  op.result = errors.EncodeException(err)
                else:
                  op.result = str(err)
467
                op.end_timestamp = TimeStampNow()
468
469
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
470
471
472
473
474
475
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

476
477
478
479
480
481
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
482
483
484
485
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
486
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
487
488
      queue.acquire()
      try:
489
        try:
490
          job.run_op_index = -1
491
          job.end_timestamp = TimeStampNow()
492
493
494
495
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
496
497
      finally:
        queue.release()
498
499
      logging.info("Worker %s finished job %s, status = %s",
                   self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
500
501
502


class _JobQueueWorkerPool(workerpool.WorkerPool):
503
504
505
  """Simple class implementing a job-processing workerpool.

  """
506
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
507
508
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
509
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
510
511


512
513
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
514

515
516
517
518
519
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
  a '_queue_lock' argument.
520

521
  @warning: Use this decorator only after utils.LockedMethod!
522

523
524
525
526
527
  Example::
    @utils.LockedMethod
    @_RequireOpenQueue
    def Example(self):
      pass
528

529
530
531
532
533
  """
  def wrapper(self, *args, **kwargs):
    assert self._queue_lock is not None, "Queue should be open"
    return fn(self, *args, **kwargs)
  return wrapper
534
535


536
537
class JobQueue(object):
  """Queue used to manage the jobs.
538

539
540
541
542
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
543

Michael Hanselmann's avatar
Michael Hanselmann committed
544
  def __init__(self, context):
545
546
547
548
549
550
551
552
553
554
555
556
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
557
    self.context = context
558
    self._memcache = weakref.WeakValueDictionary()
559
    self._my_hostname = utils.HostInfo().name
560

Michael Hanselmann's avatar
Michael Hanselmann committed
561
562
563
564
565
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

566
    # Initialize
567
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
568

569
570
571
572
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
573

574
    # Get initial list of nodes
575
    self._nodes = dict((n.name, n.primary_ip)
576
577
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
578
579
580

    # Remove master node
    try:
581
      del self._nodes[self._my_hostname]
582
    except KeyError:
583
      pass
584
585
586

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
587
    # Setup worker pool
588
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
589
    try:
590
591
592
593
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
594
595
596
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
597
        jobs_count = len(all_job_ids)
598
599
600
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
601
602
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
603
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
604
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
605
606
607
608
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

609
610
611
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
612

613
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
614

615
616
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
617

618
          elif status in (constants.JOB_STATUS_RUNNING,
619
620
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
621
622
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
623
624
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                    "Unclean master daemon shutdown")
625
626
            finally:
              self.UpdateJobUnlocked(job)
627
628

        logging.info("Job queue inspection finished")
629
630
631
632
633
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
634

635
636
  @utils.LockedMethod
  @_RequireOpenQueue
637
638
639
640
641
642
643
644
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
645
    assert node_name != self._my_hostname
646

647
    # Clean queue directory on added node
648
    rpc.RpcRunner.call_jobqueue_purge(node_name)
649

650
651
652
653
654
655
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

656
657
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
658

659
660
661
662
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
663
664
665
666
667
668
669
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

670
671
672
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
673
674
675
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

676
    self._nodes[node_name] = node.primary_ip
677
678
679
680

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
681
682
683
684
685
686
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
687
    try:
688
      # The queue is removed by the "leave node" RPC call.
689
      del self._nodes[node_name]
690
    except KeyError:
691
692
      pass

693
  def _CheckRpcResult(self, result, nodes, failmsg):
694
695
696
697
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
698
    log the case when more than half of the nodes fails.
699
700
701
702
703
704
705
706

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

724
725
726
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

727
728
729
730
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

731
732
733
734
735
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

736
737
738
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

739
740
741
742
743
744
745
746
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

747
748
749
    """
    utils.WriteFile(file_name, data=data)

750
    names, addrs = self._GetNodeIp()
751
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
752
753
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
754

755
  def _RenameFilesUnlocked(self, rename):
756
757
758
759
760
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

761
762
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
763
764

    """
765
    # Rename them locally
766
767
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
768

769
770
771
772
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
773

Michael Hanselmann's avatar
Michael Hanselmann committed
774
  def _FormatJobID(self, job_id):
775
776
777
778
779
780
781
782
783
784
785
786
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
787
788
789
790
791
792
793
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

794
795
796
797
798
799
800
801
802
803
804
805
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

Iustin Pop's avatar
Iustin Pop committed
806
  def _NewSerialsUnlocked(self, count):
807
808
809
810
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

Iustin Pop's avatar
Iustin Pop committed
811
812
    @type count: integer
    @param count: how many serials to return
813
814
    @rtype: str
    @return: a string representing the job identifier.
815
816

    """
Iustin Pop's avatar
Iustin Pop committed
817
    assert count > 0
818
    # New number
Iustin Pop's avatar
Iustin Pop committed
819
    serial = self._last_serial + count
820
821

    # Write to file
822
823
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
824

Iustin Pop's avatar
Iustin Pop committed
825
826
    result = [self._FormatJobID(v)
              for v in range(self._last_serial, serial + 1)]
827
828
829
    # Keep it only if we were able to write the file
    self._last_serial = serial

Iustin Pop's avatar
Iustin Pop committed
830
    return result
831

Michael Hanselmann's avatar
Michael Hanselmann committed
832
833
  @staticmethod
  def _GetJobPath(job_id):
834
835
836
837
838
839
840
841
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
842
843
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

844
845
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
846
847
848
849
850
851
852
853
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
854
855
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
856

Michael Hanselmann's avatar
Michael Hanselmann committed
857
858
  @classmethod
  def _ExtractJobID(cls, name):
859
860
861
862
863
864
865
866
867
868
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
869
    m = cls._RE_JOB_FILE.match(name)
870
871
872
873
874
    if m:
      return m.group(1)
    else:
      return None

875
876
877
878
879
880
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
881
882
883
884
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

885
886
887
    @rtype: list
    @return: the list of job IDs

888
    """
889
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
890
    jlist = utils.NiceSort(jlist)
891
    return jlist
892

893
  def _ListJobFiles(self):
894
895
896
897
898
899
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
900
901
902
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

903
  def _LoadJobUnlocked(self, job_id):
904
905
906
907
908
909
910
911
912
913
914
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
915
916
    job = self._memcache.get(job_id, None)
    if job:
917
      logging.debug("Found job %s in memcache", job_id)
918
      return job
Iustin Pop's avatar
Iustin Pop committed
919

920
    filepath = self._GetJobPath(job_id)
921
922
923
924
925
926
927
928
929
930
931
932
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

933
934
935
936
937
938
939
940
941
942
    try:
      job = _QueuedJob.Restore(self, data)
    except Exception, err:
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
943
        self._RenameFilesUnlocked([(filepath, new_path)])
944
945
      return None

Iustin Pop's avatar
Iustin Pop committed
946
    self._memcache[job_id] = job
947
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
948
    return job
949
950

  def _GetJobsUnlocked(self, job_ids):
951
952
953
954
955
956
957
958
959
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
960
961
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
962

963
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
964

965
966
967
968
969
970
971
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

972
973
974
    @rtype: boolean
    @return: True of the job queue is marked for draining

975
976
977
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

978
979
980
981
982
983
984
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

985
    @type drain_flag: boolean
Michael Hanselmann's avatar
Michael Hanselmann committed
986
    @param drain_flag: Whether to set or unset the drain flag
987

988
989
990
991
992
993
994
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

995
  @_RequireOpenQueue
Iustin Pop's avatar
Iustin Pop committed
996
  def _SubmitJobUnlocked(self, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
997
    """Create and store a new job.
998

Michael Hanselmann's avatar
Michael Hanselmann committed
999
1000
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
1001

Iustin Pop's avatar
Iustin Pop committed
1002
1003
    @type job_id: job ID
    @param jod_id: the job ID for the new job
1004
    @type ops: list
1005
    @param ops: The list of OpCodes that will become the new job.
1006
1007
1008
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
1009
1010

    """
1011
    if self._IsQueueMarkedDrain():
1012
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
Michael Hanselmann's avatar
Michael Hanselmann committed
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024

    # Check job queue size
    size = len(self._ListJobFiles())
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
      # TODO: Autoarchive jobs. Make sure it's not done on every job
      # submission, though.
      #size = ...
      pass

    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
      raise errors.JobQueueFull()

1025
1026
1027
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
1028
    self.UpdateJobUnlocked(job)
1029

1030
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
1031
1032
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
1033
1034
1035
1036
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
1037

1038
1039
1040
1041
1042
1043
1044
1045
  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitJob(self, ops):
    """Create and store a new job.

    @see: L{_SubmitJobUnlocked}

    """
Iustin Pop's avatar
Iustin Pop committed
1046
1047
    job_id = self._NewSerialsUnlocked(1)[0]
    return self._SubmitJobUnlocked(job_id, ops)
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057

  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitManyJobs(self, jobs):
    """Create and store multiple jobs.

    @see: L{_SubmitJobUnlocked}

    """
    results = []
Iustin Pop's avatar
Iustin Pop committed
1058
1059
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
    for job_id, ops in zip(all_job_ids, jobs):
1060
      try:
Iustin Pop's avatar
Iustin Pop committed
1061
        data = self._SubmitJobUnlocked(job_id, ops)
1062
1063
1064
1065
1066
1067
1068
1069
1070
        status = True
      except errors.GenericError, err:
        data = str(err)
        status = False
      results.append((status, data))

    return results


1071
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
1072
  def UpdateJobUnlocked(self, job):
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
1083
    filename = self._GetJobPath(job.id)
1084
    data = serializer.DumpJson(job.Serialize(), indent=False)
1085
    logging.debug("Writing job %s to %s", job.id, filename)
1086
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
1087

1088
    # Notify waiters about potential changes
1089
    job.change.notifyAll()
1090

1091
  @utils.LockedMethod
1092
  @_RequireOpenQueue
1093
1094
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
1105
1106
    @type timeout: float
    @param timeout: maximum time to wait
1107
1108
1109
1110
1111
1112
1113
1114
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
1115
1116

    """
1117
    logging.debug("Waiting for changes in job %s", job_id)
1118
1119
1120
1121

    job_info = None
    log_entries = None

1122
    end_time = time.time() + timeout
1123
    while True:
1124
1125
1126
1127
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

1128
1129
1130
1131
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
1132

1133
1134
1135
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
1136
1137
1138
1139
1140
1141

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
1142
1143
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1144

1145
      if status not in (constants.JOB_STATUS_QUEUED,
Iustin Pop's avatar
Iustin Pop committed
1146
1147
                        constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
1148
1149
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
1150
1151
        break

1152
1153
1154
1155
1156
1157
1158
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
1159
      job.change.wait(delta_time)
1160
1161
1162

    logging.debug("Job %s changed", job_id)

1163
1164
1165
1166
    if job_info is None and log_entries is None:
      return None
    else:
      return (job_info, log_entries)
1167

1168
  @utils.LockedMethod
1169
  @_RequireOpenQueue
1170
1171
1172
  def CancelJob(self, job_id):
    """Cancels a job.

1173
1174
    This will only succeed if the job has not started yet.

1175
    @type job_id: string
1176
    @param job_id: job ID of job to be cancelled.
1177
1178

    """
1179
    logging.info("Cancelling job %s", job_id)
1180

Michael Hanselmann's avatar
Michael Hanselmann committed
1181
    job = self._LoadJobUnlocked(job_id)
1182
1183
    if not job:
      logging.debug("Job %s not found", job_id)
1184
1185
1186
      return (False, "Job %s not found" % job_id)

    job_status = job.CalcStatus()
1187