jqueue.py 47.7 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
import logging
34
35
import errno
import re
36
import time
37
import weakref
Iustin Pop's avatar
Iustin Pop committed
38

Guido Trotter's avatar
Guido Trotter committed
39
40
41
42
43
44
45
try:
  # pylint: disable-msg=E0611
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from ganeti import asyncnotifier
Michael Hanselmann's avatar
Michael Hanselmann committed
46
from ganeti import constants
47
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
51
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import netutils
57
from ganeti import compat
Michael Hanselmann's avatar
Michael Hanselmann committed
58

59

60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
62

63
64
65
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
66

Iustin Pop's avatar
Iustin Pop committed
67

68
class CancelJob(Exception):
69
70
71
72
73
  """Special exception to cancel a job.

  """


74
def TimeStampNow():
75
76
77
78
79
80
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
81
82
83
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
84
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
85
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
86

87
88
89
90
91
92
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
Iustin Pop's avatar
Iustin Pop committed
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

Michael Hanselmann's avatar
Michael Hanselmann committed
96
  """
97
  __slots__ = ["input", "status", "result", "log",
Iustin Pop's avatar
Iustin Pop committed
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
100
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
101
  def __init__(self, op):
102
103
104
105
106
107
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
108
109
110
111
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
112
    self.start_timestamp = None
Iustin Pop's avatar
Iustin Pop committed
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115
116
117

  @classmethod
  def Restore(cls, state):
118
119
120
121
122
123
124
125
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
126
127
128
129
130
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
Iustin Pop's avatar
Iustin Pop committed
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
135
136
    return obj

  def Serialize(self):
137
138
139
140
141
142
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
143
144
145
146
147
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
Iustin Pop's avatar
Iustin Pop committed
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

Michael Hanselmann's avatar
Michael Hanselmann committed
153
154
155
156

class _QueuedJob(object):
  """In-memory job representation.

157
158
159
160
161
162
163
164
165
166
167
168
169
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
Michael Hanselmann's avatar
Michael Hanselmann committed
171
172

  """
Iustin Pop's avatar
Iustin Pop committed
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
178
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
179
  def __init__(self, queue, job_id, ops):
180
181
182
183
184
185
186
187
188
189
190
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
191
    if not ops:
Guido Trotter's avatar
Guido Trotter committed
192
      raise errors.GenericError("A job needs at least one opcode")
Michael Hanselmann's avatar
Michael Hanselmann committed
193

Michael Hanselmann's avatar
Michael Hanselmann committed
194
    self.queue = queue
195
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
199
200
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
201

202
203
204
    # In-memory attributes
    self.lock_status = None

205
206
207
208
209
210
211
  def __repr__(self):
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
              "id=%s" % self.id,
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]

    return "<%s at %#x>" % (" ".join(status), id(self))

212
  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
213
  def Restore(cls, queue, state):
214
215
216
217
218
219
220
221
222
223
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
224
225
226
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
227
228
229
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
230

231
232
233
    # In-memory attributes
    obj.lock_status = None

234
235
236
237
238
239
240
241
    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

242
243
244
    return obj

  def Serialize(self):
245
246
247
248
249
250
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
251
252
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
253
      "ops": [op.Serialize() for op in self.ops],
254
255
256
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
257
258
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
259
  def CalcStatus(self):
260
261
262
263
264
265
266
267
268
269
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
270
          - canceling
271
272
273
274
275
276
277
278
279
280
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
281
282
283
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
284
285
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
286
287
288
289
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
290
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
291
        pass
Iustin Pop's avatar
Iustin Pop committed
292
293
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
294
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
295
        status = constants.JOB_STATUS_RUNNING
296
297
298
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
299
      elif op.status == constants.OP_STATUS_ERROR:
300
301
302
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
305
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
306
307
308
309
310
311

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

312
  def GetLogEntries(self, newer_than):
313
314
315
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
316
    @param newer_than: if this is None, return all log entries,
317
318
319
320
321
322
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
323
324
325
326
327
328
329
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331
332
333

    return entries

334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
  def GetInfo(self, fields):
    """Returns information about a job.

    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field
    @raise errors.OpExecError: when an invalid field
        has been passed

    """
    row = []
    for fname in fields:
      if fname == "id":
        row.append(self.id)
      elif fname == "status":
        row.append(self.CalcStatus())
      elif fname == "ops":
        row.append([op.input.__getstate__() for op in self.ops])
      elif fname == "opresult":
        row.append([op.result for op in self.ops])
      elif fname == "opstatus":
        row.append([op.status for op in self.ops])
      elif fname == "oplog":
        row.append([op.log for op in self.ops])
      elif fname == "opstart":
        row.append([op.start_timestamp for op in self.ops])
      elif fname == "opexec":
        row.append([op.exec_timestamp for op in self.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in self.ops])
      elif fname == "received_ts":
        row.append(self.received_timestamp)
      elif fname == "start_ts":
        row.append(self.start_timestamp)
      elif fname == "end_ts":
        row.append(self.end_timestamp)
      elif fname == "lock_status":
        row.append(self.lock_status)
      elif fname == "summary":
        row.append([op.input.Summary() for op in self.ops])
      else:
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
    return row

379
380
381
382
383
384
385
386
387
388
389
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
390
391
392
393
394
395
396
397
398
399
400
    try:
      not_marked = True
      for op in self.ops:
        if op.status in constants.OPS_FINALIZED:
          assert not_marked, "Finalized opcodes found after non-finalized ones"
          continue
        op.status = status
        op.result = result
        not_marked = False
    finally:
      self.queue.UpdateJobUnlocked(self)
401

402

403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
405
  def __init__(self, queue, job, op):
    """Initializes this class.
