__init__.py 76.2 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

Michael Hanselmann's avatar
Michael Hanselmann committed
32
import logging
33
import errno
34
import time
35
import weakref
Michael Hanselmann's avatar
Michael Hanselmann committed
36
import threading
37
import itertools
38
import operator
39
import os
Iustin Pop's avatar
Iustin Pop committed
40

Guido Trotter's avatar
Guido Trotter committed
41
try:
42
  # pylint: disable=E0611
Guido Trotter's avatar
Guido Trotter committed
43
44
45
46
47
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from ganeti import asyncnotifier
Michael Hanselmann's avatar
Michael Hanselmann committed
48
from ganeti import constants
49
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
50
from ganeti import workerpool
51
from ganeti import locking
52
from ganeti import luxi
53
from ganeti import opcodes
Jose A. Lopes's avatar
Jose A. Lopes committed
54
from ganeti import opcodes_base
Iustin Pop's avatar
Iustin Pop committed
55
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
56
from ganeti import mcpu
57
from ganeti import utils
58
from ganeti import jstore
59
import ganeti.rpc.node as rpc
60
from ganeti import runtime
61
from ganeti import netutils
62
from ganeti import compat
Michael Hanselmann's avatar
Michael Hanselmann committed
63
from ganeti import ht
64
65
from ganeti import query
from ganeti import qlang
66
from ganeti import pathutils
67
from ganeti import vcluster
Michael Hanselmann's avatar
Michael Hanselmann committed
68

69

70
JOBQUEUE_THREADS = 1
Michael Hanselmann's avatar
Michael Hanselmann committed
71

72
73
74
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
75

76
77
78
#: Retrieves "id" attribute
_GetIdAttr = operator.attrgetter("id")

Iustin Pop's avatar
Iustin Pop committed
79

80
class CancelJob(Exception):
81
82
83
84
85
  """Special exception to cancel a job.

  """


86
87
88
89
90
91
class QueueShutdown(Exception):
  """Special exception to abort a job when the job queue is shutting down.

  """


92
def TimeStampNow():
93
94
95
96
97
98
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
99
100
101
  return utils.SplitTime(time.time())


102
103
104
105
106
107
108
109
def _CallJqUpdate(runner, names, file_name, content):
  """Updates job queue file after virtualizing filename.

  """
  virt_file_name = vcluster.MakeVirtualPath(file_name)
  return runner.call_jobqueue_update(names, virt_file_name, content)


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class _SimpleJobQuery:
  """Wrapper for job queries.

  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.

  """
  def __init__(self, fields):
    """Initializes this class.

    """
    self._query = query.Query(query.JOB_FIELDS, fields)

  def __call__(self, job):
    """Executes a job query using cached field list.

    """
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]


Michael Hanselmann's avatar
Michael Hanselmann committed
129
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
130
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
131

132
133
134
135
136
137
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
Iustin Pop's avatar
Iustin Pop committed
138
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
139
  @ivar stop_timestamp: timestamp for the end of the execution
140

Michael Hanselmann's avatar
Michael Hanselmann committed
141
  """
142
  __slots__ = ["input", "status", "result", "log", "priority",
Iustin Pop's avatar
Iustin Pop committed
143
               "start_timestamp", "exec_timestamp", "end_timestamp",
144
145
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
146
  def __init__(self, op):
147
    """Initializes instances of this class.
148
149
150
151
152

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
153
154
155
156
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
157
    self.start_timestamp = None
Iustin Pop's avatar
Iustin Pop committed
158
    self.exec_timestamp = None
159
    self.end_timestamp = None
160

161
162
163
    # Get initial priority (it might change during the lifetime of this opcode)
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)

164
165
  @classmethod
  def Restore(cls, state):
166
167
168
169
170
171
172
173
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
174
175
176
177
178
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
179
    obj.start_timestamp = state.get("start_timestamp", None)
Iustin Pop's avatar
Iustin Pop committed
180
    obj.exec_timestamp = state.get("exec_timestamp", None)
181
    obj.end_timestamp = state.get("end_timestamp", None)
182
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
183
184
185
    return obj

  def Serialize(self):
186
187
188
189
190
191
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
192
193
194
195
196
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
197
      "start_timestamp": self.start_timestamp,
