jqueue.py 69.9 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

Michael Hanselmann's avatar
Michael Hanselmann committed
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

Michael Hanselmann's avatar
Michael Hanselmann committed
32
import logging
33
34
import errno
import re
35
import time
36
import weakref
Michael Hanselmann's avatar
Michael Hanselmann committed
37
import threading
Iustin Pop's avatar
Iustin Pop committed
38

Guido Trotter's avatar
Guido Trotter committed
39
40
41
42
43
44
45
try:
  # pylint: disable-msg=E0611
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from ganeti import asyncnotifier
Michael Hanselmann's avatar
Michael Hanselmann committed
46
from ganeti import constants
47
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
51
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
Michael Hanselmann's avatar
Michael Hanselmann committed
59
from ganeti import ht
Michael Hanselmann's avatar
Michael Hanselmann committed
60

61

62
JOBQUEUE_THREADS = 25
63
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
64

65
66
67
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
68

Iustin Pop's avatar
Iustin Pop committed
69

70
class CancelJob(Exception):
71
72
73
74
75
  """Special exception to cancel a job.

  """


76
def TimeStampNow():
77
78
79
80
81
82
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
83
84
85
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
86
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
87
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
88

89
90
91
92
93
94
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
Iustin Pop's avatar
Iustin Pop committed
95
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96
  @ivar stop_timestamp: timestamp for the end of the execution
97

Michael Hanselmann's avatar
Michael Hanselmann committed
98
  """
99
  __slots__ = ["input", "status", "result", "log", "priority",
Iustin Pop's avatar
Iustin Pop committed
100
               "start_timestamp", "exec_timestamp", "end_timestamp",
101
102
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
103
  def __init__(self, op):
104
105
106
107
108
109
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
110
111
112
113
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
114
    self.start_timestamp = None
Iustin Pop's avatar
Iustin Pop committed
115
    self.exec_timestamp = None
116
    self.end_timestamp = None
117

118
119
120
    # Get initial priority (it might change during the lifetime of this opcode)
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)

121
122
  @classmethod
  def Restore(cls, state):
123
124
125
126
127
128
129
130
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
131
132
133
134
135
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
136
    obj.start_timestamp = state.get("start_timestamp", None)
Iustin Pop's avatar
Iustin Pop committed
137
    obj.exec_timestamp = state.get("exec_timestamp", None)
138
    obj.end_timestamp = state.get("end_timestamp", None)
139
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140
141
142
    return obj

  def Serialize(self):
143
144
145
146
147
148
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
149
150
151
152
153
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
154
      "start_timestamp": self.start_timestamp,
Iustin Pop's avatar
Iustin Pop committed
155
      "exec_timestamp": self.exec_timestamp,
156
      "end_timestamp": self.end_timestamp,
157
      "priority": self.priority,
158
      }
159

Michael Hanselmann's avatar
Michael Hanselmann committed
160
161
162
163

class _QueuedJob(object):
  """In-memory job representation.

164
165
166
167
168
169
170
171
172
173
174
175
176
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
177
  @ivar writable: Whether the job is allowed to be modified
