jqueue.py 45.4 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Guido Trotter's avatar
Guido Trotter committed
40
41
42
43
44
45
46
try:
  # pylint: disable-msg=E0611
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from ganeti import asyncnotifier
Michael Hanselmann's avatar
Michael Hanselmann committed
47
from ganeti import constants
48
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
49
from ganeti import workerpool
50
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
51
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
56

57

58
JOBQUEUE_THREADS = 25
59
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
60

Iustin Pop's avatar
Iustin Pop committed
61

62
class CancelJob(Exception):
63
64
65
66
67
  """Special exception to cancel a job.

  """


68
def TimeStampNow():
69
70
71
72
73
74
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
75
76
77
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
78
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
79
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
80

81
82
83
84
85
86
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
Iustin Pop's avatar
Iustin Pop committed
87
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
88
  @ivar stop_timestamp: timestamp for the end of the execution
89

Michael Hanselmann's avatar
Michael Hanselmann committed
90
  """
91
  __slots__ = ["input", "status", "result", "log",
Iustin Pop's avatar
Iustin Pop committed
92
               "start_timestamp", "exec_timestamp", "end_timestamp",
93
94
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
95
  def __init__(self, op):
96
97
98
99
100
101
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
102
103
104
105
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
106
    self.start_timestamp = None
Iustin Pop's avatar
Iustin Pop committed
107
    self.exec_timestamp = None
108
    self.end_timestamp = None
109
110
111

  @classmethod
  def Restore(cls, state):
112
113
114
115
116
117
118
119
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
120
121
122
123
124
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
125
    obj.start_timestamp = state.get("start_timestamp", None)
Iustin Pop's avatar
Iustin Pop committed
126
    obj.exec_timestamp = state.get("exec_timestamp", None)
127
    obj.end_timestamp = state.get("end_timestamp", None)
128
129
130
    return obj

  def Serialize(self):
131
132
133
134
135
136
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
137
138
139
140
141
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
142
      "start_timestamp": self.start_timestamp,
Iustin Pop's avatar
Iustin Pop committed
143
      "exec_timestamp": self.exec_timestamp,
144
      "end_timestamp": self.end_timestamp,
145
      }
146

Michael Hanselmann's avatar
Michael Hanselmann committed
147
148
149
150

class _QueuedJob(object):
  """In-memory job representation.

151
152
153
154
155
156
157
158
159
160
161
162
163
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
164
  @ivar lock_status: In-memory locking information for debugging