Iustin Pop's avatar
Iustin Pop committed
198
      "exec_timestamp": self.exec_timestamp,
199
      "end_timestamp": self.end_timestamp,
200
      "priority": self.priority,
201
      }
202

Michael Hanselmann's avatar
Michael Hanselmann committed
203
204
205
206

class _QueuedJob(object):
  """In-memory job representation.

207
208
209
210
211
212
213
214
215
216
217
218
219
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
220
  @ivar writable: Whether the job is allowed to be modified
Michael Hanselmann's avatar
Michael Hanselmann committed
221
222

  """
223
  # pylint: disable=W0212
224
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
225
               "received_timestamp", "start_timestamp", "end_timestamp",
226
227
228
               "processor_lock", "writable", "archived",
               "livelock", "process_id",
               "__weakref__"]
229

230
  def AddReasons(self, pickup=False):
231
232
233
234
235
236
237
238
    """Extend the reason trail

    Add the reason for all the opcodes of this job to be executed.

    """
    count = 0
    for queued_op in self.ops:
      op = queued_op.input
239
240
241
242
243
244
      if pickup:
        reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
      else:
        reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
                                                reason_src_prefix)
245
246
247
248
249
250
      reason_text = "job=%d;index=%d" % (self.id, count)
      reason = getattr(op, "reason", [])
      reason.append((reason_src, reason_text, utils.EpochNano()))
      op.reason = reason
      count = count + 1

251
  def __init__(self, queue, job_id, ops, writable):
252
253
254
255
256
257
258
259
260
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes
261
262
    @type writable: bool
    @param writable: Whether job can be modified
263
264

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
265
    if not ops:
Guido Trotter's avatar
Guido Trotter committed
266
      raise errors.GenericError("A job needs at least one opcode")
Michael Hanselmann's avatar
Michael Hanselmann committed
267

Michael Hanselmann's avatar
Michael Hanselmann committed
268
    self.queue = queue
Iustin Pop's avatar
Iustin Pop committed
269
    self.id = int(job_id)
Michael Hanselmann's avatar
Michael Hanselmann committed
270
    self.ops = [_QueuedOpCode(op) for op in ops]
271
    self.AddReasons()
272
    self.log_serial = 0
273
274
275
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
276
    self.archived = False
277
278
    self.livelock = None
    self.process_id = None
279

280
    self._InitInMemory(self, writable)
281

282
283
    assert not self.archived, "New jobs can not be marked as archived"

284
  @staticmethod
285
  def _InitInMemory(obj, writable):
286
287
288
    """Initializes in-memory variables.

    """
289
    obj.writable = writable
290
    obj.ops_iter = None
291
    obj.cur_opctx = None
292
293
294
295
296
297

    # Read-only jobs are not processed and therefore don't need a lock
    if writable:
      obj.processor_lock = threading.Lock()
    else:
      obj.processor_lock = None
298

299
300
301
302
303
304
305
  def __repr__(self):
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
              "id=%s" % self.id,
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]

    return "<%s at %#x>" % (" ".join(status), id(self))

306
  @classmethod
307
  def Restore(cls, queue, state, writable, archived):
308
309
310
311
312
313
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
314
315
    @type writable: bool
    @param writable: Whether job can be modified
316
317
    @type archived: bool
    @param archived: Whether job was already archived
318
319
320
321
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
322
323
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
Iustin Pop's avatar
Iustin Pop committed
324
    obj.id = int(state["id"])
325
326
327
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
328
    obj.archived = archived
329
330
331
332
    obj.livelock = state.get("livelock", None)
    obj.process_id = state.get("process_id", None)
    if obj.process_id is not None:
      obj.process_id = int(obj.process_id)
333
334
335
336
337
338
339
340
341

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

342
    cls._InitInMemory(obj, writable)
343

344
345
346
    return obj

  def Serialize(self):
347
348
349
350
351
352
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
353
354
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
355
      "ops": [op.Serialize() for op in self.ops],
356
357
358
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
359
360
      "livelock": self.livelock,
      "process_id": self.process_id,
361
362
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
363
  def CalcStatus(self):
364
365
366
367
368
369
370
371
372
373
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
374
          - canceling
375
376
377
378
379
380
381
382
383
384
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
385
386
387
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
388
389
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
390
391
392
393
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
394
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
395
        pass