406

407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

423
  @locking.ssynchronized(_QUEUE, shared=1)
424
  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
425
426
    """Mark the opcode as running, not lock-waiting.

427
428
429
430
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
431
432

    """
433
434
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                               constants.OP_STATUS_CANCELING)
435

436
437
    # All locks are acquired by now
    self._job.lock_status = None
438

439
440
441
    # Cancel here if we were asked to
    if self._op.status == constants.OP_STATUS_CANCELING:
      raise CancelJob()
442

443
444
445
446
447
    self._op.status = constants.OP_STATUS_RUNNING
    self._op.exec_timestamp = TimeStampNow()

    # And finally replicate the job status
    self._queue.UpdateJobUnlocked(self._job)
448

449
  @locking.ssynchronized(_QUEUE, shared=1)
450
451
452
453
454
455
456
457
  def _AppendFeedback(self, timestamp, log_type, log_msg):
    """Internal feedback append function, with locks

    """
    self._job.log_serial += 1
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
    self._queue.UpdateJobUnlocked(self._job, replicate=False)

458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
473
    self._AppendFeedback(timestamp, log_type, log_msg)
474

475
476
477
478
479
480
481
482
483
  def ReportLocks(self, msg):
    """Write locking information to the job.

    Called whenever the LU processor is waiting for a lock or has acquired one.

    """
    # Not getting the queue lock because this is a single assignment
    self._job.lock_status = msg

484

485
486
487
class _JobChangesChecker(object):
  def __init__(self, fields, prev_job_info, prev_log_serial):
    """Initializes this class.
Guido Trotter's avatar
Guido Trotter committed
488

489
490
491
492
493
494
    @type fields: list of strings
    @param fields: Fields requested by LUXI client
    @type prev_job_info: string
    @param prev_job_info: previous job info, as passed by the LUXI client
    @type prev_log_serial: string
    @param prev_log_serial: previous job serial, as passed by the LUXI client
Guido Trotter's avatar
Guido Trotter committed
495

496
497
498
499
    """
    self._fields = fields
    self._prev_job_info = prev_job_info
    self._prev_log_serial = prev_log_serial
Guido Trotter's avatar
Guido Trotter committed
500

501
502
  def __call__(self, job):
    """Checks whether job has changed.
Guido Trotter's avatar
Guido Trotter committed
503

504
505
    @type job: L{_QueuedJob}
    @param job: Job object
Guido Trotter's avatar
Guido Trotter committed
506
507

    """