Michael Hanselmann's avatar
Michael Hanselmann committed
165
166

  """
Iustin Pop's avatar
Iustin Pop committed
167
  # pylint: disable-msg=W0212
168
  __slots__ = ["queue", "id", "ops", "log_serial",
169
               "received_timestamp", "start_timestamp", "end_timestamp",
170
               "lock_status", "change",
171
172
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
173
  def __init__(self, queue, job_id, ops):
174
175
176
177
178
179
180
181
182
183
184
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
185
    if not ops:
Guido Trotter's avatar
Guido Trotter committed
186
      raise errors.GenericError("A job needs at least one opcode")
Michael Hanselmann's avatar
Michael Hanselmann committed
187

Michael Hanselmann's avatar
Michael Hanselmann committed
188
    self.queue = queue
189
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
190
    self.ops = [_QueuedOpCode(op) for op in ops]
191
    self.log_serial = 0
192
193
194
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
195

196
197
198
    # In-memory attributes
    self.lock_status = None

199
200
201
202
203
204
205
  def __repr__(self):
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
              "id=%s" % self.id,
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]

    return "<%s at %#x>" % (" ".join(status), id(self))

206
  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
207
  def Restore(cls, queue, state):
208
209
210
211
212
213
214
215
216
217
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
218
219
220
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
221
222
223
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
224

225
226
227
    # In-memory attributes
    obj.lock_status = None

228
229
230
231
232
233
234
235
    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

236
237
238
    return obj

  def Serialize(self):
239
240
241
242
243
244
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
245
246
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
247
      "ops": [op.Serialize() for op in self.ops],
248
249
250
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
251
252
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
253
  def CalcStatus(self):
254
255
256
257
258
259
260
261
262
263
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
264
          - canceling
265
266
267
268
269
270
271
272
273
274
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
275
276
277
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
278
279
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
280
281
282
283
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
284
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
285
        pass
Iustin Pop's avatar
Iustin Pop committed
286
287
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
288
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
289
        status = constants.JOB_STATUS_RUNNING
290
291
292
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
293
      elif op.status == constants.OP_STATUS_ERROR:
294
295
296
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
297
      elif op.status == constants.OP_STATUS_CANCELED:
298
299
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
300
301
302
303
304
305

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

306
  def GetLogEntries(self, newer_than):
307
308
309
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
310
    @param newer_than: if this is None, return all log entries,
311
312
313
314
315
316
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
317
318
319
320
321
322
323
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
324
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325
326
327

    return entries

328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
  def GetInfo(self, fields):
    """Returns information about a job.

    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field
    @raise errors.OpExecError: when an invalid field
        has been passed

    """
    row = []
    for fname in fields:
      if fname == "id":
        row.append(self.id)
      elif fname == "status":
        row.append(self.CalcStatus())
      elif fname == "ops":
        row.append([op.input.__getstate__() for op in self.ops])
      elif fname == "opresult":
        row.append([op.result for op in self.ops])
      elif fname == "opstatus":
        row.append([op.status for op in self.ops])
      elif fname == "oplog":
        row.append([op.log for op in self.ops])
      elif fname == "opstart":
        row.append([op.start_timestamp for op in self.ops])
      elif fname == "opexec":
        row.append([op.exec_timestamp for op in self.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in self.ops])
      elif fname == "received_ts":
        row.append(self.received_timestamp)
      elif fname == "start_ts":
        row.append(self.start_timestamp)
      elif fname == "end_ts":
        row.append(self.end_timestamp)
      elif fname == "lock_status":
        row.append(self.lock_status)
      elif fname == "summary":
        row.append([op.input.Summary() for op in self.ops])
      else:
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
    return row

373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False

393

394
class _OpExecCallbacks(mcpu.OpExecCbBase):
395
396
  def __init__(self, queue, job, op):
    """Initializes this class.
397

398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
415
416
    """Mark the opcode as running, not lock-waiting.

417
418
419
420
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
421
422

    """
423
    self._queue.acquire()
Iustin Pop's avatar
Iustin Pop committed
424
    try:
425
426
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                 constants.OP_STATUS_CANCELING)
427

428
429
430
      # All locks are acquired by now
      self._job.lock_status = None

431
      # Cancel here if we were asked to
432
      if self._op.status == constants.OP_STATUS_CANCELING:
433
434
        raise CancelJob()

435
      self._op.status = constants.OP_STATUS_RUNNING
Iustin Pop's avatar
Iustin Pop committed
436
      self._op.exec_timestamp = TimeStampNow()
Iustin Pop's avatar
Iustin Pop committed
437
    finally:
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
      self._queue.release()

  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
Iustin Pop's avatar
Iustin Pop committed
455

456
457
458
459
    self._queue.acquire()
    try:
      self._job.log_serial += 1
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
460
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
461
462
463
    finally:
      self._queue.release()

464
465
466
467
468
469
470
471
472
  def ReportLocks(self, msg):
    """Write locking information to the job.

    Called whenever the LU processor is waiting for a lock or has acquired one.

    """
    # Not getting the queue lock because this is a single assignment
    self._job.lock_status = msg

473