396
397
      elif op.status == constants.OP_STATUS_WAITING:
        status = constants.JOB_STATUS_WAITING
Michael Hanselmann's avatar
Michael Hanselmann committed
398
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
399
        status = constants.JOB_STATUS_RUNNING
400
401
402
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
403
      elif op.status == constants.OP_STATUS_ERROR:
404
405
406
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
407
      elif op.status == constants.OP_STATUS_CANCELED:
408
409
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
410
411
412
413
414
415

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
  def CalcPriority(self):
    """Gets the current priority for this job.

    Only unfinished opcodes are considered. When all are done, the default
    priority is used.

    @rtype: int

    """
    priorities = [op.priority for op in self.ops
                  if op.status not in constants.OPS_FINALIZED]

    if not priorities:
      # All opcodes are done, assume default priority
      return constants.OP_PRIO_DEFAULT

    return min(priorities)

434
  def GetLogEntries(self, newer_than):
435
436
437
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
438
    @param newer_than: if this is None, return all log entries,
439
440
441
442
443
444
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
445
446
447
448
449
450
451
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
452
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
453
454
455

    return entries

456
457
458
459
460
461
462
463
464
465
466
  def GetInfo(self, fields):
    """Returns information about a job.

    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field
    @raise errors.OpExecError: when an invalid field
        has been passed

    """
467
    return _SimpleJobQuery(fields)(self)
468

469
470
471
472
473
474
475
476
477
478
479
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
480
481
482
483
484
485
486
487
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False
488

489
490
491
492
493
494
  def Finalize(self):
    """Marks the job as finalized.

    """
    self.end_timestamp = TimeStampNow()

495
  def Cancel(self):
496
497
498
499
500
501
502
    """Marks job as canceled/-ing if possible.

    @rtype: tuple; (bool, string)
    @return: Boolean describing whether job was successfully canceled or marked
      as canceling and a text message

    """
503
504
505
506
507
    status = self.CalcStatus()

    if status == constants.JOB_STATUS_QUEUED:
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                             "Job canceled by request")
508
      self.Finalize()
509
      return (True, "Job %s canceled" % self.id)
510

511
    elif status == constants.JOB_STATUS_WAITING:
512
513
      # The worker will notice the new status and cancel the job
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
514
      return (True, "Job %s will be canceled" % self.id)
515

516
517
518
    else:
      logging.debug("Job %s is no longer waiting in the queue", self.id)
      return (False, "Job %s is no longer waiting in the queue" % self.id)
519

520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
  def ChangePriority(self, priority):
    """Changes the job priority.

    @type priority: int
    @param priority: New priority
    @rtype: tuple; (bool, string)
    @return: Boolean describing whether job's priority was successfully changed
      and a text message

    """
    status = self.CalcStatus()

    if status in constants.JOBS_FINALIZED:
      return (False, "Job %s is finished" % self.id)
    elif status == constants.JOB_STATUS_CANCELING:
      return (False, "Job %s is cancelling" % self.id)
    else:
      assert status in (constants.JOB_STATUS_QUEUED,
                        constants.JOB_STATUS_WAITING,
                        constants.JOB_STATUS_RUNNING)

      changed = False
      for op in self.ops:
        if (op.status == constants.OP_STATUS_RUNNING or
            op.status in constants.OPS_FINALIZED):
          assert not changed, \
            ("Found opcode for which priority should not be changed after"
             " priority has been changed for previous opcodes")
          continue

        assert op.status in (constants.OP_STATUS_QUEUED,
                             constants.OP_STATUS_WAITING)

        changed = True

555
        # Set new priority (doesn't modify opcode input)
556
557
558
559
560
561
562
563
        op.priority = priority

      if changed:
        return (True, ("Priorities of pending opcodes for job %s have been"
                       " changed to %s" % (self.id, priority)))
      else:
        return (False, "Job %s had no pending opcodes" % self.id)

564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
  def SetPid(self, pid):
    """Sets the job's process ID

    @type pid: int
    @param pid: the process ID

    """
    status = self.CalcStatus()

    if status in (constants.JOB_STATUS_QUEUED,
                  constants.JOB_STATUS_WAITING):
      if self.process_id is not None:
        logging.warning("Replacing the process id %s of job %s with %s",
                        self.process_id, self.id, pid)
      self.process_id = pid
    else:
      logging.warning("Can set pid only for queued/waiting jobs")