Michael Hanselmann's avatar
Michael Hanselmann committed
178
179

  """
Iustin Pop's avatar
Iustin Pop committed
180
  # pylint: disable-msg=W0212
181
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182
               "received_timestamp", "start_timestamp", "end_timestamp",
183
               "__weakref__", "processor_lock", "writable"]
184

185
  def __init__(self, queue, job_id, ops, writable):
186
187
188
189
190
191
192
193
194
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes
195
196
    @type writable: bool
    @param writable: Whether job can be modified
197
198

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
199
    if not ops:
Guido Trotter's avatar
Guido Trotter committed
200
      raise errors.GenericError("A job needs at least one opcode")
Michael Hanselmann's avatar
Michael Hanselmann committed
201

Michael Hanselmann's avatar
Michael Hanselmann committed
202
    self.queue = queue
203
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
204
    self.ops = [_QueuedOpCode(op) for op in ops]
205
    self.log_serial = 0
206
207
208
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
209

210
    self._InitInMemory(self, writable)
211
212

  @staticmethod
213
  def _InitInMemory(obj, writable):
214
215
216
    """Initializes in-memory variables.

    """
217
    obj.writable = writable
218
    obj.ops_iter = None
219
    obj.cur_opctx = None
220
221
222
223
224
225

    # Read-only jobs are not processed and therefore don't need a lock
    if writable:
      obj.processor_lock = threading.Lock()
    else:
      obj.processor_lock = None
226

227
228
229
230
231
232
233
  def __repr__(self):
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
              "id=%s" % self.id,
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]

    return "<%s at %#x>" % (" ".join(status), id(self))

234
  @classmethod
235
  def Restore(cls, queue, state, writable):
236
237
238
239
240
241
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
242
243
    @type writable: bool
    @param writable: Whether job can be modified
244
245
246
247
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
248
249
250
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
251
252
253
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
254
255
256
257
258
259
260
261
262

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

263
    cls._InitInMemory(obj, writable)
264

265
266
267
    return obj

  def Serialize(self):
268
269
270
271
272
273
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
274
275
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
276
      "ops": [op.Serialize() for op in self.ops],
277
278
279
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
280
281
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
282
  def CalcStatus(self):
283
284
285
286
287
288
289
290
291
292
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
293
          - canceling
294
295
296
297
298
299
300
301
302
303
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
304
305
306
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
307
308
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
309
310
311
312
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
313
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
314
        pass
Iustin Pop's avatar
Iustin Pop committed
315
316
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
317
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
318
        status = constants.JOB_STATUS_RUNNING
319
320
321
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
322
      elif op.status == constants.OP_STATUS_ERROR:
323
324
325
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
326
      elif op.status == constants.OP_STATUS_CANCELED:
327
328
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
329
330
331
332
333
334

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
  def CalcPriority(self):
    """Gets the current priority for this job.

    Only unfinished opcodes are considered. When all are done, the default
    priority is used.

    @rtype: int

    """
    priorities = [op.priority for op in self.ops
                  if op.status not in constants.OPS_FINALIZED]

    if not priorities:
      # All opcodes are done, assume default priority
      return constants.OP_PRIO_DEFAULT

    return min(priorities)

353
  def GetLogEntries(self, newer_than):
354
355
356
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
357
    @param newer_than: if this is None, return all log entries,
358
359
360
361
362
363
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
364
365
366
367
368
369
370
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
371
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
372
373
374

    return entries

375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
  def GetInfo(self, fields):
    """Returns information about a job.

    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field
    @raise errors.OpExecError: when an invalid field
        has been passed

    """
    row = []
    for fname in fields:
      if fname == "id":
        row.append(self.id)
      elif fname == "status":
        row.append(self.CalcStatus())
392
393
      elif fname == "priority":
        row.append(self.CalcPriority())
394
395
396
397
398
399
400
401
402
403
404
405
406
407
      elif fname == "ops":
        row.append([op.input.__getstate__() for op in self.ops])
      elif fname == "opresult":
        row.append([op.result for op in self.ops])
      elif fname == "opstatus":
        row.append([op.status for op in self.ops])
      elif fname == "oplog":
        row.append([op.log for op in self.ops])
      elif fname == "opstart":
        row.append([op.start_timestamp for op in self.ops])
      elif fname == "opexec":
        row.append([op.exec_timestamp for op in self.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in self.ops])
408
409
      elif fname == "oppriority":
        row.append([op.priority for op in self.ops])
410
411
412
413
414
415
416
417
418
419
420
421
      elif fname == "received_ts":
        row.append(self.received_timestamp)
      elif fname == "start_ts":
        row.append(self.start_timestamp)
      elif fname == "end_ts":
        row.append(self.end_timestamp)
      elif fname == "summary":
        row.append([op.input.Summary() for op in self.ops])
      else:
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
    return row

422
423
424
425
426
427
428
429
430
431
432
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
433
434
435
436
437
438
439
440
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False
441

442
443
444
445
446
447
  def Finalize(self):
    """Marks the job as finalized.

    """
    self.end_timestamp = TimeStampNow()

448
  def Cancel(self):
449
450
451
452
453
454
455
    """Marks job as canceled/-ing if possible.

    @rtype: tuple; (bool, string)
    @return: Boolean describing whether job was successfully canceled or marked
      as canceling and a text message

    """
456
457
458
459
460
    status = self.CalcStatus()

    if status == constants.JOB_STATUS_QUEUED:
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                             "Job canceled by request")
461
      self.Finalize()
462
      return (True, "Job %s canceled" % self.id)
463
464
465
466

    elif status == constants.JOB_STATUS_WAITLOCK:
      # The worker will notice the new status and cancel the job
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467
      return (True, "Job %s will be canceled" % self.id)
468

469
470
471
    else:
      logging.debug("Job %s is no longer waiting in the queue", self.id)
      return (False, "Job %s is no longer waiting in the queue" % self.id)
472

473

474
class _OpExecCallbacks(mcpu.OpExecCbBase):
475
476
  def __init__(self, queue, job, op):
    """Initializes this class.