Guido Trotter's avatar
Guido Trotter committed
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
class _WaitForJobChangesHelper(object):
  """Helper class using initofy to wait for changes in a job file.

  This class takes a previous job status and serial, and alerts the client when
  the current job status has changed.

  @type job_id: string
  @ivar job_id: id of the job we're watching
  @type prev_job_info: string
  @ivar prev_job_info: previous job info, as passed by the luxi client
  @type prev_log_serial: string
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
  @type queue: L{JobQueue}
  @ivar queue: job queue (used for a few utility functions)
  @type job_path: string
  @ivar job_path: absolute path of the job file
  @type wm: pyinotify.WatchManager (or None)
  @ivar wm: inotify watch manager to watch for changes
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
  @ivar inotify_handler: single file event handler, used for watching
  @type notifier: pyinotify.Notifier
  @ivar notifier: inotify single-threaded notifier, used for watching

  """
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
    self.job_id = job_id
    self.fields = fields
    self.prev_job_info = prev_job_info
    self.prev_log_serial = prev_log_serial
    self.queue = queue
    # pylint: disable-msg=W0212
    self.job_path = self.queue._GetJobPath(self.job_id)
    self.wm = None
    self.inotify_handler = None
    self.notifier = None

  def _SetupInotify(self):
    """Create the inotify

    @raises errors.InotifyError: if the notifier cannot be setup

    """
    if self.wm:
      return
    self.wm = pyinotify.WatchManager()
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
                                                                self.OnInotify,
                                                                self.job_path)
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
    self.inotify_handler.enable()

  def _LoadDiskStatus(self):
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
    if not job:
      raise errors.JobLost()
    self.job_status = job.CalcStatus()

    job_info = job.GetInfo(self.fields)
    log_entries = job.GetLogEntries(self.prev_log_serial)
    # Serializing and deserializing data can cause type changes (e.g. from
    # tuple to list) or precision loss. We're doing it here so that we get
    # the same modifications as the data received from the client. Without
    # this, the comparison afterwards might fail without the data being
    # significantly different.
    # TODO: we just deserialized from disk, investigate how to make sure that
    # the job info and log entries are compatible to avoid this further step.
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))

  def _CheckForChanges(self):
    self._LoadDiskStatus()
    # Don't even try to wait if the job is no longer running, there will be
    # no changes.
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
                                constants.JOB_STATUS_RUNNING,
                                constants.JOB_STATUS_WAITLOCK) or
        self.prev_job_info != self.job_info or
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
      logging.debug("Job %s changed", self.job_id)
      return (self.job_info, self.log_entries)

    raise utils.RetryAgain()

  def OnInotify(self, notifier_enabled):
    if not notifier_enabled:
      self.inotify_handler.enable()

  def WaitFn(self, timeout):
    self._SetupInotify()
    if self.notifier.check_events(timeout*1000):
      self.notifier.read_events()
    self.notifier.process_events()

  def WaitForChanges(self, timeout):
    try:
      return utils.Retry(self._CheckForChanges,
                         utils.RETRY_REMAINING_TIME,
                         timeout,
                         wait_fn=self.WaitFn)
    except (errors.InotifyError, errors.JobLost):
      return None
    except utils.RetryTimeout:
      return constants.JOB_NOTCHANGED

  def Close(self):
    if self.wm:
      self.notifier.stop()


583
584
585
586
class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
587
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
588
589
    """Job executor.

590
591
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
592

593
594
595
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
596
    """
597
    logging.info("Processing job %s", job.id)
598
    proc = mcpu.Processor(self.pool.queue.context, job.id)
599
    queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
600
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
601
602
603
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
604
          op_summary = op.input.Summary()
605
606
607
608
609
610
611
612
613
          if op.status == constants.OP_STATUS_SUCCESS:
            # this is a job that was partially completed before master
            # daemon shutdown, so it can be expected that some opcodes
            # are already completed successfully (if any did error
            # out, then the whole job should have been aborted and not
            # resubmitted for processing)
            logging.info("Op %s/%s: opcode %s already processed, skipping",
                         idx + 1, count, op_summary)
            continue
Michael Hanselmann's avatar
Michael Hanselmann committed
614
          try:
615
616
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
617
618
619

            queue.acquire()
            try:
620
621
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
622
              assert op.status == constants.OP_STATUS_QUEUED
Iustin Pop's avatar
Iustin Pop committed
623
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
624
              op.result = None