582

583
class _OpExecCallbacks(mcpu.OpExecCbBase):
584
585
  def __init__(self, queue, job, op):
    """Initializes this class.
586

587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

603
604
605
606
607
608
609
610
611
  def _CheckCancel(self):
    """Raises an exception to cancel the job if asked to.

    """
    # Cancel here if we were asked to
    if self._op.status == constants.OP_STATUS_CANCELING:
      logging.debug("Canceling opcode")
      raise CancelJob()

612
613
614
615
616
    # See if queue is shutting down
    if not self._queue.AcceptingJobsUnlocked():
      logging.debug("Queue is shutting down")
      raise QueueShutdown()

617
  @locking.ssynchronized(_QUEUE, shared=1)
618
  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
619
620
    """Mark the opcode as running, not lock-waiting.

621
622
623
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
624
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
Iustin Pop's avatar
Iustin Pop committed
625
626

    """
627
    assert self._op in self._job.ops
628
    assert self._op.status in (constants.OP_STATUS_WAITING,
629
                               constants.OP_STATUS_CANCELING)
630

631
    # Cancel here if we were asked to
632
    self._CheckCancel()
633

634
    logging.debug("Opcode is now running")
635

636
637
638
639
640
    self._op.status = constants.OP_STATUS_RUNNING
    self._op.exec_timestamp = TimeStampNow()

    # And finally replicate the job status
    self._queue.UpdateJobUnlocked(self._job)
641

642
  @locking.ssynchronized(_QUEUE, shared=1)
643
644
645
646
647
648
649
650
  def _AppendFeedback(self, timestamp, log_type, log_msg):
    """Internal feedback append function, with locks

    """
    self._job.log_serial += 1
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
    self._queue.UpdateJobUnlocked(self._job, replicate=False)

651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
666
    self._AppendFeedback(timestamp, log_type, log_msg)
667

668
669
  def CurrentPriority(self):
    """Returns current priority for opcode.
670
671

    """
672
    assert self._op.status in (constants.OP_STATUS_WAITING,
673
674
675
676
677
                               constants.OP_STATUS_CANCELING)

    # Cancel here if we were asked to
    self._CheckCancel()

678
679
    return self._op.priority

680
681
682
683
684
685
686
687
688
  def SubmitManyJobs(self, jobs):
    """Submits jobs for processing.

    See L{JobQueue.SubmitManyJobs}.

    """
    # Locking is done in job queue
    return self._queue.SubmitManyJobs(jobs)

689

690
691
692
class _JobChangesChecker(object):
  def __init__(self, fields, prev_job_info, prev_log_serial):
    """Initializes this class.
Guido Trotter's avatar
Guido Trotter committed
693

694
695
696
697
698
699
    @type fields: list of strings
    @param fields: Fields requested by LUXI client
    @type prev_job_info: string
    @param prev_job_info: previous job info, as passed by the LUXI client
    @type prev_log_serial: string
    @param prev_log_serial: previous job serial, as passed by the LUXI client
Guido Trotter's avatar
Guido Trotter committed
700

701
    """
702
    self._squery = _SimpleJobQuery(fields)
703
704
    self._prev_job_info = prev_job_info
    self._prev_log_serial = prev_log_serial
Guido Trotter's avatar
Guido Trotter committed
705

706
707
  def __call__(self, job):
    """Checks whether job has changed.
Guido Trotter's avatar
Guido Trotter committed
708

709
710
    @type job: L{_QueuedJob}
    @param job: Job object