477

478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

494
495
496
497
498
499
500
501
502
  def _CheckCancel(self):
    """Raises an exception to cancel the job if asked to.

    """
    # Cancel here if we were asked to
    if self._op.status == constants.OP_STATUS_CANCELING:
      logging.debug("Canceling opcode")
      raise CancelJob()

503
  @locking.ssynchronized(_QUEUE, shared=1)
504
  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
505
506
    """Mark the opcode as running, not lock-waiting.

507
508
509
510
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
511
512

    """
513
    assert self._op in self._job.ops
514
515
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                               constants.OP_STATUS_CANCELING)
516

517
    # Cancel here if we were asked to
518
    self._CheckCancel()
519

520
    logging.debug("Opcode is now running")
521

522
523
524
525
526
    self._op.status = constants.OP_STATUS_RUNNING
    self._op.exec_timestamp = TimeStampNow()

    # And finally replicate the job status
    self._queue.UpdateJobUnlocked(self._job)
527

528
  @locking.ssynchronized(_QUEUE, shared=1)
529
530
531
532
533
534
535
536
  def _AppendFeedback(self, timestamp, log_type, log_msg):
    """Internal feedback append function, with locks

    """
    self._job.log_serial += 1
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
    self._queue.UpdateJobUnlocked(self._job, replicate=False)

537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
552
    self._AppendFeedback(timestamp, log_type, log_msg)
553

554
555
  def CheckCancel(self):
    """Check whether job has been cancelled.
556
557

    """
558
559
560
561
562
563
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                               constants.OP_STATUS_CANCELING)

    # Cancel here if we were asked to
    self._CheckCancel()

564
565
566
567
568
569
570
571
572
  def SubmitManyJobs(self, jobs):
    """Submits jobs for processing.

    See L{JobQueue.SubmitManyJobs}.

    """
    # Locking is done in job queue
    return self._queue.SubmitManyJobs(jobs)

573

574
575
576
class _JobChangesChecker(object):
  def __init__(self, fields, prev_job_info, prev_log_serial):
    """Initializes this class.
Guido Trotter's avatar
Guido Trotter committed
577

578
579
580
581
582
583
    @type fields: list of strings
    @param fields: Fields requested by LUXI client
    @type prev_job_info: string
    @param prev_job_info: previous job info, as passed by the LUXI client
    @type prev_log_serial: string
    @param prev_log_serial: previous job serial, as passed by the LUXI client
Guido Trotter's avatar
Guido Trotter committed
584

585
586
587
588
    """
    self._fields = fields
    self._prev_job_info = prev_job_info
    self._prev_log_serial = prev_log_serial
Guido Trotter's avatar
Guido Trotter committed
589

590
591
  def __call__(self, job):
    """Checks whether job has changed.
Guido Trotter's avatar
Guido Trotter committed
592

593
594
    @type job: L{_QueuedJob}
    @param job: Job object
Guido Trotter's avatar
Guido Trotter committed
595
596

    """
597
598
    assert not job.writable, "Expected read-only job"

599
600
601
    status = job.CalcStatus()
    job_info = job.GetInfo(self._fields)
    log_entries = job.GetLogEntries(self._prev_log_serial)
Guido Trotter's avatar
Guido Trotter committed
602
603
604
605
606
607
608
609

    # Serializing and deserializing data can cause type changes (e.g. from
    # tuple to list) or precision loss. We're doing it here so that we get
    # the same modifications as the data received from the client. Without
    # this, the comparison afterwards might fail without the data being
    # significantly different.
    # TODO: we just deserialized from disk, investigate how to make sure that
    # the job info and log entries are compatible to avoid this further step.