625
              op.start_timestamp = TimeStampNow()
626
627
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
628
629
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
630
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
631
632
633
            finally:
              queue.release()

634
635
            # Make sure not to hold queue lock while calling ExecOpCode
            result = proc.ExecOpCode(input_opcode,
636
                                     _OpExecCallbacks(queue, job, op))
Michael Hanselmann's avatar
Michael Hanselmann committed
637
638
639
640
641

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
642
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
643
644
645
646
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

647
648
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
649
650
651
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
652
653
654
655
656
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
657
658
659
660
                if isinstance(err, errors.GenericError):
                  op.result = errors.EncodeException(err)
                else:
                  op.result = str(err)
661
                op.end_timestamp = TimeStampNow()
662
663
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
664
665
666
667
668
669
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

670
671
672
673
674
675
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
676
677
678
679
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
680
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
681
682
      queue.acquire()
      try:
683
        try:
684
          job.lock_status = None
685
          job.end_timestamp = TimeStampNow()
686
687
688
689
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
690
691
      finally:
        queue.release()
692

693
      logging.info("Finished job %s, status = %s", job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
694
695
696


class _JobQueueWorkerPool(workerpool.WorkerPool):
697
698
699
  """Simple class implementing a job-processing workerpool.

  """
700
  def __init__(self, queue):
701
702
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
                                              JOBQUEUE_THREADS,
Michael Hanselmann's avatar
Michael Hanselmann committed
703
                                              _JobQueueWorker)
704
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
705
706


Iustin Pop's avatar
Iustin Pop committed
707
708
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
709

Iustin Pop's avatar
Iustin Pop committed
710
711
712
713
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
714
  a '_queue_filelock' argument.
715

Iustin Pop's avatar
Iustin Pop committed
716
  @warning: Use this decorator only after utils.LockedMethod!
717

Iustin Pop's avatar
Iustin Pop committed
718
719
720
721
722
  Example::
    @utils.LockedMethod
    @_RequireOpenQueue
    def Example(self):
      pass
723

Iustin Pop's avatar
Iustin Pop committed
724
725
  """
  def wrapper(self, *args, **kwargs):
Iustin Pop's avatar
Iustin Pop committed
726
    # pylint: disable-msg=W0212
727
    assert self._queue_filelock is not None, "Queue should be open"
Iustin Pop's avatar
Iustin Pop committed
728
729
    return fn(self, *args, **kwargs)
  return wrapper
730
731


Iustin Pop's avatar
Iustin Pop committed
732
733
class JobQueue(object):
  """Queue used to manage the jobs.
734

Iustin Pop's avatar
Iustin Pop committed
735
736
737
738
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
739

Michael Hanselmann's avatar
Michael Hanselmann committed
740
  def __init__(self, context):
741
742
743
744
745
746
747
748
749
750
751
752
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
753
    self.context = context
754
    self._memcache = weakref.WeakValueDictionary()
755
    self._my_hostname = utils.HostInfo().name
756

Michael Hanselmann's avatar
Michael Hanselmann committed
757
758
759
760
761
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

762
763
764
    # Initialize the queue, and acquire the filelock.
    # This ensures no other process is working on the job queue.
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
765

766
767
768
769
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
770

771
    # Get initial list of nodes