Guido Trotter's avatar
Guido Trotter committed
711
712

    """
713
714
    assert not job.writable, "Expected read-only job"

715
    status = job.CalcStatus()
716
    job_info = self._squery(job)
717
    log_entries = job.GetLogEntries(self._prev_log_serial)
Guido Trotter's avatar
Guido Trotter committed
718
719
720
721
722
723
724
725

    # Serializing and deserializing data can cause type changes (e.g. from
    # tuple to list) or precision loss. We're doing it here so that we get
    # the same modifications as the data received from the client. Without
    # this, the comparison afterwards might fail without the data being
    # significantly different.
    # TODO: we just deserialized from disk, investigate how to make sure that
    # the job info and log entries are compatible to avoid this further step.
726
727
728
729
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
    # efficient, though floats will be tricky
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
Guido Trotter's avatar
Guido Trotter committed
730
731
732

    # Don't even try to wait if the job is no longer running, there will be
    # no changes.
733
734
    if (status not in (constants.JOB_STATUS_QUEUED,
                       constants.JOB_STATUS_RUNNING,
735
                       constants.JOB_STATUS_WAITING) or
736
737
738
739
        job_info != self._prev_job_info or
        (log_entries and self._prev_log_serial != log_entries[0][0])):
      logging.debug("Job %s changed", job.id)
      return (job_info, log_entries)
Guido Trotter's avatar
Guido Trotter committed
740

741
742
743
744
    return None


class _JobFileChangesWaiter(object):
745
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
746
747
748
749
750
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file
    @raises errors.InotifyError: if the notifier cannot be setup
Guido Trotter's avatar
Guido Trotter committed
751

752
    """
753
    self._wm = _inotify_wm_cls()
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
    self._inotify_handler = \
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
    self._notifier = \
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
    try:
      self._inotify_handler.enable()
    except Exception:
      # pyinotify doesn't close file descriptors automatically
      self._notifier.stop()
      raise

  def _OnInotify(self, notifier_enabled):
    """Callback for inotify.

    """
Guido Trotter's avatar
Guido Trotter committed
769
    if not notifier_enabled:
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
      self._inotify_handler.enable()

  def Wait(self, timeout):
    """Waits for the job file to change.

    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    assert timeout >= 0
    have_events = self._notifier.check_events(timeout * 1000)
    if have_events:
      self._notifier.read_events()
    self._notifier.process_events()
    return have_events

  def Close(self):
    """Closes underlying notifier and its file descriptor.

    """
    self._notifier.stop()


class _JobChangesWaiter(object):
795
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
796
797
798
799
800
801
802
803
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file

    """
    self._filewaiter = None
    self._filename = filename
804
    self._waiter_cls = _waiter_cls
Guido Trotter's avatar
Guido Trotter committed
805