610
611
612
613
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
    # efficient, though floats will be tricky
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
Guido Trotter's avatar
Guido Trotter committed
614
615
616

    # Don't even try to wait if the job is no longer running, there will be
    # no changes.
617
618
619
620
621
622
623
    if (status not in (constants.JOB_STATUS_QUEUED,
                       constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK) or
        job_info != self._prev_job_info or
        (log_entries and self._prev_log_serial != log_entries[0][0])):
      logging.debug("Job %s changed", job.id)
      return (job_info, log_entries)
Guido Trotter's avatar
Guido Trotter committed
624

625
626
627
628
629
630
631
632
633
634
    return None


class _JobFileChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file
    @raises errors.InotifyError: if the notifier cannot be setup
Guido Trotter's avatar
Guido Trotter committed
635

636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
    """
    self._wm = pyinotify.WatchManager()
    self._inotify_handler = \
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
    self._notifier = \
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
    try:
      self._inotify_handler.enable()
    except Exception:
      # pyinotify doesn't close file descriptors automatically
      self._notifier.stop()
      raise

  def _OnInotify(self, notifier_enabled):
    """Callback for inotify.

    """
Guido Trotter's avatar
Guido Trotter committed
653
    if not notifier_enabled:
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
      self._inotify_handler.enable()

  def Wait(self, timeout):
    """Waits for the job file to change.

    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    assert timeout >= 0
    have_events = self._notifier.check_events(timeout * 1000)
    if have_events:
      self._notifier.read_events()
    self._notifier.process_events()
    return have_events

  def Close(self):
    """Closes underlying notifier and its file descriptor.

    """
    self._notifier.stop()


class _JobChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file

    """
    self._filewaiter = None
    self._filename = filename
Guido Trotter's avatar
Guido Trotter committed
688

689
690
  def Wait(self, timeout):
    """Waits for a job to change.
Guido Trotter's avatar
Guido Trotter committed
691

692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    if self._filewaiter:
      return self._filewaiter.Wait(timeout)

    # Lazy setup: Avoid inotify setup cost when job file has already changed.
    # If this point is reached, return immediately and let caller check the job
    # file again in case there were changes since the last check. This avoids a
    # race condition.
    self._filewaiter = _JobFileChangesWaiter(self._filename)

    return True

  def Close(self):
    """Closes underlying waiter.

    """
    if self._filewaiter:
      self._filewaiter.Close()


class _WaitForJobChangesHelper(object):
  """Helper class using inotify to wait for changes in a job file.

  This class takes a previous job status and serial, and alerts the client when
  the current job status has changed.

  """
  @staticmethod
  def _CheckForChanges(job_load_fn, check_fn):
    job = job_load_fn()
    if not job:
      raise errors.JobLost()

    result = check_fn(job)
    if result is None:
      raise utils.RetryAgain()

    return result

  def __call__(self, filename, job_load_fn,
               fields, prev_job_info, prev_log_serial, timeout):
    """Waits for changes on a job.

    @type filename: string
    @param filename: File on which to wait for changes
    @type job_load_fn: callable
    @param job_load_fn: Function to load job
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
    @type timeout: float
    @param timeout: maximum time to wait in seconds

    """
Guido Trotter's avatar
Guido Trotter committed
753
    try:
754
755
756
757
758
759
760
761
762
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
      waiter = _JobChangesWaiter(filename)
      try:
        return utils.Retry(compat.partial(self._CheckForChanges,
                                          job_load_fn, check_fn),
                           utils.RETRY_REMAINING_TIME, timeout,
                           wait_fn=waiter.Wait)
      finally:
        waiter.Close()
Guido Trotter's avatar
Guido Trotter committed
763
764
765
766
767
768
    except (errors.InotifyError, errors.JobLost):
      return None
    except utils.RetryTimeout:
      return constants.JOB_NOTCHANGED


769
770
771
772
773
774
775
776
777
778
779
780
def _EncodeOpError(err):
  """Encodes an error which occurred while processing an opcode.

  """
  if isinstance(err, errors.GenericError):
    to_encode = err
  else:
    to_encode = errors.OpExecError(str(err))

  return errors.EncodeException(to_encode)