508
509
510
    status = job.CalcStatus()
    job_info = job.GetInfo(self._fields)
    log_entries = job.GetLogEntries(self._prev_log_serial)
Guido Trotter's avatar
Guido Trotter committed
511
512
513
514
515
516
517
518

    # Serializing and deserializing data can cause type changes (e.g. from
    # tuple to list) or precision loss. We're doing it here so that we get
    # the same modifications as the data received from the client. Without
    # this, the comparison afterwards might fail without the data being
    # significantly different.
    # TODO: we just deserialized from disk, investigate how to make sure that
    # the job info and log entries are compatible to avoid this further step.
519
520
521
522
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
    # efficient, though floats will be tricky
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
Guido Trotter's avatar
Guido Trotter committed
523
524
525

    # Don't even try to wait if the job is no longer running, there will be
    # no changes.
526
527
528
529
530
531
532
    if (status not in (constants.JOB_STATUS_QUEUED,
                       constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK) or
        job_info != self._prev_job_info or
        (log_entries and self._prev_log_serial != log_entries[0][0])):
      logging.debug("Job %s changed", job.id)
      return (job_info, log_entries)
Guido Trotter's avatar
Guido Trotter committed
533

534
535
536
537
538
539
540
541
542
543
    return None


class _JobFileChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file
    @raises errors.InotifyError: if the notifier cannot be setup
Guido Trotter's avatar
Guido Trotter committed
544

545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
    """
    self._wm = pyinotify.WatchManager()
    self._inotify_handler = \
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
    self._notifier = \
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
    try:
      self._inotify_handler.enable()
    except Exception:
      # pyinotify doesn't close file descriptors automatically
      self._notifier.stop()
      raise

  def _OnInotify(self, notifier_enabled):
    """Callback for inotify.

    """
Guido Trotter's avatar
Guido Trotter committed
562
    if not notifier_enabled:
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
      self._inotify_handler.enable()

  def Wait(self, timeout):
    """Waits for the job file to change.

    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    assert timeout >= 0
    have_events = self._notifier.check_events(timeout * 1000)
    if have_events:
      self._notifier.read_events()
    self._notifier.process_events()
    return have_events

  def Close(self):
    """Closes underlying notifier and its file descriptor.

    """
    self._notifier.stop()


class _JobChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file

    """
    self._filewaiter = None
    self._filename = filename
Guido Trotter's avatar
Guido Trotter committed
597

598
599
  def Wait(self, timeout):
    """Waits for a job to change.
Guido Trotter's avatar
Guido Trotter committed
600

601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    if self._filewaiter:
      return self._filewaiter.Wait(timeout)

    # Lazy setup: Avoid inotify setup cost when job file has already changed.
    # If this point is reached, return immediately and let caller check the job
    # file again in case there were changes since the last check. This avoids a
    # race condition.
    self._filewaiter = _JobFileChangesWaiter(self._filename)

    return True

  def Close(self):
    """Closes underlying waiter.

    """
    if self._filewaiter:
      self._filewaiter.Close()


class _WaitForJobChangesHelper(object):
  """Helper class using inotify to wait for changes in a job file.

  This class takes a previous job status and serial, and alerts the client when
  the current job status has changed.

  """
  @staticmethod
  def _CheckForChanges(job_load_fn, check_fn):
    job = job_load_fn()
    if not job:
      raise errors.JobLost()

    result = check_fn(job)
    if result is None:
      raise utils.RetryAgain()

    return result

  def __call__(self, filename, job_load_fn,
               fields, prev_job_info, prev_log_serial, timeout):
    """Waits for changes on a job.

    @type filename: string
    @param filename: File on which to wait for changes
    @type job_load_fn: callable
    @param job_load_fn: Function to load job
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
    @type timeout: float
    @param timeout: maximum time to wait in seconds

    """
Guido Trotter's avatar
Guido Trotter committed
662
    try:
663
664
665
666
667
668
669
670
671
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
      waiter = _JobChangesWaiter(filename)
      try:
        return utils.Retry(compat.partial(self._CheckForChanges,
                                          job_load_fn, check_fn),
                           utils.RETRY_REMAINING_TIME, timeout,
                           wait_fn=waiter.Wait)
      finally:
        waiter.Close()
Guido Trotter's avatar
Guido Trotter committed
672
673
674
675
676
677
    except (errors.InotifyError, errors.JobLost):
      return None
    except utils.RetryTimeout:
      return constants.JOB_NOTCHANGED


678
679
680
681
class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
682
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
683
684
    """Job executor.

685
686
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
687

688
689
690
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
691
    """
692
    logging.info("Processing job %s", job.id)
693
    proc = mcpu.Processor(self.pool.queue.context, job.id)
694
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
695
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
696
697
698
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
699
          op_summary = op.input.Summary()
700
701
702
703
704
705
706
707
708
          if op.status == constants.OP_STATUS_SUCCESS:
            # this is a job that was partially completed before master
            # daemon shutdown, so it can be expected that some opcodes
            # are already completed successfully (if any did error
            # out, then the whole job should have been aborted and not
            # resubmitted for processing)
            logging.info("Op %s/%s: opcode %s already processed, skipping",
                         idx + 1, count, op_summary)
            continue
Michael Hanselmann's avatar
Michael Hanselmann committed
709
          try:
710
711
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
712

713
            queue.acquire(shared=1)
Michael Hanselmann's avatar
Michael Hanselmann committed
714
            try:
715
716
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
717
              assert op.status == constants.OP_STATUS_QUEUED
Iustin Pop's avatar
Iustin Pop committed
718
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
719
              op.result = None
720
              op.start_timestamp = TimeStampNow()
721
722
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
723
724
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
725
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
726
727
728
            finally:
              queue.release()

729
730
            # Make sure not to hold queue lock while calling ExecOpCode
            result = proc.ExecOpCode(input_opcode,
731
                                     _OpExecCallbacks(queue, job, op))
Michael Hanselmann's avatar
Michael Hanselmann committed
732

733
            queue.acquire(shared=1)
Michael Hanselmann's avatar
Michael Hanselmann committed
734
735
736
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
737
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
738
739
740
741
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

742
743
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
744
745
746
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
747
          except Exception, err:
748
            queue.acquire(shared=1)
Michael Hanselmann's avatar
Michael Hanselmann committed
749
750
751
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
752
753
754
755
                if isinstance(err, errors.GenericError):
                  op.result = errors.EncodeException(err)
                else:
                  op.result = str(err)
756
                op.end_timestamp = TimeStampNow()
757
758
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
759
760
761
762
763
764
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

765
      except CancelJob:
766
        queue.acquire(shared=1)
767
        try:
768
769
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                                "Job canceled by request")
770
771
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
772
773
774
775
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
776
    finally:
777
      queue.acquire(shared=1)
Michael Hanselmann's avatar
Michael Hanselmann committed
778
      try:
779
        try:
780
          job.lock_status = None
781
          job.end_timestamp = TimeStampNow()
782
783
784
785
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
786
787
      finally:
        queue.release()
788

789
      logging.info("Finished job %s, status = %s", job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
790
791
792


class _JobQueueWorkerPool(workerpool.WorkerPool):
793
794
795
  """Simple class implementing a job-processing workerpool.

  """
796
  def __init__(self, queue):
797
798
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
                                              JOBQUEUE_THREADS,
Michael Hanselmann's avatar
Michael Hanselmann committed
799
                                              _JobQueueWorker)
800
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
801
802


Iustin Pop's avatar
Iustin Pop committed
803
804
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
805

Iustin Pop's avatar
Iustin Pop committed
806
807
808
809
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
810
  a '_queue_filelock' argument.
811

812
  @warning: Use this decorator only after locking.ssynchronized
813

Iustin Pop's avatar
Iustin Pop committed
814
  Example::
815
    @locking.ssynchronized(_LOCK)
Iustin Pop's avatar
Iustin Pop committed
816
817
818
    @_RequireOpenQueue
    def Example(self):
      pass