806
807
  def Wait(self, timeout):
    """Waits for a job to change.
Guido Trotter's avatar
Guido Trotter committed
808

809
810
811
812
813
814
815
816
817
818
819
820
    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    if self._filewaiter:
      return self._filewaiter.Wait(timeout)

    # Lazy setup: Avoid inotify setup cost when job file has already changed.
    # If this point is reached, return immediately and let caller check the job
    # file again in case there were changes since the last check. This avoids a
    # race condition.
821
    self._filewaiter = self._waiter_cls(self._filename)
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840

    return True

  def Close(self):
    """Closes underlying waiter.

    """
    if self._filewaiter:
      self._filewaiter.Close()


class _WaitForJobChangesHelper(object):
  """Helper class using inotify to wait for changes in a job file.

  This class takes a previous job status and serial, and alerts the client when
  the current job status has changed.

  """
  @staticmethod
841
842
843
844
845
846
847
  def _CheckForChanges(counter, job_load_fn, check_fn):
    if counter.next() > 0:
      # If this isn't the first check the job is given some more time to change
      # again. This gives better performance for jobs generating many
      # changes/messages.
      time.sleep(0.1)

848
849
850
851
852
853
854
855
856
857
858
    job = job_load_fn()
    if not job:
      raise errors.JobLost()

    result = check_fn(job)
    if result is None:
      raise utils.RetryAgain()

    return result

  def __call__(self, filename, job_load_fn,
859
860
               fields, prev_job_info, prev_log_serial, timeout,
               _waiter_cls=_JobChangesWaiter):
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
    """Waits for changes on a job.

    @type filename: string
    @param filename: File on which to wait for changes
    @type job_load_fn: callable
    @param job_load_fn: Function to load job
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
    @type timeout: float
    @param timeout: maximum time to wait in seconds

    """
877
    counter = itertools.count()
Guido Trotter's avatar
Guido Trotter committed
878
    try:
879
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
880
      waiter = _waiter_cls(filename)
881
882
      try:
        return utils.Retry(compat.partial(self._CheckForChanges,
883
                                          counter, job_load_fn, check_fn),
884
885
886
887
                           utils.RETRY_REMAINING_TIME, timeout,
                           wait_fn=waiter.Wait)
      finally:
        waiter.Close()
888
    except errors.JobLost:
Guido Trotter's avatar
Guido Trotter committed
889
890
891
892
893
      return None
    except utils.RetryTimeout:
      return constants.JOB_NOTCHANGED


894
895
896
897
898
899
900
901
902
903
904
905
def _EncodeOpError(err):
  """Encodes an error which occurred while processing an opcode.

  """
  if isinstance(err, errors.GenericError):
    to_encode = err
  else:
    to_encode = errors.OpExecError(str(err))

  return errors.EncodeException(to_encode)


906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
class _TimeoutStrategyWrapper:
  def __init__(self, fn):
    """Initializes this class.

    """
    self._fn = fn
    self._next = None

  def _Advance(self):
    """Gets the next timeout if necessary.

    """
    if self._next is None:
      self._next = self._fn()

  def Peek(self):
    """Returns the next timeout.

    """
    self._Advance()
    return self._next

  def Next(self):
    """Returns the current timeout and advances the internal state.

    """
    self._Advance()
    result = self._next
    self._next = None
    return result


938
class _OpExecContext:
939
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
940
941
942
943
944
945
946
947
    """Initializes this class.

    """
    self.op = op
    self.index = index
    self.log_prefix = log_prefix
    self.summary = op.input.Summary()

Michael Hanselmann's avatar
Michael Hanselmann committed
948
    # Create local copy to modify
Jose A. Lopes's avatar
Jose A. Lopes committed
949
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
Michael Hanselmann's avatar
Michael Hanselmann committed
950
951
952
953
      self.jobdeps = op.input.depends[:]
    else:
      self.jobdeps = None

954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
    self._timeout_strategy_factory = timeout_strategy_factory
    self._ResetTimeoutStrategy()

  def _ResetTimeoutStrategy(self):
    """Creates a new timeout strategy.

    """
    self._timeout_strategy = \
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)

  def CheckPriorityIncrease(self):
    """Checks whether priority can and should be increased.

    Called when locks couldn't be acquired.

    """
    op = self.op

    # Exhausted all retries and next round should not use blocking acquire
    # for locks?
    if (self._timeout_strategy.Peek() is None and
        op.priority > constants.OP_PRIO_HIGHEST):
      logging.debug("Increasing priority")
      op.priority -= 1
      self._ResetTimeoutStrategy()
      return True

    return False

  def GetNextLockTimeout(self):
    """Returns the next lock acquire timeout.

    """
    return self._timeout_strategy.Next()

989

990
class _JobProcessor(object):
991
992
993
994
  (DEFER,
   WAITDEP,
   FINISHED) = range(1, 4)

995
996
  def __init__(self, queue, opexec_fn, job,
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
997
998
999
1000
1001
1002
    """Initializes this class.

    """
    self.queue = queue
    self.opexec_fn = opexec_fn
    self.job = job
1003
    self._timeout_strategy_factory = _timeout_strategy_factory
1004
1005

  @staticmethod
1006
  def _FindNextOpcode(job, timeout_strategy_factory):
1007
1008
1009
1010
    """Locates the next opcode to run.

    @type job: L{_QueuedJob}
    @param job: Job object
1011
    @param timeout_strategy_factory: Callable to create new timeout strategy
1012
1013
1014
1015
1016
1017

    """
    # Create some sort of a cache to speed up locating next opcode for future
    # lookups
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
    # pending and one for processed ops.
1018
1019
    if job.ops_iter is None:
      job.ops_iter = enumerate(job.ops)
1020
1021
1022
1023

    # Find next opcode to run
    while True:
      try:
1024
        (idx, op) = job.ops_iter.next()
1025
1026
1027
1028
1029
1030
1031
      except StopIteration:
        raise errors.ProgrammerError("Called for a finished job")

      if op.status == constants.OP_STATUS_RUNNING:
        # Found an opcode already marked as running
        raise errors.ProgrammerError("Called for job marked as running")

1032
1033
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
                             timeout_strategy_factory)
1034

1035
1036
      if op.status not in constants.OPS_FINALIZED:
        return opctx
1037

1038
1039
1040
1041
1042
1043
      # This is a job that was partially completed before master daemon
      # shutdown, so it can be expected that some opcodes are already
      # completed successfully (if any did error out, then the whole job
      # should have been aborted and not resubmitted for processing).
      logging.info("%s: opcode %s already processed, skipping",
                   opctx.log_prefix, opctx.summary)
1044
1045
1046
1047
1048
1049
1050
1051
1052

  @staticmethod
  def _MarkWaitlock(job, op):
    """Marks an opcode as waiting for locks.

    The job's start timestamp is also set if necessary.

    @type job: L{_QueuedJob}
    @param job: Job object