781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
class _TimeoutStrategyWrapper:
  def __init__(self, fn):
    """Initializes this class.

    """
    self._fn = fn
    self._next = None

  def _Advance(self):
    """Gets the next timeout if necessary.

    """
    if self._next is None:
      self._next = self._fn()

  def Peek(self):
    """Returns the next timeout.

    """
    self._Advance()
    return self._next

  def Next(self):
    """Returns the current timeout and advances the internal state.

    """
    self._Advance()
    result = self._next
    self._next = None
    return result


813
class _OpExecContext:
814
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
815
816
817
818
819
820
821
822
    """Initializes this class.

    """
    self.op = op
    self.index = index
    self.log_prefix = log_prefix
    self.summary = op.input.Summary()

Michael Hanselmann's avatar
Michael Hanselmann committed
823
824
825
826
827
828
    # Create local copy to modify
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
      self.jobdeps = op.input.depends[:]
    else:
      self.jobdeps = None

829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
    self._timeout_strategy_factory = timeout_strategy_factory
    self._ResetTimeoutStrategy()

  def _ResetTimeoutStrategy(self):
    """Creates a new timeout strategy.

    """
    self._timeout_strategy = \
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)

  def CheckPriorityIncrease(self):
    """Checks whether priority can and should be increased.

    Called when locks couldn't be acquired.

    """
    op = self.op

    # Exhausted all retries and next round should not use blocking acquire
    # for locks?
    if (self._timeout_strategy.Peek() is None and
        op.priority > constants.OP_PRIO_HIGHEST):
      logging.debug("Increasing priority")
      op.priority -= 1
      self._ResetTimeoutStrategy()
      return True

    return False

  def GetNextLockTimeout(self):
    """Returns the next lock acquire timeout.

    """
    return self._timeout_strategy.Next()

864

865
class _JobProcessor(object):
866
867
  def __init__(self, queue, opexec_fn, job,
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
868
869
870
871
872
873
    """Initializes this class.

    """
    self.queue = queue
    self.opexec_fn = opexec_fn
    self.job = job
874
    self._timeout_strategy_factory = _timeout_strategy_factory
875
876

  @staticmethod
877
  def _FindNextOpcode(job, timeout_strategy_factory):
878
879
880
881
    """Locates the next opcode to run.

    @type job: L{_QueuedJob}
    @param job: Job object
882
    @param timeout_strategy_factory: Callable to create new timeout strategy
883
884
885
886
887
888

    """
    # Create some sort of a cache to speed up locating next opcode for future
    # lookups
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
    # pending and one for processed ops.
889
890
    if job.ops_iter is None:
      job.ops_iter = enumerate(job.ops)
891
892
893
894

    # Find next opcode to run
    while True:
      try:
895
        (idx, op) = job.ops_iter.next()
896
897
898
899
900
901
902
      except StopIteration:
        raise errors.ProgrammerError("Called for a finished job")

      if op.status == constants.OP_STATUS_RUNNING:
        # Found an opcode already marked as running
        raise errors.ProgrammerError("Called for job marked as running")

903
904
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
                             timeout_strategy_factory)
905

906
907
      if op.status not in constants.OPS_FINALIZED:
        return opctx
908

909
910
911
912
913
914
      # This is a job that was partially completed before master daemon
      # shutdown, so it can be expected that some opcodes are already
      # completed successfully (if any did error out, then the whole job
      # should have been aborted and not resubmitted for processing).
      logging.info("%s: opcode %s already processed, skipping",
                   opctx.log_prefix, opctx.summary)
915
916
917
918
919
920
921
922
923

  @staticmethod
  def _MarkWaitlock(job, op):
    """Marks an opcode as waiting for locks.

    The job's start timestamp is also set if necessary.

    @type job: L{_QueuedJob}
    @param job: Job object
924
925
    @type op: L{_QueuedOpCode}
    @param op: Opcode object
926
927
928

    """
    assert op in job.ops
929
930
931
932
    assert op.status in (constants.OP_STATUS_QUEUED,
                         constants.OP_STATUS_WAITLOCK)

    update = False
933
934

    op.result = None