772
    self._nodes = dict((n.name, n.primary_ip)
773
774
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
775
776

    # Remove master node
777
    self._nodes.pop(self._my_hostname, None)
778
779
780

    # TODO: Check consistency across nodes

781
782
783
784
    self._queue_size = 0
    self._UpdateQueueSizeUnlocked()
    self._drained = self._IsQueueMarkedDrain()

Michael Hanselmann's avatar
Michael Hanselmann committed
785
    # Setup worker pool
786
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
787
    try:
788
789
790
791
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
792
793
794
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
795
        jobs_count = len(all_job_ids)
796
797
798
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
799
800
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
801
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
802
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
803
804
805
806
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

807
808
809
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
810

811
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
812

813
814
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
815

816
          elif status in (constants.JOB_STATUS_RUNNING,
817
818
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
819
820
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
821
822
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                    "Unclean master daemon shutdown")
823
824
            finally:
              self.UpdateJobUnlocked(job)
825
826

        logging.info("Job queue inspection finished")
827
828
829
830
831
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
832

833
834
  @utils.LockedMethod
  @_RequireOpenQueue
835
836
837
838
839
840
841
842
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
843
    assert node_name != self._my_hostname
844

845
    # Clean queue directory on added node
846
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
847
    msg = result.fail_msg
848
849
850
    if msg:
      logging.warning("Cannot cleanup queue directory on node %s: %s",
                      node_name, msg)
851

852
853
854
855
856
857
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

858
859
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
860

861
862
863
864
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
865
      # Read file content
866
      content = utils.ReadFile(file_name)
867

868
869
870
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
871
      msg = result[node_name].fail_msg
872
873
874
      if msg:
        logging.error("Failed to upload file %s to node %s: %s",
                      file_name, node_name, msg)
875

876
    self._nodes[node_name] = node.primary_ip
877
878
879
880

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
881
882
883
884
885
886
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
887
    self._nodes.pop(node_name, None)
888

889
890
  @staticmethod
  def _CheckRpcResult(result, nodes, failmsg):
891
892
893
894
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
895
    log the case when more than half of the nodes fails.
896
897
898
899
900
901
902
903

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
904
905
906
907
    failed = []
    success = []

    for node in nodes:
908
      msg = result[node].fail_msg
909
      if msg:
910
        failed.append(node)
911
912
        logging.error("RPC call %s (%s) failed on node %s: %s",
                      result[node].call, failmsg, node, msg)
913
914
      else:
        success.append(node)
915
916
917
918
919
920

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

921
922
923
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

924
925
926
927
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

928
929
930
931
932
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

933
  def _UpdateJobQueueFile(self, file_name, data, replicate):
934
935
    """Writes a file locally and then replicates it to all nodes.

936
937
938
939
940
941
942
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file
943
944
    @type replicate: boolean
    @param replicate: whether to spread the changes to the remote nodes
945

946
947
948
    """
    utils.WriteFile(file_name, data=data)

949
950
951
952
    if replicate:
      names, addrs = self._GetNodeIp()
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
953

954
  def _RenameFilesUnlocked(self, rename):
955
956
957
958
959
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

960
961
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
962
963

    """
964
    # Rename them locally
965
966
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
967

968
969
970
971
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
972

973
974
  @staticmethod
  def _FormatJobID(job_id):
975
976
977
978
979
980
981
982
983
984
985
986
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
987
988
989
990
991
992
993
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

Iustin Pop's avatar
Iustin Pop committed
1006
  def _NewSerialsUnlocked(self, count):
1007
1008
1009
1010
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

Iustin Pop's avatar
Iustin Pop committed
1011
1012
    @type count: integer
    @param count: how many serials to return
1013
1014
    @rtype: str
    @return: a string representing the job identifier.
1015
1016

    """
Iustin Pop's avatar
Iustin Pop committed
1017
    assert count > 0
1018
    # New number
Iustin Pop's avatar
Iustin Pop committed
1019
    serial = self._last_serial + count
1020
1021

    # Write to file
1022
1023
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
                             "%s\n" % serial, True)
1024

Iustin Pop's avatar
Iustin Pop committed
1025
1026
    result = [self._FormatJobID(v)
              for v in range(self._last_serial, serial + 1)]
1027
1028
1029
    # Keep it only if we were able to write the file
    self._last_serial = serial

Iustin Pop's avatar
Iustin Pop committed
1030
    return result
1031

Michael Hanselmann's avatar
Michael Hanselmann committed
1032
1033
  @staticmethod
  def _GetJobPath(job_id):
1034
1035
1036
1037
1038
1039
1040
1041
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
1042
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1043

1044
1045
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
1046
1047
1048
1049
1050
1051
1052
1053
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
Iustin Pop's avatar
Iustin Pop committed
1054
1055
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1056