1053
1054
    @type op: L{_QueuedOpCode}
    @param op: Opcode object
1055
1056
1057

    """
    assert op in job.ops
1058
    assert op.status in (constants.OP_STATUS_QUEUED,
1059
                         constants.OP_STATUS_WAITING)
1060
1061

    update = False
1062
1063

    op.result = None
1064
1065

    if op.status == constants.OP_STATUS_QUEUED:
1066
      op.status = constants.OP_STATUS_WAITING
1067
1068
1069
1070
1071
      update = True

    if op.start_timestamp is None:
      op.start_timestamp = TimeStampNow()
      update = True
1072
1073
1074

    if job.start_timestamp is None:
      job.start_timestamp = op.start_timestamp
1075
1076
      update = True

1077
    assert op.status == constants.OP_STATUS_WAITING
1078
1079

    return update
1080

Michael Hanselmann's avatar
Michael Hanselmann committed
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
  @staticmethod
  def _CheckDependencies(queue, job, opctx):
    """Checks if an opcode has dependencies and if so, processes them.

    @type queue: L{JobQueue}
    @param queue: Queue object
    @type job: L{_QueuedJob}
    @param job: Job object
    @type opctx: L{_OpExecContext}
    @param opctx: Opcode execution context
    @rtype: bool
    @return: Whether opcode will be re-scheduled by dependency tracker

    """
    op = opctx.op

    result = False

    while opctx.jobdeps:
      (dep_job_id, dep_status) = opctx.jobdeps[0]

      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
                                                          dep_status)
      assert ht.TNonEmptyString(depmsg), "No dependency message"

      logging.info("%s: %s", opctx.log_prefix, depmsg)

      if depresult == _JobDependencyManager.CONTINUE:
        # Remove dependency and continue
        opctx.jobdeps.pop(0)

      elif depresult == _JobDependencyManager.WAIT:
        # Need to wait for notification, dependency tracker will re-add job
        # to workerpool
        result = True
        break

      elif depresult == _JobDependencyManager.CANCEL:
        # Job was cancelled, cancel this job as well
        job.Cancel()
        assert op.status == constants.OP_STATUS_CANCELING
        break

      elif depresult in (_JobDependencyManager.WRONGSTATUS,
                         _JobDependencyManager.ERROR):
        # Job failed or there was an error, this job must fail
        op.status = constants.OP_STATUS_ERROR
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
        break

      else:
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
                                     depresult)

    return result

1137
  def _ExecOpCodeUnlocked(self, opctx):
1138
1139
1140
    """Processes one opcode and returns the result.

    """
1141
1142
    op = opctx.op

1143
1144
1145
1146
1147
1148
    assert op.status in (constants.OP_STATUS_WAITING,
                         constants.OP_STATUS_CANCELING)

    # The very last check if the job was cancelled before trying to execute
    if op.status == constants.OP_STATUS_CANCELING:
      return (constants.OP_STATUS_CANCELING, None)
1149

1150
1151
    timeout = opctx.GetNextLockTimeout()

1152
1153
1154
    try:
      # Make sure not to hold queue lock while calling ExecOpCode
      result = self.opexec_fn(op.input,
1155
                              _OpExecCallbacks(self.queue, self.job, op),
1156
                              timeout=timeout)
1157
1158
1159
    except mcpu.LockAcquireTimeout:
      assert timeout is not None, "Received timeout for blocking acquire"
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1160

1161
      assert op.status in (constants.OP_STATUS_WAITING,
1162
1163
1164
1165
1166
1167
                           constants.OP_STATUS_CANCELING)

      # Was job cancelled while we were waiting for the lock?
      if op.status == constants.OP_STATUS_CANCELING:
        return (constants.OP_STATUS_CANCELING, None)

1168
1169
1170
1171
      # Queue is shutting down, return to queued
      if not self.queue.AcceptingJobsUnlocked():
        return (constants.OP_STATUS_QUEUED, None)

1172
      # Stay in waitlock while trying to re-acquire lock
1173
      return (constants.OP_STATUS_WAITING, None)
1174
    except CancelJob:
1175
      logging.exception("%s: Canceling job", opctx.log_prefix)
1176
1177
      assert op.status == constants.OP_STATUS_CANCELING
      return (constants.OP_STATUS_CANCELING, None)
1178
1179
1180
1181
1182
1183
1184
1185
1186

    except QueueShutdown:
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)

      assert op.status == constants.OP_STATUS_WAITING

      # Job hadn't been started yet, so it should return to the queue
      return (constants.OP_STATUS_QUEUED, None)

1187
    except Exception, err: # pylint: disable=W0703
1188
1189
      logging.exception("%s: Caught exception in %s",
                        opctx.log_prefix, opctx.summary)
1190
1191
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
    else:
1192
1193
      logging.debug("%s: %s successful",
                    opctx.log_prefix, opctx.summary)
1194
1195
      return (constants.OP_STATUS_SUCCESS, result)

1196
  def __call__(self, _nextop_fn=None):
1197
1198
    """Continues execution of a job.