935
936
937
938
939
940
941
942

    if op.status == constants.OP_STATUS_QUEUED:
      op.status = constants.OP_STATUS_WAITLOCK
      update = True

    if op.start_timestamp is None:
      op.start_timestamp = TimeStampNow()
      update = True
943
944
945

    if job.start_timestamp is None:
      job.start_timestamp = op.start_timestamp
946
947
948
949
950
      update = True

    assert op.status == constants.OP_STATUS_WAITLOCK

    return update
951

Michael Hanselmann's avatar
Michael Hanselmann committed
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
  @staticmethod
  def _CheckDependencies(queue, job, opctx):
    """Checks if an opcode has dependencies and if so, processes them.

    @type queue: L{JobQueue}
    @param queue: Queue object
    @type job: L{_QueuedJob}
    @param job: Job object
    @type opctx: L{_OpExecContext}
    @param opctx: Opcode execution context
    @rtype: bool
    @return: Whether opcode will be re-scheduled by dependency tracker

    """
    op = opctx.op

    result = False

    while opctx.jobdeps:
      (dep_job_id, dep_status) = opctx.jobdeps[0]

      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
                                                          dep_status)
      assert ht.TNonEmptyString(depmsg), "No dependency message"

      logging.info("%s: %s", opctx.log_prefix, depmsg)

      if depresult == _JobDependencyManager.CONTINUE:
        # Remove dependency and continue
        opctx.jobdeps.pop(0)

      elif depresult == _JobDependencyManager.WAIT:
        # Need to wait for notification, dependency tracker will re-add job
        # to workerpool
        result = True
        break

      elif depresult == _JobDependencyManager.CANCEL:
        # Job was cancelled, cancel this job as well
        job.Cancel()
        assert op.status == constants.OP_STATUS_CANCELING
        break

      elif depresult in (_JobDependencyManager.WRONGSTATUS,
                         _JobDependencyManager.ERROR):
        # Job failed or there was an error, this job must fail
        op.status = constants.OP_STATUS_ERROR
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
        break

      else:
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
                                     depresult)

    return result

1008
  def _ExecOpCodeUnlocked(self, opctx):
1009
1010
1011
    """Processes one opcode and returns the result.

    """
1012
1013
    op = opctx.op

1014
1015
    assert op.status == constants.OP_STATUS_WAITLOCK

1016
1017
    timeout = opctx.GetNextLockTimeout()

1018
1019
1020
    try:
      # Make sure not to hold queue lock while calling ExecOpCode
      result = self.opexec_fn(op.input,
1021
                              _OpExecCallbacks(self.queue, self.job, op),
1022
                              timeout=timeout, priority=op.priority)
1023
1024
1025
    except mcpu.LockAcquireTimeout:
      assert timeout is not None, "Received timeout for blocking acquire"
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1026
1027
1028
1029
1030
1031
1032
1033

      assert op.status in (constants.OP_STATUS_WAITLOCK,
                           constants.OP_STATUS_CANCELING)

      # Was job cancelled while we were waiting for the lock?
      if op.status == constants.OP_STATUS_CANCELING:
        return (constants.OP_STATUS_CANCELING, None)

1034
1035
      # Stay in waitlock while trying to re-acquire lock
      return (constants.OP_STATUS_WAITLOCK, None)
1036
    except CancelJob:
1037
      logging.exception("%s: Canceling job", opctx.log_prefix)
1038
1039
1040
      assert op.status == constants.OP_STATUS_CANCELING
      return (constants.OP_STATUS_CANCELING, None)
    except Exception, err: # pylint: disable-msg=W0703
1041
1042
      logging.exception("%s: Caught exception in %s",
                        opctx.log_prefix, opctx.summary)
1043
1044
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
    else:
1045
1046
      logging.debug("%s: %s successful",
                    opctx.log_prefix, opctx.summary)
1047
1048
      return (constants.OP_STATUS_SUCCESS, result)

1049
  def __call__(self, _nextop_fn=None):
1050
1051
    """Continues execution of a job.

1052
    @param _nextop_fn: Callback function for tests
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
    @rtype: bool
    @return: True if job is finished, False if processor needs to be called
             again

    """
    queue = self.queue
    job = self.job

    logging.debug("Processing job %s", job.id)

    queue.acquire(shared=1)
    try:
      opcount = len(job.ops)