Guido Trotter's avatar
Guido Trotter committed
1057
  def _GetJobIDsUnlocked(self, sort=True):
1058
1059
    """Return all known job IDs.

Iustin Pop's avatar
Iustin Pop committed
1060
1061
1062
1063
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

Guido Trotter's avatar
Guido Trotter committed
1064
1065
    @type sort: boolean
    @param sort: perform sorting on the returned job ids
1066
1067
1068
    @rtype: list
    @return: the list of job IDs

1069
    """
Guido Trotter's avatar
Guido Trotter committed
1070
    jlist = []
1071
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
Guido Trotter's avatar
Guido Trotter committed
1072
1073
1074
1075
1076
      m = self._RE_JOB_FILE.match(filename)
      if m:
        jlist.append(m.group(1))
    if sort:
      jlist = utils.NiceSort(jlist)
1077
    return jlist
1078
1079

  def _LoadJobUnlocked(self, job_id):
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
1091
1092
    job = self._memcache.get(job_id, None)
    if job:
1093
      logging.debug("Found job %s in memcache", job_id)
1094
      return job
Iustin Pop's avatar
Iustin Pop committed
1095

1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
    try:
      job = self._LoadJobFromDisk(job_id)
    except errors.JobFileCorrupted:
      old_path = self._GetJobPath(job_id)
      new_path = self._GetArchivedJobPath(job_id)
      if old_path == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
        self._RenameFilesUnlocked([(old_path, new_path)])
      return None
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124

    self._memcache[job_id] = job
    logging.debug("Added job %s to the cache", job_id)
    return job

  def _LoadJobFromDisk(self, job_id):
    """Load the given job file from disk.

    Given a job file, read, load and restore it in a _QueuedJob format.

    @type job_id: string
    @param job_id: job identifier
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
1125
    filepath = self._GetJobPath(job_id)
1126
1127
    logging.debug("Loading job from %s", filepath)
    try:
1128
      raw_data = utils.ReadFile(filepath)
1129
    except EnvironmentError, err:
1130
1131
1132
      if err.errno in (errno.ENOENT, ):
        return None
      raise
1133

1134
    try:
1135
      data = serializer.LoadJson(raw_data)
1136
      job = _QueuedJob.Restore(self, data)
Iustin Pop's avatar
Iustin Pop committed
1137
    except Exception, err: # pylint: disable-msg=W0703
1138
      raise errors.JobFileCorrupted(err)
1139

Iustin Pop's avatar
Iustin Pop committed
1140
    return job
1141

1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
  def SafeLoadJobFromDisk(self, job_id):
    """Load the given job file from disk.

    Given a job file, read, load and restore it in a _QueuedJob format.
    In case of error reading the job, it gets returned as None, and the
    exception is logged.

    @type job_id: string
    @param job_id: job identifier
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
    try:
      return self._LoadJobFromDisk(job_id)
    except (errors.JobFileCorrupted, EnvironmentError):
      logging.exception("Can't load/parse job %s", job_id)
      return None

1161
1162
1163
1164
1165
1166
1167
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

1168
1169
1170
    @rtype: boolean
    @return: True of the job queue is marked for draining

1171
1172
1173
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

1174
1175
1176
1177
1178
1179
1180
1181
1182
  def _UpdateQueueSizeUnlocked(self):
    """Update the queue size.

    """
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))

  @utils.LockedMethod
  @_RequireOpenQueue
  def SetDrainFlag(self, drain_flag):
1183
1184
    """Sets the drain flag for the queue.

1185
    @type drain_flag: boolean
Michael Hanselmann's avatar
Michael Hanselmann committed
1186
    @param drain_flag: Whether to set or unset the drain flag
1187

1188
1189
1190
1191
1192
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1193
1194
1195

    self._drained = drain_flag

1196
1197
    return True

1198
  @_RequireOpenQueue
Iustin Pop's avatar
Iustin Pop committed
1199
  def _SubmitJobUnlocked(self, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
1200
    """Create and store a new job.
1201