819

Iustin Pop's avatar
Iustin Pop committed
820
821
  """
  def wrapper(self, *args, **kwargs):
Iustin Pop's avatar
Iustin Pop committed
822
    # pylint: disable-msg=W0212
823
    assert self._queue_filelock is not None, "Queue should be open"
Iustin Pop's avatar
Iustin Pop committed
824
825
    return fn(self, *args, **kwargs)
  return wrapper
826
827


Iustin Pop's avatar
Iustin Pop committed
828
829
class JobQueue(object):
  """Queue used to manage the jobs.
830

Iustin Pop's avatar
Iustin Pop committed
831
832
833
834
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
835

Michael Hanselmann's avatar
Michael Hanselmann committed
836
  def __init__(self, context):
837
838
839
840
841
842
843
844
845
846
847
848
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
849
    self.context = context
850
    self._memcache = weakref.WeakValueDictionary()
851
    self._my_hostname = netutils.HostInfo().name
852

853
854
855
856
857
858
859
860
861
    # The Big JobQueue lock. If a code block or method acquires it in shared
    # mode safe it must guarantee concurrency with all the code acquiring it in
    # shared mode, including itself. In order not to acquire it at all
    # concurrency must be guaranteed with all code acquiring it in shared mode
    # and all code acquiring it exclusively.
    self._lock = locking.SharedLock()

    self.acquire = self._lock.acquire
    self.release = self._lock.release
Michael Hanselmann's avatar
Michael Hanselmann committed
862

863
864
865
    # Initialize the queue, and acquire the filelock.
    # This ensures no other process is working on the job queue.
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
866

867
868
869
870
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
871

872
    # Get initial list of nodes
873
    self._nodes = dict((n.name, n.primary_ip)
874
875
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
876
877

    # Remove master node
878
    self._nodes.pop(self._my_hostname, None)
879
880
881

    # TODO: Check consistency across nodes

882
883
884
885
    self._queue_size = 0
    self._UpdateQueueSizeUnlocked()
    self._drained = self._IsQueueMarkedDrain()

Michael Hanselmann's avatar
Michael Hanselmann committed
886
    # Setup worker pool
887
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
888
    try:
889
890
891
892
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
893
894
895
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
896
        jobs_count = len(all_job_ids)
897
898
899
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
900
901
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
902
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
903
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
904
905
906
907
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

908
909
910
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
911

912
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
913

914
915
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
916

917
          elif status in (constants.JOB_STATUS_RUNNING,
918
919
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
920
            logging.warning("Unfinished job %s found: %s", job.id, job)
921
922
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                  "Unclean master daemon shutdown")
923
924

        logging.info("Job queue inspection finished")
925
926
927
928
929
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
930

931
  @locking.ssynchronized(_LOCK)
932
  @_RequireOpenQueue
933
934
935
936
937
938
939
940
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
941
    assert node_name != self._my_hostname
942

943
    # Clean queue directory on added node
944
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
945
    msg = result.fail_msg
946
947
948
    if msg:
      logging.warning("Cannot cleanup queue directory on node %s: %s",
                      node_name, msg)
949

950
951
952
953
954
955
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

956
957
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
958

959
960
961
962
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
963
      # Read file content
964
      content = utils.ReadFile(file_name)
965

966
967
968
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
969
      msg = result[node_name].fail_msg
970
971
972
      if msg:
        logging.error("Failed to upload file %s to node %s: %s",
                      file_name, node_name, msg)
973

974
    self._nodes[node_name] = node.primary_ip
975

976
  @locking.ssynchronized(_LOCK)
977
978
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
979
980
981
982
983
984
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
985
    self._nodes.pop(node_name, None)
986

987
988
  @staticmethod
  def _CheckRpcResult(result, nodes, failmsg):
989
990
991
992
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
993
    log the case when more than half of the nodes fails.
994
995
996
997
998
999
1000
1001

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
1002
1003
1004
1005
    failed = []
    success = []

    for node in nodes:
1006
      msg = result[node].fail_msg
1007
      if msg:
1008
        failed.append(node)
1009
1010
        logging.error("RPC call %s (%s) failed on node %s: %s",
                      result[node].call, failmsg, node, msg)
1011
1012
      else:
        success.append(node)
1013
1014
1015
1016
1017
1018

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

1019
1020
1021
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

1022
1023
1024
1025
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

1026
1027
1028
1029
1030
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

1031
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1032
1033
    """Writes a file locally and then replicates it to all nodes.

1034
1035
1036
1037
1038
1039
1040
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file
1041
1042
    @type replicate: boolean
    @param replicate: whether to spread the changes to the remote nodes
1043

1044
1045
1046
    """
    utils.WriteFile(file_name, data=data)

1047
1048
1049
1050
    if replicate:
      names, addrs = self._GetNodeIp()
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1051

1052
  def _RenameFilesUnlocked(self, rename):
1053
1054
1055
1056
1057
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

1058
1059
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
1060
1061

    """
1062
    # Rename them locally
1063
1064
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
1065

1066
1067
1068
1069
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1070

1071
1072
  @staticmethod
  def _FormatJobID(job_id):
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
1085
1086
1087
1088
1089
1090
1091
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

Iustin Pop's avatar
Iustin Pop committed
1104
  def _NewSerialsUnlocked(self, count):
1105
1106
1107
1108
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

Iustin Pop's avatar
Iustin Pop committed
1109
1110
    @type count: integer
    @param count: how many serials to return
1111
1112
    @rtype: str
    @return: a string representing the job identifier.
1113
1114

    """
Iustin Pop's avatar
Iustin Pop committed
1115
    assert count > 0
1116
    # New number
Iustin Pop's avatar
Iustin Pop committed
1117
    serial = self._last_serial + count
1118
1119

    # Write to file
1120
1121
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
                             "%s\n" % serial, True)
1122

Iustin Pop's avatar
Iustin Pop committed
1123
1124
    result = [self._FormatJobID(v)
              for v in range(self._last_serial, serial + 1)]
1125
1126
1127
    # Keep it only if we were able to write the file
    self._last_serial = serial

Iustin Pop's avatar
Iustin Pop committed
1128
    return result
1129

Michael Hanselmann's avatar
Michael Hanselmann committed
1130
1131
  @staticmethod
  def _GetJobPath(job_id):
1132
1133
1134
1135
1136
1137
1138
1139
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
1140
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1141

1142
1143
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
1144
1145
1146
1147
1148
1149
1150
1151
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
Iustin Pop's avatar
Iustin Pop committed
1152
1153
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1154

Guido Trotter's avatar
Guido Trotter committed
1155
  def _GetJobIDsUnlocked(self, sort=True):
1156
1157
    """Return all known job IDs.

Iustin Pop's avatar
Iustin Pop committed
1158
1159
1160
1161
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

Guido Trotter's avatar
Guido Trotter committed
1162
1163
    @type sort: boolean
    @param sort: perform sorting on the returned job ids
1164
1165
1166
    @rtype: list
    @return: the list of job IDs

1167
    """
Guido Trotter's avatar
Guido Trotter committed
1168
    jlist = []
1169
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
Guido Trotter's avatar
Guido Trotter committed
1170
1171
1172
1173
1174
      m = self._RE_JOB_FILE.match(filename)
      if m:
        jlist.append(m.group(1))
    if sort:
      jlist = utils.NiceSort(jlist)
1175
    return jlist
1176
1177

  def _LoadJobUnlocked(self, job_id):
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
1189
1190
    job = self._memcache.get(job_id, None)
    if job:
1191
      logging.debug("Found job %s in memcache", job_id)
1192
      return job
Iustin Pop's avatar
Iustin Pop committed
1193

1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
    try:
      job = self._LoadJobFromDisk(job_id)
    except errors.JobFileCorrupted:
      old_path = self._GetJobPath(job_id)
      new_path = self._GetArchivedJobPath(job_id)
      if old_path == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
        self._RenameFilesUnlocked([(old_path, new_path)])
      return None
1207
1208
1209
1210
1211
1212
1213