1067
1068
      assert job.writable, "Expected writable job"

1069
1070
1071
1072
      # Don't do anything for finalized jobs
      if job.CalcStatus() in constants.JOBS_FINALIZED:
        return True

1073
1074
1075
      # Is a previous opcode still pending?
      if job.cur_opctx:
        opctx = job.cur_opctx
1076
        job.cur_opctx = None
1077
1078
1079
1080
1081
      else:
        if __debug__ and _nextop_fn:
          _nextop_fn()
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)

1082
      op = opctx.op
1083
1084
1085

      # Consistency check
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1086
                                     constants.OP_STATUS_CANCELING)
1087
                        for i in job.ops[opctx.index + 1:])
1088
1089
1090

      assert op.status in (constants.OP_STATUS_QUEUED,
                           constants.OP_STATUS_WAITLOCK,
1091
                           constants.OP_STATUS_CANCELING)
1092

1093
1094
1095
      assert (op.priority <= constants.OP_PRIO_LOWEST and
              op.priority >= constants.OP_PRIO_HIGHEST)

Michael Hanselmann's avatar
Michael Hanselmann committed
1096
1097
      waitjob = None

1098
      if op.status != constants.OP_STATUS_CANCELING:
1099
1100
1101
        assert op.status in (constants.OP_STATUS_QUEUED,
                             constants.OP_STATUS_WAITLOCK)

1102
        # Prepare to start opcode
1103
1104
1105
        if self._MarkWaitlock(job, op):
          # Write to disk
          queue.UpdateJobUnlocked(job)
1106
1107
1108

        assert op.status == constants.OP_STATUS_WAITLOCK
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1109
        assert job.start_timestamp and op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
1110
1111
1112
1113
        assert waitjob is None

        # Check if waiting for a job is necessary
        waitjob = self._CheckDependencies(queue, job, opctx)
1114

Michael Hanselmann's avatar
Michael Hanselmann committed
1115
1116
1117
        assert op.status in (constants.OP_STATUS_WAITLOCK,
                             constants.OP_STATUS_CANCELING,
                             constants.OP_STATUS_ERROR)
1118

Michael Hanselmann's avatar
Michael Hanselmann committed
1119
1120
1121
1122
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
                                         constants.OP_STATUS_ERROR)):
          logging.info("%s: opcode %s waiting for locks",
                       opctx.log_prefix, opctx.summary)
1123

Michael Hanselmann's avatar
Michael Hanselmann committed
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
          assert not opctx.jobdeps, "Not all dependencies were removed"

          queue.release()
          try:
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
          finally:
            queue.acquire(shared=1)

          op.status = op_status
          op.result = op_result

          assert not waitjob
1136

1137
        if op.status == constants.OP_STATUS_WAITLOCK:
1138
1139
          # Couldn't get locks in time
          assert not op.end_timestamp
1140
        else:
1141
1142
          # Finalize opcode
          op.end_timestamp = TimeStampNow()
1143

1144
1145
1146
1147
1148
          if op.status == constants.OP_STATUS_CANCELING:
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
                                  for i in job.ops[opctx.index:])
          else:
            assert op.status in constants.OPS_FINALIZED
1149

Michael Hanselmann's avatar
Michael Hanselmann committed
1150
      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1151
1152
        finalize = False

Michael Hanselmann's avatar
Michael Hanselmann committed
1153
        if not waitjob and opctx.CheckPriorityIncrease():
1154
1155
          # Priority was changed, need to update on-disk file
          queue.UpdateJobUnlocked(job)
1156

1157
1158
        # Keep around for another round
        job.cur_opctx = opctx
1159

1160
1161
        assert (op.priority <= constants.OP_PRIO_LOWEST and
                op.priority >= constants.OP_PRIO_HIGHEST)
1162

1163
        # In no case must the status be finalized here
1164
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1165
1166

      else:
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
        # Ensure all opcodes so far have been successful
        assert (opctx.index == 0 or
                compat.all(i.status == constants.OP_STATUS_SUCCESS
                           for i in job.ops[:opctx.index]))

        # Reset context
        job.cur_opctx = None

        if op.status == constants.OP_STATUS_SUCCESS:
          finalize = False

        elif op.status == constants.OP_STATUS_ERROR:
          # Ensure failed opcode has an exception as its result
          assert errors.GetEncodedError(job.ops[opctx.index].result)

          to_encode = errors.OpExecError("Preceding opcode failed")
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                _EncodeOpError(to_encode))
          finalize = True
1186

1187
1188
1189
1190
          # Consistency check
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
                            errors.GetEncodedError(i.result)
                            for i in job.ops[opctx.index:])
1191

1192
1193
1194
1195
1196
1197
1198
1199
        elif op.status == constants.OP_STATUS_CANCELING:
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                                "Job canceled by request")
          finalize = True

        else:
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)

1200
1201
1202
1203
1204
        if opctx.index == (opcount - 1):
          # Finalize on last opcode
          finalize = True

        if finalize:
1205
          # All opcodes have been run, finalize job
1206
          job.Finalize()
1207
1208
1209
1210

        # Write to disk. If the job status is final, this is the final write
        # allowed. Once the file has been written, it can be archived anytime.
        queue.UpdateJobUnlocked(job)
1211

Michael Hanselmann's avatar
Michael Hanselmann committed
1212
1213
        assert not waitjob

1214
        if finalize:
1215
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
Michael Hanselmann's avatar
Michael Hanselmann committed
1216
1217
          # TODO: Check locking
          queue.depmgr.NotifyWaiters(job.id)
1218
          return True
1219

Michael Hanselmann's avatar
Michael Hanselmann committed
1220
1221
1222
      assert not waitjob or queue.depmgr.JobWaiting(job)

      return bool(waitjob)
1223
    finally:
1224
      assert job.writable, "Job became read-only while being processed"
1225
1226
1227
      queue.release()


1228
1229
1230
1231
class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
1232
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
1233
1234
    """Job executor.

1235
1236
1237
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
1238
    """
1239
1240
    assert job.writable, "Expected writable job"

Michael Hanselmann's avatar
Michael Hanselmann committed
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
    # Ensure only one worker is active on a single job. If a job registers for
    # a dependency job, and the other job notifies before the first worker is
    # done, the job can end up in the tasklist more than once.
    job.processor_lock.acquire()
    try:
      return self._RunTaskInner(job)
    finally:
      job.processor_lock.release()

  def _RunTaskInner(self, job):
    """Executes a job.

    Must be called with per-job lock acquired.

Michael Hanselmann's avatar
Michael Hanselmann committed
1255
    """
1256
1257
1258
    queue = job.queue
    assert queue == self.pool.queue

1259
1260
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
    setname_fn(None)
1261

1262
1263
    proc = mcpu.Processor(queue.context, job.id)

1264
1265
1266
1267
1268
    # Create wrapper for setting thread name
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
                                    proc.ExecOpCode)

    if not _JobProcessor(queue, wrap_execop_fn, job)():
1269
      # Schedule again
1270
      raise workerpool.DeferTask(priority=job.CalcPriority())
Michael Hanselmann's avatar
Michael Hanselmann committed
1271

1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
  @staticmethod
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
    """Updates the worker thread name to include a short summary of the opcode.

    @param setname_fn: Callable setting worker thread name
    @param execop_fn: Callable for executing opcode (usually
                      L{mcpu.Processor.ExecOpCode})

    """
    setname_fn(op)
    try:
      return execop_fn(op, *args, **kwargs)
    finally:
      setname_fn(None)

  @staticmethod
  def _GetWorkerName(job, op):
    """Sets the worker thread name.

    @type job: L{_QueuedJob}
    @type op: L{opcodes.OpCode}

    """
    parts = ["Job%s" % job.id]

    if op:
      parts.append(op.TinySummary())

    return "/".join(parts)

Michael Hanselmann's avatar
Michael Hanselmann committed
1302
1303

class _JobQueueWorkerPool(workerpool.WorkerPool):
1304
1305
1306
  """Simple class implementing a job-processing workerpool.

  """
1307
  def __init__(self, queue):
1308
    super(_JobQueueWorkerPool, self).__init__("Jq",
1309
                                              JOBQUEUE_THREADS,
Michael Hanselmann's avatar
Michael Hanselmann committed
1310
                                              _JobQueueWorker)
1311
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
1312
1313


Michael Hanselmann's avatar
Michael Hanselmann committed
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422