1199
    @param _nextop_fn: Callback function for tests
1200
1201
1202
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
      be deferred and C{WAITDEP} if the dependency manager
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213

    """
    queue = self.queue
    job = self.job

    logging.debug("Processing job %s", job.id)

    queue.acquire(shared=1)
    try:
      opcount = len(job.ops)

1214
1215
      assert job.writable, "Expected writable job"

1216
1217
      # Don't do anything for finalized jobs
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1218
        return self.FINISHED
1219

1220
1221
1222
      # Is a previous opcode still pending?
      if job.cur_opctx:
        opctx = job.cur_opctx
1223
        job.cur_opctx = None
1224
1225
1226
1227
1228
      else:
        if __debug__ and _nextop_fn:
          _nextop_fn()
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)

1229
      op = opctx.op
1230
1231
1232

      # Consistency check
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1233
                                     constants.OP_STATUS_CANCELING)
1234
                        for i in job.ops[opctx.index + 1:])
1235
1236

      assert op.status in (constants.OP_STATUS_QUEUED,
1237
                           constants.OP_STATUS_WAITING,
1238
                           constants.OP_STATUS_CANCELING)
1239

1240
1241
1242
      assert (op.priority <= constants.OP_PRIO_LOWEST and
              op.priority >= constants.OP_PRIO_HIGHEST)

Michael Hanselmann's avatar
Michael Hanselmann committed
1243
1244
      waitjob = None

1245
      if op.status != constants.OP_STATUS_CANCELING:
1246
        assert op.status in (constants.OP_STATUS_QUEUED,
1247
                             constants.OP_STATUS_WAITING)
1248

1249
        # Prepare to start opcode
1250
1251
1252
        if self._MarkWaitlock(job, op):
          # Write to disk
          queue.UpdateJobUnlocked(job)
1253

1254
1255
        assert op.status == constants.OP_STATUS_WAITING
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1256
        assert job.start_timestamp and op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
1257
1258
1259
1260
        assert waitjob is None

        # Check if waiting for a job is necessary
        waitjob = self._CheckDependencies(queue, job, opctx)
1261

1262
        assert op.status in (constants.OP_STATUS_WAITING,
Michael Hanselmann's avatar
Michael Hanselmann committed
1263
1264
                             constants.OP_STATUS_CANCELING,
                             constants.OP_STATUS_ERROR)
1265

Michael Hanselmann's avatar
Michael Hanselmann committed
1266
1267
1268
1269
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
                                         constants.OP_STATUS_ERROR)):
          logging.info("%s: opcode %s waiting for locks",
                       opctx.log_prefix, opctx.summary)
1270

Michael Hanselmann's avatar
Michael Hanselmann committed
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
          assert not opctx.jobdeps, "Not all dependencies were removed"

          queue.release()
          try: