jqueue.py 58 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

Michael Hanselmann's avatar
Michael Hanselmann committed
32
import logging
33
34
import errno
import re
35
import time
36
import weakref
Iustin Pop's avatar
Iustin Pop committed
37

Guido Trotter's avatar
Guido Trotter committed
38
39
40
41
42
43
44
try:
  # pylint: disable-msg=E0611
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from ganeti import asyncnotifier
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import constants
46
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
47
from ganeti import workerpool
48
from ganeti import locking
49
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
50
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
51
from ganeti import mcpu
52
from ganeti import utils
53
from ganeti import jstore
54
from ganeti import rpc
55
from ganeti import runtime
56
from ganeti import netutils
57
from ganeti import compat
Michael Hanselmann's avatar
Michael Hanselmann committed
58

59

60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
62

63
64
65
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
66

Iustin Pop's avatar
Iustin Pop committed
67

68
class CancelJob(Exception):
69
70
71
72
73
  """Special exception to cancel a job.

  """


74
def TimeStampNow():
75
76
77
78
79
80
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
81
82
83
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
84
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
85
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
86

87
88
89
90
91
92
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
Iustin Pop's avatar
Iustin Pop committed
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

Michael Hanselmann's avatar
Michael Hanselmann committed
96
  """
97
  __slots__ = ["input", "status", "result", "log", "priority",
Iustin Pop's avatar
Iustin Pop committed
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
100
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
101
  def __init__(self, op):
102
103
104
105
106
107
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
108
109
110
111
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
112
    self.start_timestamp = None
Iustin Pop's avatar
Iustin Pop committed
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

116
117
118
    # Get initial priority (it might change during the lifetime of this opcode)
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)

119
120
  @classmethod
  def Restore(cls, state):
121
122
123
124
125
126
127
128
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
129
130
131
132
133
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
134
    obj.start_timestamp = state.get("start_timestamp", None)
Iustin Pop's avatar
Iustin Pop committed
135
    obj.exec_timestamp = state.get("exec_timestamp", None)
136
    obj.end_timestamp = state.get("end_timestamp", None)
137
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138
139
140
    return obj

  def Serialize(self):
141
142
143
144
145
146
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
147
148
149
150
151
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
152
      "start_timestamp": self.start_timestamp,
Iustin Pop's avatar
Iustin Pop committed
153
      "exec_timestamp": self.exec_timestamp,
154
      "end_timestamp": self.end_timestamp,
155
      "priority": self.priority,
156
      }
157

Michael Hanselmann's avatar
Michael Hanselmann committed
158
159
160
161

class _QueuedJob(object):
  """In-memory job representation.

162
163
164
165
166
167
168
169
170
171
172
173
174
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
Michael Hanselmann's avatar
Michael Hanselmann committed
175
176

  """
Iustin Pop's avatar
Iustin Pop committed
177
  # pylint: disable-msg=W0212
178
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179
180
181
               "received_timestamp", "start_timestamp", "end_timestamp",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
182
  def __init__(self, queue, job_id, ops):
183
184
185
186
187
188
189
190
191
192
193
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
194
    if not ops:
Guido Trotter's avatar
Guido Trotter committed
195
      raise errors.GenericError("A job needs at least one opcode")
Michael Hanselmann's avatar
Michael Hanselmann committed
196

Michael Hanselmann's avatar
Michael Hanselmann committed
197
    self.queue = queue
198
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
199
    self.ops = [_QueuedOpCode(op) for op in ops]
200
    self.log_serial = 0
201
202
203
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
204

205
206
207
208
209
210
211
    self._InitInMemory(self)

  @staticmethod
  def _InitInMemory(obj):
    """Initializes in-memory variables.

    """
212
    obj.ops_iter = None
213
    obj.cur_opctx = None
214

215
216
217
218
219
220
221
  def __repr__(self):
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
              "id=%s" % self.id,
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]

    return "<%s at %#x>" % (" ".join(status), id(self))

222
  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
223
  def Restore(cls, queue, state):
224
225
226
227
228
229
230
231
232
233
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
234
235
236
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
237
238
239
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
240
241
242
243
244
245
246
247
248

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

249
    cls._InitInMemory(obj)
250

251
252
253
    return obj

  def Serialize(self):
254
255
256
257
258
259
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
260
261
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
262
      "ops": [op.Serialize() for op in self.ops],
263
264
265
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
266
267
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
268
  def CalcStatus(self):
269
270
271
272
273
274
275
276
277
278
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
279
          - canceling
280
281
282
283
284
285
286
287
288
289
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
290
291
292
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
293
294
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
295
296
297
298
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
299
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
300
        pass
Iustin Pop's avatar
Iustin Pop committed
301
302
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
303
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
304
        status = constants.JOB_STATUS_RUNNING
305
306
307
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
308
      elif op.status == constants.OP_STATUS_ERROR:
309
310
311
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
312
      elif op.status == constants.OP_STATUS_CANCELED:
313
314
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
315
316
317
318
319
320

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
  def CalcPriority(self):
    """Gets the current priority for this job.

    Only unfinished opcodes are considered. When all are done, the default
    priority is used.

    @rtype: int

    """
    priorities = [op.priority for op in self.ops
                  if op.status not in constants.OPS_FINALIZED]

    if not priorities:
      # All opcodes are done, assume default priority
      return constants.OP_PRIO_DEFAULT

    return min(priorities)

339
  def GetLogEntries(self, newer_than):
340
341
342
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
343
    @param newer_than: if this is None, return all log entries,
344
345
346
347
348
349
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
350
351
352
353
354
355
356
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
357
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358
359
360

    return entries

361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
  def GetInfo(self, fields):
    """Returns information about a job.

    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field
    @raise errors.OpExecError: when an invalid field
        has been passed

    """
    row = []
    for fname in fields:
      if fname == "id":
        row.append(self.id)
      elif fname == "status":
        row.append(self.CalcStatus())
378
379
      elif fname == "priority":
        row.append(self.CalcPriority())
380
381
382
383
384
385
386
387
388
389
390
391
392
393
      elif fname == "ops":
        row.append([op.input.__getstate__() for op in self.ops])
      elif fname == "opresult":
        row.append([op.result for op in self.ops])
      elif fname == "opstatus":
        row.append([op.status for op in self.ops])
      elif fname == "oplog":
        row.append([op.log for op in self.ops])
      elif fname == "opstart":
        row.append([op.start_timestamp for op in self.ops])
      elif fname == "opexec":
        row.append([op.exec_timestamp for op in self.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in self.ops])
394
395
      elif fname == "oppriority":
        row.append([op.priority for op in self.ops])
396
397
398
399
400
401
402
403
404
405
406
407
      elif fname == "received_ts":
        row.append(self.received_timestamp)
      elif fname == "start_ts":
        row.append(self.start_timestamp)
      elif fname == "end_ts":
        row.append(self.end_timestamp)
      elif fname == "summary":
        row.append([op.input.Summary() for op in self.ops])
      else:
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
    return row

408
409
410
411
412
413
414
415
416
417
418
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
419
420
421
422
423
424
425
426
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False
427

428
  def Cancel(self):
429
430
431
432
433
434
435
    """Marks job as canceled/-ing if possible.

    @rtype: tuple; (bool, string)
    @return: Boolean describing whether job was successfully canceled or marked
      as canceling and a text message

    """
436
437
438
439
440
    status = self.CalcStatus()

    if status == constants.JOB_STATUS_QUEUED:
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                             "Job canceled by request")
441
      return (True, "Job %s canceled" % self.id)
442
443
444
445

    elif status == constants.JOB_STATUS_WAITLOCK:
      # The worker will notice the new status and cancel the job
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446
      return (True, "Job %s will be canceled" % self.id)
447

448
449
450
    else:
      logging.debug("Job %s is no longer waiting in the queue", self.id)
      return (False, "Job %s is no longer waiting in the queue" % self.id)
451

452

453
class _OpExecCallbacks(mcpu.OpExecCbBase):
454
455
  def __init__(self, queue, job, op):
    """Initializes this class.
456

457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

473
474
475
476
477
478
479
480
481
  def _CheckCancel(self):
    """Raises an exception to cancel the job if asked to.

    """
    # Cancel here if we were asked to
    if self._op.status == constants.OP_STATUS_CANCELING:
      logging.debug("Canceling opcode")
      raise CancelJob()

482
  @locking.ssynchronized(_QUEUE, shared=1)
483
  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
484
485
    """Mark the opcode as running, not lock-waiting.

486
487
488
489
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
490
491

    """
492
    assert self._op in self._job.ops
493
494
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                               constants.OP_STATUS_CANCELING)
495

496
    # Cancel here if we were asked to
497
    self._CheckCancel()
498

499
    logging.debug("Opcode is now running")
500

501
502
503
504
505
    self._op.status = constants.OP_STATUS_RUNNING
    self._op.exec_timestamp = TimeStampNow()

    # And finally replicate the job status
    self._queue.UpdateJobUnlocked(self._job)
506

507
  @locking.ssynchronized(_QUEUE, shared=1)
508
509
510
511
512
513
514
515
  def _AppendFeedback(self, timestamp, log_type, log_msg):
    """Internal feedback append function, with locks

    """
    self._job.log_serial += 1
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
    self._queue.UpdateJobUnlocked(self._job, replicate=False)

516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
531
    self._AppendFeedback(timestamp, log_type, log_msg)
532

533
534
  def CheckCancel(self):
    """Check whether job has been cancelled.
535
536

    """
537
538
539
540
541
542
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                               constants.OP_STATUS_CANCELING)

    # Cancel here if we were asked to
    self._CheckCancel()

543

544
545
546
class _JobChangesChecker(object):
  def __init__(self, fields, prev_job_info, prev_log_serial):
    """Initializes this class.
Guido Trotter's avatar
Guido Trotter committed
547

548
549
550
551
552
553
    @type fields: list of strings
    @param fields: Fields requested by LUXI client
    @type prev_job_info: string
    @param prev_job_info: previous job info, as passed by the LUXI client
    @type prev_log_serial: string
    @param prev_log_serial: previous job serial, as passed by the LUXI client
Guido Trotter's avatar
Guido Trotter committed
554

555
556
557
558
    """
    self._fields = fields
    self._prev_job_info = prev_job_info
    self._prev_log_serial = prev_log_serial
Guido Trotter's avatar
Guido Trotter committed
559

560
561
  def __call__(self, job):
    """Checks whether job has changed.
Guido Trotter's avatar
Guido Trotter committed
562

563
564
    @type job: L{_QueuedJob}
    @param job: Job object
Guido Trotter's avatar
Guido Trotter committed
565
566

    """
567
568
569
    status = job.CalcStatus()
    job_info = job.GetInfo(self._fields)
    log_entries = job.GetLogEntries(self._prev_log_serial)
Guido Trotter's avatar
Guido Trotter committed
570
571
572
573
574
575
576
577

    # Serializing and deserializing data can cause type changes (e.g. from
    # tuple to list) or precision loss. We're doing it here so that we get
    # the same modifications as the data received from the client. Without
    # this, the comparison afterwards might fail without the data being
    # significantly different.
    # TODO: we just deserialized from disk, investigate how to make sure that
    # the job info and log entries are compatible to avoid this further step.
578
579
580
581
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
    # efficient, though floats will be tricky
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
Guido Trotter's avatar
Guido Trotter committed
582
583
584

    # Don't even try to wait if the job is no longer running, there will be
    # no changes.
585
586
587
588
589
590
591
    if (status not in (constants.JOB_STATUS_QUEUED,
                       constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK) or
        job_info != self._prev_job_info or
        (log_entries and self._prev_log_serial != log_entries[0][0])):
      logging.debug("Job %s changed", job.id)
      return (job_info, log_entries)
