jqueue.py 40.5 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56
57
58
59
60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62
63
64
65
66
67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68
69
70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
72
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
73

74
75
76
77
78
79
80
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
81

Michael Hanselmann's avatar
Michael Hanselmann committed
82
  """
83
84
85
86
  __slots__ = ["input", "status", "result", "log",
               "start_timestamp", "end_timestamp",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
87
  def __init__(self, op):
88
89
90
91
92
93
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
94
95
96
97
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
98
99
    self.start_timestamp = None
    self.end_timestamp = None
100
101
102

  @classmethod
  def Restore(cls, state):
103
104
105
106
107
108
109
110
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
111
112
113
114
115
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
116
117
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
118
119
120
    return obj

  def Serialize(self):
121
122
123
124
125
126
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
127
128
129
130
131
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
132
133
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
134
      }
135

Michael Hanselmann's avatar
Michael Hanselmann committed
136
137
138
139

class _QueuedJob(object):
  """In-memory job representation.

140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type run_op_index: int
  @ivar run_op_index: the currently executing opcode, or -1 if
      we didn't yet start executing
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
157
158

  """
159
160
161
162
163
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
               "received_timestamp", "start_timestamp", "end_timestamp",
               "change",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
164
  def __init__(self, queue, job_id, ops):
165
166
167
168
169
170
171
172
173
174
175
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
176
    if not ops:
177
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
178
179
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
180
    self.queue = queue
181
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
182
183
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
184
    self.log_serial = 0
185
186
187
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
188
189
190

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
191
192

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
193
  def Restore(cls, queue, state):
194
195
196
197
198
199
200
201
202
203
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
204
205
206
207
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
208
209
210
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
211
212
213
214
215
216
217
218
219
220
221
222

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

223
224
225
    return obj

  def Serialize(self):
226
227
228
229
230
231
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
232
233
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
237
238
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
239
240
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
241
  def CalcStatus(self):
242
243
244
245
246
247
248
249
250
251
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
252
          - canceling
253
254
255
256
257
258
259
260
261
262
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
263
264
265
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
266
267
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
268
269
270
271
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
272
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
273
        pass
Iustin Pop's avatar
Iustin Pop committed
274
275
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
276
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
277
        status = constants.JOB_STATUS_RUNNING
278
279
280
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
281
      elif op.status == constants.OP_STATUS_ERROR:
282
283
284
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
287
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
288
289
290
291
292
293

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

294
  def GetLogEntries(self, newer_than):
295
296
297
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
298
    @param newer_than: if this is None, return all log entries,
299
300
301
302
303
304
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
305
306
307
308
309
310
311
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314
315

    return entries

316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False

336

Michael Hanselmann's avatar
Michael Hanselmann committed
337
class _JobQueueWorker(workerpool.BaseWorker):
338
339
340
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
341
342
343
344
345
346
347
348
349
350
351
352
353
354
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
355
356
357
358
359
360
361
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
                                    constants.OP_STATUS_CANCELING)

      # Cancel here if we were asked to
      if self.opcode.status == constants.OP_STATUS_CANCELING:
        raise CancelJob()

Iustin Pop's avatar
Iustin Pop committed
362
363
364
365
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
366
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
367
368
    """Job executor.

369
370
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
371

372
373
374
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
375
    """
376
    logging.info("Worker %s processing job %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
377
                  self.worker_id, job.id)
378
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
379
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
380
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
381
382
383
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
384
          op_summary = op.input.Summary()
385
386
387
388
389
390
391
392
393
          if op.status == constants.OP_STATUS_SUCCESS:
            # this is a job that was partially completed before master
            # daemon shutdown, so it can be expected that some opcodes
            # are already completed successfully (if any did error
            # out, then the whole job should have been aborted and not
            # resubmitted for processing)
            logging.info("Op %s/%s: opcode %s already processed, skipping",
                         idx + 1, count, op_summary)
            continue
Michael Hanselmann's avatar
Michael Hanselmann committed
394
          try:
395
396
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
397
398
399

            queue.acquire()
            try:
400
401
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
402
              assert op.status == constants.OP_STATUS_QUEUED
Michael Hanselmann's avatar
Michael Hanselmann committed
403
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
404
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
405
              op.result = None
406
              op.start_timestamp = TimeStampNow()
407
408
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
409
410
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
411
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
412
413
414
            finally:
              queue.release()

415
            def _Log(*args):
416
417
418
419
420
421
422
423
424
425
426
427
428
429
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
430

431
              queue.acquire()
432
              try:
433
434
435
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

436
437
                job.change.notifyAll()
              finally:
438
                queue.release()
439

440
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
441
442
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
443
444
445
446
447

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
448
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
449
450
451
452
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

453
454
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
455
456
457
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
458
459
460
461
462
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
463
464
465
466
                if isinstance(err, errors.GenericError):
                  op.result = errors.EncodeException(err)
                else:
                  op.result = str(err)
467
                op.end_timestamp = TimeStampNow()
468
469
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
470
471
472
473
474
475
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

476
477
478
479
480
481
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
482
483
484
485
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
486
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
487
488
      queue.acquire()
      try:
489
        try:
490
          job.run_op_index = -1
491
          job.end_timestamp = TimeStampNow()
492
493
494
495
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
496
497
      finally:
        queue.release()
498
499
      logging.info("Worker %s finished job %s, status = %s",
                   self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
500
501
502


class _JobQueueWorkerPool(workerpool.WorkerPool):
503
504
505
  """Simple class implementing a job-processing workerpool.

  """
506
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
507
508
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
509
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
510
511


Michael Hanselmann's avatar
Michael Hanselmann committed
512
class JobQueue(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
513
  """Queue used to manage the jobs.
514
515
516
517

  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
518
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
519

520
521
522
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

523
524
    This function should be used for all 'public' functions. That is,
    functions usually called from other classes.
525

526
    @warning: Use this decorator only after utils.LockedMethod!
527

528
    Example::
529
530
531
532
533
534
535
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
536
      assert self._queue_lock is not None, "Queue should be open"
537
538
539
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
540
  def __init__(self, context):
541
542
543
544
545
546
547
548
549
550
551
552
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
553
    self.context = context
554
    self._memcache = weakref.WeakValueDictionary()
555
    self._my_hostname = utils.HostInfo().name
556

Michael Hanselmann's avatar
Michael Hanselmann committed
557
558
559
560
561
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

562
    # Initialize
563
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
564

565
566
567
568
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
569

570
    # Get initial list of nodes
571
    self._nodes = dict((n.name, n.primary_ip)
572
573
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
574
575
576

    # Remove master node
    try:
577
      del self._nodes[self._my_hostname]
578
    except KeyError:
579
      pass
580
581
582

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
583
    # Setup worker pool
584
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
585
    try:
586
587
588
589
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
590
591
592
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
593
        jobs_count = len(all_job_ids)
594
595
596
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
597
598
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
599
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
600
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
601
602
603
604
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

605
606
607
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
608

609
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
610

611
612
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
613

614
          elif status in (constants.JOB_STATUS_RUNNING,
615
616
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
617
618
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
619
620
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                    "Unclean master daemon shutdown")
621
622
            finally:
              self.UpdateJobUnlocked(job)
623
624

        logging.info("Job queue inspection finished")
625
626
627
628
629
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
630

631
632
  @utils.LockedMethod
  @_RequireOpenQueue
633
634
635
636
637
638
639
640
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
641
    assert node_name != self._my_hostname
642

643
    # Clean queue directory on added node
644
645
646
647
648
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
    msg = result.RemoteFailMsg()
    if msg:
      logging.warning("Cannot cleanup queue directory on node %s: %s",
                      node_name, msg)
649

650
651
652
653
654
655
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

656
657
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
658

659
660
661
662
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
663
664
665
666
667
668
669
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

670
671
672
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
673
674
675
676
      msg = result[node_name].RemoteFailMsg()
      if msg:
        logging.error("Failed to upload file %s to node %s: %s",
                      file_name, node_name, msg)
677

678
    self._nodes[node_name] = node.primary_ip
679
680
681
682

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
683
684
685
686
687
688
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
689
    try:
690
      # The queue is removed by the "leave node" RPC call.
691
      del self._nodes[node_name]
692
    except KeyError:
693
694
      pass

695
  def _CheckRpcResult(self, result, nodes, failmsg):
696
697
698
699
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
700
    log the case when more than half of the nodes fails.
701
702
703
704
705
706
707
708

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
709
710
711
712
    failed = []
    success = []

    for node in nodes:
713
714
      msg = result[node].RemoteFailMsg()
      if msg:
715
        failed.append(node)
716
717
718
719
        logging.error("RPC call %s failed on node %s: %s",
                      result[node].call, node, msg)
      else:
        success.append(node)
720
721
722
723
724
725

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

726
727
728
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

729
730
731
732
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

733
734
735
736
737
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

738
739
740
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

741
742
743
744
745
746
747
748
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

749
750
751
    """
    utils.WriteFile(file_name, data=data)

752
    names, addrs = self._GetNodeIp()
753
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
754
755
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
756

757
  def _RenameFilesUnlocked(self, rename):
758
759
760
761
762
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

763
764
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
765
766

    """
767
    # Rename them locally
768
769
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
770

771
772
773
774
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
775

Michael Hanselmann's avatar
Michael Hanselmann committed
776
  def _FormatJobID(self, job_id):
777
778
779
780
781
782
783
784
785
786
787
788
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
789
790
791
792
793
794
795
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

796
797
798
799
800
801
802
803
804
805
806
807
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

808
  def _NewSerialUnlocked(self):
809
810
811
812
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

813
814
    @rtype: str
    @return: a string representing the job identifier.
815
816
817
818
819
820

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
821
822
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
823
824
825
826

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
827
    return self._FormatJobID(serial)
828

Michael Hanselmann's avatar
Michael Hanselmann committed
829
830
  @staticmethod
  def _GetJobPath(job_id):
831
832
833
834
835
836
837
838
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
839
840
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

841
842
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
843
844
845
846
847
848
849
850
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
851
852
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
853

Michael Hanselmann's avatar
Michael Hanselmann committed
854
855
  @classmethod
  def _ExtractJobID(cls, name):
856
857
858
859
860
861
862
863
864
865
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
866
    m = cls._RE_JOB_FILE.match(name)
867
868
869
870
871
    if m:
      return m.group(1)
    else:
      return None

872
873
874
875
876
877
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
878
879
880
881
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

882
883
884
    @rtype: list
    @return: the list of job IDs

885
    """
886
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
887
    jlist = utils.NiceSort(jlist)
888
    return jlist
889

890
  def _ListJobFiles(self):
891
892
893
894
895
896
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
897
898
899
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

900
  def _LoadJobUnlocked(self, job_id):
901
902
903
904
905
906
907
908
909
910
911
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
912
913
    job = self._memcache.get(job_id, None)
    if job:
914
      logging.debug("Found job %s in memcache", job_id)
915
      return job
Iustin Pop's avatar
Iustin Pop committed
916

917
    filepath = self._GetJobPath(job_id)
918
919
920
921
922
923
924
925
926
927
928
929
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

930
931
932
933
934
935
936
937
938
939
    try:
      job = _QueuedJob.Restore(self, data)
    except Exception, err:
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
940
        self._RenameFilesUnlocked([(filepath, new_path)])
941
942
      return None

Iustin Pop's avatar
Iustin Pop committed
943
    self._memcache[job_id] = job
944
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
945
    return job
946
947

  def _GetJobsUnlocked(self, job_ids):
948
949
950
951
952
953
954
955
956
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
957
958
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
959

960
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
961

962
963
964
965
966
967
968
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

969
970
971
    @rtype: boolean
    @return: True of the job queue is marked for draining

972
973
974
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

975
976
977
978
979
980
981
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

982
    @type drain_flag: boolean
Michael Hanselmann's avatar
Michael Hanselmann committed
983
    @param drain_flag: Whether to set or unset the drain flag
984

985
986
987
988
989
990
991
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

992
  @_RequireOpenQueue
993
  def _SubmitJobUnlocked(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
994
    """Create and store a new job.
995

Michael Hanselmann's avatar
Michael Hanselmann committed
996
997
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
998
999

    @type ops: list
1000
    @param ops: The list of OpCodes that will become the new job.
1001
1002
1003
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
1004
1005

    """
1006
    if self._IsQueueMarkedDrain():
1007
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
Michael Hanselmann's avatar
Michael Hanselmann committed
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019

    # Check job queue size
    size = len(self._ListJobFiles())
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
      # TODO: Autoarchive jobs. Make sure it's not done on every job
      # submission, though.
      #size = ...
      pass

    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
      raise errors.JobQueueFull()

1020
    # Get job identifier
1021
    job_id = self._NewSerialUnlocked()
1022
1023
1024
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
1025
    self.UpdateJobUnlocked(job)
1026

1027
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
1028
1029
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
1030
1031
1032
1033
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
1034

1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitJob(self, ops):
    """Create and store a new job.

    @see: L{_SubmitJobUnlocked}

    """
    return self._SubmitJobUnlocked(ops)

  @utils.LockedMethod
  @_RequireOpenQueue
  def SubmitManyJobs(self, jobs):
    """Create and store multiple jobs.

    @see: L{_SubmitJobUnlocked}

    """
    results = []
    for ops in jobs:
      try:
        data = self._SubmitJobUnlocked(ops)
        status = True
      except errors.GenericError, err:
        data = str(err)
        status = False
      results.append((status, data))

    return results


1066
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
1067
  def UpdateJobUnlocked(self, job):
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
1078
    filename = self._GetJobPath(job.id)
1079
    data = serializer.DumpJson(job.Serialize(), indent=False)
1080
    logging.debug("Writing job %s to %s", job.id, filename)
1081
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
1082

1083
    # Notify waiters about potential changes
1084
    job.change.notifyAll()
1085

1086
  @utils.LockedMethod
1087
  @_RequireOpenQueue
1088
1089
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
1100
1101
    @type timeout: float
    @param timeout: maximum time to wait
1102
1103
1104
1105
1106
1107
1108
1109
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
1110
1111

    """
1112
    logging.debug("Waiting for changes in job %s", job_id)
1113
1114
1115
1116

    job_info = None
    log_entries = None

1117
    end_time = time.time() + timeout
1118
    while True:
1119
1120
1121
1122
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

1123
1124
1125
1126
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
1127

1128
1129
1130
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
1131
1132
1133
1134
1135
1136

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
1137
1138
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1139

1140
      if status not in (constants.JOB_STATUS_QUEUED,
Iustin Pop's avatar
Iustin Pop committed
1141
1142
                        constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
1143
1144
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
1145
1146
        break

1147
1148
1149
1150
1151
1152
1153
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
1154
      job.change.wait(delta_time)
1155
1156
1157

    logging.debug("Job %s changed", job_id)

1158
1159
1160
1161
    if job_info is None and log_entries is None:
      return None
    else:
      return (job_info, log_entries)
1162

1163
  @utils.LockedMethod
1164
  @_RequireOpenQueue
1165
1166
1167
  def CancelJob(self, job_id):
    """Cancels a job.

1168
1169
    This will only succeed if the job has not started yet.

1170
    @type job_id: string
1171
    @param job_id: job ID of job to be cancelled.
1172
1173

    """
1174
    logging.info("Cancelling job %s", job_id)
1175

Michael Hanselmann's avatar
Michael Hanselmann committed
1176
    job = self._LoadJobUnlocked(job_id)
1177
1178
    if not job:
      logging.debug("Job %s not found", job_id)
1179
1180
1181
      return (False, "Job %s not found" % job_id)

    job_status = job.CalcStatus()
1182

1183
1184
    if job_status not in (constants.JOB_STATUS_QUEUED,
                          constants.JOB_STATUS_WAITLOCK):
1185
1186
      logging.debug("Job %s is no longer waiting in the queue", job.id)
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1187
1188
1189
1190

    if job_status == constants.JOB_STATUS_QUEUED:
      self.CancelJobUnlocked(job)
      return (True, "Job %s canceled" % job.id)
1191

1192
1193
1194
    elif job_status == constants.JOB_STATUS_WAITLOCK:
      # The worker will notice the new status and cancel the job
      try:
1195
        job.