Guido Trotter's avatar
Guido Trotter committed
592

593
594
595
596
597
598
599
600
601
602
    return None


class _JobFileChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file
    @raises errors.InotifyError: if the notifier cannot be setup
Guido Trotter's avatar
Guido Trotter committed
603

604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
    """
    self._wm = pyinotify.WatchManager()
    self._inotify_handler = \
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
    self._notifier = \
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
    try:
      self._inotify_handler.enable()
    except Exception:
      # pyinotify doesn't close file descriptors automatically
      self._notifier.stop()
      raise

  def _OnInotify(self, notifier_enabled):
    """Callback for inotify.

    """
Guido Trotter's avatar
Guido Trotter committed
621
    if not notifier_enabled:
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
      self._inotify_handler.enable()

  def Wait(self, timeout):
    """Waits for the job file to change.

    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    assert timeout >= 0
    have_events = self._notifier.check_events(timeout * 1000)
    if have_events:
      self._notifier.read_events()
    self._notifier.process_events()
    return have_events

  def Close(self):
    """Closes underlying notifier and its file descriptor.

    """
    self._notifier.stop()


class _JobChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file

    """
    self._filewaiter = None
    self._filename = filename
Guido Trotter's avatar
Guido Trotter committed
656

657
658
  def Wait(self, timeout):
    """Waits for a job to change.
Guido Trotter's avatar
Guido Trotter committed
659

660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    if self._filewaiter:
      return self._filewaiter.Wait(timeout)

    # Lazy setup: Avoid inotify setup cost when job file has already changed.
    # If this point is reached, return immediately and let caller check the job
    # file again in case there were changes since the last check. This avoids a
    # race condition.
    self._filewaiter = _JobFileChangesWaiter(self._filename)

    return True

  def Close(self):
    """Closes underlying waiter.

    """
    if self._filewaiter:
      self._filewaiter.Close()


class _WaitForJobChangesHelper(object):
  """Helper class using inotify to wait for changes in a job file.

  This class takes a previous job status and serial, and alerts the client when
  the current job status has changed.

  """
  @staticmethod
  def _CheckForChanges(job_load_fn, check_fn):
    job = job_load_fn()
    if not job:
      raise errors.JobLost()

    result = check_fn(job)
    if result is None:
      raise utils.RetryAgain()

    return result

  def __call__(self, filename, job_load_fn,
               fields, prev_job_info, prev_log_serial, timeout):
    """Waits for changes on a job.

    @type filename: string
    @param filename: File on which to wait for changes
    @type job_load_fn: callable
    @param job_load_fn: Function to load job
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
    @type timeout: float
    @param timeout: maximum time to wait in seconds

    """
Guido Trotter's avatar
Guido Trotter committed
721
    try:
722
723
724
725
726
727
728
729
730
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
      waiter = _JobChangesWaiter(filename)
      try:
        return utils.Retry(compat.partial(self._CheckForChanges,
                                          job_load_fn, check_fn),
                           utils.RETRY_REMAINING_TIME, timeout,
                           wait_fn=waiter.Wait)
      finally:
        waiter.Close()
Guido Trotter's avatar
Guido Trotter committed
731
732
733
734
735
736
    except (errors.InotifyError, errors.JobLost):
      return None
    except utils.RetryTimeout:
      return constants.JOB_NOTCHANGED


737
738
739
740
741
742
743
744
745
746
747
748
def _EncodeOpError(err):
  """Encodes an error which occurred while processing an opcode.

  """
  if isinstance(err, errors.GenericError):
    to_encode = err
  else:
    to_encode = errors.OpExecError(str(err))

  return errors.EncodeException(to_encode)


749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
class _TimeoutStrategyWrapper:
  def __init__(self, fn):
    """Initializes this class.

    """
    self._fn = fn
    self._next = None

  def _Advance(self):
    """Gets the next timeout if necessary.

    """
    if self._next is None:
      self._next = self._fn()

  def Peek(self):
    """Returns the next timeout.

    """
    self._Advance()
    return self._next

  def Next(self):
    """Returns the current timeout and advances the internal state.

    """
    self._Advance()
    result = self._next
    self._next = None
    return result


781
class _OpExecContext:
782
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
783
784
785
786
787
788
789
790
    """Initializes this class.

    """
    self.op = op
    self.index = index
    self.log_prefix = log_prefix
    self.summary = op.input.Summary()

791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
    self._timeout_strategy_factory = timeout_strategy_factory
    self._ResetTimeoutStrategy()

  def _ResetTimeoutStrategy(self):
    """Creates a new timeout strategy.

    """
    self._timeout_strategy = \
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)

  def CheckPriorityIncrease(self):
    """Checks whether priority can and should be increased.

    Called when locks couldn't be acquired.

    """
    op = self.op

    # Exhausted all retries and next round should not use blocking acquire
    # for locks?
    if (self._timeout_strategy.Peek() is None and
        op.priority > constants.OP_PRIO_HIGHEST):
      logging.debug("Increasing priority")
      op.priority -= 1
      self._ResetTimeoutStrategy()
      return True

    return False

  def GetNextLockTimeout(self):
    """Returns the next lock acquire timeout.

    """
    return self._timeout_strategy.Next()

826

827
class _JobProcessor(object):
828
829
  def __init__(self, queue, opexec_fn, job,
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
830
831
832
833
834
835
    """Initializes this class.

    """
    self.queue = queue
    self.opexec_fn = opexec_fn
    self.job = job
836
    self._timeout_strategy_factory = _timeout_strategy_factory
837
838

  @staticmethod
839
  def _FindNextOpcode(job, timeout_strategy_factory):
840
841
842
843
    """Locates the next opcode to run.

    @type job: L{_QueuedJob}
    @param job: Job object
844
    @param timeout_strategy_factory: Callable to create new timeout strategy
845
846
847
848
849
850

    """
    # Create some sort of a cache to speed up locating next opcode for future
    # lookups
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
    # pending and one for processed ops.
851
852
    if job.ops_iter is None:
      job.ops_iter = enumerate(job.ops)
853
854
855
856

    # Find next opcode to run
    while True:
      try:
857
        (idx, op) = job.ops_iter.next()
858
859
860
861
862
863
864
      except StopIteration:
        raise errors.ProgrammerError("Called for a finished job")

      if op.status == constants.OP_STATUS_RUNNING:
        # Found an opcode already marked as running
        raise errors.ProgrammerError("Called for job marked as running")

865
866
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
                             timeout_strategy_factory)
867
868
869
870
871
872
873
874
875
876
877
878

      if op.status == constants.OP_STATUS_CANCELED:
        # Cancelled jobs are handled by the caller
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
                              for i in job.ops[idx:])

      elif op.status in constants.OPS_FINALIZED:
        # This is a job that was partially completed before master daemon
        # shutdown, so it can be expected that some opcodes are already
        # completed successfully (if any did error out, then the whole job
        # should have been aborted and not resubmitted for processing).
        logging.info("%s: opcode %s already processed, skipping",
879
                     opctx.log_prefix, opctx.summary)
880
881
        continue

882
      return opctx
883
884
885
886
887
888
889
890
891

  @staticmethod
  def _MarkWaitlock(job, op):
    """Marks an opcode as waiting for locks.

    The job's start timestamp is also set if necessary.

    @type job: L{_QueuedJob}
    @param job: Job object
892
893
    @type op: L{_QueuedOpCode}
    @param op: Opcode object
894
895
896

    """
    assert op in job.ops
897
898
899
900
    assert op.status in (constants.OP_STATUS_QUEUED,
                         constants.OP_STATUS_WAITLOCK)

    update = False
901
902

    op.result = None
903
904
905
906
907
908
909
910

    if op.status == constants.OP_STATUS_QUEUED:
      op.status = constants.OP_STATUS_WAITLOCK
      update = True

    if op.start_timestamp is None:
      op.start_timestamp = TimeStampNow()
      update = True
911
912
913

    if job.start_timestamp is None:
      job.start_timestamp = op.start_timestamp
914
915
916
917
918
      update = True

    assert op.status == constants.OP_STATUS_WAITLOCK

    return update
919

920
  def _ExecOpCodeUnlocked(self, opctx):
921
922
923
    """Processes one opcode and returns the result.

    """
924
925
    op = opctx.op

926
927
    assert op.status == constants.OP_STATUS_WAITLOCK

928
929
    timeout = opctx.GetNextLockTimeout()

930
931
932
    try:
      # Make sure not to hold queue lock while calling ExecOpCode
      result = self.opexec_fn(op.input,
933
                              _OpExecCallbacks(self.queue, self.job, op),
934
                              timeout=timeout, priority=op.priority)
935
936
937
    except mcpu.LockAcquireTimeout:
      assert timeout is not None, "Received timeout for blocking acquire"
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
938
939
940
941
942
943
944
945

      assert op.status in (constants.OP_STATUS_WAITLOCK,
                           constants.OP_STATUS_CANCELING)

      # Was job cancelled while we were waiting for the lock?
      if op.status == constants.OP_STATUS_CANCELING:
        return (constants.OP_STATUS_CANCELING, None)

946
947
      # Stay in waitlock while trying to re-acquire lock
      return (constants.OP_STATUS_WAITLOCK, None)
948
    except CancelJob:
949
      logging.exception("%s: Canceling job", opctx.log_prefix)
950
951
952
      assert op.status == constants.OP_STATUS_CANCELING
      return (constants.OP_STATUS_CANCELING, None)
    except Exception, err: # pylint: disable-msg=W0703
953
954
      logging.exception("%s: Caught exception in %s",
                        opctx.log_prefix, opctx.summary)
955
956
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
    else:
957
958
      logging.debug("%s: %s successful",
                    opctx.log_prefix, opctx.summary)
959
960
      return (constants.OP_STATUS_SUCCESS, result)

961
  def __call__(self, _nextop_fn=None):
962
963
    """Continues execution of a job.

964
    @param _nextop_fn: Callback function for tests
965
966
967
968
969
970
971
972
973
974
975
976
977
978
    @rtype: bool
    @return: True if job is finished, False if processor needs to be called
             again

    """
    queue = self.queue
    job = self.job

    logging.debug("Processing job %s", job.id)

    queue.acquire(shared=1)
    try:
      opcount = len(job.ops)

979
980
981
      # Is a previous opcode still pending?
      if job.cur_opctx:
        opctx = job.cur_opctx
982
        job.cur_opctx = None
983
984
985
986
987
      else:
        if __debug__ and _nextop_fn:
          _nextop_fn()
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)

988
      op = opctx.op
989
990
991

      # Consistency check
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
992
                                     constants.OP_STATUS_CANCELING,
993
                                     constants.OP_STATUS_CANCELED)
994
                        for i in job.ops[opctx.index + 1:])
995
996
997

      assert op.status in (constants.OP_STATUS_QUEUED,
                           constants.OP_STATUS_WAITLOCK,
998
                           constants.OP_STATUS_CANCELING,
999
1000
                           constants.OP_STATUS_CANCELED)

1001
1002
1003
      assert (op.priority <= constants.OP_PRIO_LOWEST and
              op.priority >= constants.OP_PRIO_HIGHEST)

1004
1005
1006
1007
1008
      if op.status not in (constants.OP_STATUS_CANCELING,
                           constants.OP_STATUS_CANCELED):
        assert op.status in (constants.OP_STATUS_QUEUED,
                             constants.OP_STATUS_WAITLOCK)

1009
        # Prepare to start opcode
1010
1011
1012
        if self._MarkWaitlock(job, op):
          # Write to disk
          queue.UpdateJobUnlocked(job)
1013
1014
1015

        assert op.status == constants.OP_STATUS_WAITLOCK
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1016
        assert job.start_timestamp and op.start_timestamp
1017

1018
1019
        logging.info("%s: opcode %s waiting for locks",
                     opctx.log_prefix, opctx.summary)
1020
1021
1022

        queue.release()
        try:
1023
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1024
1025
1026
1027
1028
1029
        finally:
          queue.acquire(shared=1)

        op.status = op_status
        op.result = op_result

1030
        if op.status == constants.OP_STATUS_WAITLOCK:
1031
1032
          # Couldn't get locks in time
          assert not op.end_timestamp
1033
        else:
1034
1035
          # Finalize opcode
          op.end_timestamp = TimeStampNow()
1036

1037
1038
1039
1040
1041
          if op.status == constants.OP_STATUS_CANCELING:
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
                                  for i in job.ops[opctx.index:])
          else:
            assert op.status in constants.OPS_FINALIZED
1042

1043
      if op.status == constants.OP_STATUS_WAITLOCK:
1044
1045
        finalize = False

1046
1047
1048
        if opctx.CheckPriorityIncrease():
          # Priority was changed, need to update on-disk file
          queue.UpdateJobUnlocked(job)
1049

1050
1051
        # Keep around for another round
        job.cur_opctx = opctx
1052

1053
1054
        assert (op.priority <= constants.OP_PRIO_LOWEST and
                op.priority >= constants.OP_PRIO_HIGHEST)
1055

1056
        # In no case must the status be finalized here
1057
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1058
1059

      else:
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
        # Ensure all opcodes so far have been successful
        assert (opctx.index == 0 or
                compat.all(i.status == constants.OP_STATUS_SUCCESS
                           for i in job.ops[:opctx.index]))

        # Reset context
        job.cur_opctx = None

        if op.status == constants.OP_STATUS_SUCCESS:
          finalize = False

        elif op.status == constants.OP_STATUS_ERROR:
          # Ensure failed opcode has an exception as its result
          assert errors.GetEncodedError(job.ops[opctx.index].result)

          to_encode = errors.OpExecError("Preceding opcode failed")
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                _EncodeOpError(to_encode))
          finalize = True
1079

1080
1081
1082
1083
          # Consistency check
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
                            errors.GetEncodedError(i.result)
                            for i in job.ops[opctx.index:])
1084

1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
        elif op.status == constants.OP_STATUS_CANCELING:
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                                "Job canceled by request")
          finalize = True

        elif op.status == constants.OP_STATUS_CANCELED:
          finalize = True

        else:
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)

        # Finalizing or last opcode?
        if finalize or opctx.index == (opcount - 1):
          # All opcodes have been run, finalize job
          job.end_timestamp = TimeStampNow()

        # Write to disk. If the job status is final, this is the final write
        # allowed. Once the file has been written, it can be archived anytime.
        queue.UpdateJobUnlocked(job)
1104

1105
1106
1107
        if finalize or opctx.index == (opcount - 1):
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
          return True
1108
1109
1110
1111
1112
1113

      return False
    finally:
      queue.release()


1114
1115
1116
1117
class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
1118
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
1119
1120
    """Job executor.

1121
1122
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
    L{_QueuedOpCode} classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
1123

1124
1125
1126
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
1127
    """
1128
1129
1130
    queue = job.queue
    assert queue == self.pool.queue

1131
1132
    self.SetTaskName("Job%s" % job.id)

1133
1134
1135
1136
    proc = mcpu.Processor(queue.context, job.id)

    if not _JobProcessor(queue, proc.ExecOpCode, job)():
      # Schedule again
1137
      raise workerpool.DeferTask(priority=job.CalcPriority())
Michael Hanselmann's avatar
Michael Hanselmann committed
1138
1139
1140


class _JobQueueWorkerPool(workerpool.WorkerPool):
1141
1142
1143
  """Simple class implementing a job-processing workerpool.

  """
1144
  def __init__(self, queue):
1145
1146
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
                                              JOBQUEUE_THREADS,
Michael Hanselmann's avatar
Michael Hanselmann committed
1147
                                              _JobQueueWorker)
1148
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
1149
1150


Iustin Pop's avatar
Iustin Pop committed
1151
1152
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
1153

Iustin Pop's avatar
Iustin Pop committed
1154
1155
1156
1157
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
1158
  a '_queue_filelock' argument.
1159

1160
  @warning: Use this decorator only after locking.ssynchronized
1161

Iustin Pop's avatar
Iustin Pop committed
1162
  Example::
1163
    @locking.ssynchronized(_LOCK)
Iustin Pop's avatar
Iustin Pop committed
1164
1165
1166
    @_RequireOpenQueue
    def Example(self):
      pass
1167

Iustin Pop's avatar
Iustin Pop committed
1168
1169
  """
  def wrapper(self, *args, **kwargs):
Iustin Pop's avatar
Iustin Pop committed
1170
    # pylint: disable-msg=W0212
1171
    assert self._queue_filelock is not None, "Queue should be open"
Iustin Pop's avatar
Iustin Pop committed
1172
1173
    return fn(self, *args, **kwargs)
  return wrapper
1174
1175


Iustin Pop's avatar
Iustin Pop committed
1176
1177
class JobQueue(object):
  """Queue used to manage the jobs.
1178

Iustin Pop's avatar
Iustin Pop committed
1179
1180
1181
1182
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1183

Michael Hanselmann's avatar
Michael Hanselmann committed
1184
  def __init__(self, context):
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
1197
    self.context = context
1198
    self._memcache = weakref.WeakValueDictionary()
1199
    self._my_hostname = netutils.Hostname.GetSysName()
1200

1201
1202
1203
1204
1205
    # The Big JobQueue lock. If a code block or method acquires it in shared
    # mode safe it must guarantee concurrency with all the code acquiring it in
    # shared mode, including itself. In order not to acquire it at all
    # concurrency must be guaranteed with all code acquiring it in shared mode
    # and all code acquiring it exclusively.
1206
    self._lock = locking.SharedLock("JobQueue")
1207
1208
1209

    self.acquire = self._lock.acquire
    self.release = self._lock.release
Michael Hanselmann's avatar
Michael Hanselmann committed
1210

1211
1212
1213
    # Initialize the queue, and acquire the filelock.
    # This ensures no other process is working on the job queue.
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1214

1215
1216
1217
1218
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
1219

1220
    # Get initial list of nodes
1221
    self._nodes = dict((n.name, n.primary_ip)
1222
1223
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
1224
1225

    # Remove master node
1226
    self._nodes.pop(self._my_hostname, None)
1227
1228
1229

    # TODO: Check consistency across nodes

1230
1231
    self._queue_size = 0
    self._UpdateQueueSizeUnlocked()
1232
    self._drained = jstore.CheckDrainFlag()
1233

Michael Hanselmann's avatar
Michael Hanselmann committed
1234
    # Setup worker pool
1235
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
1236
    try:
1237
1238
1239
1240
      self._InspectQueue()
    except:
      self._wpool.TerminateWorkers()
      raise
1241

1242
1243
1244
1245
1246
1247
1248
  @locking.ssynchronized(_LOCK)
  @_RequireOpenQueue
  def _InspectQueue(self):
    """Loads the whole job queue and resumes unfinished jobs.

    This function needs the lock here because WorkerPool.AddTask() may start a
    job while we're still doing our work.
1249

1250
1251
1252
    """
    logging.info("Inspecting job queue")

1253
1254
    restartjobs = []

1255
1256
1257
1258
1259
1260
1261
1262
1263
    all_job_ids = self._GetJobIDsUnlocked()
    jobs_count = len(all_job_ids)
    lastinfo = time.time()
    for idx, job_id in enumerate(all_job_ids):
      # Give an update every 1000 jobs or 10 seconds
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
          idx == (jobs_count -